March 1, 2023

10:45 am - 11:15 am PST

Tame the Small Files Problem and Optimize Data Layout for Streaming Ingestion to Iceberg

In modern data architectures, stream processing engines such as Apache Flink are used to ingest continuous streams of data into data lakes such as Apache Iceberg. Streaming ingestion to Iceberg tables can suffer from two problems: the small files problem that can hurt read performance, and poor data clustering that can make file pruning less effective.

In this session, we will discuss how data teams can address those problems by adding a shuffling stage to the Flink Iceberg streaming writer to intelligently group data via bin packaging or range partition, reduce the number of concurrent files that every task writes, and improve data clustering. We will explain the motivations in detail and dive into the design of the shuffling stage. We will also share the evaluation results that demonstrate the effectiveness of smart shuffling.

Topics Covered

Open Source

Sign up to watch all Subsurface 2023 sessions


Note: This transcript was created using speech recognition software. While it has been reviewed by human transcribers, it may contain errors.

Steven Wu:

Hi, thank you everyone for coming to our talk today. I’m Steven. This Gung. This presentation is based on joint work with our colleague Haizhou who is not on stage today.

How do Flink and Iceberg Work Together to Read and Write Data?

Most data is born in a continuous stream. They typically land in the message queue at Kafka first. Then, a stream parsing engine like Flink can read data from Kafka and commit them to Iceberg Data Lake. In the streaming judging pass, data can commit to Iceberg every few minutes. Maybe let’s zoom in a little bit to see how Flink Iceberg sync works internally. First, where the parallel writers process the workers, and then write them into a data file like Parquet. When the Flink checkpoint comes, the writers will flush the data file and upload them to the distribution file system.

Then the writer sends the metadata about the files like location of the file to the committer operator. The committer operator always runs with pattern one. When the Flink checkpoint finished successfully, the commit operator then committed a list of collected data files to Iceberg. This is how Flink Iceberg sync works and it works well for many use cases for the streaming ingestion to the Iceberg. So what is the problem we are trying to solve here today? Many Iceberg tables are partitioned by time, like hourly or daily. Time partition tables can support time range quality effectively using partition based pruning. For some use cases, this time needs to be event time. By event time, we refer to the time that the events are generated. Probably on the device side

With the event time, data can come in late, hours late, days, weeks, months. There are multiple reasons for the delay. Could be a buffer on the device side. It also could be the delay on the backend pipeline. If we pause the traffic volume, distribution costs even hours, let’s say R0 will be the cutting hour and R1 will be the previous hour. Typically, R0 contains most data, and R1 contains the second most data. As we go further back in time, there is less and less data. That’s why it’s called the long tail pattern.

In Iceberg, data files can only contain records from the same partition. That’s how the partition based file printing can be efficient. That means our file can only contest the record from the same event hour. With that, let’s do some calculation to see how many data files we’re going to generate every hour. Let’s assume the table, the partition already, and we also assume the event time range is kept at 10 days. If the event time range ranges longer, then we have an even worse problem, a small file problem. So that means every writer can process records from 20-40 partitions or 20-40 hours because every partition is a data file. So every writer can open 20-40 files. We assume the writer is 500. That means every Flink checkpoint cycle the committer can collect 120,000 files.

How Many Data Files Can Be Produced In An Hour?

So if we assume the Flink checkpoint intervals every five minutes, that means every hour we have 12 Flink checkpoints at 12 commits to the ice table. So we’re talking about 1.4 million data files every hour. That’s a lot of files for one hour of data. And because the long hours have little traffic, this can also lead to small file problems. Here we report the data file size histogram on one of the datasets. You can see 75% of files are less than 77 kilobytes. Those are very, very small files in the big data world. But what are the implications of too many small fires? First, cloud storage is typically optimized for throughput, right? They can stream the bytes out that well, but they’re not open for low latency. The cost to open a file stream that retrieves the first price is typically actually pretty high. So if we have a lot of small files, it can hurt the throughput. Second, a lot of cloud storage. They have an API limit per storage shard. If the Iceberg writer creates too many small files, the request can get throttled.

So if we use a column or identifier format like a Parquet, it typically has a pretty significant footprint. For example, Parquet can buffer one local size of data record in memory. A rogue group could be 128 megabytes. If every right task needs to open hundreds of Parquet files, that can lead to significant memory pressure and possibly auto memory error. Force is that when the Flink checkpoint comes, the writers need to flush the workers and upload them to the distributor system storage. If the writer needs to flush hundreds of files, even though they’re small files, that can still take quite a bit of time. That can increase the Flink checkpoint duration and this is part of the Flink checkpoint, which means during the syncing part of the Flink checkpoint, the pipeline is actually paused, not processing records. So that can also hurt the throughput. Last, if the Iceberg needs to check over 1 million data files for one data, that’s a lot of metadata for Iceberg. It’s definitely stressing the metadata system in Iceberg. So we are encountering all those problems in the event time budgeting table.

