Subsurface LIVE Winter 2021
Apache Iceberg: What's New
Iceberg is a cloud-native table format that eliminates unpleasant surprises that cost you time. After tackling atomic commits, table evolution and hidden partitioning, the Iceberg community has been building features to save both data engineer and processing time. This talk will cover what's new in Iceberg and why we are excited about it, including row-based updates and new use cases.
Ryan Blue, Apache Iceberg PMC Chair, Software Engineer, Data Platform, Netflix
Ryan Blue works at Netflix on open source processing infrastructure like Apache Iceberg and Spark.
Hello everybody and thank you for joining us for this session. My name is Louise and I will be your moderator. I just wanted to go through a few housekeeping items before we start. First, we will have a live Q&A after the presentation, we do recommend activating your microphone and your [00:00:30] camera for the Q&A portion of the session. You just simply need to use the button in the upper right of your screen to share your audio and video, and then you’ll be automatically put in a queue and you can ask your questions there. With that, I’d like to welcome our first speaker in the technical breakouts, Ryan Blue, who’s the Apache Iceberg PMC Chair, as well as a data platform software engineer at Netflix. Ryan, over to you.
[00:01:00] Hi everyone. I’m Ryan Blue, thanks for the intro. Nice to have everyone here. I see the little number clicking up on my screen, so great to have so many attendees, as well as my live attendee, which is my daughter’s teddy bear. Hopefully he enjoys the presentation as well. So I’m here today to talk about what’s new in Apache Iceberg, hopefully everyone knows what [00:01:30] Iceberg is cause I’m not going to go too much into the details on what it is. I want to really try and focus today on where we’re heading and what’s exciting about the project. All right, so to get things started, why does Iceberg exist and why are we here talking about it? Well, Iceberg exists because Netflix slowly realized that we needed a new table format. It turns out, [00:02:00] I think with the benefit of hindsight, that table formats are actually more important than file formats for overall performance, usability and all sorts of goals for what you want from your data platform.
So Iceberg exists to fill the gap that we saw, with primarily hive tables, but when we decided to build Iceberg, we also realized that there wasn’t really anything out there that met [00:02:30] all of our goals for how a table should behave. So really quickly, let’s talk about what are those goals. The first goal that Netflix is particularly sensitive to, although I think everyone should be, is correctness. Put simply, tables shouldn’t lie to you when you query them. It’s a really simple thing, but we survived for a very, very long time where, if tables were being updated [00:03:00] or your file system were, say, S3 and didn’t provide consistent listing, your tables could easily lie to you. We were, I think, at the start of realizing we needed to fix this because we’re on S3 and we had so many problems just because latencies were higher, we didn’t have as many guarantees and that sort of thing. So correctness was always a really huge driver for us.
[00:03:30] The next thing was that, although we had solved some of the correctness challenges, the way we had to solve those for hive tables bled over into more performance challenges and more usability challenges. So what we wanted to do with Iceberg was build something where we could have all three. Atomic transactions, so tables never lie to you. Your operations are either completely finished and completely there or [00:04:00] no changes to the table were made and something failed. We also wanted performance, so we needed to be able to do operations at a finer grain than simply partitioning. Partitioning is basically all-in granularity where you can have correctness and hive tables, but we needed to be able to rewrite data at the file level in order to do more efficient writes. We wanted appends that could append to multiple partitions at the same time. And [00:04:30] we also had a lot of performance needs on the query side.
There’s no need to filter down to just the partitions that you need. We also want it to be able to particularly, excuse me, filter down to individual files that are needed for a particular query. And lastly, the state-of-the-art, at the time, was pretty terrible in terms of usability. So our changes made it so that we can only overwrite [00:05:00] entire partitions. That was a performance challenge, but that also was just terrible for our data engineers and users. We also wanted to improve certain things where our data engineers just kept getting hung up, so the way that schemas were evolving, you could evolve by name or you could evolve by position. You had different behaviors, depending on what underlying file formats you had. We wanted to build something where all of those [00:05:30] problems were solved and you didn’t have to worry about, is this system identifying columns by name? If so, oh, then I can’t delete and add something that has the same name, because you’ll get the old data back.
Just things that tables should not do. And those concerns were costing our data engineers quite a bit of time. We also wanted to be able to evolve tables in place so that, if you need to go from daily [00:06:00] partitioning to hourly partitioning, you could do that without creating an entirely new table, copying data from one table to another, and then rewriting, most importantly, every single query that use that table. That was just a huge cost and could seriously be a month of someone’s time just to slightly change the layout of a table. So we wanted to have this, in place table evolution. There are a lot of other things to go into, but I don’t [00:06:30] want to talk exactly about where we’ve been, I want to talk about where we are and where we’re headed.
So currently we’ve solved, I think, those three challenges from the last slide. We have high-performance tables that operate at tens of petabytes, if not hundreds of petabytes, we simply haven’t gone up that far. And we’re able to do job planning much more quickly, job execution much more quickly. Because we’re reading far [00:07:00] less data, we have vectorization. We really have achieved, I think, high performance tables at scale meeting the performance and usability that we set out to do. So that’s really exciting, as far as completing phase one of where we were going with Iceberg, I’m really happy about where we’re at right now. So we’re also in, I think, a better position than what we set out to do. So one [00:07:30] thing that that has really been amazing is the adoption and support that’s been coming in from different companies for major open source processing engines.
So we’ve really formed a great community around this project. And we’re seeing the Trino in integration, Spark integration, Flink Ink integration, Hive integration. Someone’s just opened a PR for Beam integration. We’re seeing just a lot of other [00:08:00] uses and other people contributing those integrations back to the project. And that’s really fantastic, for not only us, because we want to be able to perhaps test Apache Beam, but also anyone else that wants to come to this. If you had to choose between doing everything in Spark or being able to use Flink or Hive or really integrate this with your existing data platform, [00:08:30] that’s super important. The other thing that we’re seeing that was somewhat surprising, but along the same lines as our original goals was, we’re seeing that the way to move forward, or at least the way that we are moving forward, is that we’re building services that actually coordinate through our Iceberg tables.
So we’re building services that are actually really good at one thing, like knowing when to delete data, to clean it up [00:09:00] and focus on our compliance concerns, or compact data or do some other maintenance. And all of those disparate services and processing engines are able to coordinate fully through Iceberg tables. Now, I actually do mean coordinate, because we were using Iceberg tables …Sorry, we were doing basically the same thing with Hive tables, many different services and engines were using Hive tables, but the problem was we didn’t [00:09:30] have that correctness guarantee. We didn’t have atomic transactions and sometimes changes from one system, caused another system to get the wrong data. And that issue caused us to just never use these services, not make concurrent changes to our tables, just to be safe.
So now we’re actually at a milestone where we can have more services. Now where I want to go next, or where I think the community is going next, is just building [00:10:00] on those successes that we’ve had. Where I think we’re doing well is heading towards being an open standard for data at rest, for data stored in tables that is changed by online services that coordinate through that table. We see a lot of that coordination happening today and I think that the next steps are really driving towards being an open standard that’s a true successor to Hive tables. Because the most successful thing about Hive tables was that [00:10:30] everything supported it. It was so easy to store data using one system and read and write it using another. Now we have something where you can do the same thing, but you have correctness guarantees and you also have better performance and better usability.
That, I think, is really going to pave the way for these data services. These decoupled services that run in our infrastructure that are really, really good at a single thing. [00:11:00] And that’s where I’m really excited about going, because we can have other projects that get super good at doing one thing and then provide that service as an open source project. Or I would love to see an ecosystem here of data services coming up. We’re also working on getting, sorry, that was my time call. [00:11:30] We’re also working on new features in the format to provide better usability, better performance and those sorts of things.
So I’m going to talk about those a little bit today as well. First though, I think that’s the big picture where we’re headed as a project. I want to talk also about the 0.11 release, which just came out this week. Very, very exciting, we’ve got a lot of great stuff in there, and I want to highlight those things. [00:12:00] Also, while we’re talking about 0.11, I want, hopefully, people to think about how you would build or how you would structure a data platform as data services using these features that I’m talking about, because that’s where I think we’re really headed. So the first thing I want to highlight in the 0.11 release is basically drop-in support port for engines. Here, I have a command for Spark SQL that adds the latest [00:12:30] Iceberg support, creates a catalog that talks to Hive and stores tables there, and is pretty seamless in terms of integration with the rest of Spark.
This enables Spark SQL, it’s very, very easy now to get up and running with Iceberg if you have a Hive catalog, if you want to use a file-based catalog, or even if you want newer catalogs that I’ll talk about in a minute. The runtime jars that we’ve built, [00:13:00] you basically just drop in the class path and you’re up and running with that support. You just need to define a few of your catalogs to say where the tables are tracked, and then you’re up and running. That’s true for Spark, Flink, Hive, and we’re also built right into the latest Trino, which is the new name for Presto SQL. So all of those releases have pretty much out of the box or very easy support.
Next, [00:13:30] we have new Metastore options, so not just Hive or tables that are stored in a file system. AWS has contributed a Glue module for tracking tables that uses DynamoDB to ensure coordination when updating. And there’s a really exciting project that actually has a talk at this conference later, Ryan Murray is going to talk about Nessie, which is a new Metastore model that’s based on Git, [00:14:00] or inspired by Git, and adds branching and tagging and hopefully multi-table transactions. So those are some really exciting things that are coming up in the 0.11 release. Next, getting back more to data services and what we can build on top of Iceberg. Iceberg comes with new table maintenance procedures. So we’ve added SQL extensions to Spark, and there’s a [00:14:30] talk later by Anton on SQL extensions, if you want a bit more of a deep dive, and those SQL extensions provide, first of all, stored procedure support using this call syntax that you can see here.
We’ve built a lot of actions that you can use directly through Java or Scala API to do table maintenance, convert from an existing Spark table to Iceberg and things like that. [00:15:00] But it wasn’t very easy to call them or use them because you had to drop back down to a Java API. With table maintenance procedures, you can call some of those actions through SQL. So here we have metadata optimization, you can rewrite the manifest in parallel in your table to reorganize them, for basically better performance for individual queries. We’ve used something very much like this to [00:15:30] get rid of an elastic search cluster because we wanted to very finely bucket the data, but we needed to take data that’s coming in by day and hour and basically written in time, and then pivot the metadata so that it’s organized by bucket. So we can really easily look up individual IDs.
We used rewrite manifest to do that, and now it’s available through SQL, which is really exciting. The next thing I want to highlight is that operations [00:16:00] like migrating from an existing table, setting the snapshot of your table, rolling back to a different version, all of those things are also now available in SQL. So that’s a really great thing and we’re excited to see what people use these for. The next thing is Spark DDL. So I mentioned SQL extensions that Anton’s going to talk about later, we’ve also added [00:16:30] a couple new DDL statements to be able to control a couple of things. First of all, we want write order in your table as configuration. So we’ve added this altar table, write ordered by statement, that you can use to set a desired sort order for your table. That’s respected by some of our newer plans merging to, eventually, inserts in Spark 2.3. And what we want to do here is move more and more things into [00:17:00] table configuration.
We’ve done this in the past with things like file format, tuning settings, compression, and so on. And what moving those things into table configuration does is, it makes it something that we can build a service to automatically tune. So doing the same thing with write order is a great way to do that. So we had, a quick example, we had a table that was tens of petabytes and we [00:17:30] saved 70%, or we’ve reduced the size of that table by 70%. resulting in, even at cheap S3 rates, several million dollars in savings per year, simply by changing the sort order and changing the compression. And some of these things that we’re able to figure automatically, automatically tune those settings. So being able to have a service that figures out what those settings are, and then apply that to a table, is really huge.
[00:18:00] The next thing is, we’ve also added DDL for in-place table evolution for table layout, in addition to our already supported … Excuse me, in addition to schema evolution that we already support. So now you can add new partition fields, remove partition fields and so on, and really manage, in place, the layout of the data in your table. [00:18:30] Next we have, I just want to highlight really quickly, row-level plans. So one of the things about the way Hive tables were used is that usually everything was overwritten using an implicit override. So you would say, “Insert into this table,” sorry, “Insert overwrite into this table, “ and what was overwritten wasn’t exactly clear. It actually depended on what your partitioning was.
Well, if we can change partitioning, [00:19:00] then that actually changes what a dynamic or implicit overwrite change is in your table. And that’s not great, that would change the meaning of some insert plan. So we’ve added a couple plans that are more targeted at row-level, so they’re easier for people to use, and they’re also better at just being on the logical level rather than implicitly depending on the table’s [00:19:30] physical structure. So the two I want to highlight here are, delete from, where you can efficiently drop into individual rows. This will go out and find all the files that have matching rows and replace just those individual files with rewritten versions. And then we have one that is really exciting, for data engineering in particular, and that’s merge into, where you can take a set of changes, merge it into a target table, and have very fine grain control over how to update [00:20:00] existing rows or insert the new rows just based on those changes.
So those plans are really, really useful for data engineers, as well as the delete from, here, is going to be a game changer, I think, for GDPR use cases and compliance. Last thing I want to highlight, and there are a lot more things in the 0.11 release, like multi-catalog support for Spark 2.4, that I [00:20:30] just can’t get into here, but I want to highlight Flink Streams. Got it, that’s my time check. I want to highlight Flink Streams. We have support for reading in both batch and streaming from Iceberg tables and writing to Iceberg tables, which is pretty huge. We’re going to be able to have jobs that run entirely in Flink consuming data from Iceberg tables and producing [00:21:00] updates to Iceberg tables.
We’ve also added experimental support for CDC streams that will write to Iceberg V2 tables. And that leads me to where the format is going. I haven’t mentioned Iceberg V2 yet, but we’re basically building row-level deletes into Iceberg. V2 is unfinished, we haven’t made the call to say, “This is everything that you must do to be V2 compliant,” [00:21:30] and therefore we don’t have that forward compatibility guarantee. That is really critical. So that once you know that some version supports V2, it will always be able to read tables efficiently without, sorry, correctly, not efficiently. So V2 is yet unfinished, but we’re implementing a lot of things. This is going to allow us to do those merge plans and row-level plans in Spark, using deltas instead of rewriting entire [00:22:00] data files. So a lot more targeted.
And it’s also going to support those streaming CDC events from Flink so that you can land a CDC stream and then read it as a materialized table. So really exciting stuff. There’s also work going on, I want to highlight Paula’s talk later, Paula from IBM, her talk later on secondary indexing. We’re hoping to get some of those features into Iceberg, as well as the Adobe talks. I don’t know if they’re talking about [00:22:30] secondary indexes, but [Godson 00:22:31] and Andre are talking about their experience at Adobe with Iceberg as well. We’re also looking at adding partition sketches and partition level metadata for better joins, maybe not having to do as much work for sorted rights into tables. And we’re also looking into multi-table transactions. So there’s a lot of great stuff coming down the pipe in the format itself.
[00:23:00] And this is my last slide. I just want to, I guess, talk a little bit about … Bring us back to data services and how we can now not just rely on our data engineers to do the right thing all the time. We were putting so much work on data engineers and data analysts, or anyone writing data. We were telling them that they had to manage [00:23:30] the small files problem, they needed to think about sort order, they needed to think about how do I tune parquet settings and all of these responsibilities. What Iceberg is allowing us to do is pull those responsibilities apart and put a lot more of them on data platform using data services. So configuration, I talked earlier about changing the sort order and the compression settings for a table, by simply figuring out what they should be using [00:24:00] an automated process, and then changing those settings on the table.
Compaction and maintenance can be another data service. We’re building one called Auto Optimize. It currently supports just merging, but it could support rewriting data and resorting or anything in the future. So I’m really excited about the potential in this area for data services. And I really want to see what everyone out there decides to build. Indexing, metadata changes, view materialization, [00:24:30] all of these things are now on the table. And I’m really excited to see what the community produces. Hopefully we have an ecosystem of services that are coordinating through Iceberg tables and really doing great things. So that’s it for me, hopefully that gives you an idea of where the project is going, what’s new, and happy to take some more questions. So thank you everyone.
All right, thanks Ryan. [00:25:00] Let’s go ahead and open it up for Q&A. If you have a question you just need to use the button in the upper right corner to share your audio and video and you’ll automatically be put in that queue. If for some reason, when we click on you and you have some trouble sharing your audio, you can go ahead and ask the question in the chat. All right, so let’s go ahead and queue up some questions here. It looks like the first one, and I apologize if I’m mispronouncing anybody’s names, but it looks like it’s coming from Shubu. Let’s see, [00:25:30] there we go. Doesn’t look like he’s in the room anymore, next one’s from Rohit. I will keep clicking until these folks are able to join.
It’s okay, I’m not scary.
All right. Unfortunately, we have some questions from folks that left the room. I’m going to ask you a quick question that came in through the chat. Is there [00:26:00] any guide on how to get started with Iceberg was Spark? Is Hive Metastore a must have?
So Hive Metastore is not a must have and there is a getting started guide. So if you go to iceberg.apache.org, there’s a Spark dropdown at the top, and there’s a getting started page. So go click on that, it has a couple examples. One of the examples is for creating a Hadoop catalog, that’s a file system only catalog, you can use that … [00:26:30] we would recommend it for testing and things like that. You would probably want a more feature-rich catalog eventually, but that’s a good way to get started. I also mentioned Nessie, Glue, Hive, and we’re working on a JDBC catalog as well. So you should be able to swap in Metastores that work with Iceberg really easily.
Okay, great. Thanks, Ryan. [00:27:00] It looks like we do have audio question, Richard [Haim 00:27:04]. I think you might be on mute, but go ahead and ask your question.
Hi Ryan, first off thanks for sharing with us. I wanted to get your thoughts on Iceberg versus Delta Lake. And I mean, I realize you’re coming directly from the Iceberg, but what do you see as the advantages of Iceberg over Delta Lake?
Oh, okay, yeah. So first of all, I think that [00:27:30] the community aspect is really important. So we want Iceberg to be [crosstalk 00:27:37].
Sorry, go ahead.
Chris, you want to mute yourself? So, where was I? It’s very important for this to be an open format where people operate that support across engines. I don’t really see any of those things happening [00:28:00] in the Delta space. I don’t follow that all that closely. I know that it’s primarily focused around Databricks offerings and Spark. It does not have near the kind of support that Iceberg does. Then there are also additional challenges, I think. In the first slide, where we’ve been, our initial goals for Iceberg. Iceberg really provides that in-place table evolution, [00:28:30] so I think that Delta provides correctness in terms of changing [crosstalk 00:28:36], but you might have to rewrite all of your data files. Which, I think that schema evolution should be a metadata operation …