Delta Lakehouse Design uses Medallion (bronze, silver, and gold) Zones (2024)

By: John Miner |Updated: 2023-11-02 |Comments | Related: > Azure Databricks


Problem

The Delta Lakehouse design uses a medallion (bronze, silver, and gold) architecturefor data quality. How can we abstract the read and write actions in Sparkto create a dynamic notebook to process data files?

Solution

The data movement between the bronze and silver zones is a consistent pattern.Therefore, we will build generic read and writefunctions to handle various file types. Once these functions are tested, wecan put the pieces together to create and schedule a dynamic notebook.

Business Problem

The top management at the Adventure Works company is interested in creating aDeltaLakehouse. The image below shows how the data quality improves when filesare processed from left to right. In my design, I will use a stage zone.This storage container contains just today's data file, while the bronze zonewill keep a copy of all data files. This may be a requirement for highly regulatedindustries that need a file audit trail.

Delta Lakehouse Design uses Medallion (bronze, silver, and gold) Zones (1)

We will store the metadata for a given job as a row in a delta table. Pleasesee myprevious article on why SQL-based storage is important when many parametersare passed to a program. The following tasks will be covered in four articles.Tasks 1-7 are focused on building a toolbox. This is where we will be focusingour attention today.

Task IdDescriptionArticle
1Abstract logging1
2Test logging1
3Abstract file reading1
4Additional test files1
5Test file reading1
6Abstract file writing1
7Test file writing1
8Full load notebook2
9Designing Workflows2
10Scheduling Workflows2
11Upserting data3
12Identifying primary keys3
13Soft vs. hard deletes3
14Incremental load notebook3
15Creating gold layer delta tables4
16Reading from delta tables using Power BI4
17Creating gold layer parquet files4
18Reading from the data lake using Power BI4

Looking back at our dashboard, we can see a whole system overview. Hereare the components used in our Delta Lake Design:

  • Azure Key Vault –save secrets.
  • Azure Storage –store files.
  • Azure Log Analytics –keep audit log.
  • Azure Databricks –data engineering notebooks.

Delta Lakehouse Design uses Medallion (bronze, silver, and gold) Zones (2)

At the end of the article, we will have a toolbox with completely tested code.These functions will be used to create a dynamic notebook that will move data betweenthe stage, bronze, and silver zones.

Abstracting Logging

Azure Log Analytics allows system designers to centralize auditing from a varietyof programs running on various Azure services. To make a rest API call, weneed to store the workspace ID, primary key and secondary key that are shown belowin a key vault. This information can be found under Settings -> Agentsmenu selections.

Delta Lakehouse Design uses Medallion (bronze, silver, and gold) Zones (3)

The code snippet below brings in the libraries needed to make the REST API call.Please see Microsoftdocumentation for the details. Of course, we need to pull the workspaceID and key from the key vault for the code to work. Why not pass this informationas parameters to the toolbox function? Since we are trying to create a centralizedlogging area, there is no need to have multiple log analytic services.

## Include libraries, read secrets# import base64, datetime, hashlib, hmac, json, requests, uuidworkspace_id = dbutils.secrets.get("ss4tips", "sec-la-workspace-id")shared_key = dbutils.secrets.get("ss4tips", "sec-la-workspace-key")

The hardest part of making a REST API call is creating the header. Thisusually involves encoding the secret and passing other required information fora successful call. The child function named build_signaturereturns an authorization string.

## build_signature() - this is the authorization string for the api call# def build_signature(date, content_length, method, content_type, resource): # define headers x_headers = 'x-ms-date:' + date string_to_hash = method + "\n" + str(content_length) + "\n" + content_type + "\n" + x_headers + "\n" + resource # encode header bytes_to_hash = str.encode(string_to_hash, 'utf-8') decoded_key = base64.b64decode(shared_key) encoded_hash = (base64.b64encode(hmac.new(decoded_key, bytes_to_hash, digestmod=hashlib.sha256).digest())).decode() # return authorization authorization = "SharedKey {}:{}".format(workspace_id, encoded_hash) return authorization 

The parent function, post_log_data, takes two parameters:the body and the log type. When designing a function, one must decide whatto do if the call fails. I have decided to print a message that will be shownin the data engineering notebook. Either the JSON document was logged or not.Please see the Python code below for full details.

