Table of Contents
Streamlining Data Quality in Apache Iceberg with write-audit-publish & branching
Data quality is a pivotal aspect of any data engineering workflow, as it directly impacts the downstream analytical workloads such as business intelligence and machine learning. For instance, you may have an ETL job that extracts some customer data from an operational source and loads it into your warehouse. What if the source contains inconsistent data or duplicates, and those inconsistencies are not validated before pushing it to the production tables? These production tables might be used for building BI reports, and any discrepancies can lead to wrong insights, which can further impact your organization’s decision-making abilities. It is, therefore, critical to ensure that data is validated and audited at each step of the data pipeline before pushing it to the production environment.
Although data quality can mean different things for different stakeholders, the overall goal of data quality is to ensure that data is accurate, consistent, and reliable for usage. From a data engineering perspective, data quality can be quite challenging, especially at scale, when dealing with many data sources and having multiple data pipelines writing data to various target destinations. Hence, an effective mechanism to validate data at certain checkpoints is supercritical. This is what brings us to the write-audit-publish (WAP) pattern.
What is Write-Audit-Publish (WAP)?
Write-Audit-Publish (WAP) is a data quality pattern commonly used in the data engineering workflow that helps us validate datasets by allowing us to write data to a non-production environment and fix any issues before finally committing the data to the production tables. Let’s look at the three stages of this pattern in a bit of detail.
- Write: This stage ensures that we can safely write data to non-prod tables, i.e., in isolation of the current production workloads. Usually, data is written to a staging or audit table, which acts as an intermediary between source and target systems. Post data validation, the staging tables are promoted (or data is moved) to production for further consumption by workloads.
- Audit: This stage enables us to understand the quality of our newly inserted data. During this stage, we can run various data quality checks such as inspecting NULL values, identifying duplicated data, running referential integrity tests, etc. By performing these checks, we can examine the health of our data and fix any warranted issues.
- Publish: The publishing stage allows data to be made available to the production tables for the consumption of the downstream analytical applications. Data is expected to be of a certain quality at this stage, so the insights derived from the data can be helpful to an organization. The publishing stage should be atomic so anyone using the tables sees all of the changes or none of them.
Now that we have a general idea of the write-audit-publish pattern, let us try to understand how we can implement such a data quality framework using the Apache Iceberg table format.
Write-Audit-Publish (WAP) using Apache Iceberg
It is essential to understand that Apache Iceberg, as a data lake table format, provides the necessary APIs and semantics to implement the write-audit-publish pattern. However, the responsibility of implementing such a feature and supporting the applications (use cases) lies on the compute engine side. At the time of writing this article, Apache Spark is the only compute engine that fully supports this specific pattern. Therefore, all our following discussions around the implementation of WAP will be with respect to Apache Iceberg and Spark.
Until version 1.2.0, there was only one way to achieve a WAP pattern in Apache Iceberg, i.e., using the table property
WAP.id. However, the latest release brings in a branching capability for WAP using a branch property called
WAP.branch. Implementing WAP with branching is the recommended best practice as it helps streamline the WAP pattern and makes it super easy to validate and support multiple changes. For instance, we can use the
VERSION AS OF branch_name syntax to validate and support multiple changes. However, if you are on older versions of Iceberg, you can choose to take the
WAP.id way. In this blog, we will look at both techniques for writing-audit-publish in Iceberg. The code used in these two methods can be found here.
Method 1: Using Branch Property (WAP.branch)
Leveraging Apache Iceberg’s native branching capability is the recommended way to implement write-audit-publish in Iceberg. If you are unaware of what branching is, here is a detailed blog, but if you come from the Git world, you might already know how branches work. So essentially, in the data world, you will have a main branch with production data and an isolated branch for experimental data work.
In Apache Iceberg, branches are named references for snapshots (i.e., state of the Iceberg table at a certain point) and are mutable, meaning you can write data to a branch.
In this approach, we will first create a new branch from our existing Iceberg table, stage the newly written changes (made by an ETL job) to this branch, make the necessary validations, and based on the quality checks, publish the new data to the main branch or just decide to drop the branch. The publish operation is facilitated by Iceberg’s cherry-pick procedure, which basically creates a new snapshot from an existing snapshot without altering or removing the original one.
This is what the process looks like visually.
Let’s see this in action using this approach. For the sake of this particular demonstration, we will use a different table called
First, let’s create a new branch so we can start writing new data to our table.
spark.sql("ALTER TABLE glue.test.salesnew CREATE BRANCH ETL_0305")
As of now, our branch has a table with 500 records.
To start writing new data using the WAP pattern, we first need to set the Iceberg table property,
write.wap.enabled=true for our table. This way we are configuring our table for the WAP pattern.
Next, we will need to assign the branch identifier (
ETL_0305 in our case) to the Spark session configuration property,
Spark.WAP.Branch. This will ensure that all operations (read and write) can happen explicitly against this branch
spark.sql( """ALTER TABLE glue.test.salesnew SET TBLPROPERTIES ('write.wap.enabled'='true')""" ) spark.conf.set('spark.wap.branch', 'ETL_0305')
Now let’s write some new data to this new branch.
spark.sql( """CREATE OR REPLACE TEMPORARY VIEW salesetl USING csv OPTIONS (path "sales_ETL.csv", header true)""" ) spark.sql("INSERT INTO glue.test.salesnew SELECT * FROM salesetl")
If we now query the table, we should see the newly made changes.
Note that because we set the
Spark.WAP.Branch property for this particular table, therefore, our records were ingested to the specific branch,
ETL_0305 only, and we could read data just using the table name
salesnew instead of providing the branch name explicitly. You can also query the specific branch using the query below.
Let’s also make sure there was no impact to the
main branch of our table.
spark.sql("SELECT * FROM glue.test.salesnew VERSION AS OF 'main'").toPandas()
As we can see, there are still 500 records in the main branch, which means the write happened in isolation, i.e. to our branch
The next step is to run some checks on the dataset to ensure it adheres to the quality measures defined within an organization. In the audit stage, we can integrate any 3rd party tools that help ensure data quality checks in the workflow.
But for this specific example, we will use a Python code to understand if there are any NULL values in the data.
So it looks like 54 null value records are in this table. At this point, we could do a couple of things.
- We could investigate the source data to understand where these NULL values originated.
- If this is due to an issue in our ETL job, we could look into that and add some robust error-handling steps.
- We can alert the downstream teams and applications about this issue, and depending on their use case, they can decide what they would like to do.
- We could try to apply some imputation methods and fix or delete the records by discussing them with the stakeholders.
Additionally, if we decide not to publish this new load of data at all, we can just drop the branch.
spark.sql("ALTER TABLE glue.test.salesnew DROP BRANCH ETL_0305")
For our demonstration, we will do a quick median imputation of the NULL value column and overwrite the imputed values to our branch
ETL_0305. That way we can take care of the missing values.
from pyspark.sql.functions import col, coalesce, percentile_approx, lit # Calculate the median value for each column medians = df_qc_sales.agg(*[percentile_approx(col(c), 0.5).alias(c) for c in df_qc_sales.columns]) # Replace null values with the calculated median df_imputed = df_qc_sales.select([coalesce(col(c), lit(str(medians.first()[c]))).alias(c) for c in df_qc_sales.columns]) # Show the imputed DataFrame df_imputed.show() # Overwrite the imputed data to our branch ETL_0305 df_imputed.write.format("iceberg").mode("overwrite").save("glue.test.salesnew")
If we now run our data quality check code, we should see no NULL values now.
It is important to note that all these changes that we made are specific to our branch
ETL_0305 and aren't available yet on the main branch. Now let’s go ahead and publish the data to make it available to the downstream consumers.
We will use the cherry-pick technique to make the data available to our production table. Cherry picking allows us to select a particular snapshot, i.e. a branch,
ETL_0305 in our case and create a new one based on that. Note that cherry-pick is a metadata-only operation, which means that the actual data files (such as Parquet, AVRO, JSON, etc.) aren’t touched as part of this operation. Therefore, cherry-picking allows us to publish the data to production without really moving the data files. Additionally, cherry-picking only works for one commit.
Other than cherry picking, there is another way using the Java API, ManageSnapshots fastForwardBranch() that can basically support fast-forward merging of branches to the main.
Note that the Apache Iceberg community has recommended a new Spark procedure
fast_forward() that can support updating the main branch to a particular state (xyz) or updating another branch to a specific state (xyz). So, we should see some progress there soon.
Since in our case, we want to use Spark as the compute engine to publish data we will go with the cherry-picking method. This procedure expects us to pass a snapshot ID parameter. So, to find out the snapshot ID associated with our specific branch, we will query the metadata table called references.
spark.sql("SELECT * FROM glue.test.salesnew.refs").toPandas()
This gives us the snapshot ID for our branch
ETL_0305 which is
5073925010883267751. Let’s go ahead and run the procedure.
So, now our branch’s snapshot is the current state of the table
salesnew. Now, if we query our main branch, we should see the new records. Great!
To end this specific WAP session tied with the branch, we will unset the Spark config property,
Method 2: Using Table Property (WAP.id)
The second method to implement write-audit-publish (for Iceberg versions lower than 1.2.0) is using the
WAP.id approach. In this method, Apache Iceberg allows writing data to production tables in an unpublished state, running time travel queries on the unpublished data (i.e., snapshot) to validate, and based on the data health, i.e., if it passes the data quality checks, allows committing the unpublished data to the production tables. Visually, this is what the entire process looks like.
The 3 stages are elaborated below.
As described in the previous method, we first need to set the Iceberg table property,
write.wap.enabled=true for our table. This ensures that the new data gets written in ‘stage’ mode only.
As of now, our table churn has 935 rows as seen below.
spark.sql( """ALTER TABLE glue.test.churn SET TBLPROPERTIES ('write.wap.enabled'='true')""" )
The next step is to set a specific session ID to our Spark session configuration property,
Spark.WAP.ID. This session ID will then be used in the Audit stage to run data quality checks.
import uuid session_id = uuid.uuid4().hex spark.conf.set('spark.wap.id', session_id)
Once we configure these two things, we are good to run our ETL job to insert new data to the
spark.sql( """CREATE OR REPLACE TEMPORARY VIEW chview USING csv OPTIONS (path "churn_etl.csv", header true)""" ) spark.sql("INSERT INTO glue.test.churn SELECT * FROM chview")
Again, these new writes will only happen in ‘stage’ mode, so they won’t be directly committed to the actual production table. Let’s verify that by getting a count of the records.
Now in the Audit step, our goal is to:
First, find the snapshot ID that was generated by the new write operation (the ETL job) based on the specific session ID that we set in the Write stage.
spark.sql("SELECT snapshot_id FROM glue.test.churn.snapshots WHERE summary['wap.id'] = '4462ae45b5664c2cb8a95032ec910892'").toPandas()
Upon passing the session_id value retrieved from the Write stage to the query above, we get the snapshot ID, 7869769243560997710.
Now, if we run a time-travel query to see how our ‘unpublished’ records look like, we can see that our ETL job inserted some new records.
spark.sql("SELECT * FROM glue.test.churn VERSION AS OF 7869769243560997710").toPandas()
Now that we have verified that the data exists in our churn table and is unpublished, we can run any data quality checks we would like. For this example, we will verify if there are any NULL values for a few columns using a similar Python code like in the first method. Again, you can also incorporate any 3rd party tool at this stage.
Since we are good with our data quality validation, our next step is to make the new data available to the production table.
In our first approach, we discussed using the cherry picking technique to publish our data to the production table. Cherry-picking is a metadata-only operation and can be run by calling a Spark procedure, as shown below.
spark.sql("CALL glue.system.cherrypick_snapshot('test.churn', 7869769243560997710)").show()
And if we now verify our records, we can see that our churn table in production is updated with the new data.
Like with any technical implementation, a couple of factors could be considered when implementing the write-audit-publish pattern for your data pipelines.
- Data latency requirements: One main factor to consider is if some of your downstream consumers are in real-time. In that case, you must adjust your pipelines (depending on the volume, etc.) to adhere to the SLAs defined. The same goes for any batch pipelines that ingest new data to the lakehouse or warehouse.
- Integration with other tools: If you want to integrate any specific data quality tool in your data pipeline as part of the WAP pattern, you must factor in the integration details with the compute engine and the tools. If it is a widely-known compute engine like Spark, there should already be some integration. E.g., Great Expectations has a test case to validate null values in Spark dataframes.
- Scalability: Your organization’s data needs will grow over time. Therefore there should be some general idea about how well your infrastructure can scale so you can design your WAP pattern accordingly. This will help bring in other tools or engines to the infrastructure.
Write-Audit-Publish can be a great data quality framework in a data engineering workflow, especially when dealing with data at scale. In this blog, we went through two approaches to implementing this pattern in the Apache Iceberg table format using Spark as the engine. Both of these approaches help us improve the data quality of our Iceberg tables by allowing us to do isolated quality checks and validations without impacting the production workloads. Iceberg’s new branching capability adds much value to the write-audit-publish pattern by facilitating easy validation and supporting multiple changes.