Subsurface LIVE Winter 2021
Deep Dive into Iceberg SQL Extensions
Apache Iceberg is an open table format that allows data engineers and data scientists to build reliable and efficient data lakes with features that are normally present only in data warehouses. The project allows companies to substantially simplify their current data lake use cases as well as to unlock fundamentally new ones.
This talk will focus on the Iceberg SQL extensions, a recent development in the Iceberg community to efficiently manage tables through SQL. In particular, this session will cover how to snapshot/migrate an existing Hive or Spark table, perform table maintenance, and optimize metadata and data to fully benefit from Iceberg’s rich feature set. In addition, the presentation will cover common pitfalls of running and managing Iceberg tables with tens of millions of files in production and how they can be addressed using SQL extensions.
Anton Okolnychyi, Apache Iceberg Committer & PMC Member
Anton Okolnychyi is a committer and PMC member of Apache Iceberg as well as an Apache Spark contributor at Apple. He has been dealing with internals of various big data systems for the last five years. At Apple, Anton is working on data lakes and an elastic, on-demand, secure and fully managed Spark as a service. Prior to joining Apple, he optimized and extended a proprietary Spark distribution at SAP. Anton holds a master’s degree in computer science from RWTH Aachen University.
Welcome everyone. Thank you for joining our session this afternoon. My name is Emily, I’m from Dremio. I’ll be moderating this presentation today and I’m glad to welcome Anton, who will present Deep Dive into Iceberg SQL Extension. If you have any questions during the presentation, please type them into the chat at any time. We will address the questions live during the Q&A session at the [00:00:30] end of today’s presentation. And now I’ll turn it over to our speaker, Anton.
Welcome everyone. Let me start sharing my screen. Today, we are going to talk about Iceberg SQL extensions. A recent addition to the project that allows data engineers to manage and maintain their Iceberg tables using plain SQL.
Let me start by introducing [00:01:00] myself. I am an Iceberg PMC member and Spark contributor. Right now, my focus is on making data lakes reliable and efficient. Even when a single table contains tens of millions of data files. Before that I was optimizing a proprietary distribution of Spark. So I know and used to work a lot on the internals of that query engine. In general, I’m into everything that’s related to distributed systems. And I’m also a big fan of open source. I truly believe [00:01:30] that the only way to build an industry standard, is within a healthy open source community driven by common sense. And I’m really excited to see how Iceberg succeeds in that.
This talk will be technical like most of my talks, and I will summarize what Iceberg is, but I will not deep dive into how it works and what features it provides. The route separate talks for this will encourage you to check those out. The primary [00:02:00] focus for me will be on SQL extension since zero point a lot when, and I will finish with some future work around this topic and a small disclaimer, before we start today, I’m going to be sharing my personal use, not necessarily the views of my current employer, or the project that I represent.
So let’s start with what Iceberg is. Some of you already know that Iceberg is a table format for huge analytical datasets whose goal is to become an industry standard [00:02:30] for storing your data in any distributed file system or object storage. Iceberg brings our data lakes to the next level, by improving the reliability and performance of existing data lake use cases, as well as blocking from the non-color new ones. It is a double level project and the Apache Software Foundation since May 2020. It’s fully open, it has a spec and is driven by common sense. And companies like Netflix, Apple, LinkedIn, [00:03:00] Adobe, Tramiel, Cloudera, Amazon, and others. So I definitely encourage you to check out other online resources about what Iceberg is, how it works, but my focus will be on SQL extensions today.
And the community has decided to introduce SQL extensions per two reasons. First we wanted to provide data engineers with high level and easy to use API to manage and maintain their tables. [00:03:30] Some of you may know that Iceberg has the table and the actions API for doing this, but it requires writing Java code, which is not very convenient if you’re working in a notebook, especially if it’s a Python notebook. That’s why offering a SQL API that’s easy to use sounds like a good idea. And second, we wanted to ensure that people who are using Iceberg will have a consistent user experience across query engines. It is very common to access [00:04:00] the same Iceberg table using multiple query engines. And if you have, for example, experience snapshots, then API for doing that should be exactly the same in both query engines. Also, it was important for us to stay within the SQL standard. That’s why we decided to leverage stored procedures for this. It’s a widely adopted concept in relational databases and it’s also present in some query engines like Presto, which is called Trino.
[00:04:30] A command to call a stored procedure is very simple. It starts with the call keyword followed by procedure identifier, and then a list of arguments. The procedure itself has multiple parts. Usually it starts with the catalog name. In this example, it’s Iceberg and it may refer to any Iceberg catalog under the hood. It all depends on your configuration and can be Hive catalog, can be glued catalog, can [00:05:00] be file-based catalog as well. The second part is the namespace. In case of built in Iceberg procedures, it will be always the system namespace. And the third part of the procedure identifier is the procedure name, which actually defines which action we would like to execute on a given table. And then at the end you have list of arguments. Some of them are required. Some of them are optional with default values, and you can [00:05:30] give them using names like in this example or using positions.
In general, one can divide all Iceberg story procedures into four groups. The first group is responsible for different aspects of migrating your Iceberg table. Your existing tables to Iceberg without any ETL jobs and the migration is done fully in place. The second group is for snapshot management. [00:06:00] It’s basic management. It’s basically for things like rollback to the previous timestamp or roll back to previous snapshot. The third group covers metadata management. Things like sparring to all snapshots, re-class training your metadata for better filtering and so on. And finally you have data compaction, which is still in progress because we want as a community to gather those rights. And we need a bit of more time, but it will come into your point 12. Let’s consider [00:06:30] each group separately and we will start those exist in datasets.
While assisting different customers in migrating to Iceberg, I realized that there are two needs that are really common. First, users would like to try Iceberg reads and rights for the existing data sets without any impact on the original table. In most cases, the original table is a production dataset, which you would like to leave untouched. [00:07:00] Of course you could create a part of that table ingested into a fresh Iceberg table and do your tests there. However, any reasonable testing would require moving a large part of that table to Iceberg, which will be expensive and inconvenient. Especially if you’re talking about a petabyte scale table, just moving large part of that data, isn’t going to be easy. Iceberg can do way better than this. And I’ll explain how in a second.
[00:07:30] And the second use case that we wanted to address is full migration to Iceberg. Usually that happens after the testing phase is done and you would like to move to Iceberg completely so that each subsequent read and write goes through Iceberg. That’s why Iceberg provides two different procedures. The first one is called Snapshot. Under the hood, it will create an independent Iceberg table whose metadata [00:08:00] will point to your existing data files, giving you the ability to read existing data through Iceberg. And it also kind of provides you a safe background to try your reads and writes.
There are very few arguments in those procedure. The first one is the source table identifier. This is the table that you are snapshot-ing. In this example, it’s a part of the table that contains some data. The second argument [00:08:30] is the identifier for the table for the Iceberg table you’re creating. So this is the one that you will refer to, to read your Iceberg data. Then you have an optional location. This is the location of the Iceberg table. If not set, it will be defaulted based on your Cadillac configuration. And eventually you have a map of cable properties for your Iceberg cable. This is really essential to get this right. [00:09:00] Do spend some time, read on the table properties that are supported by Iceberg. There are even talks about how to optimize them. And make sure that you queue them because this will greatly impact the user experience you will get.
Now, let’s see what actually happens in the file system when we do a snapshot. In this case, we have a small table with two partitions and a couple of files. And once you do a snapshot, [00:09:30] Iceberg create a new Iceberg table with metadata pointing to existing data files. And at this point you can go ahead and read your Iceberg table, and you will get the same results as you would get from creating original table. But also you can try out some writes as well. And whenever you write new data, it will go into the Iceberg location, which is totally independent from their original location. So whatever we do with Iceberg [00:10:00] table will not be visible in the original table. It’s also true the other way around. Files that will be added to the original table after the snapshot is created, will not be visible.
The next procedure is called migrate. It is similar to snapshot, but instead of creating an independent table, it migrates that table in place. So it has only two arguments. It has the identifier [00:10:30] of the table it migrates. Once the migration is done, this identifier will refer to the Iceberg table. And it also accepts table properties, just like this snapshot command. Let’s compare it to snapshot. So we have the same table. There’s two partitions in a couple of files. Once you do migrate, then the metadata data will be generated in the same location as your original table, telling that this is now an Iceberg [00:11:00] table. And once you do more rights, by default, the data will go into the data sub folder.
This is done intentionally because that way you have an easy way to determine files that were imported to Iceberg and files that are written into Iceberg. But you have full control of which layout you would like to keep because the metadata and data locations are controlled by table properties. So you can keep the old layout if you want to. [00:11:30] The are a couple of things I would like to mention here. Firstly, you really need to know the details about the table you migrate. For example, you have to know which schema evolution rules you followed.
Iceberg uses column IDs for reliable schema evolution. But those column IDs are not present in the data files that were written without Iceberg. That’s why Iceberg assigns a default name mapping, which should cover 99 [00:12:00] of the use cases out of a hundred. But if you’re doing some special schema evolution, you would need to double check how this maps into Iceberg schema evolution rules, and you may need to tweak the default name mapping, but that should work pretty easily for you. Also, you have can be aware of what data types are used in the original table.
For example, Spark used to represent parquet timestamps as int96 in inside files, but that’s [00:12:30] against the parquet specs at all. And Iceberg doesn’t do that. It has pull back mechanisms to read such data, but new data that will be reading was proper formats. So you have to be aware of those differences if they matter to you. I’ll say it is really critical that you stop ingestion while you do the full migration, because the migration under the hood happens by listing the partitions you have to find out which files are inside your storage. [00:13:00] And if you add more files to the partitions that were all on your list, then potentially you will lose those files.
Also, it is really critical that you can fit your Iceberg tables, there are a lot of table properties that you can queue, but at least consider these ones. First is the snapshot ID inheritance. If you enable those, then Iceberg will be able to prepare the metadata in a distributed fashion and do a lightweight command on the driver. [00:13:30] If you don’t do this, then the migration will consume more memory on the driver and it will also take more time.
The next two properties actually allow you to control the metadata and data location, giving you the flexibility to have what are really out you want. And finally, you have table properties for your metrics, which I will talk a bit [inaudible 00:13:56] . [00:14:00] Iceberg maintains and keeps min-max statistics for your columns inside its own metadata. That way we can not only filter partitions, but also find which file’s match was in those partitions. This allows you to query petabytes scale tables really quickly. So we’ve seen examples where a two petabyte table was queried in five seconds overall time, because we were able to locate just a couple of files that potentially had matches [00:14:30] using that data.
If you have a column, if you have a table with a lot of columns, it doesn’t make sense to import all of the metrics for all the columns. For example, if you have a table with 200 screens it doesn’t make sense to import min-max statistics for every single column. Instead you need to find which columns participate, basically, in your queries most of the time. And ideally that will be your sort key in the table. And [00:15:00] you shouldn’t work min-max statistics only for those. And this will reduce the pressure of the metadata and demand data size. Also, once you did the migration, take a look at the metadata tables. There are a lot of metadata tables that allow you to find a lot about the state of your table and is really helpful for debugging purposes. And just to understand the number of files you have in the table and, and how the metadata [00:15:30] is clustered.
The second section will be about managing snapshots. And the first operation I’m going to talk about is rollback to snapshot. If you’re doing some operations and you realize that you made a mistake, you can go back and if you know a specific snapshot ID you’d like to go back, then you can use rollback to snapshot procedure. Similarly, if you know a timestamp [00:16:00] to which you would like to rollback, then you can use rollback to timestamp. And then in this case, Iceberg will figure out which snapshot was valid at this point in time and it will send the current snapshot accordingly.
The third procedure is called cherrypick Snapchat. It’s oriented for more advanced users. In short, Iceberg allows you to create new snapshots without making them available for reads immediately. [00:16:30] This gives you more time to run extra validation jobs, to make sure the data is valid before it’s available for downstream consumption. Once you did the validation and you’re confident that the data is valid and you’re ready to promote a particular snapshot to the current table state, this is the operation you’re going to use. It will load the snapshot by ID and it will figure out what it did and it will apply those changes to the current table state. [00:17:00] The third section will be about metadata management. This one is especially important if you run at any reasonable scale.
Each data operation in Iceberg produces and use snapshot and we have to keep the old snapshots as well for snapshot isolation and time travel. And of course the snapshot list can not grow indefinitely. So a huge list of snapshots will impact the job planning time [00:17:30] and the commit time. So eventually you would like to get rid of the old snapshots. In addition, it prohibits removal of old data and metadata files. For example, if you’re doing a compaction and you compacted your table, then you may end up storing twice as much data as you did before. And at some point you would like to get rid of the old data files that you no longer need.
To do this you can use expire snapshots [00:18:00] procedure. This procedure will not do any list operations. So Iceberg is smart enough to figure out what has to be removed and what has to be deleted without any lists. There are IPU interest in arguments, you should be aware of. The first one is the older than timestamp snapshots older than this timestamp will be expired and files that no longer needed by the current table state will [00:18:30] be physically removed from your storage. You also have the way to pass the minimum number of snapshots to keep. For example, if you stopped writing to your table, it’s still probably a good idea to keep at least a hundred of the most recent snapshots, no matter what.
So I would advise to expire snapshots every day, cause it’s a relatively cheap operation [00:19:00] and it will make sure that you have a clean tables team. I would also advise to keep enough history so you can back if needed. The default thresholds for the retention can be configured in table property and by default it’s five days and 100 of snapshots. And if you realize that its procedure doesn’t perform, as you would expect, and then try to change the number of shuffle partitions in the query engine, because most [00:19:30] of the work that’s done is done in a distributed fashion.
The second point is about orphan data files and orphan metadata files. If your executer dies or if the driver dies, then there may be some files left orphaned. They will not be visible for your readers, but they will be physically present. And at some point you would like to remove them. The same also applies to files that you didn’t manage to [00:20:00] delete while experience snapshots, because the storage could be under crashing and some of the delete requests could fail. In order to remove orphaned files and you have a dedicated procedure. This is probably the most expensive procedure that is available because this is the only one that does a list operation, because we need to figure out whatever files you have in your storage and compare them to any reference data or [00:20:30] metadata file by any snapshot of the table. And then the difference would tell us what files are considered orphan.
It also has older than timestamp argument and files reading after this timestamp will be ignored in this analysis. This is a safety measure because while we are doing this operation, they may be concurrent demands going to the table. [00:21:00] And those files, new files, will not be part of your metadata, but they will be there and they are not orphans. You don’t want to remove them because this will potentially corrupt your tables data. That’s why you have this older than timestamp and make sure it’s least a day or two, and it doesn’t really affect any of the concurrent operations going on.
Also, you have the location argument that tells us where to look for orphans. By default, it [00:21:30] will clean the route table location, scanning those metadata and data, but you feel free to point it to any other location as well. So for example, you can point it to the metadata folder or you can point to a specific partition in your tables as well. And finally you have a dry run flag. If you set it, then the procedure will just scan for orphan files. It will not physically remove them. And you will have the ability [00:22:00] to double check that those files are actually orphan files.
In general, I wouldn’t recommend running those procedure frequently because we don’t expect to have a lot of orphan files. Once in a month is probably a good default value. And also make sure you have a safe orphan file interval if its a day or maybe two. What I’ve seen that also people [00:22:30] do quite a lot is they use dry run true and they capture the output from the procedure with a list of files that are considered orphan data to validate them. If they actually look like valid candidates, they go ahead and remove them.
The third problem is un-optimized metadata. So over time you may accumulate too many small manifests or metadata for one partition may be spread across multiple [00:23:00] manifests. This will slow down the job planning performance, and it will not let Iceberg to use indexing on min-max, on columns using min-max statistics. So you have rewrite manifest procedure, which will re-cluster the metadata in your table, make sure it’s aligned through your partition values and this way this should speed up the job planning for it.
[00:23:30] I usually recommend disabling manifest impaction on commit, which is controlled by that table property you see on the slide, unless you know what you’re doing and your rights are perfectly aligned with your partitions. But in most cases, what you would want to do is to leverage this procedure that I’ve just described to rewrite the metadata on demand once your job planning becomes slow. [00:24:00] And also make sure you have snapshot ID inheritance enabled, this will speed up the overall execution by a lot. I’ve seen cases where it dropped the execution from hours to just a minute. So it is a big deal there.
Also, a few words about the future work as well. We are tackling the problem of small data files and the data that is not properly clustered as well, because if data [00:24:30] is not properly, clustered Iceberg will not be able to leverage min-max statistics in this metadata to speed the job planning. There will be multiple compaction strategies starting from simple bin-pack without any shuffles to restore the file size, doing more sophisticated sort based compactions with ways to analyze how well the data is clustered was in partition according to the sort spec option table. [00:25:00] I would like to finish what is a quick summary.
Iceberg is a really powerful tool. It has a number of unique features and it brings your data lakes to the next level. But in order to truly benefit from that, you have to maintain your tables and SQL extensions is probably the best way to do this because it’s a high level API that is designed for data engineers specifically. And then you will be able to focus on what makes most sense for you. And also [00:25:30] do join the community. It’s a great community driven by a lot of strong people that are committers and PMC members on other Apache projects and just a great place to be in to learn. Thanks everyone.
Great. Thank you so much, Anton. That was fantastic. We’ve got a lot of good questions coming in the chat. Before we jump into those, two announcements. Before you leave today’s session, if you could go into the tab that’s called Slido. It’s right above the chat. There’s a three question [00:26:00] survey. So if you could take a few seconds to do that. And then additionally, if we don’t get to your question, or if you have additional questions, Anton will be in the Slack community afterwards, the subsurface Slack community, for about an hour. So you can find him there, just search his name. All right. First question. Does Spark support the call statement for stored procedures, or is this the only Presto Trino, only for Presto Trino?
[00:26:30] Yeah. So you would have to enable support for this. Actually the only implementation that is support is for Spark right now. Presto has a couple of them, but not all of them that I just described. Sprung doesn’t have stored procedures as such, but you can enable them using Iceberg SQL extensions, which are part of the chart itself. If you enable them, then you will be able to call them. And there will be [00:27:00] documentation explaining what you have to, which properties you would have to set,. But what I just described they will be available for, and they are only available for Spark. For Trino, and they will be a bit later. And we also proposing as for procedure API in Spark [inaudible 00:27:17] .
Great. Gerard asks, “Can Iceberg completely replace a data base? For example, ups or tasks?”
I think [00:27:30] there are multiple phases of Iceberg. First, we wanted to make sure we covered data lake use cases. And if you look at the roadmap of Iceberg and what are the features that are being contributed, the are more from the data warehouse domain more or less. We’ve been working on row level updates using differential files. It’s a well known [00:28:00] idea in the industry and academia as well. And if you compare it to other solutions like Hudi, for example, then I think our implementation is by far the most flexible because we support updates by essential key, which can be arbitrary, not just the predefined one. We also could be able to support position on dates. And we also support copy-on-write as of today.
Great. Stefan asked, “Does Iceberg support multi table transactions? “
[00:28:30] It doesn’t. So you’re probably interested in project Nessie, which has driven by Dremio and in close collaboration was Iceberg. And there will be parts. There will be extensions in Iceberg for doing that, but it’s not possible as of now.
Is the time-travel configurable. For example, how far back can we go?
You can go back to any previous snapshot that’s [00:29:00] currently present in your metadata. So it’s up to you, how you configure it. If you never expire snapshots, and you will always be able to go back to the first point you started with. But it’s like if you run at scale and you could use, let’s say a snapshot every hour, then you won’t be able to keep a history for five years, right? Especially if you do compaction because you want to get rid of the old files. But by default, you will be able to go back [00:29:30] for five days. That’s that’s the default value.
Chris asks, “What is the latest on the record update changes? Is there a release date for these?”
Right now, the core part of it is done. So the Iceberg part of it is done and the Flink integration for change data capture actually uses that spec already. There is experimental support rate. The part that’s still missing [00:30:00] is the query engine part. The one that would execute a distributed joint, find out which records have to be updated. That’s what we will be working in the SQL extensions to start with. And that’s basically the next thing we are going to work. We’ve just delivered the copy and write for delete and merge statements in Spark. And we are going to do merge and read next.
Great. “Any benchmark done with [00:30:30] Iceberg format,” Nita asks.
Yeah, apparently. I mean, my company are using Iceberg really extensively. So we’ve done a lot of benchmarks before this. And one I referred to earlier is if you have a petabyte scale cable, then you can clear it really fast if your layout is great. So you can actually replace some of that elastic search use [00:31:00] cases if your table has a sort key defined. And for example, we did a benchmark where we looked was in three partitions for a specific set of keys. And we got like six seconds overall run time. That was a table was millions of files basically. And if you compare it to the, let’s say, Spark file source, then this would give you, I think, a couple of minutes on the same set of resources.
Great. [00:31:30] We have a few more questions. We won’t be able to get to all of them. So this is the last one. Again, you can head over to the Slack community Anton will be there asking, or answering any of your questions. But in closing, what is the easiest way to get set up with Iceberg for someone that’s never used it?
I think extensions is a really good point. The snapshot one allows you to try out Iceberg on the existing data side. In general, the website has a couple of links as well. There are great [00:32:00] tutorials online as well. I hope to write more in this as well. Yeah, but I will try to use the extensions that are available [inaudible 00:32:10] . It should kick circuit pretty quickly.
And it looks like Ryan just dropped a link in the chat as well to get started quickly. Yeah, he’s on top of it. So. All right. Thank you so much, Anton. We really appreciate this was fantastic. Thank you to everybody for participating, for the wonderful set of questions. We really appreciate it and enjoy the rest of your day. [00:32:30] Thanks everyone.
Thanks, bye. Bye