## post_log_data() – save the body to the log table# def post_log_data(body, log_type): # Static variables method = 'POST' content_type = 'application/json' resource = '/api/logs' # Grab date/time rfc1123date = datetime.datetime.utcnow().strftime('%a, %d %b %Y %H:%M:%S GMT') # Size of msg content_length = len(body) # Grab auth signature signature = build_signature(rfc1123date, content_length, method, content_type, resource) # Define url + header uri = 'https://' + workspace_id + '.ods.opinsights.azure.com' + resource + '?api-version=2016-04-01' headers = { 'content-type': content_type, 'Authorization': signature, 'Log-Type': log_type, 'x-ms-date': rfc1123date } # Make https call response = requests.post(uri, data=body, headers=headers) # Check return value if (response.status_code >= 200 and response.status_code <= 299): print ('post_log_data() - accepted and stored message') else: print ("post_log_data() - error processing message, response code: {}".format(response.status_code)) 

Now that we have defined our function, it is time to test it.

Test Logging

The toolbox notebook is a work in progress. In my last article, we talkedabout abstracting parameters. Today, we will focus on logging, reading, andwriting files. The header from the test notebook is shown below.

Delta Lakehouse Design uses Medallion (bronze, silver, and gold) Zones (4)

For this simple testing notebook, I chose to pass the parameters using a JSONdocument. However, in the final data engineering notebook, we will store theparameters in a metadata table using the DELTA file format. The code belowparses the folder path to retrieve the schema and table name for the dimensionalmodel files, representing each table in the SQL Server source. I will showthe first call that logs the start of the notebook. Just change the actionproperty to log the end of the notebook. Having recorded these two events,we can calculate the total time the notebook executes.

## Write start msg to log analytics# # remove slash from dir path, split into partspath_parts = app_config['folder_path'].split("/") # which file are we processingnb_parm = "process [{}].[{}] file".format(path_parts[0], path_parts[1]) # Custom log in workspacelog_type = 'AdvWrksDlp' # Create start messagejson_data = [ { "id": str(uuid.uuid4()), "platform": "Azure Data Bricks", "software": "Adventure Works - Data Platform", "component": "nb-test-toolbox", "action": "Start Program", "parms": nb_parm, "version": 1.01} ]body = json.dumps(json_data) # Post message to log analyticspost_log_data(body, log_type)

We can use the Kusto Query language to look at the entries in the Adventure WorksData Lake Platform Custom Log (AdvWrksDlp_CL).

Delta Lakehouse Design uses Medallion (bronze, silver, and gold) Zones (5)

Unless you think there will be a potential bottleneck in the notebook code, addinga log entry for the start and end of the Spark session is good enough to calculatethe total run time. Additional entries can be added to determine how longit takes to move from one quality zone to another if needed.

Abstracting File Reading

The Python Spark library can read the following file formats: AVRO, CSV,DELTA, JSON, ORC, PARQUET, and TEXT. We want to provide our genericread_data function with enough parameters to read any supportedfile type. The function will return a Spark dataframe as an output.The code snippet below implements our user-defined function. File type canbe classified as strong or weak. Strong file types are self-contained anddo not require additional data. Weak file types may require additional parameterssuch as a schema definition, a header line flag, and/or a delimiter character toperform the read.

