Unlocking MERGE, UPDATE, and DELETE Operations in Structured Streaming on Databricks

In today’s data-driven world, the ability to process and analyze data in real-time is crucial for businesses to stay competitive. At Sigma Software, we always strive to remain at the edge of technology, looking for the best approaches and tools to help our customers harness the power of data. One of the tools we use is Databricks, and Sigma Software is a registered partner of Databricks.

Databricks is a unified data analytics platform that simplifies big data processing by providing a seamless environment for working with Apache Spark™, Delta Lake, and other data technologies. Built to handle large-scale data processing, Databricks enables organizations to build robust data pipelines, perform advanced analytics, and leverage machine learning with ease.

One of the standout features of Databricks is Structured Streaming, a scalable and fault-tolerant stream processing engine built on top of Apache Spark. With Structured Streaming, users can process massive data streams in real time with guarantees of ensuring exactly-once processing guarantees, automatic failure recovery, and seamless integration with multiple data sources and sinks.

Key benefits of using Structured Streaming with large-scale data include:

  1. Scalability: Structured Streaming can handle millions of events per second, making it ideal for processing large volumes of streaming data.
  2. Fault Tolerance: Built on the distributed nature of Apache Spark, Structured Streaming automatically handles failures and ensures data integrity through its exactly-once processing semantics.
  3. Ease of Use: With its high-level API, developers can write concise and expressive streaming queries, reducing the complexity of building and maintaining streaming applications.
  4. Seamless Integration: Structured Streaming integrates natively with Delta Lake, providing support for ACID transactions and schema enforcement, which are crucial for maintaining data quality in large-scale pipelines.

However, while Structured Streaming excels at processing data streams in real-time, it has a notable limitation: it does not natively support MERGE, UPDATE, and DELETE operations on source tables. These operations are essential in many real-world scenarios where the underlying data can change over time.

Structured Streaming on Databricks

In this article, we will explore why these operations are restricted in Structured Streaming and how you can overcome this limitation using Change Data Capture (CDC) and the enableChangeDataFeed option in Databricks. This article explains how to unlock the full potential of Structured Streaming for dynamic data processing.

Understanding the Limitation

Structured Streaming is designed around the concept of append-only streaming sources, where data flows continuously in a single direction — into the system. This design choice simplifies the management of stateful streaming operations and ensures high throughput and low latency. However, it also introduces a significant limitation: no built-in support for MERGE, UPDATE, or DELETE operations on the source table.

These operations are critical in many real-world applications where the data in the source tables can change over time. For example, consider a customer database where records are frequently updated or deleted. Without support for these operations, you would either fail to capture these changes in your streaming application or be forced to adopt complex workarounds.

Diving Deeper into Change Data Capture (CDC)

To address the challenge of capturing and processing data changes such as INSERTs, UPDATEs, and DELETEs, the data engineering community employs a technique known as Change Data Capture (CDC). CDC is a set of software design patterns used to detect and track data changes in a source system, ensuring that downstream systems receive and process these changes accurately and efficiently.

Key Aspects of CDC:

  1. Real-time Data Synchronization: CDC enables real-time or near-real-time synchronization of data between systems. This is crucial for applications like real-time analytics, monitoring, and ETL processes.
  2. Efficiency: Instead of bulk data transfers, the CDC captures only the changes (deltas), reducing the amount of data that needs to be processed and transferred.
  3. Data Consistency and Integrity: By accurately capturing and applying changes, CDC ensures that target systems remain consistent with the source, maintaining data integrity across platforms.

Common CDC Techniques:

  • Log-Based CDC: This method reads the database transaction logs to identify changes. It’s efficient and has minimal impact on the performance of source systems.
  • Trigger-Based CDC: Database triggers are set up on source tables to capture changes. While this approach is straightforward, it can introduce overhead and impact performance.
  • Timestamp-Based CDC: Changes are identified based on timestamp columns in the tables. While simpler, it can miss some changes if not implemented carefully.

CDC in the Context of Databricks:

Databricks integrates CDC capabilities through the Change Data Feed (CDF) feature available in Delta Lake tables. By enabling CDF, Delta Lake captures changes to tables, making it easier to propagate those changes downstream.

The Solution: Using CDC and Change Data Feed

To overcome the limitation of not supporting MERGE, UPDATE, and DELETE operations in Structured Streaming, Databricks provides a powerful feature called Change Data Feed (CDF). When enabled, CDF captures all changes to a Delta Lake table, including INSERTs, UPDATEs, and DELETEs, and exposes them in a consumable format. This integration allows streaming applications to process not only new data but also changes to existing data.

Step 1: Enable Change Data Feed on the Source Table

To start using CDC with CDF, you must first enable the Change Data Feed on your source Delta Lake table. This is achieved by setting the delta.enableChangeDataFeed table property to true. You can do this either during table creation or by altering an existing table.