Downsides of Using Flink keyBy to Shuffle Data?

So you may wonder why we don’t use the Flink keyBy? Using keyBy in the Flink, you can do a hash shuffle. We can do keyBy on the event hour. You can hash shuffle the data by the event hour. This will definitely reduce the number of data files because all the workers for the same event hour are processed by a single writer task, right? That’s great. But here are the problems. As we mentioned earlier, traffic is not evenly distributed across event hours. They follow a long tail pattern. Second, even if, let’s assume the traffic is distributed across event hour, when we do a keyBy on low cardinality, we mean less than a unique number of keys. Let’s say it is not much higher than the writer’s parallelism. In the GitHub PR discussion with a link below this demonstrates when we do a keyBy on the Iceberg bucketing column, it’ll result in uneven distribution of the traffic. The takeaway is that we need a smarter shuffling than a simple keyBy hash shuffle. Similarly, we can also use shuffle to input data costling for non-teaching columns. For example, we can input the data costling to learn the data files can contain tight value value ranges, which means when we use the MINMAX column lab statistics from Iceberg to do file pruning, when you have the right value range, they can be very effective compared to your when the file data file contains the value wide value range.

Yeah, due to the time concern, we’re not going to dive into the non-partition data. If you come from the big data world, you may be familiar with the maintenance jobs to compact the small files or sort the data files for better data costing.

The Cost of Remediation Over Prevention

If we use the analogy of remediation over prevention, for those that maintain the job there is a regulation approach. Remediation tends to be more expensive in this case, if you want to download a file, read the data files, compact them or sort them and write new data files out, then upload to the distributed storage. Again, the solution we described here is a more preventive approach, which had added the streaming sharpen stage before the Iceberg writer to improve the data clustering. Second, those maintenance jobs won’t solve the problem we talk about in the upstreaming injection parts, the streaming injection parts to the Iceberg. For example, the sorting issue. Alright, when you do the compaction afterward, you won’t help. Compacting can also be a little bit tricky for event time budget tables because as mentioned earlier, their data doesn’t come in late days, weeks, months. So we may need to recompact the partition over and over again because the data can keep coming so it can get a little more complicated managing those compactions. With that, I hope you have a good understanding of the problem we’re trying to solve. Now we’ll go into some of the high level design points.

The key idea is to introduce a smart shuffling stage before the Iceberg writer. There are two steps. First, we’re going to calculate the traffic statistics. For example, the shuttle task can calculate the local statistics. The local statistics could be, let’s say for every even hour how many record you have seen, right? It’s a traffic distribution. Then the traffic shuttle task can send a loose local statistic to the shuttle coordinator. The shuttle coordinator runs on the free draw manager, which is the drywall you can use with Spark. So it’s the shuffle coordinator then can do the global aggregation. The benefit of the global aggregation is that because of all the shuffle tasks, they may have different views of the data. If they have a skilled view, then you don’t want to make to make a different shuffle decision. So now when we aggregate the global statistic learn, the shuffle coordinator can broadcast the globally aggregated statistics to all shuffle tasks. Now all the shuffle tasks, they can make the same shuffle decision based on the same global statistics. The second step is to actually shuffle. How do we actually shop the data based on the traffic statistic we just calculated? Because the last statistic is going to guide our sharp, Sharp Decision.

How To Add a Custom Partitioner to Shuffle Data

So this is a code snippet of the Flink code. You can say you can add a custom partitioner after the shuffle operator to actually shuffle the data. A partitioner interface in Flink is say giving a key and the number partition. Number partition is in such a number of downstream subtasks. So basically select the downstream task to output those. List record two select downstream channels to send output. This basically the interface partitioner,

We can use the range partition to split the sort of the values, let’s say to sort the value will be R. R0, 1, 2 to whatever limit we have. We split them into different ranges and with the goal that each range has roughly, roughly the same amount of weight, right? For example, in this case, R0 has a lot of data. It gets assigned to task T0, T1, T2 and also tail end to T3. So in the end, we are trying to achieve with each task only a slice of the range sort range. And here we balance the weight distribution with continuous ranges. So that’s why each task only processes R. Is R0 or R1, or R1 or R2 or R2 and so on. It works well for both partition and non partition columns. As you mentioned earlier, the long tail hours, they probably have a little data. So if we look at the weight in terms of traffic dispute traffic volume, like Byte rate or record rate, we may assign hundreds of the long tail hours to a single right task.