## read_data - given a file format, read in the data# def read_data(file_path, file_type = 'PARQUET', file_header = None, file_schema = None, file_delimiter = None): # convert to upper case file_type = file_type.upper() # valid file types file_types = ['AVRO', 'CSV', 'DELTA', 'JSON', 'ORC', 'PARQUET', 'TEXT'] # read avro if (file_type == 'AVRO'): df = spark.read.format("avro").load(file_path) return df # read delta if (file_type == 'DELTA'): df = spark.read.format("delta").load(file_path) return df # read orc if (file_type == 'ORC'): df = spark.read.format("orc").load(file_path) return df # read parquet if (file_type == 'PARQUET'): df = spark.read.format("parquet").load(file_path) return df # read json if (file_type == 'JSON'): if (file_schema is None): df = spark.read.format("json").option("inferschema", "true").load(file_path) else: df = spark.read.format("json").schema(file_schema).load(file_path) return df # do we have a header line? if (file_header is None or file_header.upper() == 'TRUE'): header_flag = "true" else: header_flag = "false" # do we have a delimiter? if (file_delimiter is None and file_type == 'CSV'): sep_char = "," elif (file_delimiter is None and file_type == 'TEXT'): sep_char = "\r\n" else: sep_char = file_delimiter # read csv if (file_type == 'CSV'): if (file_schema is None): df = spark.read.format("csv").option("header", header_flag).option("inferschema", "true").option("delimiter", sep_char).load(file_path) else: df = spark.read.format("csv").option("header", header_flag).schema(file_schema).option("delimiter", sep_char).load(file_path) return df # read text if (file_type == 'TEXT'): df = spark.read.format("text").option("lineSep", sep_char).option("wholetext", "false").load(file_path) #df = spark.read.format("text").load(file_path) return df # unsupported file type if (file_type not in file_types): df = spark.emptyDataFrame() return df

Note: An invalid file type results in an empty dataframe return value.Now that we have our user-defined read function, we need to createtest cases for each file type.

Additional Test Files

The Azure Databricks workspace contains a folder on the file system nameddatabricks-dataset. Thisfolder contains data in various formats. However, I could not file data in theAVRO and/or ORC formats. Therefore, we need to read in a known dataset andcreate temporary copies of the file to work with.

I found an online retail dataset saved in a CSV format. The Python codebelow uses our new function to read the file into a variable named df0.

## Setup 1 - read csv data file# # the foldervar_path = '/databricks-datasets/online_retail/data-001' # the typevar_type = 'CSV' # read files & infer schemadf0 = read_data(var_path, var_type) # show datadisplay(df0)

Thedisplay command shows the information in a scrollable grid. We can seethe online retail data in the image below.

Delta Lakehouse Design uses Medallion (bronze, silver, and gold) Zones (6)

Because I have not defined a generic write_data function yet,we will use the Databricks utility and PySpark commands to solve the problem.The code below creates a sub-directory named online_retail underthe tmp root directory. I will repartition the dataframeto one file and save the information in a sub-directory named avro.

## Setup 2 - write avro file# # remove dirtmp_dir = '/tmp/online_retail/avro'try: dbutils.fs.rm(tmp_dir, recurse=True)except: pass # write dirdf0.repartition(1).write.format('avro').save(tmp_dir) # show dirdisplay(dbutils.fs.ls(tmp_dir))

The list command was used to display the files in the avro sub-directory.The one file starting with part-00000 represents the partitioned dataframe savedin AVRO format. Please see the image below for details.

Delta Lakehouse Design uses Medallion (bronze, silver, and gold) Zones (7)

We need to repeat this process for the ORC file format.

## Setup 3 - write orc file# # remove dirtmp_dir = '/tmp/online_retail/orc'try: dbutils.fs.rm(tmp_dir, recurse=True)except: pass # write dirdf0.repartition(1).write.format('orc').save(tmp_dir) # show dirdisplay(dbutils.fs.ls(tmp_dir))

Again, the list command was used to display the files in the orcsub-directory. The one file starting with part-00000 represents the partitioneddataframe saved in ORC format. Please see the image below for details.

Delta Lakehouse Design uses Medallion (bronze, silver, and gold) Zones (8)

Now that we have test files for each format, we can finally test our genericread_data function.

Test File Reading

The pattern for reading files is quite similar. That is why we createda generic read function that can be parameter-driven. Thecode below reads anAVRO file.It requires a file path and file type since it is a strong file type!

## Read 1 - avro file(s)# # the foldervar_path = '/tmp/online_retail/avro' # the typevar_type = 'AVRO' # read files & infer schemadf = read_data(var_path, var_type) # show datadisplay(df)

The image below shows the online retail dataframe displayedin a scrollable grid. Note: The AVRO and ORC file tests use the same dataset.

Delta Lakehouse Design uses Medallion (bronze, silver, and gold) Zones (9)

The code below reads inCSVdata files on bike sharing.

## Read 2 - csv file(s)# # the foldervar_path = '/databricks-datasets/bikeSharing/data-001/' # the typevar_type = 'CSV' # read files & infer schemadf = read_data(var_path, var_type) # show datadisplay(df)

