Table Formats Apache Iceberg Subsurface LIVE Sessions

Session Abstract

This talk will introduce the use cases for Apache Iceberg tables that we didn’t expect when we created Iceberg, and will explain the details so you can use Iceberg for similar cases.

Video Transcrip

Ryan Blue:    Well, hi everyone. I’m Ryan Blue. I’m the co-creator along with my co-founder Dan weeks of Apache Iceberg. Today, we’re going to go into a couple of case studies and learn as in-depth as we can given the short amount of time, just about a couple of uses for Apache Iceberg that we didn’t see coming ahead of time, and hopefully we’ll [00:00:30] point out some of the advantages of how Iceberg works that allowed us to do these things. So we’re just going to dive straight in and I’m not going to talk a lot about Iceberg itself because number one, I think people already know. And number two, we have very, very little time because I was pretty ambitious choosing fairly complicated or dense case studies for this. So, one quick note before we start, most [00:01:00] of my slides, I actually tested in Spark.So Spark SQL with the Iceberg SQL extensions, those SQL extensions give us the ability to do a lot more things like call stored procedures or extended DDL statements in spark. I know that there are alternative ways to do this directly through the API. So in a couple of places I have marked how you would do this through the Iceberg API, if you’re not using Spark or, [00:01:30] you’re using primarily like Dremio or Trino or any of the other engines. Since Iceberg is enabling this multi-engine space, I think it’s important to say which engine we’re primarily looking at here, but also have examples where you can do it across engines and not just in any single one. Because the advantage here is being able to tie all of these things together, pretty seamlessly.So with that, let’s dive [00:02:00] into the two case studies that we’re going to cover today. Now the first case study is a fast and targeted queries at scale. So in Netflix we had this problem where we had customer service representatives that are needing to debug some issue with a user. And so we’ve got a lot of log information coming in about what’s going on, maybe a problem on the account, and the question is, how do we figure [00:02:30] out very quickly, within say five minutes of some event happening? How do we get those logs really fast when it could be about any user or device watching TV? So it’s a very large dataset and we want both very fast access to the data in terms of the turnaround time, from when it arrives, as well as really quick introspection to pull that up because someone’s essentially waiting on the [00:03:00] phone line.So that’s the first case study that we’re going to take a look at. The next case study that we’re going to take a look at after that, it has to do with the new compliance landscape or basically how we handle GDPR. And this is a little different. It’s not that we’re designing a new table in order to satisfy queries really quickly. What we’re doing is we’re looking at tables that have maybe been converted to Iceberg and have a couple of years worth of data, but where these tables [00:03:30] were laid out and originally designed for query efficiency. For some dimension that is not keeping track of users by Id so that we can tell users, “Here’s all the information we have on you.” Or “I’m going to go delete a certain user because we have compliance requirements.” So GDPR and the new California law as well, bring in new compliance requirements for us, and we need to go back and update tables.So the question is, how do we actually do that? [00:04:00] And Iceberg is super useful here because it actually allows you to change your partition layout on the fly. So we’re going to dive into number one? Fast. So we’re going to take a look at fast targeted queries at basically internet scale. So again, the situation here is that we have a huge source of data. We’re putting it into an Iceberg table very very quickly, like commits on a five minute timeframe. [00:04:30] And committing from Flink, ingesting data is not all that difficult. So we’re not going to cover that. What we’re going to cover is how to actually get that data into a form and design a table so that we can query it really quickly. Now we could use some other system for this like elastic search or an in-memory database. But those clusters are very expensive to run because you need to keep them up all the time.So what we’re going to do, is [00:05:00] we’re going to try and keep all the data at rest in S3 and still have queries that bring that back data within a couple of seconds. So our queries look like this, so let’s start from logs, where device Id equals something and event time is between t1 and t2. Now, we have two main problems with these fast targeted queries. Number one, is we’re probably dealing with a multi-petabyte table. There are millions of [00:05:30] devices operating all the time, and we just have just way too much data to brute force and go through. So we have to first of all prepare the data itself for skipping. And we have two things that allow us to do that. Number one is partitioning and number two is sorting. So we’re going to go into how partitioning and sorting can be used to basically solve the problem of too much data.Problem number two, is where Iceberg actually really [00:06:00] shines here. Because once we’ve partitioned the data, and we’ve sorted the data, we have the ability… we’ve clustered the data so that what we’re looking for is in very small chunks. But then we have to keep track of the metadata and basically position the metadata, so it doesn’t take longer than the query itself, just to figure out where the data we need lives in the table. So, number two… sorry, problem number two, is that there’s just too much metadata, if we have a multi-petabyte [00:06:30] table, with millions of partitions. And so we’re going to go into how to solve that. So in our toolbox, I mentioned partitioning and sorting. And these address the problems of too much data. So the idea behind partitioning is that we just divide records into groups and skip most of those groups. Now in Iceberg, partitions are actually indexed at multiple layers in the metadata.So data files are tracked by manifests, and those [00:07:00] manifests for each file have a tuple of information that encode the partition that that belong to. And then we also index the manifest themselves or the partition ranges within manifests at the manifest list level. That’s going to come in handy later when we’re trying to skip our metadata. So for our use case here, we have a whole bunch of log information coming in by time. So what we’re going to do is first group it by hour size chunks [00:07:30] of time. So that when someone calls our helpline, we can say, “Okay, when did the event happen?” And narrow down that way. That also really aligns with the write pattern. So when we write a whole bunch of information in five or 10 minute chunks coming in from Kafka, that’s going to create some files and we’ll just update certain partitions with that.But if we were to just organize the data by time, we wouldn’t really be [00:08:00] cutting the data down that much. Because we’re talking about all users in the entire system. If we’re slogging through an entire hour, then that’s not going to help much. So what we also want to do is cut that down by a factor of say 500. And to do that, we’re going to hash each device Id, and mob that by 500 to produce a bucket, that gives us a very predictable transformation from any device Id that we’re looking for, to a specific bucket Id [00:08:30] or partition to look into. So to do this, we’re just going to create a table, and here’s the create table statement. Create table logs, device Id, event, time, and other info partitioned by a bucket with 500 buckets on device Id, and then hours of event time. That is going to tell Iceberg, “Don’t create any files that cross the bucket or our boundaries, and so we’re going to divide it into nice little chunks of data that we can look [00:09:00] at.Next we’re going to use sorting. And sorting, we have two abilities to sort here. We can either sort locally within files, and that’s going to cluster events within some file. Or we can globally sort across files within some partition. And so what we want to do in this case is cluster across files so that we don’t have to read every single file in a partition. So the idea here, [00:09:30] is that we’re going to first order by the bucket and then the event time, and then the actual device Id, which I screwed up here, it should be device Id, not customer Id. So if we sort within each bucket and hour, by the device Id, then we’re going to essentially keep all of the logs… sorry all of the log events for a particular device in just one [00:10:00] file. Now Iceberg helps us when we do that, by keeping track of file ranges… oh sorry, the rank column ranges in each data file.So when we cluster across files, Iceberg is going to be able to use the min-max stats for device Id, to pick just the file at job planning time, that we need to read. So that’s going to help us… partitioning helps us ignore all of the buckets that we don’t need, and all of the hours that we [00:10:30] don’t need. On the other side, sorting helps us ignore individual files that we don’t need in order to satisfy a query. So most queries in this case are going to have to read one or maybe two files to get the records for a particular user. Which is a massive improvement.I should also mention here, that in Spark, which is where I tested all of these examples, you actually have to define the bucket function that we use in this sort on the last [00:11:00] page, so order by Iceberg bucket 500, and event time, well Iceberg bucket 500 actually needs to be defined in the latest versions of Spark, and there’s a helper method called registered bucket UDF, that allows you to do that and name your bucket UDF. We are working on just exposing these natively in Spark through a function catalog, but we recently added the function catalog in Sparks. So if you’re trying to do this, just be aware that you’re going to have to define that UDF. Okay, so now that [00:11:30] we’ve prepared our data by partitioning and sorting, what we’ve done is we’ve actually created a situation where we can read data really quickly, once we know which data files to read, but the metadata might be all over the place.And that’s because metadata in Iceberg is clustered by write. So if you write data for a particular hour, then we’re going to keep those data files together in Iceberg metadata, and the next hour we’ll be together in Iceberg [00:12:00] metadata and so on. We cluster by write. And when we manifest the track data files, we merge them sequentially. So we don’t screw up that initial write clustering. That works out most of the time, really, really well. Because you’re typically… as is the case here, you’re writing data by time, and then you’re reading data by time. Or basically the clustering of your data on write, matches the clustering of your data that’s desired on read. [00:12:30] But in this particular case, we also care a lot about how data is clustered by bucket. So we don’t just care about hour, we care about bucket as well.And in this case, because we’re writing by hour, each manifest file is going to contain all of the buckets for that hour. And so if we’re looking for a particular bucket, we’re going to read essentially 500 times the metadata that we have, because [00:13:00] each manifest we’re going to have to slog through all of the files for different buckets. So what if we could change it and change how we wrote out the metadata in the table, so that we’re grouping the data files by bucket, and then using that to eliminate… even reading manifest files that track data files. And that’s what is the secret to Iceberg here making these queries really, really fast. So what we’re going to do [00:13:30] is pivot the metadata, using a very simple stored procedure or using a table operation that is part of the Iceberg API. So for simplicity, all you would have to say here is call system.rewritemanifests in our logs table.You have a lot more flexibility, if you’re calling the actual table operation through the API, because you can specify your cluster key instead of just automatically clustering by partition [00:14:00] values. But both of these will achieve the same thing. So in the end, we will get a situation where the data is clustered by the bucket, and then by the time, instead of by time and then by bucket. And that is going to make it so that when we go to plan a query, to find just which data files we need, we’re going to be able to take advantage of that Iceberg metadata structure, where the manifest list [00:14:30] keeps the partition ranges that we need to read, because we’ve structured it. So individual manifests have just small partition ranges in terms of the bucket space, we will be able to read very few manifests to satisfy a query.And so our whole job planning time is going to be much, much faster, and it won’t actually take a longer to plan the query than to read the data. And that is really something that is I think unique to Iceberg, because [00:15:00] Iceberg maintains a metadata tree, rather than just accumulating changes to some list of files. So the tree and the indexing in Iceberg, are really what make this possible, because we can have fast data… sorry, fast planning operations. And that makes it possible to use something like Presto and data at rest in S3, rather than needing to keep an online system.So, [00:15:30] one more thing I should point out here is that in order to do this, we’re going to turn off automatic manifest merging, and that’s just because we don’t want manifest, merging and rewriting manifests to conflict with one another. Merging manifest is probably going to undo some of the rewrites that we’re doing. So just to keep things simple, you can turn off manifest merging by setting a table property. Okay, now we’re going to… [00:16:00] like I said, we’re going to really blow through this content because there’s not a ton of time. So we’re going to move on to the new compliance landscape and the GDPR case study. And what we found is really super useful about Iceberg for GDPR and the new California laws.So to review, this case is where we have an older table. It has a couple of years worth of [00:16:30] data either because you’ve been using Iceberg for that long. I think Netflix has had tables in production for that long. Or you converted a Hive table to Iceberg and you simply need to do something. Now, what happens here is, the table was written for query efficiency. So let’s pretend that that is some event time. So we have events, we were writing by event time, we were largely grouping and reading by event time, [00:17:00] and everything was happy. But now we have these new requirements that number one, we need to be able to delete user events, if a user requests, which is very, very important. And number two, we also need to be able to select individual users events and return those values to the user once requested.So unfortunately the time-based data, means that we didn’t do anything to actually cluster our data or [00:17:30] make it accessible by user Id. So how do we update the table, so that that is now part of the design for efficiency of these newer queries? And there we just don’t want to go through and do a full table scan every time we want to select a user’s events to show it to them. We also don’t want to do a full table scan to find where all the user’s information is, in order to delete it. So in order to do this, we have two problems. [00:18:00] Number one, is that there’s a table layout mismatch. The solution here is actually pretty simple. [Icebergo.11 00:18:08] which I think was released about six months ago, included the ability to now change the layout using either DDL or an API operation, changed the partitioning of a table.So we’re going to go into a little bit of that and how you want to use it. Number two, we have existing data in this table. [00:18:30] So we have say two years of old data, that we need to move into the new partitioning spec. And that’s because everything is lazy in Iceberg. So we’re not going to change partitioning and actually go rewrite two years worth of data before completing that operation. That would be a little crazy, so we want to make it so that we can incrementally migrate table into the new layout, and then use the DELETE FROM or select for more efficient queries. So jumping into that [00:19:00] first one, what we want to do here, is add a partition field. And we’re going to use the same bucketing that we were talking about in the first example. So if we bucket by user Id, that’s going to take down the amount of data that we have to scan through, by a factor of 100.So that’s really, really nice. So we’re going to bucket by user Id using that hash distribution. And that way, when we go to find any particular user, we will hopefully just go to a series [00:19:30] of buckets for that, and then be able to return the user’s data. Now, we’re not going to pivot the metadata in this case, because we don’t actually care about faster queries, remember our table was already designed for fast queries for most things. So we’re going to assume that we don’t want to pivot the metadata by bucket in this case. Now, when you’re updating the partitioning of a table, I have a couple of caveats and just warnings for you. So, first of all, [00:20:00] a lot of writes in the Hadoop space, so like Hives, Insert Overwrite, or I think Spark, it has the exact same behavior here, that does an implicit overwrite.So it overwrites some entire partition. Whether that is an hour or a day, depends on how the table is configured and what that is. So implicit overwrites are dangerous if you change what the definition of a partition is. So if you change from daily partitioning [00:20:30] to hourly partitioning, all of a sudden your overwrites, may not be overwriting what you think. Especially if you’re going from hourly to daily, because now your overwrites actually overwrite a day. So in this case, we really do want to be careful… sorry, not in this case, but in most cases you want to be very careful. If you’re adding a bucketing field though, we expect most writes to almost certainly hit every single bucket that we’re adding. And so if we’re doing an overwrite in this table, it’s almost certainly going to have more than a hundred user Ids and most of the time hit every single bucket.[00:21:00] So it’s probably not going to affect the write too much, but it is something to be aware of. And I would recommend in the future, always moving to explicit overwrites. So that is overwriting a specific part of the table using a filter, saying delete this day and add this data, or using [00:21:30] the newer plans like MERGE INTO which we’re going to cover in just a second. Okay, so in order to actually do the partition evolution, we have two options, one is the spark extensions, new DDL, which is very, very simple, ALTER TABLE events and PARTITION FIELD bucket 100 by user Id. On the other hand, if you don’t use spark and still want to take advantage of this, you totally can. So the second option again, is to fall back to the Iceberg API, load a table through your catalog [00:22:00] and say, update spec, add field bucket, user Id 100, and then commit that change.All of a sudden, now you have this partition spec, that is used by default for the next set of rates that will do this correctly. Again, make sure your writes are aligned with the partitioning of your table, but overall, this should work out. The next problem that arises is, remember we were doing this, just defining [00:22:30] a new partition spec. So we actually still need to incrementally move all of the data from the old partitioning by hour into the new partitioning by hour and bucket. To do this, I recommend using MERGE INTO. So MERGE INTO is a newer command, in Spark, it’s going to have a Copy on Write Symantec. So any effective data file is going to get deleted. And all of the records from that data file are going to be rewritten back into the table. [00:23:00] Now there’s the nice thing about this is Spark is by default going to write with the newest strategy… sorry, the newest partitioning spec.And so this is a great way to rewrite data from an old partition spec into a new partition spec and have everything work just fine without that implicit overwrite problem. In order to do this, what we’re going to do is we’re going to merge events into itself. So [00:23:30] MERGE INTO target table events e using source table events s. And then we’re going to select, user Id equals user Id or something like that. This is whatever makes the event unique, this might have to be event Id. It depends on how many events are there for your users. And then crucially for at least Spark, we want to add the on clause so that we don’t select the entire table [00:24:00] from either the target or the entire table for the source. So we’re going to add event time between t1 and t2 for both the source records that were merging and the target the target table.So that’s going to select just the partition that we want to move. And then the update operation is just some no updates. So set the user Id to the user Id, we know that they’re already going to match. And so [00:24:30] this is just going to be a no op basically copy from older partitioning to newer partitioning. After that, you can change the t1 and t2, and incrementally move over data in your table. So MERGE INTO is not only super useful for row level updates and expressing exactly what you want to do to your data in a very compact form. You can also use it for this in-place migration. After we do that, we can use [00:25:00] DELETE FROM, for efficient deletes, also in Spark, this is going to do a Copy on Write strategy. So it’ll determine which data files you actually need to read. Now, in this case, after we’ve moved everything over, it’s going to bucket by user Id. And so we’re going to get very efficient queries that only need to rewrite parts of the table. And it’s going to have more efficient planning as well as more efficient runtime and execution. So this is actually a really cool thing where you can [00:25:30] just update your tables, and now you’re good to go for GDPR. And with that, we are going to move on to questions. Sorry. I’m a little into the question, period, but hopefully, hopefully we’re good.Speaker 2:    That’s okay. Nice job, Ryan. All right. So let’s go ahead and open it up for Q&A. Folks, if you have a question, use the button in the upper right side to share your audio and video, and you’ll automatically be placed into a queue. And if for [00:26:00] some reason you have trouble sharing your audio, you can ask the question in the chat. Let’s go take a look over the questions. It looks like we’ve got a few questions here. The first one is from Matthew [Tubble 00:26:09] . What options are there for writing Iceberg tables, other than in Spark? Are there plans for more options, languages, libraries to be created?Ryan Blue:    Yeah, absolutely. So I think I alluded to this earlier, but one of the main things that I think is beneficial about Iceberg is that there are so many engines [00:26:30] that support it. So I talked about the utility of the MERGE INTO operation. Well Dremio supports MERGE INTO for Iceberg tables. I was just looking through their documentation the other day, and I was like, “Oh, well, that’s awesome.” I know that there are plans to add that to Trino and Athena. Spark is obviously a good one, Flink, it’s a different use case so I don’t know that MERGE INTO makes a ton of sense there, but the important thing is that we’re tying all of these engines together, [00:27:00] and that they can work… different groups can work in their preferred engine for their particular use case.So there are definitely other engines that support this sort of thing, and that’s why I try to be as agnostic as possible and stay purely within either the Iceberg API or DDL statements that will probably be implemented in those other engines and strict SQL. Right? So the DELETE FROM and MERGE INTO, those are standard SQL plans, but I think [00:27:30] every warehousing engine can implement.Speaker 2:    Okay, great. And we’ve got one more question here from Sam, when we use metadata rewriting as a table option, is it always eagerly merging on write? Or is it writing metadata that will allow for future Merge on Read behaviors?Ryan Blue:    So rewriting table metadata doesn’t actually trigger a Merge on Read or Copy on Write operation. We, [00:28:00] I mean, you can think of it as Copy on Write but it’s not Copy on Write with the data. It doesn’t change the data at all. So what we did in the first case study is we partitioned and then we sorted the files so that the files were clustering the data as we needed. Then what we did is we took all of the metadata for those files, which was already written out, and we just said, “Okay, if we group the metadata differently into manifests to track the data files, then we’re going to get more efficient query planning [00:28:30] because we’ll be able to eliminate whole manifests that we don’t need to read instead of reading every single manifest.” So that was the key to that operation. It was that we were able to skip a whole lot more manifests, because the manifest list knows the partition ranges in each manifest, and so we can skip a whole bunch of them and only scan the ones that we need to for a particular query or bucket.Speaker 2:    Okay. Real quick, because I don’t know how much more time we have. Is MERGE and DELETE operations available for Spark, Scala [00:29:00] DataFrames, API, or is it just for Spark SQL.Ryan Blue:    Right now? It’s just for Spark SQL. We are working on getting the implementation upstream. So right now the Spark is relying on our SQL extensions. That’s basically how we were building it out, but now we’re working on getting that upstream because we’ve generalized it and have a proposal in the Spark community. So we welcome anyone in the Spark community that wants [00:29:30] to join us and have that… help us get that capability and from there, I imagine that we’ll probably add the same API that Databricks has in… I think they added it strangely enough in Delta, but they added the actual SQL syntax in Spark. So I’m not sure what their plan is there, but I imagine we’ll get it all in upstream Spark eventually.Speaker 2:    [00:30:00] Okay. So folks, that’s all the questions we have time for in this session. If we didn’t get to your question, you’ll have the opportunity to ask Ryan in his sub-service Slack channel, before you leave, we’d appreciate if you’d fill out the super short Slido session survey on the right side before you leave, so with that, thank you very much Ryan. I really enjoyed your talk.Ryan Blue:    Thanks everyone, talk to you in Slack.Speaker 2:    Yeah, head over to Slack folks. [00:30:30] Okay. Folks are heading over there. I think maybe we should just jump over there. I know we’re still being recorded, but they’ll cut this out, and it looks like you got a question there about roadmap, feature developments, and I’ll answer the one about the slides being posted. So we’ll see you over there in Slack.Ryan Blue:    Yep, can I still update the slides? Because I found a couple things to replace this morning. [00:31:00] I’ll just update the final presentation in that Google area.Speaker 2:    Yeah, definitely do that. Of course, the video will still have that, but that’s okay.Ryan Blue:    Yeah. Yeah no worries.Speaker 2:    Okay cool I’ll see you over there.Ryan Blue:    Bye.Speaker 2:    Bye-bye.