Welcome, everyone. Thank you for joining this session. My name is Emily and I’m from Dremio. I’ll be moderating this presentation today, and I’m glad to welcome Gautam, who will present Iceberg at Adobe, Challenges, Lessons, and Achievements. If you have any questions during the presentation, please type them into the chat at any time. We will address the questions live during the Q and A session at the end of today’s presentation. I’ll turn it over to our speaker, Gautam. [00:00:30] Go ahead.
Thanks, Emily. My name is Gautam Kowshik and hope everyone is staying safe and healthy. Today, I’ll be going over our journey with Iceberg at Adobe, some of the challenges we faced along the way, and some [00:01:00] of the ways we circumvented those challenges, and some of the stories along that line.
A little bit about myself, my name is Gautam Kaushik. I’ve been working with large scale distributed systems for about 10 years now. I currently work with the experience query service team at Adobe, which is a team that’s responsible for building a petabyte scale SQL engine to provide access on data on AE. [00:01:30] Also been contributing to the Apache Iceberg project since about Jan. of 2019.
A little bit about Adobe Experience platform, it is an open system for driving personalized experiences. It is a platform that allows our customers, partners, and solutions to provide a 360 degree view of their own data in a centralized way.
It is a platform to run targeted campaigns, classified profiles and leverage advanced analytics. [00:02:00] Our upstream and downstream consumers typically are Adobe’s customers, Adobe’s data partners and some other Adobe solutions that also include some internal teams that use AE platform to power their products. The Adobe Experience platform is build to process and scale to exabytes of data.
A little bit about how the flow of data works. Our producers that typically include Adobe’s external customers, Adobe’s [00:02:30] data partners, and data solutions send data in a streaming fashion in a near real-time way to a streaming pipeline, which is the [inaudible 00:02:39] and the streaming pipeline directly serves data to our consumers and our consumers can be a unified profile service or unified identity service. These provide a 360 consumer profile identity view on the data lake that power advanced [00:03:00] analytics and multi-channel marketing products. It also powers the experience query service.
Like I mentioned, it is a petabyte scale SQL engine that powers post hoc analysis and ad hoc analysis of data on the AE platform. The data science workbench, which is used by Sensei which is an ML, a machine learning engine for machine learning workloads in the data science workbench. And also our Adobe Solutions, which also include our internal teams [00:03:30] where other teams directly access that data to power their products. The streaming pipeline periodically also writes in batch to a data lake every 15 minutes or so, and at the center of it all is the data lake which is powered by the Microsoft Azure Data Lake store. Adobe producers also can write data in batch directly using our bulk ingest APIs to the data lake directly. And [00:04:00] those are then accessible to our consumers directly off of the data lake in a batch fashion.
So the data lake is the source of truth. That’s where all our data goes. All the producers and consumers do access the data. Typical daily scale is about a million batches. A batch is like a transaction ID, which keeps track of all the data that come into the system. It’s about in the order of 10 terabytes per day [00:04:30] and 32 billion events on a daily basis and tens of thousands of data sets that we see on a daily basis. On a per data set level, we’re seeing about 100,000 files in the order of about 250 GB of uncompressed data per dataset. This obviously is expected to grow multiple orders of magnitude this year and that is how our platform is built. To give you an idea of the foundational architecture, [00:05:00] first is the metadata layer, which is the catalog, which keeps our data set metadata and the schema registry, which we call the XDM registry.
It’s an open system. It has an open NCPA for our consumers to access their schemas and write new schemas as well. Our data ingest layer includes platform services, data connectors, and streaming APIs and also a batch bulk ingest APIs that interface [00:05:30] with the data lake to write data using a centralized platform SDK. The platform SDK is a centralized API to produce data. The storage layer, which is the Microsoft ADLS store, where all the data goes and the data access layer where consumers access their data. Consumers can use the platform services API, the data access API, and Experience Query Service API. So these enable various forms [00:06:00] of access to the data lake and these are, again, integrated with our platform SDK, which is the central API to access data.
The other layer is the data management. Our data management layer is for out of order processing and housekeeping activities like garbage collection, keeping track of soft deletes, getting rid of old data, keeping it in an optimal way, using data compaction, doing backfills and also [00:06:30] enforcing privacy guarantees and privacy services like GDPR.
The data and metadata flow into the data lake and most of the metadata are used to be kept in catalog and the metadata and data are served to the data access API. So before I get into why, for in particular, Iceberg, what were some of the challenges we had before that govern [inaudible 00:06:57] why we needed something like Iceberg. One of [00:07:00] the first challenges was data reliability. Because we have such a dynamic schema, enforcement on that schema was very difficult and we couldn’t guarantee a 100% guarantee tight consistency and lack of data corruption. So this led to some issues we used to pay some production [inaudible 00:07:20] but we saw some inconsistencies and data corruption. In a very high concurrent environment we used to have Spark jobs that could fail [00:07:30] and sometimes, because we wrote data directly, sometimes the jobs would fail and would leave partial reserves in and would expose the reader with partial data. This left the reader in an inconsistent state.
In a very concurrent environment, we had readers and writers also the reach conflicts because sometimes we would have large backfills that would try and rewrite a huge bunch of data and also rewrite some of the metadata and that, [00:08:00] because our metadata and data were separate, these would lead to conflicts and, in some cases data loss, as well.
The other issue was read performance. So to be able to do query planning at the very least, we needed to list the data and the directories and files to even understand the effort needed to query the data and this, as you know, listing is an O(n) complexity. And as the data sets grew in size and metadata our queries got slower. [00:08:30] And even if we tried to optimize the metadata, the listing was something we couldn’t get away from.
The planning wasn’t very efficient. By this I mean that all those with planning, even if our data was partitioned in fairly optimal ways, the split planning would map way too many files than was necessary. So our compute would end up scanning way too much data than necessary.
Scalability was the other issue, because we have a catalog, [00:09:00] which is a separate metadata store and we depended on a lot for our metadata. The metadata and the data were separate and often if there were latency issues or issues with the catalog or the metadata there would be issues with reading or even accessing the system. So it was a single point of failure in a way.
The metadata also needed to be evicted because of the size and scale of the metadata kept in catalog. We had to keep it in an optimal [00:09:30] way by rewriting the metadata very often.
So why Iceberg? So the what about Iceberg made it appeal for us? So as I mentioned that we use the Microsoft ADLS Data Lake store, which is an eventually consistent cloud object store. And we needed a way to be able to do asset compliant transactions and Iceberg is big for these kinds of cloud object stores and to be able to do asset compliant transactions [00:10:00] at the dataset level. And there are also other projects now that do multiple consistencies and we’re looking to do it as well.
It was fairly easy to integrate Iceberg. Iceberg doesn’t have any long running processes. The bootstrapping with Iceberg was fairly easy. We could integrate it into our data management layer and our SDKs in fairly easy ways. So there was no operational overhead. In fact, it came along with so it [00:10:30] gelled really well with our processes. So since Iceberg keeps all of the metadata on the same data lake where the data is kept alongside the data and it also controls all of the region drives, the metadata is always consistent with the data.
So because Iceberg is integrated with many compute engines, these are Spark, MapReduce, Flink and Presto and more coming, it scales [00:11:00] really well with the compute engine. So things like split planning can be scaled. The data metadata is laid out in ways that you can choose to do the query planning in fairly scalable ways. The scans are also, the filtering and scans are also fairly scalable.
Iceberg maintains schema mapping by using identifiers within the schema. So it does not depend on the names of the fields rather than it has the ID mechanism. So what [00:11:30] it allows us to do is it allows us to have very expressive schema evolution semantics, you can add and delete fields, you can rename fields, or even reorder them without having to worry about any kind of data issues. So Iceberg doesn’t really care about the order in which the fields are kept because it keeps an ID mapping.
Iceberg also has a fairly open interface for metadata. All of its metadata is accessible using Spark DataFrame APIs. [00:12:00] So you can actually process and inspect the metadata on Iceberg much like any other data sites that you would. So you can actually run Spark jobs or whatever, or MapReduce jobs on the metadata itself. It allows you to actually have very open interfaces to expose these using other APIs that are specific to your business logic. It also has great SQL extensions. And there’s an excellent talk by Anton from yesterday, which you can watch the recording of, which talks about a lot of the SQL extensions that have gone [00:12:30] into Iceberg. We are planning to leverage a lot of the SQL extensions, we already are as we speak. And it also has a really open catalog API, which allows you to integrate your existing metadata store or your existing catalog into the Spark catalog.
So how did we integrate Iceberg into our system? Like I mentioned before, we had the metadata there and our data layer. The first point of integration [00:13:00] is all of our data management processes had to be integrated with Iceberg. So this was like adding Iceberg a dependency to some of our processes and making sure all of our data management processes went to the Iceberg APIs to read and write data. And because we had a centralized platform SDK, which makes sure all the access to the data lake is from a central API, all we had to do was integrate Iceberg into that one single SDK and it kind of enabled Iceberg [00:13:30] for everyone in a transparent way. So most of our services aren’t even aware that they’re using Iceberg. So because that SDK is a Spark data source, they just access their data like any other Spark data source. And we are able to get the benefits of Iceberg.
So the change now is that only data set level metadata is kept in catalog. So this lightened the load on the catalog and our dependence on it reduced quite a bit. And most of the table level [00:14:00] and underneath partition pile statistics are all kept along with the metadata in Iceberg. So this separated the catalog from a lot of the processing that we depend on. So taking away a lot, the single point of failure issues that we used to have.
So with that, next I would like to go over some of the challenges we faced and how we went about circumventing these challenges. One of the first things we faced was guaranteeing exactly [00:14:30] once semantics. So like I mentioned, we ingest data in batches, or you can look at them as transactions, and you can have multiple writers writing the same batch because of retries or for some other reason. So because we used to commit data directly to Iceberg, there wasn’t a way for us to guarantee exactly once semantics. So we sometimes had two different batches coming in.
We [00:15:00] also had pre-flight validation checks we used to run in our legacy system before we mastered our data. After we processed it and wrote it to the data lake, we had to run pre-flight validation checks and audit checks to make sure that the data is fairly compliant for consumption.
On finding data bugs and, at a later point after ingesting the data, we needed ways to rollback and continue processing without having any downtime on our [00:15:30] dataset.
So to get around this, we had built a feature called the Write-Audit-Publish Flow. This is a feature in Iceberg, it’s available. What it does is that allows you to stage your commit. So what Iceberg does is every time you write data in Iceberg, you commit the data at the end to mark the end of the [inaudible 00:15:52]. So what the Write-Audit-Publish Flow does is it allows you to stage your commits. So Iceberg is aware of the comment, [00:16:00] but it’s not available to the active line of readers. So we get a chance to look at the stage from it, inspect it, run our pre-flight validation checks, audit it, and then do a publish. So it does a two phase commit of sorts.
So this allowed us to do a lot of these pre-flight validation checks. It also allowed us to inject these guarantees for exactly once. So in our pre-flight validation check, we will make sure that no two transaction IDs are the same. So if a new transaction [00:16:30] ID came in, we would check if this ID was already ingested. So this allowed us to have exactly once semantics.
The other issue we faced was high-frequency commits that caused contention and failed commits. So, because the rate at which data was coming in, we were initially just writing it as and when it came into Iceberg. So, because Iceberg is great for slow changing data, it is not really great for doing high-frequency commits. So [00:17:00] this was one of the challenges we faced.
The other was Iceberg keeps a centralized Version-hint file. The Version-hint file is a file that marks, it gives you an idea of what is the latest version of the commit on the table. So this is used by the reader, so the first thing that the reader does in Iceberg is look at the Version-hint file. And that’s how it knows that that’s the metadata, that’s the latest and metadata that it needs to read from.
So what would happen is because the Microsoft Azure Data Lake store [00:17:30] does not provide overwrite semantics. So every time a new commit [inaudible 00:17:34], the Version-hint file will get overwritten. The overwrite semantics in this cloud object store is not [inaudible 00:17:41]. So this lead to readers, for a small amount of time, the Version-hint file will disappear, [inaudible 00:17:46] was being overwritten. So this would lead to read failures while we heard a fair amount of comments coming in.
So we did a couple of things for this. One thing was batch buffering. We built a service [00:18:00] called the Consolidation Service, which consolidates the incoming transactions or incoming batches, and then collates them and then does one single Iceberg commit. So even though we have multiple batches coming in, those would translate to one Iceberg commit. So this helped us stem the flow of the commits coming into Iceberg.
The other thing we did was, so this was a tricky problem, the Version-hint issue, we looked at multiple ways to do this. We [00:18:30] tried doing version directory listing, tried to do multiple things but the power of open source is that you can actually ask questions and somebody else might have a better solution for this. So this community actually came up with a smarter solution where instead of, if you look at what the Version-hint file it is just a hint. It is not the source of truth. So what the Iceberg community did was they added a feature to be able to look at the Version-hint file, and if the reader didn’t find the Version-hint file, it would fall back, it [00:19:00] would sort the metadata, the files, and then look at the latest one. So what this did was the readers were able to still continue reading, even though there was the Version-hint file wasn’t the version.
There’s an excellent talk by Andrea and Sean from yesterday, the recording of which goes into detail on both these things that I just spoke of.
The other issue we faced was at the time of integration, Iceberg only had data file level [00:19:30] delete support. So we needed ways to be able to enforce the quality deletes. By this I mean that if somebody says, “delete data where some filter,” then whatever does match those filters, that data needs to be to deleted. Typical GDPR style deletes. So we didn’t have a way to do this straight off the bat of a need to get Iceberg.
But for this we built an internal [00:20:00] implementation file, what we call as a tombstoning mechanism, which basically takes, because we track our data is by batch IDs, what we would do is that we would track every time somebody deleted data, we would find the batches that it matches. And then we record the snapshots in Iceberg as tombstones so we allow for writers to be able to express soft deletes using batch IDs. So this allowed us to do [00:20:30] a deep enough metadata in Iceberg and on the read side, when it will be guide from an Iceberg table, we would look at the tombstoning information imported in the snapshot summary and use that to do an anti-join on the read side. So this allowed us to do a soft delete mechanism, enforce a soft delete mechanism, while the reader will always ensure the correct data.
So we are excited to move to the table Iceberg V2 format. The Iceberg community has a V2 [00:21:00] table format now, which actually does the quality-based deletes in a very performant way and that is being built right now. So we’re excited to move to this new feature. So we are testing this out and we’re also helping build it.
The other issue we faced was slow scan read performance. So back in Jan. 2019, when we did a performance comparison with our legacy, our data is kept in Parquet. So when we compare the scan performance [00:21:30] between Iceberg’s reader and our legacy reader, it wasn’t as performant. So we had issues with going directly to production with Iceberg as it came out, so that was one issue.
The other issue was query planning. So although Iceberg does a very good job of keeping all of its manifests and metadata in very optimal ways. What can happen is that as and when you ingest data, depending on the ingest pattern, your partitions [00:22:00] might end up, your partition metadata might end up in multiple snapshots being more snapshots and way more manifests than needed. So your query planning can suffer the query planning and manifest have a very strong relationship. So if your partitions are scattered across many manifests, you will have query planning issues.
To get around these issues, we had introduced an Iceberg [00:22:30] Apache Arrowbase reader. So Apache Arrow is an open source project which introduced in-memory columnar format. So we helped introduce a feature where Iceberg has support to do vectorized reading based on Parquet which uses Apache Aero in-memory. So the good thing about this approach is that this enables a vectorized reading for any compute engine or any [00:23:00] data formats. So it’s not necessarily strongly tied to the compute that as the legacy system that we had was very strongly tied to the compute and the Parquet format.
Snapshot Expiry, so, as in when you commit data, every commit in Iceberg translates to a snapshot. So you can imagine that the number of snapshots grows and can bloat. So this can cause a lot of issues on the read side. [00:23:30] So when you’re reading or loading the tables, depending on how you’re using the tables, this can cause a lot of issues. So we were seeing issues in our data maintenance, in our data management layers, and even in some of the readers that the table load times are getting affected.
What we did with that is that we use the expire snapshot action, so Iceberg has a feature where you can expire your snapshots, think of it as housekeeping for snapshots [00:24:00] and there is a table API in Iceberg, which allows you to expire snapshots. It allows you to express a filter or a time-based filter and expire all the snapshot metadata.
So this would keep all the other data as current and just make sure that unused snapshots that are not actively used can be expired. We keep, by default, about 30 days worth of snapshots and also [inaudible 00:24:24] those to lower numbers depending on the datasets.
So manifest rewrite is another [00:24:30] big one. Like I said, the manifests have a strong relationship with the query planning. So Iceberg also has an API for rewriting your manifest. So what the manifest API does is that it collates all of your manifest metadata and then sorts it and then optimizes it and rewrites it in ways that are aligned to your partition schema. You can also define how you want to, depending on your read part, we can optimize for how your [00:25:00] manifest rewrite produces the output. So by default, we just ran the default settings for these manifests in operation. We actually enabled a service, a health check service, which will look at data sets and the health of the dataset, and then trigger a manifest rewrite whenever query that dataset [inaudible 00:25:16].
The other big one was a vacuum data compaction process. We have a process that looks at data, the historical data, and we suffer from the small pile problem [00:25:30] as well. And we often look at our old data and rewrite the data in more optimal ways. We achieved a fairly good sweet spot with about 1 GB of file sizes as a target file size. So the vacuum data operation tries to rewrite the data in ways that tries to achieve a 1 GB output file size for Parquet.
So with that, I wanted to give you an idea of where we are with benchmarks. So when we started integration [00:26:00] with Iceberg back in Jan., We, we did an initial benchmark comparison. We took a dataset that is representative of our production data about one month to create about 5 terabytes, with assigned CDs, partitioned by day and we ran some of the queries that represent our query engine workloads. And basically this chart compares, the blue bars are what Iceberg was back in Jan. 2019. And the greens are where [00:26:30] our legacy data was. As you can see then, it was nowhere close to where our legacy reader was performing. So this was a mix of a lot of issues we faced. So this kind of give us an idea of where we were and what the task we had to come.
After all the improvements and all the work that the community has done towards making Iceberg better, as of August last year, we ran another benchmark. The same queries on the same dataset. Now, as you can see that the yellow bars are Iceberg [00:27:00] and the red bars are our legacy reader. It, in almost all cases, does a lot better than in most of the queries. So most of the improvements came from the vectorized, deeper commits improvement. A lot of them came from the query planning, so as you can see the Parquet reader has to do a file listing. So we’ve suffered a lot, depending on how many files are there in the system.
We still have some work to do in the point-in-time queries. [00:27:30] We still are looking to optimize our query planning and some of the scan performance and data filtering. But we are in much better position than you were back in Jan. 2019.
To give you an idea of where we are migration, about 80% customers have fully been migrated. We’re hoping to finish full migration by end of this quarter. And any new customer that we enable in the data lake are by default, [00:28:00] right into Iceberg.
We’re excited to integrate more features and looking forward to integrate with more Iceberg features. Some of these are Time Travel SQL extensions. So we have use cases where we want to enable reading data as of a certain snapshot ID, which is like a snapshot offset. So this allows us to have data workflows by multiple readers at different points in time, want to make sure their reading it [00:28:30] as of a certain time. The other is incremental reads. So we want to be able to read the between two snapshots to be able to only read data that came in. So Iceberg has, you have to be able to look at data that came in since we can do snapshot offsets.
They also building a statistics service, but because keeps accurate statistics already at the bottom level, we use those statistics to build a statistics service, which we use for smart way of cost-based [00:29:00] optimization.
We’re looking to build cost-based optimization in our SQL and we are also considering things like approximate query planning where we can look at our statistics and make smarter decisions of using stratified samples to power our queries, to prepare insights. We also looking at secondary index support on top of Iceberg. There is a proposal in the open source community, which we are going to talk about where we can actually build a secondary index on top of Iceberg. [00:29:30] We’re looking at Bloom filters for data skipping in particular. And we also testing with the Microsoft Hyperspace, which is an indexing system that strong integration with Spark.
Another big one is the Change Data Capture feature, where we are building with a lot of our products. A lot of our teams are interested in exposing the edits coming into a dataset and to be able to build features on top of it. We [00:30:00] are actively working with the community to build some of these features into Iceberg to be able to support this.
I obviously skimmed over a lot of the details that I just went over. There was a lot to cover. So a lot more of a deep dive is in our blog series that we’ve started. Three of you already blogged about our content in three blog series. So welcome to dive deeper into these.
With that I’ll take some questions.
Great. Thank [00:30:30] you so much Gautam. Before I get into the questions, just two quick announcements. So before you leave today, if you don’t mind going into the Slido tab, which is above your chat, there’s a three question survey that we’d love for you to answer before leaving. And then additionally, Gautam will be available for more questions in the Slack community for about an hour after today’s session. Looks like we have a lot of questions, so not sure if we’ll be able to get to all of them. So first off, Rafael asks, “Did you examine to take the Delta Lake instead of Iceberg? [00:31:00] What was the reasons?”
Right. So that’s a good question. So in Jan. 2019 Delta Lake was barely even existent. At the time there was, we didn’t have that Lake to compare against, but Delta Lake has quickly made a lot of strides. We are also looking as of now for, particularly for the Change Data Capture feature, we are looking at Delta Lake as well, but [00:31:30] we haven’t fully arrived at the decision for some of these use cases. But I would say that the biggest thing with Delta Lake is that it is proprietary. A lot of the features like the order indexing and a lot of the performance features are kind of closed and proprietary, although they have a community version of it. So some of the core metadata API is also sometimes hidden. So it’s a constant conversation we were having but, to answer your question, we went with Iceberg because [00:32:00] that was the only viable thing at the time.
Great. How do you integrate with Databricks Spark 3 catalog? Is it not Hive Metastore anymore?
They have support for it. The Spark catalog API is open. You can integrate with your own catalog as well. We don’t currently. So Databricks is our Spark vendor currently. So we use it for our Spark workloads, but we [00:32:30] don’t integrate with the Spark catalog API. We have our own integrations into Spark using our own processes, but we don’t use the Databricks Spark catalog API.
Chass, “How do you make sure your user-facing catalog is consistent with Iceberg catalog? Assuming you are using Azure to store Iceberg metadata. From your blog, they’re persisted by two separate paths.”
[00:33:00] Right, so like I said, what is maintained in the catalog is only the data set level information. By that I mean only things like where the location of the data is or some data set level tags. But all of the metadata beneath that with every incoming commit the batch information or the transaction IDs, all of these are kept in Iceberg. So catalog is already aware [00:33:30] of this metadata, so we don’t really need to keep anything in sync. All we need to do is usually information about whether the data set exists or not and that’s it. There is some metadata kept above batches, but we’re kind of moving away from it. Moving forward, we are trying to remove all dependence on batches from the catalog.
Okay. I’m just looking [00:34:00] through, Andre is answering some of these. So why did you need Write-Audit-Publish stage commit in Iceberg, given that your ingestion pipeline already have batched rights in the data lake buffer zone?
Right, that’s another good question. So the data buffering was to stem the flow of commits to Iceberg. So there were two different issues. One was just that the data, which Iceberg can allow commits to come in. [00:34:30] So the data buffering allows us to do that, to make sure that, in a performant way, we don’t spend a lot of wasted compute to rewrite our data every single time a commit fails. So in the data commit in the Write-Audit-Publish, what it allowed us to do is that even after we do an Iceberg commit, what used to happen was that multiple consolidated batches [00:35:00] could end up writing the same data. So we needed to make sure that there was a centralized way for us to look at is this batch ID already ingested?
Right? So if we have a unique identifier to map data coming in, we use Icebergs Write-Audit-Publish flow to do things like, has this batch ID already arrived? If it’s already arrived, don’t ingest it. Also the pre-flight validation checks are things [00:35:30] like if you want to keep certain metrics about Iceberg commits, those are the kinds of things that we can do. But they’re kind of two different problems that we found different solutions to, that we have to take a two-pronged approach to that. Hope that answered your question.
Great. And we’re at time, so this is the last question, but a reminder to head over to the Slack community to ask Gautam any more questions. Kenneth asks, “Can a large RDBMS table be converted to Iceberg file for read by Dremio. If so, [00:36:00] is that a replacement for Parquet?”
Oh, I am not looking to Dremio as such, but if you are trying to do addressed data, like if you’re trying to build Iceberg metadata on top of your existing storage engine, I don’t think that exists already. You will need to write or export data into Iceberg. Iceberg likes to make sure that the data is written by Iceberg. [00:36:30] But that doesn’t stop us from building features that allow us to use existing storage engines to built metadata off of. But as of now, if you want to write data into Iceberg, you would need to rewrite your data from your legacy RDBMS.
Great. Thank you so much, Gautam. Thank you for everyone participating. Again, head over to the Slack community and enjoy the rest of your sessions. Thanks everyone!
Yep. Thank you.