The image below shows the bike sharing dataframe displayed ina scrollable grid.

Delta Lakehouse Design uses Medallion (bronze, silver, and gold) Zones (10)

The code below reads in theDELTA file for people's salaries.

## Read 3 - delta file(s)# # the foldervar_path = '/databricks-datasets/learning-spark-v2/people/people-10m.delta' # the typevar_type = 'DELTA' # read files & infer schemadf = read_data(var_path, var_type) # show datadisplay(df)

The image below shows the people's salary dataframe displayedin a scrollable grid.

Delta Lakehouse Design uses Medallion (bronze, silver, and gold) Zones (11)

The code below reads inJSON data filesfor IOTdevices.

## Read 4 - json file(s)# # the foldervar_path = '/databricks-datasets/iot/' # the typevar_type = 'JSON' # read files & infer schemadf = read_data(var_path, var_type) # show datadisplay(df)

The image below shows the Internet Of Things (IOT) dataframedisplayed in a scrollable grid.

Delta Lakehouse Design uses Medallion (bronze, silver, and gold) Zones (12)

The code below reads in theORC datafile for online retail data.

## Read 5 - orc file(s)# # the foldervar_path = '/tmp/online_retail/orc' # the typevar_type = 'orc' # read files & infer schemadf = read_data(var_path, var_type) # show datadisplay(df)

The image below shows the online retail dataframe displayedin a scrollable grid.

Delta Lakehouse Design uses Medallion (bronze, silver, and gold) Zones (13)

The code below reads in thePARQUETdata file for Amazon Sales data.

## Read 6 - parquet file(s)# # the foldervar_path = '/databricks-datasets/amazon/data20K/' # the typevar_type = 'PARQUET' # read files & infer schemadf = read_data(var_path, var_type) # show datadisplay(df)

The image below shows the amazon sales dataframe displayed ina scrollable grid.

Delta Lakehouse Design uses Medallion (bronze, silver, and gold) Zones (14)

The code below reads in aTEXT datafile for power plant readings.

That has been a tedious exercise, but testing is almost complete. One thingI did not disclose is that we have been passing parameters by position. Also,the read_data function has default values for missing arguments.The only requiredfunction arguments are the file path and the file type. To pass a parameterthat is out of order, I suggest passing the parameters by name.

## Read 7 - text file(s)# # the foldervar_path = '/databricks-datasets/power-plant/data/' # the type (CR - '\r' or LF - '\n')var_type = 'TEXT'var_del = '\n' # read files & infer schemadf = read_data(file_path=var_path, file_type=var_type, file_delimiter=var_del) # show datadisplay(df)

Passing a different delimiter to the read data function resultsin different looking dataframes. The Azure Databricks system runs on a clusterof Linux boxes. When we use theline feed character(LF or \n) as the delimiter, we read all five files a line at a time. Thereare thousands of lines or strings.

Delta Lakehouse Design uses Medallion (bronze, silver, and gold) Zones (15)

The power plant data file contains nocarriagereturns (CR or \r). If we choose this character as the delimiter, we cansee that the dataframe has five rows. Each row is the complete file as a singlestring.

Delta Lakehouse Design uses Medallion (bronze, silver, and gold) Zones (16)

To recap, we have painstakingly tested all file formats supported by theread_data function.

Abstracting File Writing

The Python Spark library can write the following file formats: AVRO, CSV,DELTA, JSON, ORC, PARQUET, and TEXT. We want to provide our genericwrite_data function with enough parameters to write any supportedfile type. The return value from the function is a numeric representing failure(-1) or success (0). It is up to the developer to set the partition countor column of the input dataframe. While some file formats support appendingdata, the only file format that truly supports INSERT, UPDATE, and DELETE commandsis the DELTA format. Today, we are going to focus on the full data load pattern.Thus, the file folder is deleted before data is written.