If a writer task needs to handle hundreds of data files, partitioning can become the bottleneck because it takes time to flush all those data files to the distributed storage that can increase the checkpoint duration and so on that we talk about earlier. If you are familiar with the open file cost in some of the engine Spark, they are trying to compensate for the fact that there’s a relatively high cost to open file stream, I mean to achieve the first byte. So, if they introduce the open file cost so that they can avoid assigning too many small files to a single reader. Similarly, on the writer path, we can introduce the close file cost to compensate for the relatively high cost to fracture, even small fire to the distributed storage. This way we can limit two assigning too many small files to a single writer

By file count skew. I’m referring to the difference between the number of data files that all the subtasks process. If let’s say one subtask versus one file, another subtask versus 100 files, then the skew is between the difference of one to a hundred. We can tune this close file cost to balance between the file count scale and the Byte rate skill. For example, as we increase the open file cost, we are going to see a smaller file count because we are avoiding assigning too many files to our single writer. But we may see much higher Byte rate skill because not every data fire contains the same amount of traffic. So some fires can be way big, some fires can be very tiny. So we may, as we increase close, close file cost, we may see much higher Byte rate skill. We continue this parameter to find the sweet spot where both skew are acceptable. With that, I’m going to hand it to my colleague Gang who shares some of the results on our initial implementation. Thank you.

Using Shuffle Operators to Reduce File Sizes

Gang Ye:

We set up two ingestion apps to either AB test. In test A, it consumes from Kafka and then writes to the Iceberg. The source operator is chained with the write operator. Job parallelism is 60 and the checkpoint interval is 10 minutes. In test B as comparison, we added the shuffle operator between the source operator and the writer operator and the job parallelism and the checkpoint interval are the same as test A for the other test configuration. The sync Iceberg table is partitioned hourly by event time. The benchmark traffic volume is around 250 MB per second and the event time range is eight days, which is around 192 hours. If the time range is bigger, then the small files program will be more severe. We measure the results from five perspectives. First, how many data files are being written in one cycle. Second, the data file size distribution. Third, how long it takes to finish one checkpoint. Fourth, the CPU utilization for the ingestion app and the last one is the traffic skill among all the Iceberg writers. From the flushing files metric here, you can see that smart shuffling reduced the file number by almost 20 times from 10K to 500, which is 2.5 times of the total position number.

Is It Possible to Avoid Small Data Files?

And we should keep in mind that recent hours have a lot of data so it cannot be handled by a single writer task. We need multiple writers to handle the recent partitions and hours, which contains most data.

We observe improvements on the file at all the percentile points. We cannot completely avoid small files since entire hours don’t have much data. Even in a perfect world, the data file size for entire hours will still be small. We did a further analysis on the file size distribution from the histogram. It shows that start shuffling reduced the small files significantly. On the left side, you can see that without shuffling, more than 6,000 files are less than 100 kb, but with shuffling, there are only 83 files. Even for the large file, where size is more than 100 MB on the right path. Smart shuffling also helps to almost double the file number from 60 to 110. During the checkpoint, if every writer needs to write many partitions or hours concurrently, then they also need to upload many files. The more files it handles, the longer time it will take. So we use the checkpoint duration to measure the difference between test A and test B.

Using Shuffling to Reduce Flink Checkpoint Time

From the flush duration metric here, you can see that the purple line shows without shuffling, the checkpoint takes 64 seconds on average, but with shuffling, it only takes eight seconds, which is eight times faster. The shorter checkpoint duration will also bring benefits for the throughput. Since no data is being processed during flush, we’re going to see the impact of the pause later. In test A, the source operator is chained with the write operator. So the job can spend most of the resource on consuming data from Kafka and the writing to Iceberg. The data handover between the source operator and the writer is a simple method call. So there is not much overhead for test A. In test B, the network shuffling between the shuffle operator and the writer operator has an additional cost of data centralization. Network IO for sending and receiving data and decentralization. We want to measure how much overhead that amounts to. We use the CPU utilization for the job to measure the overhead.

Increasing CPU Utilization With Shuffling

Without shuffling, the average CPU utilization is 35%, but with shuffling the CPU utilization is 57%. Here we trade more resources for larger file size and better data classroom. Because data is usually written once and read many times, it is usually a good trade off to spend more resources on the write path, which will also bring benefits for the read performance. And we should keep in mind that for these two tests, they are the simplest streaming job, consuming from Kafka and then writing to Iceberg. If we want to add much more complex logic, then the CPU increase will likely be much smaller than 62%. You may already notice here without shuffling, the CPU utilization has some spikes as we discussed earlier. Without shuffling, it needs to flush and upload more files, which increase the pause period. So here the big trough, which is followed by the big catch up spike is caused by that, but we don’t see such a spike for the job with shuffling.

After introducing shuffling, every writer will process data for the assigned partitions. We want to measure how much traffic skill net added. We use the writer records rate to measure the traffic skill. Without shuffling, the max record rate is close to the minimum one and for the shuffling with the algorithm, the max record rate is 59% higher than the minimum record rate. This is caused by the counter grid algorithm which distributes the partitions evenly based on the weight to the writers. We’re going to have some better acquisition to solve this skewness problem. In the future, first now to collect the data statistics, we use the memory map to count the records by the key, but if the key has high cardinality, then the memory map won’t have enough space to store that. Then in this case, the sketch statistics will be helpful. And second is we’re going to contribute this smart shuffling implementation to the open source and we have already opened the project on that. This is our design doc. If you want to know more details about it, feel free to take a look, leave comments and feedback. That’s all for today. Yeah, thanks. We’re open for questions.


Any questions in the audience? So we went back there.

Are There Any Differences Across Resourcing Memory?

Audience Member:

This is great. I’m super excited to see this get into open source. This is fantastic work. But you mentioned the memory difference between the two tests. Any other differences across resourcing memory or memory disk network?

Steven Wu:

Yeah, that’s a great question. I think obviously because of the shuffle, the network will be higher because without shuffling you’re not doing any traffic in terms of the memory. This will help to reduce the memory consumption because we reduce the number of data files. Each writer task needs to open, definitely avoid actually those, the auto memory issue we talked about earlier when you do the shuffle. Yeah, the CPU will definitely be higher because of the actual digitalization network. So yeah.

Why Is This Better Than Partitioning By The Logarithm?


Great. Thank you. We have a couple questions. Well, folks may be in here. Some from the internet. One of the questions earlier in the presentation said basically how much better is your approach than just partitioning by like the logarithm of the hour of the field.

Steven Wu:

By partitioning by log?


Yeah, I think this is what they’re asking, how you had the spike with the long tail if you partition by the logarithm of the hour to see if that would be a better approach. Sure. If you guys considered that approach.

Steven Wu:

Okay. Yeah. I think that because businesses require the partition to be by hour, they want credit for, hey, I want to credit hour 10 of yesterday by event time. Yeah, the log of that value is not really meaningful.

Does Flink Iceberg Support Merge Queries?


Great. Thanks. We had another one here. Couple of these, I think one of your colleagues might also be in the chat answering some of the questions. One of the questions is does Flink Iceberg support merge queries to handle partial column updates?

Steven Wu:

Today the Flink Iceberg does not support a partial column write to an Ice table because when you write, just say I want to update one column, it’s actually required to read the existing role, merge them right out. So it’s not supported today in the Flink Iceberg.

How Long Can The Event Time Be?

Audience Member:

This question about your testing strategy, you said okay, you’re using eight days worth of events, right? Is there any reasoning for picking eight days, like any relation to the use case, which is sending the data in different ways or you can probably expand on that. Yeah,

Steven Wu:

That’s a great question. I think typically the event time can be much longer. Information can be months, right? We cap the eight days that businesses one of the businesses. Hey, we don’t have to post the data on all the pages. And for this leadership purpose, we just cap it. So that can make the comparison of data a little more simpler. Yeah. But if the time range is wider, the small file problem will be more severe. Actually this can help you in better.

Does Shuffling Benefit From the Law of Averages?

Audience Member:

Okay. Next one is a silly question on the smart shuffling algorithm. Assuming, okay, all these events are coming in different sizes, right? Generally the randomized algorithm should give you the law, follow the law of averages and probably give you the benefit, right? Why do you have to create this multiple statistical oriented approach here, right? With the global coordinate, I see a complex design there, right? So did you try a randomized approach and how was the comparison?

Steven Wu:

Basically those statistics can tell you the traffic distribution for every event hour so that you can know how to split them in a balanced way so you don’t assign too much event traffic to a single writer task. So let’s basically guide the decision so that we can leave the number files while making the traffic relative balance cross writer task. So we have to understand the distribution before we can make a loose, kind informed decision.

Gang Ye:

Yeah, and also another advantage is because now we calculate the data statistics dynamically. So if the distribution changes, then it will also reflect their change of the distribution and then be assigned to the writers for different partitions.


Great. Oh yeah, we got another question here.

How is Smart Shuffling Different From Range Partitioning?

Audience Member:

I might be repeating that question, but how’s your smart shuffling different from normal range partitioning? You use the event tower and then add some id?

Steven Wu:

Well, this is in the streaming injection path to the Iceberg. So we have streaming data coming, Kafka, then we can inject stream data in the Iceberg. This is to try to solve that problem in that segment. So yes, in Spark you can also do sorting. Then probably a similar shuffling approach.