By: Maria Zakourdaev | Updated: 2022-11-18 | Comments (1) | Related: > Azure Databricks
Problem
The main Data Engineers' challenge is building data processing pipelines that handle a considerable amount of ever-growing data. Modern architectures embrace polyglot persistence, where data sources are scattered among different data processing systems. There is a constant need to migrate the data between sources, fix data types, remove duplicates or empty rows, and merge and aggregate the datasets.
The more data transformation logic incorporated into the flow, the more complex the pipeline maintenance is. Instead of focusing on data, we spend a tremendous amount of time on tooling, working on a retry logic, change tracking, and incrementalizing the flow.
All data starts dirty, regardless of the data sources. Data cleanliness is a matter of requirements, and requirements tend to change all the time. Requirement changes trigger the need to backfill and rerun the data flows. In addition, Data Engineers are required to track data movements and make sure they update tables in the correct order. However, Data Engineers would prefer to focus on data and queries and avoid the operational complexity of the pipelines.
Solution
Most ETL/ELT frameworks perform procedural transformations, working row by row when applying data cleanup or enrichment logic. Procedural data processing means we explicitly state how to get the data and process it on the lowest technical level, breaking it down into the smallest steps.
Declarative data processing works differently. We neither define the order of data transformation logic execution nor require an explanation of every execution step. When we use a declarative approach to process data, execution timing and pipeline complexity are significantly reduced. Modern data processing systems have intelligent data processing algorithms that are getting smarter with every new version.
Delta Live Tables
This tip will introduce you to an innovative Databricks framework called Delta Live Tables. It is a dynamic data transformation tool, similar to the materialized views.
Delta Live Tables are simplified pipelines that use declarative development in a "data-as-a-code" style. Databricks takes care of finding the best execution plan and managing the cluster resources. We only need to define the data transformations. Pipelines are "stateful"—they read each data row only once, and they can track data quality and alert on percentage of bad records growth.
Delta Live Table is built with the concept of data table chaining and reuse. When the source table definition or data changes, the changes automatically propagate to the chained tables.
Delta Pipeline will visualize linked data sources and target datasets. Delta Live Framework automatically creates and maintains tables in Delta format and ensures the data is updated according to the pipeline data transformations definitions. We can also define "Expectations": "constraints" that validate data correctness.
Delta Live tables fit well into the medallion architecture that segments data lake data into three categories: raw (dirty) data, cleaned, and aggregated groups of data tables (below).
Creating Delta Live Tables
Note: Delta Live Tables are not available in the Standard Pricing Tier, so make sure your Data Bricks account is Premium Pricing Tier.
Delta Live notebooks are special, and the best practice is to leave them unattached to a cluster and use a secondary notebook to run development and debug commands.
In the development notebook, I will create the database where I want to put Delta Live Tables:
CREATE DATABASE delta_live_lake;
I will also prepare a mount point to help read the data from Azure Data Lake:
dbutils.fs.mount( source = "wasbs://container_name@storage_account_name.blob.core.windows.net", mount_point = "/mnt/data/", extra_configs = {"fs.azure.account.key.admiraldatastorage.blob.core.windows.net":"account_key"})
Delta Live Table notebook will have one SQL command to create the Delta Live table. Since Delta tables are deeply integrated with the Spark Streaming engine, you can load Delta Table as a stream. The Spark engine will take care of running it incrementally and continuously as data arrives. This can reduce the costs of processing new data and lower data latency. However, such a table can only process append operations, while merges and deletes are not supported.
CREATE STREAMING LIVE TABLE yellow_trips COMMENT "The raw NY yellow taxi trips table" TBLPROPERTIES ("quality" = "bronze") AS SELECT * FROM cloud_files("/mnt/data/yellow-taxi-csv/", "csv",map("cloudFiles.inferColumnTypes", "true"));
We can now create a pipeline to build and maintain the above Delta Live Table. As seen below, choose the Workflows tab, then go to Delta Live Tables, a separate type of Databricks job.
To create a new pipeline, you need to choose the proper configuration settings. Most settings can be changed at any time during pipeline development, except for the "Storage location" setting, which can be set only during pipeline creation. Below are explanations of each configuration setting:
Product Edition
- Core – Most basic product edition. Best suited for workloads that do not require advanced features, like data quality constraints, called expectations (to be discussed later).
- Pro – Supports all core features and includes a feature that supports updating tables when base data changes (Change Data Capture).
- Advanced – Supports Core and Pro features as well as data quality constraints (expectations).
Notebook Libraries
Surprisingly, you put the name of your notebook that contains the table creation code. If your code requires additional Python libraries, you can add them after the pipeline is created using the "Settings" button.
Storage Location
Cloud storage location is where the tables and metadata will be stored in subfolders. If not specified, the system will use "dbfs:/pipelines/". Note: This setting cannot be changed after the pipeline is created.
Target
When building a Delta Live Table, you will not be able to provide a database name where the tables should be located. This configuration is used to choose the database target.
Pipeline Execution Mode
- Triggered – More cost-effective. Tables get updated on a schedule or manually without keeping cluster resources online.
- Continuous – Tables get updated as soon as data gets updated, which requires the cluster to run all the time.
Cluster Mode
- Legacy Autoscaling – Autoscaling dynamically allocates workers based on the workload and can reduce costs by deallocating cluster resources when they are not needed. This is a backward compatibility mode; in most cases, you should use "enhanced autoscaling" mode.
- Enhanced Autoscaling – Workers also dynamically allocated workers and includes all kinds of enhancements in cluster utilization and lower costs.
- Fixed-size – Cluster resources are static.
Photon Acceleration
This new query engine speeds up the processing of large datasets (100GB+) using robust scan performance and advanced join strategies on tables with many columns and/or many small files. Usually, this will not improve the performance of small datasets.
Channel
This configuration allows testing of the future version of Delta Live Runtime.
Execution Modes
- Development – When building the pipeline and re-trying execution, the pipeline will reuse the same cluster to avoid long waits for the resources and disables pipeline retries.
- Production – The pipeline will be executed based on a chosen schedule and will try to repair the execution by restarting the cluster when specific errors are received.
After successful pipeline execution, we can click on a table visualization to get information about the created live table schema and how many rows were loaded and failed. Since no data quality constraints were defined for this demonstration, all table data was successfully loaded, as seen below.
I will add another table to the data pipeline, a taxi zone lookup table, and rerun the flow.
CREATE STREAMING LIVE TABLE taxi_zones_lookup COMMENT "The NY yellow taxi zone lookup" TBLPROPERTIES ("quality" = "bronze") AS SELECT * FROM cloud_files("/mnt/data/ny_taxi_lookups/", "csv",map("cloudFiles.inferColumnTypes", "true"));
Now I can join both tables in a development notebook to analyze the data:
Let's define two data quality constraints on the yellow_trips table:
- Always have a pickup location. If the pickup location is empty, drop the row.
- Remove the trips where the passenger count = 0.
In addition, let's create a live table as the above aggregation to reflect the number of trips per Taxi Zone, the average number of passengers, and the average and maximum trip costs:
CREATE LIVE TABLE yellow_trips_metrics COMMENT "Aggregation by pickup location" TBLPROPERTIES ("quality" = "silver") AS SELECT date(t.tpep_pickup_datetime) as pickup_date, l1.Borough as pickup_location, l2.Borough as dropoff_location, avg(t.passenger_count) as passenger_count_avg, avg(t.total_amount) as total_amount_avg, max(t.total_amount) as total_amount_max FROM LIVE.yellow_trips t join LIVE.taxi_zones_lookup l1 on t.PULocationID = l1.LocationID join LIVE.taxi_zones_lookup l2 on t.DOLocationID = l2.LocationID GROUP BY date(t.tpep_pickup_datetime) , l1.Borough , l2.Borough
Here is my final notebook code:
The image below shows the table chaining visualization and data quality results after a Delta pipeline execution:
Best Practices
- Store the expectations outside the notebook in a format that can be easily edited, such as a JSON file in the Data Lake, to ensure the code is portable and easy to maintain.
- Save invalid data instead of dropping the rows to analyze later.
- Define cluster policies to limit cluster resources.
- The PIVOT clause is not supported as a Delta Live Tables transformation.
Next Steps
- Databricks SQL documentation
- Delta Live tables GitHub repo
- Delta Live tables Hugging Face Sentiment Analysis demo
About the author
This author pledges the content of this article is based on professional experience and not AI generated.
View all my tips
Article Last Updated: 2022-11-18