## write_data - given dataframe, write files# def write_data(input_df, file_path, file_type = 'PARQUET', file_header = None, file_schema = None, file_delimiter = None): # convert to upper case file_type = file_type.upper() # valid file types file_types = ['AVRO', 'CSV', 'DELTA', 'JSON', 'ORC', 'PARQUET', 'TEXT'] # unsupported file type if (file_type not in file_types): return -1 # remove dir if exists try: dbutils.fs.rm(file_path, recurse=True) except: pass # write avro if (file_type == 'AVRO'): input_df.write.format("avro").save(file_path) return 0 # write delta - supports ACID properties if (file_type == 'DELTA'): input_df.write.format("delta").save(file_path) return 0 # write orc if (file_type == 'ORC'): df.write.format("orc").save(file_path) return 0 # write parquet if (file_type == 'PARQUET'): input_df.write.format("parquet").save(file_path) return 0 # write json if (file_type == 'JSON'): input_df.write.format("json").save(file_path) return 0 # do we have a header line? if (file_header is None or file_header.upper() == 'TRUE'): header_flag = "true" else: header_flag = "false" # do we have a delimiter? if (file_delimiter is None and file_type == 'CSV'): sep_char = "," elif (file_delimiter is None and file_type == 'TEXT'): sep_char = "\r\n" else: sep_char = file_delimiter # write csv if (file_type == 'CSV'): input_df.write.format("csv").option("header", header_flag).option("delimiter", sep_char).save(file_path) return 0 # write text if (file_type == 'TEXT'): input_df.write.format("text").option("lineSep", sep_char).option("wholetext", "false").save(file_path) return 0

Now that we have our user-defined write function, we need totest each file type.

Test File Writing

The pattern for writing files is quite similar. That is why we createda generic write function that can be parameter-driven. Allfiles will be written to a test folder in the data lake. We will use the onlineretail dataframe as the input for all write tests.

The code below writes anAVRO file.

## Write 1 - avro file format# # the foldervar_path = '/mnt/advwrks/datalake/test/avro' # the typevar_type = 'AVRO' # write out retail dataret = write_data(input_df = df0, file_path=var_path, file_type=var_type) # show dirdisplay(dbutils.fs.ls(var_path))

The image below shows the AVRO directory with the files listed in a scrollablegrid.

Delta Lakehouse Design uses Medallion (bronze, silver, and gold) Zones (17)

The code below writes aCSVfile.

## Write 2 - csv file format# # the foldervar_path = '/mnt/advwrks/datalake/test/csv' # the typevar_type = 'CSV' # write out retail dataret = write_data(input_df = df0, file_path=var_path, file_type=var_type) # show dirdisplay(dbutils.fs.ls(var_path))

The image below shows the CSV directory with the files listed in a scrollablegrid.

Delta Lakehouse Design uses Medallion (bronze, silver, and gold) Zones (18)

The code below writes aDELTA file.

## Write 3 - delta file format# # the foldervar_path = '/mnt/advwrks/datalake/test/delta' # the typevar_type = 'DELTA' # write out retail dataret = write_data(input_df = df0, file_path=var_path, file_type=var_type) # show dirdisplay(dbutils.fs.ls(var_path))

The image below shows the DELTA directory with the files listed in a scrollablegrid.

Delta Lakehouse Design uses Medallion (bronze, silver, and gold) Zones (19)

The code below writes aJSON file.

## Write 4 - json file format# # the foldervar_path = '/mnt/advwrks/datalake/test/json' # the typevar_type = 'JSON' # write out retail dataret = write_data(input_df = df0, file_path=var_path, file_type=var_type) # show dirdisplay(dbutils.fs.ls(var_path))

The image below shows the JSON directory with the files listed in a scrollablegrid.

Delta Lakehouse Design uses Medallion (bronze, silver, and gold) Zones (20)

The code below writes anORC file.

## Write 5 - orc file format# # the foldervar_path = '/mnt/advwrks/datalake/test/orc' # the typevar_type = 'ORC' # write out retail dataret = write_data(input_df = df0, file_path=var_path, file_type=var_type) # show dirdisplay(dbutils.fs.ls(var_path))

The image below shows the ORC directory with the files listed in a scrollablegrid.

Delta Lakehouse Design uses Medallion (bronze, silver, and gold) Zones (21)

The code below writes aPARQUETfile.

