Serverless Delta Lake Compaction in AWS

Setting up a transactional data lake that continuously “upserts” (updates/inserts) change data capture (CDC) from relational databases into table snapshots is only the beginning.

You may have noticed that the number of Parquet files (partitions) in S3 keeps increasing over time, for each table. This is always the case, whether you use Apache Hudi with EMR or Apache Delta Lake with Glue.

Each partition is relatively small, usually in the KB or MB sizes. An entire table, however, can be GB, TB, or more in size. Here is a serverless solution architecture my team implemented:

No alt text provided for this image


If you are using Glue to create derived views from these raw OLTP tables, you will notice that Glue job latency keeps increasing gradually (usually in the scale of days).

This happens because downstream Spark jobs load full table snapshots in preparation for JOINs to produce various views. As the number of Parquet files increases per source table, more worker node tasks are required to execute the DAGs. This leads to growing latency, unless we are willing to increase the number of DPUs allocated per Glue job.

This is unacceptable because we may have latency SLAs and budget constraints (Glue jobs cost $0.44 USD per DPU hour, which adds up over time for frequent jobs).

In fact, based on the AWS Well-Architected Cost Optimization pillar, we should always architect solutions in a way that cost is actively reduced or kept at a minimum.

The problem of growing latency and cost in transactional data lakes leads to a solution called compaction.

Compaction is the process of re-partitioning tables into a smaller number of Parquet files of larger size (such as going from many KB/MB sized files to a few GB sized files). Here is sample PySpark code for use within a Delta Lake compaction Glue job:

for table in bronze_tables:

    path = f"s3://{delta-lake-bucket}/bronze/{database}/{table}/"
    numFiles = 20
   
    df = (spark.read
         .format("delta")
         .load(path)
         .repartition(numFiles)
         .write
         .option("dataChange", "false")
         .format("delta")
         .mode("overwrite")
         .save(path))


    print(f"Table: {table} re-partitioned")

When the Glue job completes, you can verify in S3 that the latest snapshot for each table has been re-partitioned into a smaller number of larger Parquet files. Older snapshots are left unchanged, and we recommend setting up S3 lifecycle policies to gradually move them into IA, then Glacier.

We also recommend experimenting with the number of partitions to converge on the optimal number for your tables. We use “selective compaction” to re-partition groups of tables differently to achieve optimal performance across our Delta Lake zones (Bronze, Silver, and Gold).

As always, monitor every aspect of your workload, particularly rolling average Glue job latency for this use case. This will help you determine the optimal EventBridge schedule to trigger your Delta Lake compaction jobs/workflows.

Have you encountered this challenge before? If so, how did you approach the solution?

If you need help implementing cloud-native MLOps, Well-Architected production ML software solutions, training/inference pipelines, monetizing your ML models in production, have specific solution architecture questions, or would just like us to review your solution architecture and provide feedback based on your goals, contact us or send me a message and we will be happy to help you.

Subscribe to my blog at: https://gradientgroup.ai/blog/

Follow me on LinkedIn: https://linkedin.com/in/carloslaraai

Leave a Reply

%d bloggers like this: