March 2, 2023

9:35 am - 10:05 am PST

Subsurface Live 2023: Iceberg File Management

Everything was going great: your data was in your data lake, queries were fast, and the SREs were happy. But then things started to slow down. Queries took longer, even specific queries which used to be fast now take a long time. The culprit? Small and unorganized files. The solution? Apache Iceberg’s RewriteDatafile action. This talk will dive into how RewriteDataFiles can 1) right-size your files, merging small files and splitting large ones, ensuring that no time is wasted in query planning or in opening files; and 2) reorganize the data within your files, supporting hierarchal sort and multidimensional z-ordering algorithms, enabling you to make sure your data is optimally set out for your queries. With these two capabilities, any table can be kept at peak performance regardless of ingestion patterns and table size.

Topics Covered

Enterprises
Iceberg
Open Source

Sign up to watch all Subsurface 2023 sessions

Transcript

Note: This transcript was created using speech recognition software. While it has been reviewed by human transcribers, it may contain errors.

Russell Spitzer:

Well, hello everybody. Wait. It’s going up there. It’s all right. Hello everyone. Welcome to Managing Data Files on Apache Iceberg. I’m Russell Spitzer, I’m an Iceberg PMC. I also work at Apple. And today I wanted to talk a lot about performance in Iceberg and how that relates to small files basically. So, what we’re going to talk about today is basically how we can fix those kinds of issues with our Iceberg tables. We’re going to start out by going into the problem really at a high level and say, why do we even care about small files? Why are small files an issue with Iceberg tables? Then we’re going to talk a little bit about, well, why am I getting lots of small files? Obviously, we don’t want them. Why are they appearing? And then we’ll talk about how we can actually fix those issues at write time.

Mostly, I’m going to talk about Spark and how Spark is implemented for its writers. And then finally, how can we fix the issue of small files and other kinds of file issues after the fact. Once we’ve already had that issue in our table, what maintenance operations can we do to fix it up? And then to wrap up, I’ll just tell you about some of our future plans and the ideas we have going forward with how we’re going to be working with Iceberg tables and managing files in the future. 

Why Care About Small Files

So let’s just start right off by talking about why do we care about small files. Now, the biggest thing to know is basically the less files you have, the better your performance is going to be. Now, there are some caveats to this, but basically we want to have big files. We want them to have hundreds of megabytes of data if possible.

Now, there’s a bunch of reasons for this and we’ll kind of go into some of them at a high level. We’ll start with talking about planning costs. Now, when we’re reading an Iceberg table, when we’re querying an Iceberg table, we basically have to start planning based on how many files there are, the more files we have to plan. The longer that planning takes, the more metadata is required, and the more overhead there is in actually starting our query. Beyond that, it’s usually more expensive in most file systems to actually open a file than it is to continue reading a file. So in most cases, we want to minimize the amount of file openings we’re doing and increase the amount of data streaming that we’re doing. And then of course, in different engines, the number of files is sometimes directly related to the number of tasks. Now, this is engine dependent, but you’ll find that you usually want to have your number of files relatively well matched with the parallelism of the job you’re doing.

Now, quarries aside, the maintenance complexity of dealing with a table with many, many files is usually scales. With the number of files it’s more and more difficult to list those files and more and more difficult to delete them. And we’ve ended up running up against different kinds of rate limits on different sorts of file systems based on the number of files we have. You know, a table with one megabyte files is always going to be much more difficult to maintain than a table with 1001 gigabyte files. And finally, Iceberg metadata itself has some constraints here. Every single file that we have to keep track of in an Iceberg table is going to be a row in our metadata, which means we’re going to end up having more and more metadata than more and more metadata with very small amounts of actual data.

We want to make sure that that ratio is actually tiny amounts of metadata to large amounts of data so that we don’t spend all of our time in our query. Looking at metadata, we spend most of our time in our query looking at data. And of course some file systems have quotas around this, where like if you’re using HDFS, you might be running along and thinking that everything’s going fine. And then your cis admin is going to call you sometime and be like, well, did you know that you’re using several million files and name node performances are decreasing? And I’m going to cut you off. So you cannot end up in these situations where there’s some file systems which just can’t handle having tons of files. So in general, we want to fix this issue. So what is a small file?

