Delta Lake for Machine Learning Pipelines in AWS

Machine learning pipelines begin with data extraction – whether training or inference.

After all, we need a dataset to begin any ML workflow.

Most of us begin by querying OLTP/OLAP tables from an on-premises relational database, such as SQL Server. When our query completes, we save the results locally as CSV and then upload the file manually to S3.

From there, we load the [relatively small] dataset into a Pandas DataFrame within a Jupyter notebook in SageMaker Studio. This approach is manual, inefficient, and does not scale past a certain dataset size.

Machine learning is a big reason why many organizations have migrated their databases to AWS using database migration service (DMS). This is typically part of a larger plan to create a data lake in S3. This way, data can be queried by big data and analytics tools efficiently, such as EMR, Glue, Athena, Redshift, and more.

Database migration is just the beginning, though. When S3 is selected as the target of DMS, the data is partitioned into 2 major categories:

  1. Initial full load (all the records in a given table at the time of migration)
  2. Change data capture (new records/transactions as they are generated by applications)

The challenge is that these migrated tables are not in a queryable state.

Why?

Because we still need to combine the DMS full load with the CDC records to obtain the “live” version of a given table, at any given time.

Do we do this every time we need to run a query, for every table we need, before we can even SELECT * FROM small_table?

It’s not sustainable because the number of CDC records is increasing continuously, written to different S3 partitions every minute, making this “combine-every-time” job more and more expensive in compute, memory, and time.

We need an efficient approach to “upsert” (update, insert, delete) CDC records into a live version of our tables, automatically as new records are created. We call the collection of these live tables the delta lake, and just like data lakes, it’s typically built on S3.

One approach to build a delta lake from our data lake is to set up an SQS queue that receives CDC event notifications from S3, for the tables of interest. Then, each SQS message triggers a Lambda function, which submits an “upsert job” (PySpark script) to an EMR cluster. The code then takes care of the upsert (using the MERGE command) to incrementally update a given table.

This delta lake approach allows us to avoid combining DMS full load and CDC records every time we need to run a query on migrated tables in our S3 data lake. We always have live tables already combined, with additional functionality to query based on “what has changed recently” (for example, incremental feature engineering; hence the name “delta” lake).

All you need in your PySpark script for data extraction for a given table is the following:

from delta.tables import *
table_path = f”s3a://your_s3_bucket/your_prefix/{your_table}”
df = spark.read.format(“delta”).load(table_path)
df.createOrReplaceTempView(str(your_table)) # Available to query using Spark SQL

Sample Glue job definition for your CloudFormation template:

ExtractorGlueJob:   
Type: AWS::Glue::Job   
Properties:     
Command:       
Name: glueetl       
PythonVersion: “3”       
ScriptLocation: “local_folder/script.py”     
Connections:       
Connections:         
– “VPC”     
DefaultArguments:       
{         
“–extra-py-files”: “s3://your_s3_bucket/spark-scripts/delta-core_2.11-0.6.1.jar”,   
“–extra-jars”: “s3://your_s3_bucket/emr-scripts/delta-core_2.11-0.6.1.jar”,         
“–conf”: “spark.delta.logStore.class=org.apache.spark.sql.delta.storage.S3SingleDriverLogStore –conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension”,         
“–enable-metrics”: true       
}     
Description: “Extractor component of ML training pipeline”     
ExecutionProperty:       
MaxConcurrentRuns: 1     
GlueVersion: “2.0”     
MaxRetries: 1     
Name: “Glue-Dataset-Extractor”     
NumberOfWorkers: 10     
Role: !Ref YourRoleArn     
Timeout: 2880     
WorkerType: “G.1X”

The DefaultArguments allow us to import delta.tables and read from our delta lake. With this capability, our ML pipelines can proceed all the way through to model training and deployment at scale.

Where are you in your data engineering infrastructure journey? Comment below! Also, let me know in the comments if you found this useful, or if you have a different approach to achieve delta lake capabilities for your ML pipelines.

If you need help implementing AWS Well-Architected production machine learning solutions, training/inference pipelines, MLOps, or if you would like us to review your solution architecture and provide feedback, contact us or send me a message and we will be happy to help you.

Written by Carlos Lara, Director of Data Science & Machine Learning Engineering

Follow Carlos on LinkedIn: https://www.linkedin.com/in/carloslaraai/

Leave a Reply

%d bloggers like this: