Diving Deeper into Change Data Capture (CDC)
USA
Please select your cookie preferences before getting in touch
Thank you for reaching out to Sigma Software!
Please fill the form below. Our team will contact you shortly.
Sigma Software has offices in multiple locations in Europe, Northern America, Asia and Latin America.
USA
Sweden
Germany
Canada
Israel
Singapore
UAE
Australia
Austria
Ukraine
Poland
Argentina
Brazil
Bulgaria
Colombia
Czech Republic
Hungary
Mexico
Portugal
In today’s data-driven world, the ability to process and analyze data in real-time is crucial for businesses to stay competitive. At Sigma Software, we always strive to remain at the edge of technology, looking for the best approaches and tools to help our customers harness the power of data. One of the tools we use is Databricks, and Sigma Software is a registered partner of Databricks.
Databricks is a unified data analytics platform that simplifies big data processing by providing a seamless environment for working with Apache Spark™, Delta Lake, and other data technologies. Built to handle large-scale data processing, Databricks enables organizations to build robust data pipelines, perform advanced analytics, and leverage machine learning with ease.
One of the standout features of Databricks is Structured Streaming, a scalable and fault-tolerant stream processing engine built on top of Apache Spark. With Structured Streaming, users can process massive data streams in real time with guarantees of ensuring exactly-once processing guarantees, automatic failure recovery, and seamless integration with multiple data sources and sinks.
Key benefits of using Structured Streaming with large-scale data include:
However, while Structured Streaming excels at processing data streams in real-time, it has a notable limitation: it does not natively support MERGE, UPDATE, and DELETE operations on source tables. These operations are essential in many real-world scenarios where the underlying data can change over time.
In this article, we will explore why these operations are restricted in Structured Streaming and how you can overcome this limitation using Change Data Capture (CDC) and the enableChangeDataFeed option in Databricks. This article explains how to unlock the full potential of Structured Streaming for dynamic data processing.
Structured Streaming is designed around the concept of append-only streaming sources, where data flows continuously in a single direction — into the system. This design choice simplifies the management of stateful streaming operations and ensures high throughput and low latency. However, it also introduces a significant limitation: no built-in support for MERGE, UPDATE, or DELETE operations on the source table.
These operations are critical in many real-world applications where the data in the source tables can change over time. For example, consider a customer database where records are frequently updated or deleted. Without support for these operations, you would either fail to capture these changes in your streaming application or be forced to adopt complex workarounds.
To address the challenge of capturing and processing data changes such as INSERTs, UPDATEs, and DELETEs, the data engineering community employs a technique known as Change Data Capture (CDC). CDC is a set of software design patterns used to detect and track data changes in a source system, ensuring that downstream systems receive and process these changes accurately and efficiently.
Key Aspects of CDC:
Common CDC Techniques:
CDC in the Context of Databricks:
Databricks integrates CDC capabilities through the Change Data Feed (CDF) feature available in Delta Lake tables. By enabling CDF, Delta Lake captures changes to tables, making it easier to propagate those changes downstream.
To overcome the limitation of not supporting MERGE, UPDATE, and DELETE operations in Structured Streaming, Databricks provides a powerful feature called Change Data Feed (CDF). When enabled, CDF captures all changes to a Delta Lake table, including INSERTs, UPDATEs, and DELETEs, and exposes them in a consumable format. This integration allows streaming applications to process not only new data but also changes to existing data.
Step 1: Enable Change Data Feed on the Source Table
To start using CDC with CDF, you must first enable the Change Data Feed on your source Delta Lake table. This is achieved by setting the delta.enableChangeDataFeed table property to true. You can do this either during table creation or by altering an existing table.
Enabling CDF on Table Creation:
CREATE TABLE my_source_table ( id LONG, name STRING, value DOUBLE ) USING delta TBLPROPERTIES (delta.enableChangeDataFeed = true);
Enabling CDF on an Existing Table:
ALTER TABLE my_source_table SET TBLPROPERTIES (delta.enableChangeDataFeed = true);
Once enabled, Delta Lake begins recording all data changes, which can then be read and processed by downstream applications.
Step 2: Reading the Change Data Stream
With CDF enabled, you can read the stream of changes from the source table. The changes are exposed as a DataFrame, with each record containing metadata columns indicating the type of operation and other relevant details.
Reading Changes Using Structured Streaming:
from pyspark.sql.functions import expr
# Define the starting point for reading changes starting_version = 0 # You can also specify a timestamp or 'latest'
# Read the change data stream cdc_df = ( spark.readStream .format("delta") .option("readChangeData", "true") .option("startingVersion", starting_version) .table("my_source_table") .withColumn "operation", expr( """ CASE WHEN _change_type = 'insert' THEN 'INSERT' WHEN _change_type = 'update_preimage' THEN 'UPDATE_BEFORE' WHEN _change_type = 'update_postimage' THEN 'UPDATE_AFTER' WHEN _change_type = 'delete' THEN 'DELETE' END """ ) ) )
Understanding Metadata Columns:
By processing these metadata columns, you can determine how to apply the changes to your target systems.
Step 3: Applying Changes to the Target Table
The final step is to apply the captured changes to the target table to ensure it remains in sync with the source. Depending on the type of operation, different actions are required.
Defining the Upsert Logic:
from delta.tables import DeltaTable def upsert_to_target_table(microbatch_df, batch_id): # Load the target table as a DeltaTable target_table = DeltaTable.forName(spark, "target_table") # Process INSERT and UPDATE_AFTER operations together inserts_and_updates = microbatch_df.filter(microbatch_df.operation.isin("INSERT", "UPDATE_AFTER")) # Process DELETE operations deletes = microbatch_df.filter(microbatch_df.operation == "DELETE") # Apply INSERTs and UPDATEs if not inserts_and_updates.isEmpty(): ( target_table.alias("t") .merge( inserts_and_updates.alias("s"), "t.id = s.id" ) .whenMatchedUpdateAll() .whenNotMatchedInsertAll() .execute() ) # Apply DELETEs if not deletes.isEmpty(): ids_to_delete = [row.id for row in deletes.select("id").collect()] ( target_table .delete(condition=f"id IN ({', '.join(map(str, ids_to_delete))})") )
Starting the Structured Streaming Query:
( cdc_df.writeStream .format("delta") .foreachBatch(upsert_to_target_table) .outputMode("update") .option("checkpointLocation", "/path/to/checkpoint/dir") .start() )
Key Points in the Upsert Logic:
While the design of Structured Streaming does not support MERGE, UPDATE, and DELETE operations out-of-the-box, we can still leverage the key benefits of Structured Streaming with data sources that contain such operations. The combination of Change Data Capture (CDC) and Change Data Feed (CDF) in Databricks provides this powerful and flexible solution. By leveraging these features, you can capture and apply changes to your source tables in real-time, ensuring that your streaming pipelines are always up to date with the latest data.
Need professional help with your data processing and analysis?
This approach not only maintains data consistency across systems but also optimizes resource utilization by processing only changes rather than entire datasets. As data systems continue to evolve, the integration of CDC patterns with streaming architectures will be pivotal in building responsive, reliable, and efficient data applications.
In addition, this approach enables maintenance aggregations that handle deletions and updates.
For over 17 years, Andrii has been working in Data Analytics and Data Engineering, 7 of which in Data Science. He actively shares his knowledge as a trainer in Sigma Software University and mentor in Sigma Group.
Organ transplantation is a process that allows patients with terminal organ diseases to get a new opportunity for life. However, this critical field is plagued ...
Organ transplantation is one of the biggest achievements in modern medicine, giving patients with organ failure a second chance at life. Every transplant relies...
Non-structured data is gradually assuming a critical role in analytics across the healthcare industry, encompassing an assortment of forms such as textual (note...