## Write 6 - parquet file format# # the foldervar_path = '/mnt/advwrks/datalake/test/parquet' # the typevar_type = 'PARQUET' # write out retail dataret = write_data(input_df = df0, file_path=var_path, file_type=var_type) # show dirdisplay(dbutils.fs.ls(var_path))

The image below shows the PARQUET directory with the files listed in a scrollablegrid.

Delta Lakehouse Design uses Medallion (bronze, silver, and gold) Zones (22)

To write a text file, we need to work on formatting the dataframe into a singlecolumn of type string. The array spark function can be used to concatenatethe fields into a single column called 'merged' oftype string. Instead of df0 as the input dataframe, we will use df1, whichhas the correct format for the TEXT file type.

## Concatenate + Cast# # req libfrom pyspark.sql import * # perform actiondf1 = df0.select(array('InvoiceNo', 'StockCode', 'Description', 'Quantity', 'InvoiceDate', 'UnitPrice', 'CustomerID', 'Country').alias('merged').cast(StringType())) # just one partdf1 = df1.repartition(1) # show resultdisplay(df1)

The image below shows the data as the string representation of an array.

Delta Lakehouse Design uses Medallion (bronze, silver, and gold) Zones (23)

The code below writes aTEXT file.

## Write 7 - text file format# # the foldervar_path = '/mnt/advwrks/datalake/test/text' # the typevar_type = 'TEXT' # write out retail dataret = write_data(input_df = df1, file_path=var_path, file_type=var_type) # show dirdisplay(dbutils.fs.ls(var_path))

The image below shows the TEXT directory with the files listed in a scrollablegrid.

Delta Lakehouse Design uses Medallion (bronze, silver, and gold) Zones (24)

In a nutshell, we have painstakingly tested all file formats supported by thewrite_data function.

Summary

Today, we spent a lot of time creating generic functions for logging events,reading various file formats, and writing multiple formats. This time waswell spent since this toolbox will be pivotal to the full load and incremental loadparameter-driven notebooks we will craft in the future.

Why write audit logs to Azure Log Analytics? A complete data platform systemmight have events occurring in Azure Data Factory, Azure Databricks, and/or AzureSQL Database. Each service is completely separate and has different defaultdata retention settings. By using Azure Log Analytics, we have a centralizedlogging solution. Even on-premises tools like SSIS can add audit logs usingthe REST API.

Reading and writing data using theApache Spark Library is straightforward. However, abstracting the actualfunction calls is very powerful. A single function can be called with differentparameters to read or write various file formats. These generic functionswill be used to write future parameter-driven notebooks.

Not covered in this article is reading or writing from Relational Databases.Since we are focused on creating Delta Lakehouse, this functionality is unnecessary.Are the supplied functions complete? There is always room for improvement.For instance, many formats support compression. A TEXT file can be saved usingthe GZIP algorithm. I leave enhancements to the functions for you to complete.The current version of the functions is adequate to demonstrate both full and incrementalloading using a parameter-driven notebook.

Enclosed are the files used in this article. Next time, we will focus oncreating a full-load data engineering notebook and scheduling a complete set offiles for our Adventure Works dimensional model.

Next Steps
  • Full file loading with Delta Tables
  • Incremental file loading with Delta Tables
  • Read these additionalDatabricks articles




About the author

John Miner is a Data Architect at Insight Digital Innovation helping corporations solve their business needs with various data platform solutions.

This author pledges the content of this article is based on professional experience and not AI generated.

View all my tips

Article Last Updated: 2023-11-02

Delta Lakehouse Design uses Medallion (bronze, silver, and gold) Zones (2024)

References

Top Articles
Latest Posts
Article information

Author: Nathanial Hackett

Last Updated:

Views: 6173

Rating: 4.1 / 5 (72 voted)

Reviews: 87% of readers found this page helpful

Author information

Name: Nathanial Hackett

Birthday: 1997-10-09

Address: Apt. 935 264 Abshire Canyon, South Nerissachester, NM 01800

Phone: +9752624861224

Job: Forward Technology Assistant

Hobby: Listening to music, Shopping, Vacation, Baton twirling, Flower arranging, Blacksmithing, Do it yourself

Introduction: My name is Nathanial Hackett, I am a lovely, curious, smiling, lively, thoughtful, courageous, lively person who loves writing and wants to share my knowledge and understanding with you.