What Is A Small File?

What really means that the small file is too small? To talk about this, let’s basically look at how Parquet files are laid out in the first place. A Parquet file is based in my mind, really around a metadata block at the end, as well as several independent row group blocks. Each row group block represents the minimal amount of data that can be read in a single task. Now the reason for this is that within a row group we have compression and encryption, which means that you can never really read part of a row group. You have to decrypt that whole block, read that whole block or uncompress that whole block before you start reading rows out of it. Which means that the write Parquet row group size bites parameter, which is an Iceberg table property, sets the minimal amount of data that an engine task can read.

So you will never be able to read less data than a single row group in a Parquet file. Now that only is the minimal bounds for a row group. The entire file is going to be made up of multiple row groups. So that’s what write target file size bytes is. Now note again that I’ve written here that this is the max size of the file being written. This is the maximum size. Now, as we talked about before, the smallest unit we can plan is a row group. So we probably want our max file size to be a multiple of our row group size. Now that controls the write side of things. We’re going to be writing these basically packages and inside each package are several row groups. Those row groups are basically independent. When we read them, there’s two parameters that Iceberg uses and exposes to engines.

One is the read split target size. This is the max amount of data in the entire read task. A read task being whatever the engine does to actually read data of Parquet files. This again, should be a multiple of the row group size because like I said before, we can never read smaller than a row group size. So setting this to any amount smaller than a row group size is basically going to just be wasting time. Now, there’s one additional parameter which is related to the cost of opening multiple files, because you might imagine that we do have several small files, but our target split size is larger than all those files combined. Well, in a standard read task, we’re going to read all of those files in serial. So we’ll open one, read its row group, then open the next read its row group open, the next read its row group and on and on and on.

Each time we’re doing that, we have to open a new file. So we’d like to calculate that cost as something called read split open file costs. So when Iceberg is planning a query, it basically takes into account how painful it is to open a new file. So all of this together basically shapes how Iceberg both reads and writes files. Now, as I’ve noted, small files are still a big problem. We get tons and tons of questions about this on the open source, Apache Iceberg Slack. I’m constantly running into it with a lot of users. You know, people are doing things as normal and then all of a sudden they have tons and tons of small files. As I mentioned before, we have these parameters that set row group size and set target file size. So, why are we still making small files?

Why Are We Still Making Small Files?

Well, the reason is that those are still, as I mentioned, just the maximum values. We can always write smaller row groups and we can always write smaller files. So why are there small files when we write using Spark? The reason is that currently although this has just changed in a very recent commit. The Iceberg Spark write thing worked basically a little bit like this. We started out with a data set built up of Spark tasks. Each Spark task has a set of rows in it. That task would then be sent to an Iceberg writer. The Iceberg writer would open up this Spark task and it would say, okay, where’s this first row going? This first row belongs to partition one. I’m going to open a new file in partition one and start writing rows to it and it will just keep writing row after row after row until it sees a different partition value.

Say it sees a new row and that row has partition two as its value. Well, it’s going to close the file in partition one and open a new file in partition two and start writing more rows. Now the problem is of course, if we have a separate Spark task over here that goes to a different Spark writer, it’s going to do the exact same thing. So if it sees values for partition one, it’s also going to open a Parquet file in partition one and it will also open a file in partition two. So we end up opening basically a lot of files. Now this is just the beginning of the issue though. The second issue is that our Spark task is basically limited in how big a file it can make with the amount of data inside of it. I can’t really ask the question to the crowd right now because no one’s out here.

But the question would be, if I’m writing a Spark test that has 10 megabytes of shuffle output data, how big of a Parquet file will it write? The only wrong answer here really is 10 megabytes or above, because the problem is that when we write data from Spark into Parquet, we’re switching the representation of that data dramatically. We are going from an in-memory object un serialized format that’s row based into an on-disk columnar compressed format, which means that whatever amount of memory we are using to hold that data in memory, it’s going to be way smaller when we actually write a file. So even if we have a Spark task with 10 megabytes of data in it, we may end up writing a file that is considerably smaller and I put five megabytes in here as an example. But you can imagine there are certain use cases where you might be repeating values over and over and over and over again.

The in memory size will count every single one of those objects while the on disk size will count the compressed size, which will probably represent all of those duplicate values in several bites rather than one bite per element. The only time when the max target file size is going to be invoked when we’re writing is when we have way more information in a Spark task than we’ll hit our Parquet file target size then when the amount of data is larger than our Parquet target file size. So if for example, we had one gigabyte of in-memory data here, we might start writing to one file, but the moment we hit the target size, the default is 512 megabytes. That writer will roll over to a new file and then write all of the additional records to that new file. So basically this is the only time when our max file size is going to come into play is when our task is much larger than the target file size. In all other cases, the largest file we could ever possibly write is the size of the task after it’s been compressed. 

Okay, so I mentioned before that this is really bad. Why is this really bad? Because in our worst case scenario, the amount of files we create in a write, if we have randomly distributed data, is the number of Iceberg partitions we’re writing into times the number of tasks being created by the engine. So a quick example, say we’ve got one gigabyte or so of data and pretend that each of them writes exactly a one megabyte Parquet file. If we have 1000 partitions in our Iceberg table and thousand of these one gigabyte tasks, we will end up writing 1 million files. This is obviously not ideal. We would much rather write, you know, 2,512 megabyte files or something along that line. So what we really want instead is to align those tasks, align the row information in those tasks with the Iceberg partitions that we’re writing to.

Write Distribution Modes

So how do we have this happen? Well, we can fix this at write time using a feature called Write Distribution Modes, which have actually been inside of Iceberg for a while. But as I mentioned before, our current default, since these have existed, has been to not impose this on the user and require a user to explicitly set it. We’re going to be changing that in the future. As I said, the patch has already been merged to switch it to be a Hash default. Now that we have these distribution modes, you actually have several options. So write distribution mode comes in three flavors right now. The first, which is the current default, which is None, basically says Spark. Just write the data as is. You get those tasks, we’ll just start writing files wherever we see them.

This is the cheapest thing you can do from a write perspective, but can end up generating lots of files if your data isn’t already aligned. Then we have two other modes, Hash and Range, which both request that Spark perform a shuffle before doing the write to organize the data so that it is partition aligned. And I’ll talk a little bit more about how each of these modes work in a second. 

Default Mode

So let’s start again by talking about that default mode. As I said before, default mode is whatever is in these tasks is going to be written out to as many partitions as are in that task. So you only want to use the None mode when your data is directly aligned with your Iceberg partitions or you’re writing to a single Iceberg partition. There are lots of cases where what, when you’re writing data, you actually have already aligned it with the Iceberg partitioning.

Like if you came from a source that was similarly partitioned. But if you are not in this use case, which is the vast majority of folks, you want to do something different. So again, what the distribution modes do is say Spark right before you do this write, perform some kind of re-partitioning and then do the write. 

Hash Mode

So the Hash mode which I talked about a little bit before is the cheapest of these two shuffles. It performs a Hash based re partitioning. You can imagine that if I have data for four different partitions in two Spark tasks, the Hash shuffle basically will say take the Hash of all of those values and group them together and then make Spark write tasks that match those Hash values. So if we’re Hashing with a modulo of two here, we take the Hash of all of the records that are modulo two equal to to one, and we put them here and all of the ones that are modulo two and we put them in this task.

And then when we write our Parquet files, we have one task that has all of the values for partition A=1 and has all the values for partition A=4. And we have another task that has all of the values for A=3, which means we will never have two spark tasks that are writing to the same Iceberg partition. So we basically get around our issue there. Now this is a relatively cheap thing to do. There are some issues on the end of this because we are trying to group all of the data by Hash. So this is where something called Adaptive Query Execution in Spark is very important for determining how big these outputs are. And I’ll talk about that in a sec. But before we get to that, let’s quickly talk about what range does.

What Range Does

