71 minute read · October 3, 2024
A Guide to Change Data Capture (CDC) with Apache Iceberg
· Senior Tech Evangelist, Dremio
Change Data Capture (CDC) is a design pattern used in databases and data processing to track and capture data changes—such as insertions, updates, and deletions—in real-time. Instead of periodically extracting entire datasets, CDC focuses on capturing only the data that has changed since the last update. This approach is crucial in modern data architectures, where timely and efficient data processing is paramount.
Businesses generate and consume vast amounts of data. Processing and reacting to this data in real time can provide a significant competitive advantage. CDC enables organizations to maintain up-to-date data across distributed systems, ensuring consistency and enabling real-time analytics and decision-making.
Common CDC Use Cases and Benefits
Common Use Cases:
- Data Synchronization: Keeping multiple databases or data warehouses in sync without full data reloads.
- Real-Time Analytics: Feeding real-time data into analytics platforms for immediate insights.
- Microservices Communication: Propagating changes across microservices architectures efficiently.
- Event Streaming: Publishing data changes as events to messaging systems like Apache Kafka.
- Audit Trails: Maintaining a history of data changes for compliance and auditing purposes.
Benefits:
- Reduced Latency: Immediate propagation of data changes minimizes the time lag between data generation and availability.
- Efficiency: Only changed data is processed, reducing the computational overhead and network bandwidth.
- Data Consistency: Ensures that all systems reflect the most recent data state, improving reliability.
- Scalability: Efficiently handles high volumes of data changes, making it suitable for large-scale systems.
Introduction to Apache Iceberg
Apache Iceberg is an open-source, high-performance table format for petabyte-scale analytic datasets. Developed to address the limitations of traditional big data file formats, Iceberg provides a more robust and flexible framework for managing large datasets.
Key Features of Apache Iceberg:
- Schema Evolution: Supports changes to the table schema without requiring costly rewrites or complex migration processes.
- Hidden Partitioning: Automatically manages partitioning to optimize query performance without exposing partition details to users.
- ACID Compliance: Provides atomicity, consistency, isolation, and durability for operations, ensuring data integrity.
- Time Travel Queries: Allows querying historical versions of data, facilitating debugging and auditing.
- Performance Optimization: Enhances query planning and execution through metadata tracking and efficient file pruning.
How Iceberg Supports CDC
Apache Iceberg's architecture inherently supports CDC by enabling efficient handling of data mutations—insertions, updates, and deletions—at scale. Here's how Iceberg enhances CDC processes:
- Metadata Management: Iceberg maintains detailed metadata about data files and their contents, allowing for precise tracking of data changes.
- Snapshot Isolation: Each change to the dataset results in a new immutable snapshot, enabling consistent reads and writes without conflicts.
- Efficient Upserts and Deletes: Supports row-level updates and deletions without needing to rewrite entire partitions or datasets.
- Integration with Streaming Engines: Seamlessly integrates with streaming processing frameworks like Apache Flink and Apache Spark Structured Streaming, facilitating real-time data ingestion and CDC workflows.
We'll see that because of Iceberg's metadata, we can efficiently derive table changes, and due to its efficient transaction and tool support, we can process those changes effectively. Although, there are different CDC scenarios so let's cover them.
Scenario #1: Capturing Changes in an Iceberg Table
In this scenario, we'll explore how to capture and manage data changes within an Apache Iceberg table using its built-in changelog procedures. This approach is essential for tracking data mutations over time, enabling real-time analytics, auditing, and synchronization across systems.
Understanding Iceberg's Changelog Procedure
Introduction to create_changelog_view
Apache Iceberg provides the create_changelog_view
procedure, a powerful tool for generating a view that contains the changes made to a table over a specified period or between specific snapshots. This view captures inserts, updates, and deletes, allowing users to query and analyze data modifications efficiently.
Key Features of create_changelog_view
:
- Time Travel Capability: Enables querying changes between snapshots or timestamps.
- Flexible Configuration: Allows specifying options like net changes, update computations, and identifier columns.
- CDC Metadata Columns: Includes special columns to provide context about each change (e.g.,
_change_type
,_change_ordinal
).
Procedure Arguments and Options
When using create_changelog_view
, several arguments and options can be specified to tailor the changelog view to your needs:
- Required Arguments:
table
: The name of the source Iceberg table from which to capture changes.
- Optional Arguments:
changelog_view
: The name to assign to the created changelog view.options
: A map of options to define the range of snapshots or timestamps.net_changes
(boolean): When set totrue
, outputs only the net changes, filtering out intermediate changes.compute_updates
(boolean): Computes pre/post-update images for updates.identifier_columns
(array): Specifies the columns that uniquely identify rows for update computations.
Commonly Used Options:
start-snapshot-id
: Exclusive start snapshot ID (reads from the first snapshot if not provided).end-snapshot-id
: Inclusive end snapshot ID (defaults to the current snapshot).start-timestamp
: Exclusive start timestamp in milliseconds.end-timestamp
: Inclusive end timestamp in milliseconds.
Working with Changelog Views
Creating a Changelog View: Step-by-Step Guide
Example 1: Capturing Changes Between Snapshot IDs
To create a changelog view that captures changes between two snapshot IDs:
CALL catalog.system.create_changelog_view( table => 'marketing.events', options => map('start-snapshot-id', '1', 'end-snapshot-id', '2') );
Example 2: Capturing Changes Between Timestamps
To capture changes within a specific time range:
CALL catalog.system.create_changelog_view( table => 'marketing.events', options => map('start-timestamp', '1678335750489', 'end-timestamp', '1678992105265'), changelog_view => 'events_cdc' );
Example 3: Computing Updates with Identifier Columns
When you want to compute pre/post-update images using specific identifier columns:
CALL catalog.system.create_changelog_view( table => 'marketing.events', options => map('start-snapshot-id', '1', 'end-snapshot-id', '2'), identifier_columns => array('id', 'name', 'attendee_count') );
Querying the Changelog View
Once the changelog view is created, you can query it like any other view:
SELECT * FROM events_cdc;
To filter specific change types or records:
SELECT * FROM events_cdc WHERE _change_type = 'INSERT' AND id = 3 ORDER BY _change_ordinal;
Interpreting Results
The changelog view includes special CDC metadata columns that provide context about each change:
_change_type
: Indicates the type of change (INSERT
,DELETE
,UPDATE_BEFORE
,UPDATE_AFTER
)._change_ordinal
: Represents the order of changes._commit_snapshot_id
: The snapshot ID where the change occurred.
Sample Result Set:
id | name | attendee_count | _change_type | _change_ordinal | _commit_snapshot_id |
---|---|---|---|---|---|
1 | event1 | 100 | INSERT | 0 | 5390529835796506035 |
2 | event2 | 200 | INSERT | 0 | 5390529835796506035 |
1 | event1 | 100 | DELETE | 1 | 8764748981452218370 |
This table shows that two records were inserted in the first snapshot and one record deleted in the second snapshot.
Advanced Topics
Net Changes
Net changes refer to the final state of data after consolidating all intermediate changes within the specified range. By setting net_changes => true
, the procedure outputs only the net effect, eliminating transient changes.
Example: Creating a Changelog View with Net Changes
CALL spark_catalog.system.create_changelog_view( table => 'marketing.events', options => map('end-snapshot-id', '8764748981452218370'), net_changes => true );
Expected Result:
Only records that represent the net change will appear. For instance, if a record was inserted and then deleted within the snapshot range, it will not appear in the net changes view.
Carry-over Rows
Explanation
Carry-over rows occur in copy-on-write operations, such as MERGE
, UPDATE
, or DELETE
, where unchanged rows are rewritten to new files. These rows might appear as deleted and then re-inserted, even though their data hasn't changed.
Viewing Carry-over Rows
To include carry-over rows in your analysis, query the SparkChangelogTable
directly:
SELECT * FROM catalog.marketing.events.changes;
Example of Carry-over Rows:
id | name | _change_type | |
---|---|---|---|
1 | event1 | 100 | DELETE |
1 | event1 | 100 | INSERT |
Pre/Post Update Images
How Iceberg Computes Update Images
When a row is updated, Iceberg can capture both the before and after states of the row. This is achieved by converting pairs of delete and insert operations into UPDATE_BEFORE
and UPDATE_AFTER
records.
Importance of identifier_columns
Identifier columns are crucial for matching delete and insert pairs that represent the same logical row. These columns typically form a primary key or unique identifier for the records.
Enabling Pre/Post Update Images
You can compute pre/post update images by setting compute_updates
to true
and specifying identifier_columns
:
CALL spark_catalog.system.create_changelog_view( table => 'marketing.events', options => map('start-snapshot-id', '1', 'end-snapshot-id', '2'), compute_updates => true, identifier_columns => array('id') );
Example Result:
id | name | attendee_count | _change_type |
---|---|---|---|
3 | event3 | 100 | UPDATE_BEFORE |
3 | event3 | 200 | UPDATE_AFTER |
In this example, the row with id = 3
was updated from 100 attendees to 200 attendees. Iceberg captures both the before and after states, providing a complete picture of the change.
By leveraging Apache Iceberg's create_changelog_view
procedure, you can efficiently track and analyze data changes within your tables. This capability is essential for applications requiring audit trails, real-time analytics, and synchronization across distributed systems. Iceberg's approach to CDC simplifies the complexity of managing data mutations, enabling you to focus on deriving insights and value from your data.
Scenario #2: Capturing Changes from a Source System
In this scenario, we'll explore how to capture and ingest data changes from an external source system into an Apache Iceberg table. This is a common requirement when dealing with databases or applications that need to synchronize their data with analytics platforms in real-time. We will use Debezium to stream changes into Apache Kafka and then ingest those changes into Iceberg tables using either Kafka Connect or Apache Flink.
Streaming Changes into Iceberg Using Debezium and Kafka
Introduction to Debezium
Debezium is an open-source distributed platform that captures row-level changes in databases so that applications can see and respond to those changes. It uses CDC (Change Data Capture) to monitor databases such as MySQL, PostgreSQL, MongoDB, and more, streaming every change event to Kafka topics.
Key Features of Debezium:
- Real-Time Data Streaming: Captures changes as they happen with minimal latency.
- Consistency and Reliability: Guarantees exactly-once semantics and preserves the order of events.
- Schema Evolution Handling: Automatically detects and propagates schema changes.
- Ease of Integration: Works seamlessly with Kafka and other stream processing platforms.
How Debezium Streams Changes into Kafka
Debezium connects to source databases and monitors the transaction logs to detect data changes. Each change event is then converted into a structured message and published to a Kafka topic. This approach decouples the source database from downstream systems, allowing multiple consumers to process the data changes independently.
Workflow Overview:
- Debezium Connector: Set up a Debezium connector for your source database.
- Change Detection: Debezium monitors the database's transaction logs for changes.
- Event Streaming: Detected changes are published to Kafka topics in real time.
- Data Consumption: Downstream consumers (e.g., Kafka Connect or Flink) consume these topics to process or store the data.
Integrating Kafka with Iceberg
Once changes are available in Kafka topics, the next step is to ingest this data into Iceberg tables. There are two primary methods to achieve this:
- Using Kafka Connect with the Apache Iceberg Sink Connector.
- Using Apache Flink for streaming ingestion into Iceberg tables.
Using Kafka Connect
Apache Iceberg Sink Connector
The Apache Iceberg Sink Connector is a Kafka Connect sink connector designed to write data from Kafka topics into Iceberg tables.
Features and Capabilities:
- Commit Coordination: Centralized management of Iceberg commits to ensure consistency.
- Exactly-Once Delivery Semantics: Guarantees that each record is processed exactly once.
- Multi-Table Fan-Out: Supports writing to multiple Iceberg tables based on record content.
- Automatic Table Creation and Schema Evolution: Can create tables on the fly and handle schema changes.
- Field Name Mapping: Utilizes Iceberg's column mapping for flexible schema handling.
Installation and Setup
Requirements:
- Kafka Version: The sink relies on KIP-447 for exactly-once semantics, requiring Kafka 2.5 or later.
- Iceberg Build: Build the Iceberg project to obtain the connector distribution.
./gradlew -x test -x integrationTest clean build
- Connector Deployment: Copy the distribution archive to the Kafka Connect plugins directory on all worker nodes.
Configuration
Core Configuration Properties:
iceberg.tables
: Comma-separated list of destination table names.iceberg.tables.dynamic-enabled
: Enables dynamic routing to tables based on record fields.iceberg.tables.route-field
: The field name used to route records to different tables.iceberg.tables.auto-create-enabled
: Automatically create tables if they do not exist.iceberg.tables.evolve-schema-enabled
: Enable automatic schema evolution.iceberg.catalog.*
: Configuration properties for connecting to the Iceberg catalog (e.g., Hive, REST, Hadoop).iceberg.hadoop.*
: Properties passed to the Hadoop configuration for HDFS or Hive.
Catalog Configuration Examples:
- REST Catalog:
"iceberg.catalog.type": "rest",
"iceberg.catalog.uri": "https://catalog-service", "iceberg.catalog.credential": "<credential>", "iceberg.catalog.warehouse": "<warehouse>"
- Hive Catalog:
"iceberg.catalog.type": "hive",
"iceberg.catalog.uri": "thrift://hive:9083",
"iceberg.catalog.warehouse": "s3a://bucket/warehouse", "iceberg.catalog.io-impl": "org.apache.iceberg.aws.s3.S3FileIO"
- Glue Catalog:
"iceberg.catalog.catalog-impl": "org.apache.iceberg.aws.glue.GlueCatalog",
"iceberg.catalog.warehouse": "s3a://bucket/warehouse", "iceberg.catalog.io-impl": "org.apache.iceberg.aws.s3.S3FileIO"
Handling Schemas and Auto-Evolution
The sink connector can automatically create tables and evolve schemas based on incoming records. Key properties to control this behavior are:
iceberg.tables.auto-create-enabled
: Set totrue
to enable automatic table creation.iceberg.tables.evolve-schema-enabled
: Set totrue
to allow the connector to add missing fields to the table schema.iceberg.tables.schema-force-optional
: Whentrue
, all columns are set as optional during table creation and evolution.
Note: Care should be taken when enabling schema evolution to ensure it aligns with your data governance policies.
Examples: Integrating Debezium with Kafka Connect and Apache Iceberg
In this section, we'll focus on how to use Kafka Connect to receive events from Debezium and write those changes into Apache Iceberg tables. This approach leverages Debezium's ability to capture database changes and Kafka Connect's integration capabilities to seamlessly ingest CDC data into Iceberg for analytics and processing.
Single Destination Table
Objective: Write all change events from a Debezium-monitored database table into a single Iceberg table.
Workflow Overview:
- Debezium Source Connector: Captures changes from the source database and publishes them to a Kafka topic.
- Kafka Topic: Receives CDC events from Debezium (e.g., topic named
dbserver1.inventory.customers
). - Kafka Connect Iceberg Sink Connector: Consumes the CDC events from the Kafka topic and writes them into an Iceberg table.
Debezium Source Connector Configuration:
First, set up a Debezium connector to capture changes from the source database:
{ "name": "inventory-connector", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "<mysql-host>", "database.port": "3306", "database.user": "<user>", "database.password": "<password>", "database.server.id": "184054", "database.server.name": "dbserver1", "database.include.list": "inventory", "table.include.list": "inventory.customers", "database.history.kafka.bootstrap.servers": "<kafka-bootstrap-servers>", "database.history.kafka.topic": "schema-changes.inventory" } }
Iceberg Sink Connector Configuration:
Configure the Iceberg Sink Connector to consume the Debezium topic and write to the Iceberg table:
{ "name": "customers-iceberg-sink", "config": { "connector.class": "org.apache.iceberg.connect.IcebergSinkConnector", "tasks.max": "2", "topics": "dbserver1.inventory.customers", "iceberg.tables": "default.customers", "iceberg.tables.auto-create-enabled": "true", "iceberg.tables.evolve-schema-enabled": "true", "iceberg.catalog.type": "hive", "iceberg.catalog.uri": "thrift://hive-metastore:9083", "iceberg.catalog.warehouse": "hdfs://namenode:8020/warehouse", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "value.converter": "io.confluent.connect.avro.AvroConverter", "value.converter.schema.registry.url": "<schema-registry-url>" } }
Explanation:
topics
: Set to the Debezium topic where CDC events are published.iceberg.tables
: Specifies the Iceberg table to write to.iceberg.tables.auto-create-enabled
: Allows the connector to automatically create the Iceberg table if it doesn't exist.iceberg.tables.evolve-schema-enabled
: Enables automatic schema evolution based on incoming records.- Converters: Ensure that the key and value converters are compatible with the data format used by Debezium (e.g., Avro).
By focusing on using Kafka Connect to receive events from Debezium changes, you can leverage existing open-source tools to build efficient and reliable CDC pipelines that feed into Apache Iceberg, unlocking powerful analytics capabilities on your real-time data.
Using Apache Flink to Consume Debezium Data from Kafka and Update an Iceberg Table
In this section, we'll focus on how to use Apache Flink to consume change data capture (CDC) events published by Debezium to a Kafka topic and then use Flink to process and write these changes into an Apache Iceberg table. This approach enables real-time data synchronization between your source databases and Iceberg tables, facilitating up-to-date analytics and data processing.
Overview
- Debezium: Captures CDC events from source databases and publishes them to Kafka topics.
- Apache Flink: Consumes CDC events from Kafka, processes them, and writes changes to Iceberg tables.
- Apache Iceberg: Stores the processed data efficiently, supporting schema evolution and providing strong consistency guarantees.
Workflow Steps
- Debezium Source Connector: Captures changes from the source database and writes them to a Kafka topic.
- Flink Streaming Job: Consumes the Debezium Kafka topic, processes CDC events, and writes changes to an Iceberg table.
- Iceberg Table: Stores the data changes with support for upserts and schema evolution.
Consuming Debezium Data with Flink
Understanding Debezium's Data Format
Debezium publishes CDC events to Kafka topics in a structured format, typically using Avro or JSON with an envelope that includes:
before
: The state of the record before the change (for updates and deletes).after
: The state of the record after the change (for inserts and updates).op
: The operation type (c
for create/insert,u
for update,d
for delete).ts_ms
: The timestamp of the change event.source
: Metadata about the source database and table.
Example Debezium Message (JSON):
{ "before": { "id": 1001, "first_name": "Sally", "last_name": "Thomas", "email": "[email protected]" }, "after": { "id": 1001, "first_name": "Sally", "last_name": "Johnson", "email": "[email protected]" }, "source": { "version": "1.5.0.Final", "connector": "mysql", "name": "dbserver1", "ts_ms": 1623436890000, "db": "inventory", "table": "customers", "server_id": 223344, "file": "mysql-bin.000003", "pos": 154, "row": 0, "thread": null, "query": null }, "op": "u", "ts_ms": 1623436891000, "transaction": null }
Setting Up Flink to Consume Debezium CDC Events
Prerequisites:
- Flink Environment: Apache Flink 1.11 or later.
- Flink Kafka Connector: To consume data from Kafka topics.
- Flink Iceberg Connector: To write data to Iceberg tables.
- Schema Registry (Optional): If using Avro or other schema-based serialization.
Steps:
- Create a Flink Streaming Job:
- Use Flink's DataStream API or Table API/SQL to define the streaming job.
- Configure the Kafka consumer to read from the Debezium topic.
- Deserialize the CDC events.
- Process the CDC events to extract insert, update, and delete operations.
- Configure the Iceberg sink to write data to the target table.
- Handle Debezium's CDC Format:
- Parse the
op
field to determine the type of operation. - Use the
before
andafter
fields to get the old and new states of the record. - Map Debezium operations to Iceberg write actions (INSERT, UPDATE, DELETE).
- Parse the
Writing Data to Iceberg with Flink
Using Flink SQL
Flink SQL provides high-level abstractions to process streaming data. You can define tables over Kafka topics and Iceberg tables, then write SQL queries to process and insert data.
Step 1: Define a Kafka Source Table
CREATE TABLE debezium_source ( `before` ROW<id INT, first_name STRING, last_name STRING, email STRING>, `after` ROW<id INT, first_name STRING, last_name STRING, email STRING>, `op` STRING, `ts_ms` TIMESTAMP(3), `source` MAP<STRING, STRING> ) WITH ( 'connector' = 'kafka', 'topic' = 'dbserver1.inventory.customers', 'properties.bootstrap.servers' = 'localhost:9092', 'format' = 'debezium-json', 'scan.startup.mode' = 'earliest-offset' );
Step 2: Define an Iceberg Sink Table
CREATE TABLE iceberg_sink ( id INT, first_name STRING, last_name STRING, email STRING, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'iceberg', 'catalog-name' = 'hive_catalog', 'catalog-database' = 'default', 'catalog-type' = 'hive', 'uri' = 'thrift://hive-metastore:9083', 'warehouse' = 'hdfs://namenode:8020/warehouse', 'format-version' = '2', 'write.upsert.enabled' = 'true' );
Step 3: Write a Flink SQL Query to Process and Insert Data
INSERT INTO iceberg_sink SELECT COALESCE(`after`.id, `before`.id) AS id, COALESCE(`after`.first_name, `before`.first_name) AS first_name, COALESCE(`after`.last_name, `before`.last_name) AS last_name, COALESCE(`after`.email, `before`.email) AS email FROM debezium_source WHERE op IN ('c', 'u', 'd');
Explanation:
- COALESCE: Used to handle null values in
before
andafter
fields depending on the operation. - Filter Operations: You may need to handle different operations (
c
,u
,d
) differently.
Handling Deletes:
Flink SQL doesn't natively support delete operations in an INSERT INTO statement. To handle deletes, you can:
- Use Flink's SQL DML Support (Flink 1.13+): Enable the
table.exec.source.cdc-events-duplicate
configuration to support CDC events. - Use Flink's DataStream API: For more complex CDC handling.
Using Flink DataStream API
For more control over the processing logic, use the DataStream API.
Step 1: Set Up the Flink Execution Environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Step 2: Create a Kafka Consumer
Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "localhost:9092"); properties.setProperty("group.id", "flink-debezium-consumer"); FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>( "dbserver1.inventory.customers", new SimpleStringSchema(), properties );
Step 3: Consume and Deserialize Debezium CDC Events
DataStream<String> kafkaStream = env.addSource(kafkaConsumer); DataStream<CustomerChangeEvent> cdcStream = kafkaStream.map(jsonString -> { ObjectMapper objectMapper = new ObjectMapper(); return objectMapper.readValue(jsonString, CustomerChangeEvent.class); });
Define CustomerChangeEvent
Class:
public class CustomerChangeEvent { public Map<String, Object> before; public Map<String, Object> after; public String op; public Long ts_ms; public Map<String, Object> source; // Getters and setters }
Step 4: Process CDC Events and Prepare for Iceberg Sink
DataStream<RowData> rowDataStream = cdcStream.flatMap(new FlatMapFunction<CustomerChangeEvent, RowData>() { @Override public void flatMap(CustomerChangeEvent event, Collector<RowData> out) throws Exception { // Define RowData conversion GenericRowData rowData; if ("c".equals(event.op) || "u".equals(event.op)) { // Handle inserts and updates Map<String, Object> after = event.after; rowData = new GenericRowData(4); rowData.setField(0, after.get("id")); rowData.setField(1, after.get("first_name")); rowData.setField(2, after.get("last_name")); rowData.setField(3, after.get("email")); out.collect(rowData); } else if ("d".equals(event.op)) { // Handle deletes (Iceberg requires row-level delete support) // Implement delete handling logic here } } });
Step 5: Configure the Iceberg Sink
Configuration hadoopConf = new Configuration(); TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://namenode:8020/warehouse/default/customers", hadoopConf); FlinkSink.forRowData(rowDataStream) .tableLoader(tableLoader) .upsert(true) // Enable upsert mode .append();
Important Notes:
- Upsert Mode: Enable upsert mode to handle inserts and updates based on primary keys.
- Delete Handling: Implement logic to handle delete events. Iceberg supports row-level deletes starting from format version 2.
- Schema Evolution: Ensure the Iceberg table schema matches the Debezium record schema. Enable schema evolution if necessary.
Advanced Writing Techniques
Handling Deletes in Flink
To handle delete events, you need to:
- Enable Row-Level Deletes in Iceberg:
- Use Iceberg table format version 2.
- Ensure the table is configured to accept deletes.
- Implement Delete Logic in Flink:
if ("d".equals(event.op)) { Map<String, Object> before = event.before; // Create RowData representing the key of the record to delete GenericRowData deleteKey = new GenericRowData(1); deleteKey.setField(0, before.get("id")); // Use a custom sink or modify the existing sink to handle deletes }
Note: Handling deletes may require extending the Iceberg sink or using Flink's Table API with CDC support.
Using Flink CDC Connectors
Flink provides CDC connectors that can read Debezium CDC events directly and produce normalized change streams.
Example:
CREATE TABLE customers ( id INT, first_name STRING, last_name STRING, email STRING, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = 'mysql-host', 'port' = '3306', 'username' = 'user', 'password' = 'password', 'database-name' = 'inventory', 'table-name' = 'customers' );
Insert into Iceberg Table:
INSERT INTO iceberg_customers SELECT * FROM customers;
Note: Using Flink's CDC connectors allows you to bypass Kafka, directly consuming changes from the database.
Write Options and Distribution Modes
When writing to Iceberg tables, you can configure various write options and distribution modes to optimize performance and resource utilization.
Configuring Write Options
Set write options in the FlinkSink
builder or via SQL hints.
Example in Java:
FlinkSink.forRowData(rowDataStream) .tableLoader(tableLoader) .upsert(true) .writeParallelism(4) .set("write.format.default", "parquet") .append();
Example in SQL:
INSERT INTO iceberg_customers /*+ OPTIONS('write.upsert.enabled'='true', 'write.format.default'='parquet') */ SELECT * FROM customers;
Best Practices and Considerations
- Schema Management:
- Ensure that the schemas of the Debezium data and the Iceberg table are compatible.
- Enable schema evolution in Iceberg if the source schema may change.
- Exactly-Once Processing:
- Configure Flink checkpoints and enable exactly-once delivery guarantees.
- Use Kafka transactional topics if needed.
- Error Handling:
- Implement exception handling and logging for deserialization and processing errors.
- Use dead-letter queues or side outputs to capture problematic records.
- Resource Management:
- Monitor the resource usage of your Flink job and adjust parallelism accordingly.
- Optimize the performance by tuning Flink and Kafka consumer configurations.
- Testing:
- Test the end-to-end pipeline in a staging environment before deploying to production.
- Simulate various CDC events, including inserts, updates, and deletes.
By using Apache Flink to consume Debezium CDC data from Kafka and update Iceberg tables, you can build a robust and scalable pipeline that ensures your analytical datasets are always up-to-date with the latest changes from your operational databases. This integration leverages the strengths of each component:
- Debezium: Efficiently captures and streams database changes.
- Kafka: Provides a durable and scalable messaging platform.
- Flink: Offers powerful stream processing capabilities to handle complex transformations and stateful computations.
- Iceberg: Ensures data is stored efficiently with support for ACID transactions and schema evolution.
Benefits of This Approach:
- Real-Time Data Synchronization: Minimal latency between source data changes and updates in the Iceberg table.
- Flexibility: Ability to implement complex processing logic in Flink.
- Scalability: Handle high throughput and large datasets efficiently.
- Data Consistency: Maintain strong consistency and integrity in your data lake.
Scenario #3: Using Managed Services for Streaming Data into Apache Iceberg
In this scenario, we'll explore how managed services like Upsolver and Estuary simplify the process of streaming data from various sources into Apache Iceberg tables, leveraging Change Data Capture (CDC) under the hood. These platforms abstract the complexities involved in building and maintaining data pipelines, allowing organizations to focus on deriving insights rather than handling infrastructure and code.
Introduction to Managed CDC Services
Overview of Upsolver and Estuary
Upsolver and Estuary are cloud-based data integration platforms that enable real-time data ingestion, transformation, and delivery to various destinations, including Apache Iceberg. They are designed to handle large-scale data processing with minimal operational overhead.
- Upsolver:
- Offers a no-code/low-code platform for building streaming data pipelines.
- Specializes in processing streaming data from various sources and writing to data lakes.
- Provides built-in connectors for popular data sources and sinks.
- Handles complex data transformations and supports schema evolution.
- Estuary:
- Focuses on real-time, event-driven data pipelines.
- Provides tools to create and manage data flows with minimal coding.
- Supports a wide range of connectors for data ingestion and delivery.
- Emphasizes ease of use and rapid deployment.
How They Simplify CDC Implementation
- Ease of Use: Both platforms provide user-friendly interfaces to design and manage data pipelines without deep technical expertise.
- Managed Infrastructure: They handle the underlying infrastructure, including scaling, monitoring, and fault tolerance.
- Automated CDC Handling: Built-in support for capturing and processing CDC events from source systems.
- Schema Evolution Support: Automatically detect and adapt to changes in data schemas.
- Integration Capabilities: Seamless integration with various data sources (databases, message queues) and destinations (Iceberg, data warehouses).
Streaming Data into Iceberg with Managed Services
Using Upsolver
Features and Benefits
- No-Code Data Pipelines: Design data flows using a visual interface, reducing the need for custom code.
- Real-Time Data Processing: Ingest and process streaming data with low latency.
- Scalability: Automatically scales to handle increasing data volumes.
- Data Transformation: Offers advanced transformation capabilities, including filtering, aggregations, and enrichment.
- Schema Evolution: Automatically handles changes in source schemas without disrupting data pipelines.
Integration with Iceberg
How Upsolver Leverages CDC Under the Hood
Upsolver captures CDC events from source systems like databases or message queues and processes them in real time. It then writes the transformed data into Apache Iceberg tables, ensuring that the data lake reflects the current state of the source systems.
Steps to Set Up Streaming into Iceberg Tables Using Upsolver:
- Connect to Data Sources:
- Use Upsolver's connectors to ingest data from databases (using CDC), Kafka topics, or other sources.
- For databases, Upsolver connects to the transaction logs to capture real-time changes.
- Define Data Transformations:
- Use Upsolver's SQL-based interface to define how data should be transformed.
- Apply filters, enrich data, and handle data type conversions as needed.
- Configure the Iceberg Output:
- Select Apache Iceberg as the output destination.
- Specify the target table, partitioning scheme, and other table properties.
- Upsolver handles data formatting, partitioning, and efficient file writes optimized for Iceberg.
- Deploy and Monitor the Pipeline:
- Deploy the data pipeline with a few clicks.
- Monitor data flow, performance metrics, and error handling through Upsolver's dashboard.
Advantages of Using Upsolver with Iceberg:
- Simplified Pipeline Management: Reduces the complexity of building and maintaining CDC pipelines.
- Optimized Performance: Upsolver optimizes data writes for Iceberg, improving query performance.
- Flexibility: Easily adjust pipelines to accommodate new data sources or changes in requirements.
Using Estuary
Features and Benefits
- Real-Time Data Integration: Focuses on streaming data integration with event-driven architectures.
- Low-Code Development: Enables users to build data pipelines with minimal coding.
- Schema Management: Automatically manages schema detection and evolution.
- Operational Simplicity: Cloud-native service that abstracts operational complexities.
Integration with Iceberg
How Estuary Streams Data into Iceberg Using CDC
Estuary captures CDC events from various sources and streams them into Apache Iceberg tables. It handles the ingestion, transformation, and loading processes, ensuring that data is consistently and accurately reflected in the Iceberg tables.
Steps to Set Up Streaming into Iceberg Tables Using Estuary:
- Set Up Source Connectors:
- Choose from Estuary's connectors to ingest data from databases, APIs, or message queues.
- Configure CDC connectors to capture data changes in real time.
- Define Dataflows:
- Create dataflows that map source data to destinations.
- Apply transformations, filters, or enrichments as necessary.
- Configure Iceberg as the Destination:
- Select Apache Iceberg as the target for the dataflow.
- Provide connection details for the Iceberg catalog and specify the table to write to.
- Handle Schema and Data Mapping:
- Estuary automatically detects the schema from the source data.
- Maps source fields to the Iceberg table schema, handling any necessary conversions.
- Deploy and Monitor:
- Launch the dataflow and monitor its performance through Estuary's interface.
- Use built-in alerts and metrics to ensure the pipeline runs smoothly.
Best Practices When Using Estuary with Iceberg:
- Ensure Schema Compatibility: Verify that the source and destination schemas are compatible, making adjustments if necessary.
- Partitioning Strategy: Define appropriate partitioning in Iceberg to optimize query performance.
- Data Quality Checks: Implement validation rules within Estuary to maintain data integrity.
- Security Considerations: Leverage Estuary's security features to protect sensitive data during transit and at rest.
Comparing Managed Services with Open-Source Tools
Pros and Cons
Managed Services (Upsolver and Estuary):
Pros:
- User-Friendly: Simplifies the creation and management of data pipelines.
- Reduced Operational Overhead: No need to manage servers or infrastructure.
- Quick Time-to-Value: Faster deployment of data solutions.
- Expert Support: Access to vendor support and resources.
Cons:
- Cost: Ongoing subscription fees can be higher than self-managed solutions over time.
- Less Customization: Limited ability to modify underlying systems or customize beyond provided features.
- Vendor Dependency: Reliance on the service provider's availability and support.
Open-Source Tools (Flink, Kafka Connect, Debezium):
Pros:
- Flexibility and Control: Full access to configure and customize the pipeline.
- Community and Ecosystem: Large community support and a wide range of plugins and integrations.
- Cost Efficiency: Potentially lower costs, especially if existing infrastructure is utilized.
Cons:
- Complexity: Requires specialized knowledge to set up and maintain.
- Maintenance Effort: Responsibility for updates, scaling, and troubleshooting lies with the user.
- Longer Deployment Time: May take more time to develop and deploy compared to managed solutions.
When to Choose Managed Services
- Limited Technical Resources: Organizations without dedicated data engineering teams benefit from managed services.
- Need for Rapid Deployment: When speed is critical, managed services offer quicker setup.
- Focus on Core Business: Allows teams to concentrate on business goals rather than infrastructure.
- Scalability Requirements: Managed services can automatically scale with data volumes without additional configuration.
Managed services like Upsolver and Estuary offer powerful solutions for streaming data into Apache Iceberg tables with minimal effort. They abstract the complexities involved in CDC implementations, providing a streamlined experience for data ingestion and processing.
By leveraging these platforms, organizations can:
- Accelerate Data Projects: Quickly build and deploy data pipelines to meet business needs.
- Reduce Operational Complexity: Offload infrastructure management and focus on data analytics.
- Maintain Data Freshness: Ensure that Iceberg tables are updated in real time, supporting timely decision-making.
- Adapt to Changes: Easily handle schema evolution and new data sources without significant rework.
Scenario #4: Efficient Acceleration with Dremio's Incremental and Live Reflections
In this scenario, we'll explore how Dremio enhances query performance on Apache Iceberg tables through its Reflections feature, specifically focusing on Incremental Reflections and Live Reflections. These capabilities leverage Change Data Capture (CDC) to provide real-time, optimized query acceleration without additional overhead for end-users.
Introduction to Dremio Reflections
Dremio Reflections are a powerful optimization feature that creates optimized representations of datasets (tables or views) within Dremio. These representations are designed to accelerate query performance transparently. Here's how they work:
- Custom Optimization: Reflections can have custom sorting, partitioning, and aggregation strategies tailored to your query patterns.
- Transparent Substitution: Dremio automatically substitutes Reflections in place of the original datasets during query execution when beneficial, without user intervention.
- Periodic Refreshes: Reflections are periodically refreshed to ensure they stay up-to-date with the source data. Traditionally, this involves full refreshes, rebuilding the Reflection entirely from the source data.
Enhancing Reflections with Apache Iceberg and CDC
When using Apache Iceberg as the source data format, Dremio Reflections gain significant enhancements powered by CDC. Iceberg's rich metadata and support for CDC allow Dremio to optimize Reflections in two key ways:
- Incremental Reflections
- Live Reflections
These features enable faster, more efficient Reflection refreshes, ensuring that query acceleration is both timely and resource-efficient.
Incremental Reflections
What are Incremental Reflections?
Incremental Reflections are Reflections that can be updated incrementally rather than being fully rebuilt each time the source data changes. Instead of reprocessing the entire dataset, Dremio processes only the data that has changed since the last refresh.
How Incremental Reflections Work with Iceberg
- Change Tracking: Apache Iceberg tracks data changes at the file and row levels using snapshots and metadata.
- Efficient Data Processing: Dremio leverages Iceberg's metadata to identify new, updated, or deleted data since the last Reflection refresh.
- Partial Refreshes: Only the affected partitions or data files are processed, significantly reducing the time and compute resources required for refreshes.
Benefits of Incremental Reflections
- Faster Refresh Times: By processing only changed data, refreshes are completed much more quickly.
- Reduced Resource Consumption: Less compute power and I/O are needed, lowering operational costs.
- Timely Data Availability: Ensures that Reflections are up-to-date with minimal latency, enhancing query accuracy and performance.
Live Reflections
What are Live Reflections?
Live Reflections are Reflections that are refreshed automatically whenever the underlying Iceberg table changes. This means that any data modifications—such as inserts, updates, or deletes—trigger an immediate refresh of the associated Reflections.
How Live Reflections Work with Iceberg
- Event-Driven Refreshes: Dremio listens for changes in the Iceberg table's metadata.
- Automatic Triggering: Dremio initiates a Reflection refresh without manual intervention upon detecting a change.
- Seamless Integration: The process is transparent to end-users; queries automatically benefit from the refreshed Reflection.
Benefits of Live Reflections
- Real-Time Acceleration: Queries benefit from the latest data almost immediately after it becomes available.
- Elimination of Stale Data: Reduces the risk of querying outdated data, improving decision-making accuracy.
- Operational Simplicity: Removes the need to schedule Reflection refreshes or manage refresh intervals manually.
Practical Implications for Your Data Platform
By utilizing Dremio's Incremental and Live Reflections on Iceberg tables:
- Optimized Performance: Achieve near real-time query performance even as data volumes grow.
- Scalability: Handle large datasets efficiently, thanks to Iceberg's and Dremio's scalable architectures.
- Simplicity: Minimize the operational burden on data engineers and analysts, allowing them to focus on deriving insights.
- Flexibility: Customize Reflections to suit specific query patterns and workloads without compromising on freshness or performance.
Conclusion
This blog explored four scenarios demonstrating how Apache Iceberg, combined with Change Data Capture (CDC) techniques, empowers organizations to handle data changes efficiently and reliably.
- Capturing Changes Within Iceberg Tables Utilizing Iceberg's
create_changelog_view
procedure allows you to track and analyze data modifications directly within Iceberg tables. This approach is ideal when:- You need to audit data changes for compliance or debugging purposes.
- Synchronize internal datasets without relying on external systems.
- Ingesting External Changes into Iceberg Integrating tools like Debezium with Kafka Connect or Apache Flink enables streaming CDC events from external databases into Iceberg tables. This scenario matters when:
- Keeping your data lake synchronized with operational databases in real time.
- Supporting real-time analytics across diverse data sources.
- Handling high-throughput data ingestion with robustness and scalability.
- Leveraging Managed CDC Services Services like Upsolver and Estuary simplify CDC implementation by providing managed pipelines that stream data into Iceberg tables. This option is beneficial when:
- Rapid deployment is critical, and you want to avoid the overhead of building pipelines from scratch.
- Technical resources are limited, and ease of use is a priority.
- Focusing on insights over infrastructure, allowing teams to concentrate on core business objectives.
- Enhancing Query Performance with Dremio Reflections Dremio's Incremental and Live Reflections offer optimized representations of Iceberg tables that automatically refresh upon data changes. This scenario is crucial when:
- High-performance query acceleration is needed without manual intervention.
- Ensuring up-to-date analytics by providing users with the freshest data possible.
- Reducing operational costs through efficient, incremental updates instead of full data reloads.
Key Takeaways:
- Flexibility and Choice: Apache Iceberg's compatibility with various tools and platforms provides multiple pathways to implement CDC based on your organization's needs and existing infrastructure.
- Efficiency and Performance: Leveraging CDC with Iceberg enhances data freshness and query performance while reducing resource consumption and operational overhead.
- Simplification of Complex Processes: Managed services and advanced features like Dremio Reflections abstract the complexities involved in CDC, making real-time data processing more accessible.
- Real-Time Insights: Implementing these scenarios empowers organizations to react swiftly to data changes, enabling timely decision-making and maintaining a competitive edge.
When They Matter:
- Scenario 1 is ideal for internal data change tracking within Iceberg-managed datasets, particularly when auditing and historical data analysis are required.
- Scenario 2 is suited for organizations that need to ingest and process changes from external source systems into their data lakes in real time.
- Scenario 3 benefits teams seeking a managed, low-code solution to implement CDC without the complexity of building and maintaining custom pipelines.
- Scenario 4 is essential for those aiming to enhance query performance and provide end-users with immediate access to the latest data without additional operational efforts.
By understanding these scenarios and their applications, you can choose the most appropriate strategy to optimize your data architecture. Whether it's through leveraging built-in features of Apache Iceberg, integrating with open-source tools, or adopting managed services, these approaches help ensure your data infrastructure is robust, efficient, and capable of meeting modern analytics demands.