Subsurface LIVE Summer 2020
Lessons Learned From Running Apache Iceberg at Petabyte Scale
Apache Iceberg is an open table format that allows data engineers and data scientists to build efficient and reliable data lakes with features that are normally present only in data warehouses. Specifically, Iceberg enables ACID compliance on any object store or distributed system, boosts the performance of highly selective queries, provides reliable schema evolution, and offers time travel and rollback capabilities. Iceberg lets companies simplify their current architectures as well as unlock new use cases on top of data lakes.
This talk will describe how to maintain Iceberg tables in their optimal shapes while running at petabyte scale. In particular, the presentation will focus on how to efficiently perform metadata and data compaction on Iceberg tables with millions of files without any impact on concurrent readers and writers.
Anton Okolnychyi, Apache Iceberg PMC Member and Apache Spark Contributor
Anton Okolnychyi is a PMC member of Apache Iceberg and a contributor to Apache Spark at Apple. He has been working on various internals of big data systems for the last 5 five years. Anton's recent focus is on building secure, efficient and reliable data lakes. Before joining Apple, he optimized and extended a proprietary distribution of Apache Spark at SAP. Anton holds a master’s degree in computer science from RWTH Aachen University.
So for those of you who are just joining us, Anton will be available after his session in his Slack session or his Slack channel in the Subsurface Community. And here, I would like to introduce your speaker, Anton. He is an Apache Iceberg PMC member and Apache Spark contributor. And if you have a question for Anton, you can either put it in the channel, or sorry, in this chat window here in the live session, or you can raise your hand by clicking on the right hand button to share your audio and video for live Q&A at the end. And if we can't get to it, we will definitely have Anton chat with you in the Subsurface Community afterwards. With that, take it away, Anton.
Cool, thanks a lot. Thanks everyone for joining. My name is Anton and I'm going to share some tips, how to run Apache Iceberg at scale. I am an Apache Iceberg committer and PMC member. From time to time I also contribute to Apache Spark. My current focus is on building reliable and efficient data lakes at scale. Before that I worked on the proprietaries part distribution, and in general I'm really highly interested in everything that's related to distributed systems. And also I do believe in the power of open source and especially health open source communities.
The today's talk will be pretty technical and we will start with an introduction to Apache Iceberg, and then we will touch up on some of its features and why they're so essential for modern data lakes. Then we will cover [inaudible 00:01:50] in Iceberg, so everyone will have an idea how it works under the hood. And finally, I will outline how you can keep your Iceberg tables optimal, even if you have single table with millions of files and with tens of petabytes of data. So a small disclaimer before we start, I will be sharing my personal views, not necessarily the views of my current employer or the project that I have with them.
So let's start with what Apache Iceberg is. And as you probably heard before, Iceberg is a table format. And a table format defines how one should lay out individual data files and bundle them up to have a concept of a table. And the term table format is relatively new, but we've been dealing with it actually forever. And the defacto standard table format that's built into query engines like Spark and Presto is the Hive table format. It usually means there was a central [inaudible 00:02:43] store that tracks a list of partitions, and whenever you need to find out which files are within those partitions, you perform a list operation in the underlying storage.
So this actually has a number of problems with respect to correctness and performance and Iceberg is designed to solve those problems and bring your data lakes to the next level. And I'm also happy to announce that Iceberg is a top level project at the Apache Software Foundation, since may this year. And one of the reasons for the gradation was a very strong technical community, driven by people from Netflix, Apple, LinkedIn, Adobe, [inaudible 00:03:17], and many other companies as well. And a lot of those people are actually part of other bunch of projects as well.
Iceberg was designed to fit into the existing ecosystem. And it comes with a clear spec, which was a binding document and multiple implementation, specifically in Java and in Python. And it works with any object store or distributed file system. It supports multiple file formats so that you can take whether you column the file format like Parquet, or row-based format like Avro. It also came with support for Presto and Spark. And right now the community is actively working on Flink, Hive and Dremio integrations as well. What is also important to mention is that you can migrate to Icebreaker in place. So you don't have to run ETL jobs. You can simply generate Iceberg metadata for your existing data files. So let's briefly touch upon some of the features that Iceberg provides.
So first of all, it provides full asset compliance on any object store or distributed file system. So there is no requirement for a consistent list or atomic rename operation. You don't need to run solutions like S3Guard or keep part of your metadata in a consistent storage. We have full asset compliance for files from multiple clusters and multiple query engines. So this means you can have a Presto cluster, a Spark notebook, and a compaction job, all interacting with the same table reliably. And by default, all operations are serializable, which has the strongest isolation level in databases. You can lift that off to snapshot isolation in some use cases.
Iceberg relies on optimistic concurrency, meaning if there are two concurrent operations only one of them will succeed. The second will have to retry, but that retry will be on the metadata level and it will be implicit for the user. And if Iceberg detects that the second command is actually not in conflict with the first one, it will commit that as well and it will be seamless for the user. And the conflict detection and resolution is done on the file level which is even better than partition level. So if you have two operations that modify the same partition but they are not in conflict, you can actually commit both of them.
The next feature I want to touch is indexing. And Iceberg brings the well known idea of small materialized aggregates to the next level. And it keeps min/max statistics for your files, basically for your columns on the file level. And this allows Iceberg to skip files without even touching them. So, for example, you no longer have to read a quarter of Parquet file to find out that it does not match your query. And apparently there's boosted performance for highly selective queries. The index is part of the metadata. So it's updated atomically, you don't have to run a separate system. And all of that actually allows you to create database SQL tables in five seconds, if you have selective enough query. And the community is also working on other secondary indexes, if we can add, and the design is still under discussion.
Iceberg also supports implicit partitioning. And today data engineers, they have to produce physical partition values and users must be aware of the underlying table partitioning, and they must define specific predicates on those partition values in order to have meaningful performance. In Iceberg, your partition value can be stored and represented as a logical transformation on top of your column. And whenever you have a predicate on that column you will derive the partition value for you and we'll use that to print partitions as necessary. And this has a number of benefits including the ability to evolve partition layouts as needed in the future.
Iceberg finally resolves the problem of schema evolution by assigning a unique column ID for every column and tracking that in the metadata, and you no longer have limitations of tracking columns by position or by name. So you can safely add, drop, update and reorder columns without side effects. And by side effects I mean, if you drop a column and then after a year you add a new column with the same name, you should not be able to see the previous data that you had.
The community is also working on updates using differential files. And this will allow us to perform online updates on top of data lakes and reduce write amplification. This is still work in progress, and I encourage you to check the design and open tasks. Right now, a lot of companies implement copy and write behavior to implement their MERGE INTO statements. And finally, there are many more features that I'll skip because I simply don't have time. But one of my favorite ones is metadata tables that allow you to analyze lots of aspects of your table without touching the data. So you can see the table history, how it evolve over the time, how many file we actually have. You can pre-compute something like the number of records or the number of files per partition. You can see the distribution of data and you can use it in your internal systems to maintain your tables as well.
So let's move on to the metadata layout and how Iceberg actually works under the hood. And there are two very fundamental things you need to know about Iceberg metadata. So first of all, Iceberg metadata files are immutable. Every time you modify a table, we will produce a new version of the table metadata. And this allows us to achieve snapshot isolation and different time traveling. Second, Iceberg keeps metadata for every single data file and passes that metadata next to your data. So those are two fundamental things that allow Iceberg to build more advanced features on top of it.
And a table state at a particular point in time is called snapshot. And a snapshot tells us what data files were part of that table at a particular point in time. And internally, each snapshot has a list of manifests. And the manifest is as a very important concept in Iceberg. It's a file that contains metadata for a group of data files. So specifically each entry in the manifest file will provide the information for one data file. And that information will include data location, file size, min/max statistics, partition, topple, and so on. And usually depends how you configure your table but one single manifest can cover from two to 5,000 of data files.
So in short, you have manifest basically provide metadata for data files. And then you have a manifest list which has a separate Avro file that provides metadata for manifest. So it stores the manifest location and partition summary for each of those. And centrally Avro allows you actually to navigate through the metadata efficiently during job planning [inaudible 00:10:44]. So you can actually skip metadata for groups of files if you don't need it. Whenever we need to plan a job, we first of all determine which snapshot we need to read. Then we talked about manifest using our partition predicates. Then using those matching manifests, we talked about data files using the main predicate that we have.
And whenever we write to the table, there will be a new snapshot. And it will have its own new manifest list, but that new manifest list can potentially reference all manifests as well as new manifests that are added in this commit. And that way you actually don't have to rewrite all of the metadata to produce a new table version. So in fact we can inherit most of it, and this makes the commit a lightweight operation. And if you take a look at the metadata folder you will see some JSON files there. Those are version files of your table. They contain table schema, partitioning spec, custom table properties, and other information.
Each of them will be prefixed with a version of the table, and that version will constantly be increased. If you open the version file you will see a lot of information. And one of the fields will be called snapshots. It's a snapshot list that you currently have, and each snapshot will be associated with a summary. So in this case we have an event that added three data files and added three records and changed three partitions. And then you also see the total number of records and data files in your table at that point. And all of this information can be exposed to the snapshots metadata table so that they can play around with as a normal DataFrame in Spark, for example.
So each snapshot has its own manifest list. As I mentioned before, it's a separate Avro file. Each [inaudible 00:12:47] contains a metadata for manifest so that you can actually prove the manifests as well using your partition predicates. And these files are prefixed with the snapshot ID in their names. And finally we have manifest which are suffixed appropriately. And in this particular case we also have a common prefix which means that all of those three manifests were all in the same commit.
All right. So we covered what Iceberg is, what features it provides, and how it works. But the final part will be about how to keep your tables optimal so that you can actually benefit from all those features. And the first point you should be aware of is the growing list of snapshots. And as I said before, each data operation produces a new snapshot and you have to keep all snapshots as well for snapshot isolation and time travel. So huge list of snapshots can actually impact the commit and job planning time as well. And apart from that, it does not really let you remove all data and metadata files that you no longer need. So for example, if you come back to the table, there will be new snapshot and you will eventually store a copy of the data. And at some point you want to get rid of that copy, because it's additional cost and additional pressure on your cluster. So of course the snapshot list will grow indefinitely.
And the first solution you have is to use the Table API to expire all snapshots. And during this process Iceberg will physically remove data in metadata files that are no longer part of your table state. And it works really well for regular maintenance. It does not require a distributed query engine. It uses a [inaudible 00:14:41] to parallelize the work. And also because Iceberg expires snapshots without any list operations, it's relatively cheap. But the only case where it doesn't work, if you have a lot of snapshots and a lot of manifest to expire. So I've seen scenarios where a table was compacted in a very short period of time and the number of files was reduced from a couple of millions to let's say a couple of hundred thousand files. And generally, a huge number of snapshots and a lot of manifests to scan, and apparently this solution was slow.
So in that environment, it would be better to use the Actions API instead of the Table API. In particular, we have expired snapshots action. It leverages a distributed query engine to paralyze the scanning of manifest, and this solution can expire thousands of snapshots easily. And it still doesn't do any list operations, so it's really efficient way to expire a lot of snapshots. They only downside, it does consume more resources.
So overall I highly advise to expire snapshots regularly, ideally every day. And also I recommend keeping enough history. So by default keep seven days so that you can go back if needed. In addition, define the minimum number of snapshots you want to keep as well as the time predicate. You can both specify those in the action. It's a good idea to keep a list, let's say a hundred last snapshots, no matter how old they are. So even if you stop [inaudible 00:16:15] to the table, you still have enough history. And finally, we use the Table API for regular maintenance and use the Actions API for other use cases.
The second problem is orphaned files. And if the driver or executer dies, then potentially some files can stay orphaned. And they will not affect your readers, but they will be physically present in your storage. So at some point you want to get rid of them. And also during the expiration of snapshots, some of the delete requests can fail as well, because your storage can be under pressure. So in those cases you end up with files that are not referenced by the metadata, and you can no longer remove them while expiring snapshots. So the solution to this problem is to use the remove orphan files action. It uses a scalable metadata tables to find out every single data and metadata file that is currently referenced and compares those to the list of files that are actually present in your table and it does a anti-join to find out the difference. So it is an expensive operation because it lists your table. So you should be careful with it.
So I would advise not to run this action frequently. Usually once a month is good enough. So, I don't expect to have many orphaned files at all. So you should really expire snapshots regularly, but this action is not really meant for daily use. And in addition, I advise to specify a safe orphan file interval. So you want to ignore files that are relatively new in your analysis, because they might be produced by the concurrent writers that haven't committed yet. So those files are not orphaned, they're just not committed yet. So you don't want to remove them. So it's a good idea to pass an interval of maybe one or two days to be safe. And also it's a good idea to pass an empty delete function to the action. That way you get a preview of what files are considered orphaned without actually removing them. So you can do an offline analysis. And if you confirm that those are orphaned files and you no longer need them, you can go ahead and physically remove them.
So problem number three is unoptimized metadata. So over a period of time, you may accumulate too many small manifests. And also the metadata for one partition can be spread across multiple manifests. And as we saw before, they have efficient job planning. You have to touch as few manifests as you can.
So the first solution will be to compact manifest during commits. And the nice thing about this approach is that it's enabled by default. It's done for you implicitly and automatically every time you commit and the downside that it works only when your writes are aligned with partitions. So, for example, if you write one partition at a time. If you write to hundreds of partitions within every batch, then the solution will not really work for you. And in addition, it increases the commit time and makes your commit retries more expensive. So potentially you may not be able to reuse the compaction work you did in the subsequent retries because the amount of data could change.
So in other ways, to leverage the Actions API and specifically rewrite manifests action, this solution works well in petabyte scale tables. And even if you write to thousands of partitions within a single batch, I've actually seen such cases, the only downside that it has to be triggered by the user or by the system that manages your tables. So, in general, I would advise to disable manifest compaction on commits, unless you know what you're doing and your use case actually works well in that case. This can be done by a certain table property. And also I would advise to rewrite metadata on demand. If you notice that your job planning becomes slow, or if you can make time increases. And I would highly advise you to enable snapshot ID inheritance for better performance of your actions. So it can rewrite metadata for millions of files in less than two minutes, for example.
So the final problem I want to talk about has nothing to do with Iceberg, but Iceberg provides you a great basis to build solutions for this problem. And specifically this is unoptimized data. And over a period of time, we may accumulate a lot of small data files, the same problem we have for metadata. And in addition, the data might not be clustered in the way you wanted after sometime as well. And if that happens then basically the index that Iceberg keeps will be useless because it will not be able to print files as the origin of the data is rendered.
So the first solution is to perform bin packing, and it's cheap, it doesn't really shuffle the data. But at the same time, it doesn't really help you to recover the clustering of the data. The second solution is to sort the records within the files you bin pack, and it's relatively cheap because it shuffles only the data in files you bin pack. But it's not as cheap as simple bin packing because there was a shuffle. And over time, if you do this 10 and 20 times, then the distribution of your data within the partition will not be ideal still. So it will not be globally ordered and the index performance will be great.
So the most expensive option which actually ensures proper clustering is to fully re-sort the partition. And that's a really expensive operation. So I have a couple of tips how to make that scalable in working practice. So first of all, you need to combine different compaction strategies depending on particular point in time. So it's usually a good idea to start with bin packing and swap, that way it's relatively cheap, but we're trying to restore the distribution. And after some time, if you don't expect to write to partition frequently, you probably want to re-sort it completely so that you restore the distribution of the data and you have the best performance for the index as you can. Second, I would advise to compact and process every partition separately because there is no reason you should combine data from multiple partitions into a single compaction job. It will just add more pressure on your cluster.
Also, I would advise to use the [inaudible 00:23:26] to parallelize partitions and to parallelize this process and compact partitions in parallel. If you have a large enough cluster and small enough partitions then you will be able to finish this process quickly. Also, if you have large partitions, maybe one or two terabytes in size, it's probably a bad idea to reshuffle than dealing with single job. You will need a large cluster to perform that operation. So instead I would advise you to have some sort of a threshold for the size you can process in one job and then split the compaction of the partition into multiple jobs. And thanks for the commit product on Iceberg that we can actually do that through the ACID compliant, and we can replace part of the table partition.
Also, I would recommend to have some sort of a threshold for the number of files before you trigger the compaction. It probably doesn't make sense to compact if you just have four files in your partition. So you probably want to wait until you have maybe 50 files, and then you compact those. Because if you write regularly, you just rewrite a lot of data. And also don't do [inaudible 00:24:41] data compaction at the same time. If you are rewriting metadata for files that are being replaced, then apparently this will fail.
So there are a bit more advanced tips as well. So Iceberg actually allows you to keep track of which partitions were modified from a given point in time. And you can, you can find that information in the metadata tables. And that way you no longer have to guess which partitions you need to compact. You can actually find out which partitions you wrote and then can issue specific compaction on those partitions. And to build on top of that idea you can use the same metadata table to analyze the clustering of the data within a partition. So you have min/max statistics for your files. So if you have your sort key, what you can do, you can find out the number of files that overlap, and that should be a pretty good estimate, whether you need to re-sort the partition, or you can still do bin pack.
And finally, you can build a service that will keep track of this for you and will optimize your tables so that your users will not have to do this on their own. And finally, I want to finish with a quick summary. So Iceberg really unlocks new use cases on top of data lakes as well as simplifies existing ones. But in order to truly benefit from it you have to maintain those tables. And the Actions API is really nice way to scalable table maintenance. And I would also encourage you to join the community because there are a lot of interesting developments going on right now and we always value a fresh perspective. So those are the links where you can join the community and contribute to the Apache project and I'll be excited to see you there. Thanks everyone. I will be open for some questions.
Thank you very much Anton. All right, folks, we are open for a live Q&A. If you have a question for Anton, please either share your video and audio in the upper right hand corner, or you can type it into the chat and I can read your questions allowed.
We have Gerard who's staying up late. Ah, okay, here, [Devenda 00:27:06]. What happens if we want to repartition Spark DataFrame on a different column along with a different number of partitions and write again?
Is it something that you do before you write to the Iceberg table or?
Devenda will have to answer that before I can give you...
Yeah. I would need more clarification.
All right. More clarification there Devenda. All right. And from Jacob, this is an obligatory question. How's the Iceberg support in various tools like Spark or Presto or Dremio or others? Also, how does it cope with S3's eventual consistency?
This is a very good question. So Iceberg was designed to solve the S3 problem. And it's actually, if you are familiar with commit protocols needed in 3.1, I believe, there were a bunch of new committers for S3 specifically, the staging one and the magic one. They kind of help to kind of improve the workloads on top of objects towards S3, but they did not really solve the problem. So Iceberg is the first solution that actually solves this problem. There is no requirement for a consistent list or atomic rename operation at all. And that's one of the really strengths of Iceberg. You can see it as a continuation of the work that was done for the commit protocols, because actually Iceberg was designed by the person who was working on those commit protocols. That is the first solution that solves that problem.
And with respect to integration, so we decided that with Spark 3.0, the integration for Spark is metadata. So we actually have full SQL support and schema evolution and a lot of other things. Presto support, it's also there. Right now it's mostly for queries. And we have a lot of people who actually use those in production and they do ETL jobs with Spark and then they have a Presto cluster to use to query the same tables for analytics. Flink is under the progress. I know that Dremio added initial support from Iceberg as well. So the community is really, really growing and we added support to Hive as well.
Great. Thank you. And the question from Jam, is Iceberg based on Avro or Parquet?
It supports multiple file formats. So right now it's ORC, Parquet and Avro. And as I said in the presentation, you can actually migrate your existing Avro and ORC data sets to Iceberg, as long as they comply with the standard table format we have in Spark or Hive or Presto. So you have a try to pick which file format you want to use. And with respect to Avro, we use Avro for vectorization. So we read Parquet files into Avro and then Avro batches as Spark batches for vectorization.
Okay. And one last question here, the rest, we will have to continue the conversation in the Slack channel. How do you compare it to the Delta Lake and Hudi?
That's a great question. So do we still have a couple of minutes?
We're actually at time right now. I don't know if folks will get bumped off. So we may have to go ahead and carry on in the Slack community, unfortunately. So thank you everyone for your questions. We are at time. Please join us and Anton in the Subsurface Community right after this session. Thanks again, everybody. Thank you, Anton.
I will answer this question on the Slack so [inaudible 00:30:50].
All right. Thanks, Anton. Appreciate it.
Cool. Thanks everyone.