Range is a similar idea, but it’s actually much more expensive because it actually does a total ordering. Now this is what happens when you set a sort order on your table in Iceberg. If you’re trying to write all of your Parquet files so that they’re ordered by some additional columns or things like that. This is the distribution mode that you use. It’s actually the default if you haven’t set anything and you have set a sort order. Basically, what it does is it takes a sampling of the data first. So this is a separate expensive stage that goes through and says, okay, based on the incoming task data, we think that the outcoming data should have one task that’s values one to two, and the other task should have values that’s three to four. It then performs the exchange, putting the values and the correct Spark tasks. And then similarly, each Spark task will only have values for a single partition. For example, this will only have the values for partitions one and two, and this will only have the values for partitions three and four.

So additionally you’ll see that we’re also getting an ordering here. So if we had other columns, like if we were sorting on other columns beyond our partitioning, those would be in this range as well. And then our Parquet files would similarly be written in an ordered way. So this all sounds pretty great except I’ve kind of skimmed over the fact of how do we make sure that these tasks at this far end aren’t super skewed, aren’t exploding, aren’t all of the values in this one particular writer for a single partition. This is where the Adaptive Query Execution comes into play. 

Adaptive Query Execution

Adaptive query execution in Spark, which is basically in 3.2 and above and is enabled by default for all of these parameters I’m listing here. It looks at the output of given shuffle operations and says, okay, given this output, am I making tasks that are too large or too small?

Now for tasks that are too small coalesce partitions, we’ll try to combine them and basically it will just take adjacent output tasks and combine them into a single partition. Sqljoin and optimize sql’s and rebalance partitions basically does the opposite. If it says the output comes out very large, we’re going to take that output and break it up into several sections. The size of this output, the size that kind of triggers all of these operations is something called Sparks sql Adaptive Advisory Partition Size and Bites. Now you might say, okay, so advisory partition size and bites. I just set that to my right target file size. We have to remember back to what I was mentioning earlier, which is that unfortunately it doesn’t quite work that way because advisory is our in memory size. So we have a few kinds of weaknesses here.

One is that this advisory is the in-memory size and then secondly the advisory partition size is going to apply to all joins that are performed within your Spark session job. So, this can be a big problem. If for example, our read split size is much smaller than our advisory partition size Spark might start doing certain things like double reading data sets because it knows that it only wants a certain amount of data in each partition. So we saw for example, when we had an internal use case where someone set the split size to a larger value than the partition advisory size. We saw that invoked in some joins reading the same dataset basically twice because it would look for any of the tasks that were producing too many input rows and ended up changing that into doing the same task two times in order to divvy up the work.

So you have to be careful that this advisory partition size is balanced with your read input split size. Now this is only the case in certain joins that are occurring. So obviously it would be ideal if we could have this advisory partition size apply only to this data source write, but currently that’s not part of the spark configuration. And then the last thing is that after we’ve performed our shuffle size with our skewing and our coalesce handling, that may still have some additional in-memory operations. So for example, if a local sort is being applied afterwards, which is something that we can’t always control and spark, but that’s again another future direction. If a local sort is applied, all of the data that we have in memory coming out of the shuffle will be locally sorted. So we end up with sometimes a very memory intensive operation. So in general, your advisory partition size is going to directly control with these distribution modes the size of your output files. And you want it to be generally to be around your write target file size, but it’ll probably have to be larger. Unfortunately we don’t know how well the data will compress when we set these parameters. So it’s something that you kind of have to tease out for your specific use case.

Managing Existing Files

These are all the things you can do when you’re writing, but there’s many cases in which when you’re writing, you still are writing small files because you simply just don’t have a lot of data for the partitions you’re writing to. So you may only write a hundred megabytes to 10 different partitions, that’s totally fine, but you may eventually then want to combine those data files. So for that Iceberg comes with a bunch of actions for maintaining existing tables. So we start with an operation called Rewrite Data Files. So this is basically the solution to small files that already exist in your table. It comes in a bunch of different flavors. So these are what we call strategies BinPack, Sort and ZOrder. These are the current ones available. Again, I ranked them from cheapest to most expensive.

BinPack