Enabling CDF on Table Creation:

CREATE TABLE my_source_table (
id LONG,
name STRING,
value DOUBLE
) USING delta
TBLPROPERTIES (delta.enableChangeDataFeed = true);

Enabling CDF on an Existing Table:

ALTER TABLE my_source_table SET TBLPROPERTIES (delta.enableChangeDataFeed = true);

Once enabled, Delta Lake begins recording all data changes, which can then be read and processed by downstream applications.

Step 2: Reading the Change Data Stream

With CDF enabled, you can read the stream of changes from the source table. The changes are exposed as a DataFrame, with each record containing metadata columns indicating the type of operation and other relevant details.

Reading Changes Using Structured Streaming:

from pyspark.sql.functions import expr
# Define the starting point for reading changes
starting_version = 0  # You can also specify a timestamp or 'latest'
# Read the change data stream
cdc_df = (
   spark.readStream
   .format("delta")
   .option("readChangeData", "true")
   .option("startingVersion", starting_version)
   .table("my_source_table")
   .withColumn
       "operation",
       expr(
           """
           CASE
               WHEN _change_type = 'insert' THEN 'INSERT'
               WHEN _change_type = 'update_preimage' THEN 'UPDATE_BEFORE'
               WHEN _change_type = 'update_postimage' THEN 'UPDATE_AFTER'
               WHEN _change_type = 'delete' THEN 'DELETE'
           END
           """
       )
   )
)

Understanding Metadata Columns:

  • _change_type: Indicates the type of change. Possible values include insert, update_preimage, update_postimage, and delete.
  • insert: Represents new rows inserted into the table.
  • update_preimage: The state of the row before the update.
  • update_postimage: The state of the row after the update.
  • delete: Rows that have been deleted.
  • _commit_version and _commit_timestamp: Provide information about the version of the table when the change was committed and the corresponding timestamp.

By processing these metadata columns, you can determine how to apply the changes to your target systems.

Step 3: Applying Changes to the Target Table

The final step is to apply the captured changes to the target table to ensure it remains in sync with the source. Depending on the type of operation, different actions are required.

Defining the Upsert Logic:

from delta.tables import DeltaTable
def upsert_to_target_table(microbatch_df, batch_id):
  # Load the target table as a DeltaTable 
  target_table = DeltaTable.forName(spark, "target_table")
  
  # Process INSERT and UPDATE_AFTER operations together 
  inserts_and_updates = microbatch_df.filter(microbatch_df.operation.isin("INSERT", "UPDATE_AFTER"))

  # Process DELETE operations 
  deletes = microbatch_df.filter(microbatch_df.operation == "DELETE")

  # Apply INSERTs and UPDATEs
  if not inserts_and_updates.isEmpty():
    (
      target_table.alias("t")
      .merge(
        inserts_and_updates.alias("s"),
        "t.id = s.id"
      )
      .whenMatchedUpdateAll()
      .whenNotMatchedInsertAll()
      .execute()
    )

  # Apply DELETEs 
  if not deletes.isEmpty():
    ids_to_delete = [row.id for row in deletes.select("id").collect()]
    (
      target_table
      .delete(condition=f"id IN ({', '.join(map(str, ids_to_delete))})")
    )

Starting the Structured Streaming Query:

(
  cdc_df.writeStream
  .format("delta")
  .foreachBatch(upsert_to_target_table)
  .outputMode("update")
  .option("checkpointLocation", "/path/to/checkpoint/dir")
  .start()
)

Key Points in the Upsert Logic:

  • Merge Operations: Using Delta Lake’s MERGE functionality, logic updates existing records and inserts new records based on incoming changes.
  • Handling Deletes: Delete operations are applied by specifying the IDs of records to be removed from the target table.
  • Batch Processing: The foreachBatch mechanism processes micro-batches of data, ensuring efficient and atomic operations.

Conclusion

While the design of Structured Streaming does not support MERGE, UPDATE, and DELETE operations out-of-the-box, we can still leverage the key benefits of Structured Streaming with data sources that contain such operations. The combination of Change Data Capture (CDC) and Change Data Feed (CDF) in Databricks provides this powerful and flexible solution. By leveraging these features, you can capture and apply changes to your source tables in real-time, ensuring that your streaming pipelines are always up to date with the latest data.

Software Engineering

Need professional help with your data processing and analysis?

Contact our data science team

This approach not only maintains data consistency across systems but also optimizes resource utilization by processing only changes rather than entire datasets. As data systems continue to evolve, the integration of CDC patterns with streaming architectures will be pivotal in building responsive, reliable, and efficient data applications.

In addition, this approach enables maintenance aggregations that handle deletions and updates.

Share article: