Subsurface LIVE Winter 2021
High Frequency Small Files vs. Slow Moving Datasets
Before implementing Apache Iceberg, we had a small file problem at Adobe. In Adobe Experience Platform's (AEP) data lake, one of our internal solutions was to replicate small files to use at a very high frequency of 50K files per day for a single dataset. A streaming service we called Valve processed those requests in parallel, writing them to our data lake and asynchronously triggering a compaction process. This worked for some time but had two major drawbacks. First, if the compaction process was unable to keep up, queries on the data lake would suffer due to expensive file listings. Second, with our journey with Iceberg underway, we quickly realized that creating thousands of snapshots per day for a single dataset would not scale. We needed an upstream solution to consolidate data prior to writing to Iceberg.
In response, a service called Flux was created to solve the problem of small files being pushed into a slow-moving tabular dataset (aka Iceberg v1). In this presentation, we will review the design of Flux, its place in AEP's data lake, the challenges we had in operationalizing it and the final results.
Andrei Ionescu, Sr. Software Engineer, Adobe
Andrei Ionescu is a Senior Software Engineer with Adobe Systems specializing in big data with Scala, Java, Apache Spark and Apache Kafka.
Shone Sadler, Adobe Systems, Principal Scientist, Experience Data Platform, Adobe
Shone Sadler is a Principal Data Scientist at Adobe Systems working on the Adobe experience platform.
Hello everybody and thank you for joining us for this session. I just want to go through a bit of housekeeping before we start. So first, we will have a live Q and A after the presentation. We recommend activating your microphone and camera for the Q and A portion of the session. You simply click the button on the upper right of the black part of the screen to share your audio and video, and that will automatically put you in the queue. [00:00:30] So with that, I’m delighted to welcome our next speakers. Andrei Ionescu, senior software engineer at Adobe, and Shone Sadler who’s a principal scientist for the Data Experience Platform at Adobe. Shone, please go ahead.
Thank you. Hello, everyone. My name is Shone Sadler. As mentioned, I’m a principal scientist for Adobe systems. I’m working on the Adobe experience platform. Today here with [00:01:00] Andrei Ionescu, a senior engineer working on big data problems.
So the discussion we have today is about the well-known small file problem. So a small file for us is anything that’s less than 256 megabytes and we see millions of those frequently. So this turned out to be a critical blocker for us to adopt Apache Iceberg, a table format designed for slow moving data sets. So let’s get into that.
So in this presentation, we’ll give you an introduction to AEP. We’ll talk about what it does [00:01:30] and who is it for. Next, we’ll talk about the small file problem and the scale we’re operating at today. We’ll talk about Apache Iceberg and describe why we’re adopting it and what problems does it solve. And that brings us to being blocked. Iceberg itself does not deal well with small files and we’ll explain why. Andrei will then proceed with a deep dive into our solution, which is called buffered writes. He’ll touch on the architecture, the data flow, the internals of the fluxors and WAP, that’s Iceberg’s write, audio, and [00:02:00] publish feature that enable us to provide exactly once guarantees. And finally, we’ll open up for Q and A.
So AEP is a data platform providing real-time personalized experiences. Producers, including our customers, partners, and internal solutions send data through either a batch and or streaming interface. The type of data sent to us includes profile data such as CRM system data, dimensional data such as product catalogs, and time series data such as click stream data for your website. [00:02:30] When data is sent in via batch through a bulk ingest API or connectors, it’s validated, transformed, partitioned, compressed. Before being written out to the data lake. Downstream consumers will be notified when data is ripe, and they can then query that data from the lake. Then it can also be sent over streaming, using our pipeline service. That data will be validated, transformed, before being made available downstream, and also being forked off into the data lake.
So now let’s take a look at the consumers. First, we have [00:03:00] the unified profile, and unified identity services. These two combined enable the merging and stitching of user profiles over an identity graph generated from various data sources to provide a 360 degree view of those profiles for our customers. Next is the experience query service, which is a high performance distributed SQL engine that enables customers to query their data and integrate with external BI tools. Next is the data science workbench, enables data scientists to build intelligent [00:03:30] capabilities into their user experiences using Adobe Sensei. And finally, we have customer journey analytics, which provides out of the box slicing and dicing of behavioral data.
The AP, we have hundreds of customers and thousands of datasets to manage and we’re growing quickly. We are processing about 10 terabytes per day today and the more interesting data points are these two. You have an [00:04:00] average file size of less than 10 megabytes, we have a max throughput per dataset per day at about a 100,000 files per dataset. That’s around 70 to 140 files per minute for a single dataset, which isn’t very compatible with many of the big data systems today. Small files like these will wreak havoc on your spark cluster, infrastructure and HDFS backend.
Interestingly, we’ve solved this problem already. So let me show you how. This diagram shows a high-level view of how ingest [00:04:30] works in APs data lake today. Step one, a client sends us JSON, CSV, parquet B or bulk ingest API. Step two, said data’s written into a staging location on the data lake. And step three and step four an ingest worker, which is a long running sparked job or application listening to that stage near receives a signal again, reading the data from stage applies our transformations validation rules before compressing, partitioning and [inaudible 00:05:00] that [00:05:00] data back out to the main storage [inaudible 00:05:02] as parquet files. The same ingest worker updates metadata in our internal handle-on, notifying downstream consumers that data hasn’t arrived. So this is a common pattern for us. So now what happens if we get a spike of small files? How would this change?
So first you would see it, a number of ingest workers spawn up based on the incoming traffic. Each node has a pool of end number of threads that can execute the transformations [00:05:30] discussed for a given file or batch of data. No, but thousands of files results in thousands of connects to the backend storage. So that mere act of discovering those files would break most consumers. As a result, we need one more component here. A compaction process that will be to watch the main storage and optimize small files as they [inaudible 00:05:51] into a preferably one gig layout.
So that brings us to Iceberg. So we had things working, but we still needed this thing [00:06:00] called Iceberg. What is it? For those not familiar with it, Iceberg is a table format. We define a table as a collection of homogenous rows of data that fit into a similar schema. That data, of any respectable size will be spread across N number of partitions and data files. A table format gives us metadata about all those partitions and files that make up a given table. So what makes iceberg so powerful for us is three thing. First is data reliability. Through its use [00:06:30] of multi version concurrency control, Iceberg enables concurrent readers and writers to operate with snapshot isolation.
Iceberg further adds asset support allowing us for the first time to safely restate data in the data lake for use cases such as GDPR. In terms of reads, Iceberg’s design allows for query planning to be done on a single process at a constant time [inaudible 00:06:52], while interacting with the file system rather than the linear cost, [inaudible 00:06:56], that we’re typically familiar with. Iceberg also [00:07:00] tracks statistics, sorry, and on the partitions files and columns in your data, enabling several data skipping optimizations. And finally we have scalability. Iceberg is a lightweight library that allows us to store our metadata co-located next to the data. There’s no need for an additional metadata service that can act as a single point of failure.
Which brings us to being blocked. So the blocker for us is high-frequency commits. [00:07:30] When we first POC Iceberg, we ran a benchmark to get the net throughput on top of ADLS, that’s Azure’s HPFS instance. So that’s the number of commits without any data processing that we could successfully complete within a minute for a single data set. And we found the sweet spot to be about 15 commits per minute. However, if you recall, based on our current architecture, we required 10X that, around 150 commits per minute. [inaudible 00:07:59] we found [00:08:00] that with optimal concurrency control and Iceberg power writers easily led to non-deterministic reprocessing of data, causing spikes in our costs. So to illustrate this, we’ll show you an example of two processes interacting with an Iceberg table.
On the top we have the reader, which is simply a process that’s issuing a query at time T1, T4 and T6. Below that we have the writer on the other hand, which performs a read at time three, but processes [00:08:30] data and then rewrites it back at T5. That can make those things successfully because there’s no conflicts, after the commit the version number of the tables incubated and a new snapshot idea is generated. But what happens if we had another writer to the mix, writer two.
In this case writer two reads at T2 and begins processing, overlapping with the processing being done by writer one and when one or two attempts to commit, results in a commit failed exception. [00:09:00] Since the data process is now out of date, what can writer two do? Well, it can retry the transaction by reading from the table again at T8, and we process it incubating again at T9. Now, imagine if we added say a hundred or a thousand more of these writers right into the same dataset as our current architecture does. We would have hundreds, if not thousands of commit failed exceptions. So with that, I’ll pass it over to Andrei to talk about the solution.
Now [00:09:30] that we’ve seen how Iceberg is using Adobe experience platform and the challenge that we encountered when ingesting [inaudible 00:09:56] data, let’s dive into the solution that [00:10:00] we’ve put together to overcome this blocker. It is called buffered writes solution, and these are support services that offers a buffering point where data can stay for a while. And based on the metadata, it will determine when and how to package and move the data from the buffer point to the main storage in the data, making it available to customers.
There are some benefits besides fixing the blocker itself. The first benefit is optimizing writes and we do that by [00:10:30] packaging more data writing less files doing less commits. The next benefit is that data is optimized for reading too, because having less files, both of data and metadata type means less five scanning, less files to open and less files to load. The third benefit is auto-scaling, both horizontally and vertically because we do use separate jobs to process and move the data. These benefit comes from the fact [00:11:00] that buffered writes solution has the component that orchestrates it.
Now let’s look at the architecture diagram and see how buffered writes solution fits into the landscape. It is the same diagram that Shone explained before, but with some noticeable changes. We have the same producers and consumers, and as we’ve already seen, customer data is consumed through the data consumption service, which gives access to the main storage and [00:11:30] the catalog. Catalog is our higher level metadata storage that keeps information about the customer’s data.
Two new things can be seen at this point. The first one is the staging zone is replaced by the buffer zone. And the second is that we use the pipeline service of Kafka back messaging system available in Adobe experience platform, through which we send different control messages between our services. [00:12:00] Data or the files are ingested to bulk ingest as you’ve seen before, but instead of writing the data into this staging zone, data is written into the buffer zone. When [inaudible 00:12:15] writing is complete in the buffer zone o notification is sent that new data has meant that it needs ready for processing. Now, the first component of buffered writes comes into image.
[00:12:30] Flux. Flux is consuming those messages that contain metadata and metrics about the files that have landed. Groups them, aggregate over those metrics and based on those composed of conditions, it decides to trigger the consolidate action at the right moment inside the compute service. Compute services is our distributed processing engine based on Apache Spark.
When a consolidation action is [00:13:00] triggered, a new consolidation worker job is started, reading [inaudible 00:13:04] data from the buffer zone, processing it and working it as Iceberg in the main storage inside the data lake. When this step is complete, we notify the next component and that component is called data record and listens for these notifications and updates the catalog. Now the data it’s ready and available to customers to query it, to consume it, to [00:13:30] use it. These architecture diagram with flux and consolidation worker has the two components that helps us to achieve a higher throughput that we wanted. So next we look at how the data flow has changed.
Let’s see a comparison between how the data was ingested before and how data is flowing now with Flux and consolidation worker. [00:14:00] On the left the ingestion without buffering, on the right is the ingestion with buffering. You can see the timeline and the different files coming in. The colors show the partitions that data belongs to. As you can see each file that lands is almost immediately loaded then written in the main storage one by one.
It doesn’t matter if the files are part of the same partition and then at the same time, [00:14:30] they get written one by one as separate files and separate commits. With buffering, files are coming in but instead of directly writing them in the main storage, we do place the data in the buffer zone and the metadata about the files to Flux, where they get buffered. So read files arrive and they wait in the red buffer, nothing being written yet. Green filings [00:15:00] is buffered too, the same with the other files. Last orange lands in its orange buffer too. Nothing as you can see, nothing gets written in the main story yet. Data files are kept in the buffer zone and metadata is kept in Flux, because not enough data is gathered, not enough time has passed. Nothing is written until now. After 15 minutes passes for each buffer, consolidation [00:15:30] job will be triggered and we’ll start writing data in the main storage.
For the case of the orange buffer, we have a different waiting time rule set at 25 minutes. And due to that, we wait a little bit more for data to accumulate and after 25 minutes, we will write the data. Comparing the two, we can see that instead of having 15 files and 15 commits, [00:16:00] with buffering we have four files and four commits with the same amount of data.
There is another difference between the two flow [inaudible 00:16:10] and that is the latency. On the left, the latency is somewhere around five minutes, while with buffering whites, the latest it can be set through rules and conditions as the ingestion pattern requires it. We start from 15 minutes up to three days, depending on the data and the injection pattern.
[00:16:30] Now let’s talk a bit about Flux. Flux is a spark stateful streaming application that reads metadata messages from the Kafka and triggers short jobs in our compute service. Flux groups the messages and aggregates to receive metadata in matrix and triggers consolidation jobs to write the data. The metrics that we use to aggregate are the following. Number of files, byte size, [00:17:00] time of arrival, partitions. These aggregated values compared against triggering rule [inaudible 00:17:07] out of conditions, determines when to start consolidation job. These conditions are mainly of two types, time-based conditions. For example, if accumulated data is older than 15 minutes, we should write the data. And another type it’s min/max condition. For example, if accumulated file size is over [00:17:30] the maximum of one gigabyte. The rules and conditions can be combined and customized as needed.
Now the other part of buffered writes solution that it’s equally important is the consolidation worker that writes the real data in the main storage. It is a short lived process in order of minutes up to a few hours. Reading parquet data from buffer zone and writing it as Iceberg [00:18:00] format in the main storage. As we’ve already been mentioning, it works real data, not metadata and have access to the data lake.
One important thing is that in the consolidation worker we make use of the Write Audit Published Flow. That’s a feature we’ve been contributing to Iceberg, and I will explain how this helps us in the next section. Write Audit Published Flow, called [00:18:30] WAP, it’s a functionality in Iceberg that helps us eliminate data duplication and reduce the cost of [inaudible 00:18:38] by minimizing the data processing through checkpointing. Due to mid-process errors, the consolidation worker job can fail and when this happened, after a while the full consolidation jobs get retried to not lose data.
When we retry the job, without using this work flow, the same [00:19:00] data it’s reprocessed and gets written once more in the Iceberg table, thus duplicating data. Now let’s see how, Write Audit Publish Flow helps us here. WAP Flow needs an external unique ID to mark the commit. If the ID is already present in the table, whenever a new commit with the same ID tries to get written, it will fail stopping that commit. And with these we’ve stopped the data duplication when jobs [00:19:30] are retried. Regarding minimizing the data processing, let me introduce another core functionality of the workflow. WAP has a staging place inside Iceberg table, where data is staged as a separate commit with the specified ID attached to it. We usually call it a stage commit.
These stage commit is then cherry-picked by that ID and committed into master of the Iceberg table. Now, [00:20:00] let’s follow the diagram on the right. After the worker starts, we check if the provided unique ID is present in the master of the Iceberg table, and if so, we update the catalog as this seems to be just a retry job that have been failing before completing the last step of updating catalog. So now, if the ID is not presenting the master, we check it to see if it’s in the stage and if found, we cherry-pick it from there [00:20:30] without having to reload and reprocess the data. If neither here is present, that means the data is totally new and we load the [inaudible 00:20:40] files. We process them. We save the data as a stage commit, then we cherry-pick it and finally update catalog.
As you can see, we have three points where we check point and by doing it we can resume the processing in any case of failure, [00:21:00] minimizing the cost of reprocessing the data multiple times.
So with all these added to Adobe experience platform, we’ve managed to get beyond our target. We are now being able to ingest more than 3000 files per minute, comparing it with our previous limitation of 15 files per minute this is a tremendous improvement. Now we have horizontal and [00:21:30] vertical scaling available, and this is because of Flux. When more data is coming, it means that more and bigger consolidation jobs are triggered by Flux, but limitation is now our compute setup and the resources available there for us.
We have optimized the data for reading too, using less resources and having faster response times when reading the data. [00:22:00] In the end, here are some resources that go into more detail about our journey and what we are doing with Iceberg in Adobe experience platform. There is a series of article on Adobe Tech Blog on medium that I would like to suggest for reading. Tomorrow there will be another session about Iceberg at Adobe that gets into more detail about the journey of adopting Iceberg in Adobe experience platform. Session will be presented by [inaudible 00:22:29]. [00:22:30] Regarding us, you can find more about us on LinkedIn. We usually post articles related to big data, Apache Spark, distributed processing frameworks, open source, and many other related topics. This conclude the presentation and we are open for Q and A. Thank you.
[00:23:00] Hi there guys. Sorry about the delay. I had a little bit of a technical difficulty on my end. All right, so as they said, we’re going to go ahead and open it up for Q and A. As a reminder, if you want to ask a question, the easiest way to do it is just to click on the button in the top right hand side of the box screen and just share your video and audio. It looks like Shone’s been busy while Andrei’s talking, asking you a good amount of questions. Shone, are [00:23:30] there any questions that you want to elaborate on a bit more or want to share with you other attendees?
Well, I guess maybe everyone else can see those questions. We had one about Delta Lake about whether it solves a problem that we’re dealing with right here. From we’ve seen Delta versus Iceberg, it doesn’t appear to solve it. We’ve done some benchmarking against the two and so when we take a look at throwing a lot of a high throughput small files at Delta, it seems to [00:24:00] behave pretty much the same. A different type of exception we get. The only thing I was going to add to that.
Okay. There’s another question that came in for you guys. Doesn’t buffering interfere with time travel features in Iceberg.
No, so the buffer that we have, actually that all happens before getting to Iceberg. That’s actually the whole lot [00:24:30] of reason for it because sitting in small files, just wouldn’t scale well for us, for a single table within Iceberg. As we do buffer before doing the commit, but then time travel works as it would otherwise after that.
Yeah, so just to add to that. First, the data lands as parquet, with buffer like parquet and the metadata while those file are separating in Flux, but when we write it, we write it as Iceberg as one, as one commit. So the time travel is available there, [00:25:00] as a default feature.
Okay, great. Do we have any more questions? Go ahead and either click on the video button or you can ask in the chat, we’ll just skip a minute here. Great thing about having two speakers of course, is you get your Q and A answered on the fly, so it’s fantastic. Lets give it another minute.
Let me add something to [inaudible 00:25:25]. We have [00:25:30] two kinds of buffering, as you can see, one it’s buffering the data itself, the other one is buffering the metadata about that data. So yeah, it’s a storage layer where we kept the real data, but the metadata is kept in the Flux that they call streaming application and that keeps it memory when running queries over it for aggregation. And to be reliable, it saves [00:26:00] it into checkpointing on our storage tool. So yeah, I hope that answers your questions.
All right, great. Thanks for the additional detail. All right, so I think that’s all the questions we have for now. I think people are getting a bit tired at the end of the day. I do want to remind you that we will have an opportunity to ask Andrei and Shone questions in their Slack channel. Just go ahead and search in the subsurface Slack [00:26:30] channel for both of their names and you’ll find them there. Please do go ahead and at tag them if you don’t get a chance to ask the question during the time allotted when they’re actually available, just so they can see it in another time.
So before you leave, we would appreciate it. If you could please fill out the super short Slido session survey at the top right of your screen as that little tab that says Slido, S-L-I-D-O. And just want to say a big thank you to our presenters [00:27:00] Shone and Andrei. Thank you very much, I think this is an excellent presentation. I’m sure we’ll get a lot of interest after that conference as well. Please do remember that the expo hall is open. There’s a lot of great demos and giveaways still available there, and I want to thank everyone for attending and enjoy the rest of the conference.
Thanks [inaudible 00:27:27].