Now, BinPack doesn’t look at what the data is at all. It just reads files that are smaller than a target and then combines them and it takes files that are larger than a target and splits them. That’s all it does, just reads them in and writes them back out. So it’s pretty cheap, very fast to do no shuffle. But of course you get no ordering benefits here and there’s nothing to worry about with partitions here because all of these rewrites happen within partitions. 

Sort

The next most expensive option is Sort. So sort is about actually applying one of those range based shuffles to your data. So given some other columns that you want to order your data on, it will order all of the rows in the output files that it’s writing by that ordering. So it ends up being really beneficial if you do a lot of queries that are exclusively on a certain column and a range of values within that column. So this can be an optimization you can make for read but is more expensive to maintain on the rewrite side. 

ZOrder

And then last there is ZOrdering, which is the most expensive possible thing and is a multi-column-ordering basically. It basically lets you cluster data on multiple axes, which allows you to have effective query pruning on multiple columns. This is way more expensive than doing the normal sort and is just one of several different, space filling curves that are being worked on in Iceberg. The main thing to know about this is that your data must kind of have an intrinsic cluster ability for this day have benefit and you still need to have a lot of data in each partition for it to have benefit because this only helps in eliminating files from a read scan, not from eliminating row groups or something like that. And most important of all is that there is no effect on a single column. If you ZOrder a single column, you might as well be doing a sort, it’ll be more efficient and it’ll give you the same benefit. 

Expire Snapshots

Now once you’ve done these rewrite operations, you probably want to get rid of the physical files to do that, we have something called Expire Snapshots. Now there’s a few things I want to tell you about Expired Snapshots. The most important thing is that it is non-destructive. It will not delete any live data. The only thing Expired Snapshot can do is basically erase the ability to time travel to the past. So if you might know a little bit about Iceberg, you know that Iceberg allows you to do time travel. Within time travel capabilities, you can only go back to snapshots that have not been expired.

Any snapshots that you expire through this method will basically say, okay, what files were exclusively used by those snapshots. Those files are no longer reachable through time travel cause we’ve deleted the snapshots and then it will physically delete them. This is pretty cheap to do because we use the metadata to determine which of these files are capable of being deleted. And no system access is really required. So generally very cheap. This is something you should be doing regularly to get rid of old snapshots. 

Remove Orphan Files

Now, more expensive than this and the last action I wanted to talk about is Remove Orphan Files. Now I want to emphasize this is the most heavy duty cleanup operation you can do. It’s extremely expensive for two main reasons. The first is that we use the file system list operation to locate files to delete. Now, you may be familiar that a lot of file systems like S3 do not have a very fast list operation. When we’re doing these lists it’s pretty expensive.

And then more than that, once we’ve figured out what files exist in a certain directory the way we check whether a file is orphaned is we see whether or not it exists within the current metadata of the table. So this is really only used when we’re looking for files that are in the table location but have never been referenced or are currently referenceable by the table. So we should only see orphan files when things have failed or have broken. Very expensive to do. You should only be running this pretty infrequently. One thing you can do to check whether or not Remove Orphan Files will help you is see the amount of space used by your current table location and compare that to the size of all of the files in the table metadata. Now you can just look at the metadata files table. The difference between that total file size and the on disk real physical size is the amount of data you can save by running remove orphan files. So just a quick tip on that. 

Future Plans

Now all of these maintenance operations I think are pretty fun, but we have a lot of future plans and how we’re going to improve those and improve how we’re writing files. As I mentioned before, one of the things that we’re pretty excited about is changing a lot of these options to be more intelligent about which files they select for optimization. Basically, having better plans on how Sort and ZOrdering work to only rewrite files that require rewriting as opposed to just looking at files based on size. And then of course, in the write distribution modes, we’re very excited about looking into ways to make it less user configurable and more automatic based on your table write size. So we’ve got some plans to do this, but of course Iceberg is an open source project and the best way to make these things happen and to improve the project is to join us. So as I thank everybody for listening to this talk, I urge you to please check out iceberg.apache.org, iceberg.apache.org community, and join our Slack and talk to folks and help out with the project because we can make something really great for everybody.

header-bg