Table Formats Apache Iceberg Subsurface LIVE Sessions

Video Session

Netflix created Apache Iceberg to address the performance and the inherent issues as well as usability challenges of using Apache Hive tables in large and demanding data lake environments.In this session, join Ted Gooch, Database Architect at Netflix, as he explains:· How Iceberg was developed by Netflix to solve some of the inherent issues in the Hive table formatrn· How increased expressive partitioning capability allows for highly selective filters over large datarn· How data marts were required to use data directly in a data lake and how migrating to Iceberg allows workloads with low latencyrn· How the flexible format of Iceberg allows for evolution of both schema and partitioningrn· How the separation of logical table from physical layout allows for background optimization of storage

Video Transcript

Ted Gooch: Thanks. Welcome, everyone. This is my first virtual conference, so I’ll try to do my best here. I’ll try to speak at a nice pace because I’m a little bit over-caffeinated but I think it will all be okay. I’m going to talk to you all a little bit about why and how Netflix created [00:00:30] and migrated to a new table format, which is Iceberg.Just to give you a quick overview of what we’re going to talk about. I am going to give you a brief history of Netflix’s data platform. I’m going to talk a little bit about how we got involved using Hive tables and our experience there. Our experience [00:01:00] migrating off of Hive tables and some of the use cases that drove that, and then talk just a little bit about good things that are in the pipeline and what’s coming next for us.So in the beginning there was EDW. So I think a lot of times when you talk about solutions that you’ve come up with it helps to give some context about where you started out. So I’m going to start at the very, very beginning. At least from my [00:01:30] perspective on the Netflix data platform. I’m going to talk about our migration from an on-prem platform [inaudible 00:01:39] architecture to a cloud architecture and talk a little bit about our choice using S3 versus HDFS for our Hadoop clusters and our storage.So when I started back at Netflix very long time ago, it seems like now, back in 2011, [00:02:00] we had a fully on-prem stack Oracle rack servers fitting an Ab Initio ETL into a Teradata data warehouse that was served BI from MicroStrategy. This slide is from a presentation done in 2013. As you can all imagine our architecture looks quite a bit different now, and I’m going to talk a little bit about how that happened and what we learned from this.[00:02:30] So at that time streaming was really starting to pick up and the data volumes that we were starting to get were just not scaling on the architecture we had on-prem and additionally, our organization as a whole is pushing really aggressively to get 100% cloud in the U.S. So on the data platform side we decided that it was a great time to start and re-architect [00:03:00] our data warehouse from scratch and move to a cloud-oriented architecture. And we started that migration at around 2011, 2012. At that time, we started looking at the infrastructure we wanted to run on top of. And we found that the S3 gave us a couple of capabilities that we thought were really important versus running on HDFS [00:03:30] cluster itself.First of all, it allowed us to run multiple clusters on top of the same storage. So you can effectively have a good separation of compute and storage. The cost profile of having S3 versus the on instance HDFS was significant. And despite the fact that we saw a lot of benefits, there were some trade-offs. There are a lot of operations that are easier in HDFS that might not be quite [00:04:00] so straightforward in S3. For example, at the time you could not get a consistent stream out of S3, so it had to be [inaudible 00:04:08] consistent and that introduced some issues. And because of that, there was some kind of unintended consequences when using some of the cloud query engines that I think we realized at the time, but they also kind of gave us some bumps that maybe weren’t fully expected.So [00:04:30] following the cloud migration, our data platform looks something like this. Now, this is probably a lot more familiar to what you all might be expecting our architecture to look like, although this is still quite a bit outdated because this is a slide from 2015 presentation. You can see we have data flowing into S3 from Kafka and Cassandra, and then being served out to Hive, Hadoop, Presto, Spark. We still [inaudible 00:05:00] at that time, as well [00:05:00] as load shift, pushing some BI, and then some other analytic data marks on the side as well. So we felt that cloud migration was a big success. We were able to keep up with our scaling data. So everything was solved then, right?Well, not quite exactly. Let me tell you a little bit about Hive tables and our love/hate relationship with them. First of all, I [00:05:30] think there’s a lot of good things to be said about Hive tables and I got this from a presentation I saw that Ryan had done, Ryan Blue, quite a while back that… before you start getting into the problem of Hive tables, it’s useful to recognize that they’re good for a lot of things. It is a very easily understood format. Because it was so easily really understood it was able to be broadly supported across [00:06:00] a large number of engines, and that it was quite flexible and supported basically every type of data format and thing out there because of that. So that was good. And so it was very easy to get up and running. We had schema enforced on read and that made things simple on the write side.However, there were some bad things in that [00:06:30] the Hive table architecture, you end up doing a lot of directory listing. So in the metastore, you have a list of all the partitions, which ended up just being directory locations. Each partition that you need to read, that needs to be listed to get the list of files that are there. And in the case where your number of partitions is very high, it would be a huge, huge, huge number of listings to do, which could end up being slow on S3. [00:07:00] And then additionally, if you are having a really high number of partitions, then you’re starting to get into trouble with the Hive metastore when you’re reading and writing and pulling back all those partitions for a given query. Because of that, we created a federated metastore that was across multiple… I’m sorry, I should have said distributed metastore that was spanning multiple instances but despite that, we still [00:07:30] struggled often with the stability of the metastore and the ability to handle large, large tables.And probably even worse than those performance-type issues is there are some very surprising behaviors. Because it’s not a heterogeneous format or a format that’s well-defined. Different engines supported things in different ways. So for example, if you were to run alter and [00:08:00] rename a column or add a column or drop a column, depending on what query engine is accessing that data, you might get different behaviors. So for example, in Presto or Trilio now, we were seeing that the column resolution was position-based. So if you renamed a column things still pretty much worked, but in Spark, things were name-based so you were now no longer able to project the columns with the renamed column. [00:08:30] And in CSVs or in other kinds of formats it might not even be able to find, it might be different behaviors depending on how the file is laid out.And then because of the way we were handling consistent listing by always writing to a new batch ID, which was the batch ID was added on at the end of a partition so that each partition atomically updated, in Presto… or sorry, all appends [00:09:00] in Spark were actually overwrites because they had to override an entire petition atomically. However, in Presto, that wasn’t the case so it just got very confusing and we would get Slacks very often from data engineers that even despite it being documented and explained, people just didn’t quite understand it.So this person’s like, “Hi, I deleted some columns from my table, and apparently, it breaks Presto as there’s a mismatch between the Hive schema and Presto schema. Is there a way [00:09:30] I can revert it back?” Or someone responding to a thread saying, “Well, if you rename a column and someone uses that column, they may get results, or they may not. In this case, it looks like you want to deprecate that column, which may work, but it has the same problem.” And then on and on where possibly even say, “Well, you can support it but then you have to use the slower code paths or accept it’s random. Spark [inaudible 00:09:58] to Hive, convert metastore parquet to false [inaudible 00:10:01] [00:10:00] if that works.” So we didn’t like giving this kind of guidance to data engineers. They didn’t like having the kind of confusing setup. They just kind of wanted to be able to create tables, rename columns, query them and have everything work like it was back when we had a more traditional relational database. They don’t really care about the underlying mechanics of the actual data.So that’s how we [00:10:30] kind of came to the conclusion that what we really needed was not just improvements to the Hive table format but an actual entirely new table format. And there’s a couple of goals that we came up with that we wanted to enforce in that new table format. So first it should be end-user friendly. Our data engineers should kind of have a good intuition about how things work without having to know the exact details of [00:11:00] the underlying implementation. So for example, schema evolution should just kind of work. You should be able to add columns, you should be able to drop columns, you should be able to rename columns and it shouldn’t break any entry.Partitioning should be something that a data engineer thinks about once but then the people querying the table don’t necessarily have to know the exact details of how something’s partitioned. For example, if you’re partitioning on a timestamp column, you should [00:11:30] be able to query against that column and have those filters automatically pushed down effectively. And our goal was that given that we are on S3 we should have a native support for the cloud object stores and work in ways that are advantageous in our environment. We should just try not to do listing or file renames. They’re slow. They’re prone to error. So [00:12:00] come up with a spec that doesn’t require that. And then possibly we wanted to be able to inject in our own abstractions for how we deal with the file system. And then the next goal was for it to be reliable. We wanted to be able to have serializable isolation. We wanted to be able to support many concurrent writers, and we wanted to be able to have some advanced filtering techniques. So I’m going to talk about each of these a little bit on its own.So [00:12:30] schema evolution, as I mentioned, the alter table should work and the way that we accomplish that is by instead of relying on naming or by position, we decided to use an ID that’s consistent across evolutions. So if you rename a column, the name changes but that underlying ID remains the same. So it can still be tracked across files. So you could have files that are written with the old schema and still be read with the new schema, and you can interpret it correctly [00:13:00] and project in the way that the user expects. Hidden partitioning, as I mentioned, users should not have to have the in-depth knowledge of the actual physical layout of the table to take advantage of the partitioning and filtering that that creates.And furthermore, we always had problems with high granulated columns where if you wanted to have a partition on something… or there a column filtered on a high granulated column, like an account ID conflict at all my heart, I call them, I call them like our account [00:13:30] ID. Well, you know, you’re talking hundreds of millions of account IDs. So that’s just not really feasible in Hive schema. So we wanted to be able to support something that could transform that into lower granularity but still able to use it to filter and you can kind of ID case [inaudible 00:13:46] bucketing where… Having that assigned a cross-engine compatible hash function. We can assign groups of IDs evenly across a set number of bins.[00:14:00] And additionally, in the case of partitioning on timestamps, instead of having a high granularity there because those are a continuous set of values, the timestamp can be trunc’d to the appropriate level of granularity. So you can have it as a day timestamp or an hour or however long it makes sense for query balance. And that should be transparent from the user actually querying [00:14:30] the table. That should be translated by the engines that are accessing the data using the Iceberg spec.So going on to the support for cloud stores. So, as we said, this is expensive. Don’t do it. So come up with a spec that just doesn’t require you to be listing constantly. Have something maintained within the spec that tells you files that are needed. And then the other goal of having the abstraction from the [00:15:00] underlying file system. Provide a pluggable I/O layer that can be configurable to your needs and can have multiple implementations depending on what your setup is.And then lastly, reliable. You want to be able to provide serializable isolation so that the state of a table could be tracked based on a linear history of snapshots. So that it was clear the state of the table from one [00:15:30] commit to the next, so that there was a clear point where the data changed. Unlike in the setup we have with Hive where the underlying data could be changed underneath and the Hive metastore knew nothing about that. So you’re having a way of knowing when data has actually changed without actually checking the files that are involved yourself.We wanted to be able to support many concurrent writers. So inspection to be able [00:16:00] to account for occupancy concurrency where writers all are writing concurrently and assume there are no other writers available that can check at the time if they need to retry. And then retry should be intelligent so that new data that’s not conflicting… if it doesn’t have any conflicts, it should just still be able going to be commit. And then we wanted to have the ability to do some advanced filtering by storing some of the metadata upfront. So for example, [00:16:30] be able to store min and max values for a partition range per file, so that that can improve that playing time rather than an actual scan time. Because one of the things that we found is we would have jobs within thousands and thousands of files, even though in the end, maybe only a small fraction of these files actually even had [inaudible 00:16:52].So even though in Parquet, they can go look at the footer and see if they need to scan that file, they still need to go have a task for that file, open it up, [00:17:00] read the footer, and then say, “Okay, I have nothing to do.” So you have thousands of tasks just kind of do nothing. And because of that, we wanted to be able to support low latency queries from the query engines that are in our ecosystem. So that’s the spec that was developed for Iceberg. A table format for slow-moving data or slow-evolving data.[00:17:30] And that’s been in place here at Netflix for a little while now. And we have some streaming use cases. You have some use cases that use the advanced filtering and we have some that rely on the snapshot isolation. I’m going to talk about each of these a little independently.So the streaming use cases, we have, as you probably can imagine, a large number of streaming inputs into our data warehouse. We have Kafka streams that are feeding in our Flink jobs [00:18:00] that use Iceberg’s ability to checkpoint the data and have consistent commits to it. And we support having Iceberg as a source because Kafka is not always practical as the source for Flink for large backfills and things of that nature, where the storage on Kafka would be prohibitive. We, of course, support having Iceberg as a sink from Flink [00:18:30] as well as a side input.And then one of the use cases that we were pretty excited about once we had Iceberg was data mesh which is our ability to do change data captures from some of our other relational systems in our [inaudible 00:18:46] environment. And one of the nice things about it is that because Iceberg supports schema evolution, we can match schema evolution of our source systems in predictable ways whereas before that was just very difficult to support. And then [00:19:00] lastly we have an internal streaming infrastructure called Mantis that we also support sinks from Iceberg.And additionally, because of some of those streaming infrastructures don’t necessarily land data in ways that are the most optimal for our query patterns, we also with Iceberg were able to develop a tool called Auto Optimize that goes through and can, in the case of Flink where we [00:19:30] may have jobs that are landing data in multi-region, that can do an auto lift and move those into our buckets that are co-located with the rest of our big data infrastructure. As well as the optimization side of things which looks at that the compression types, the file sizes, and query patterns and tries to do some optimizations by merging files and changing files and things like that. [00:20:00] Because Iceberg provides an atomic file-level commit instead of just a partitional level, it made that tooling a lot easier to develop. There actually are great tech blog posts on Auto Optimize if anyone wants to read that on the Netflix tech blog.And then for high-cardinality filters, there’s a number of surprising use cases that we came up with. For one, we have a system called Atlas that does all of our [00:20:30] service monitoring. So think of all the microservices in Netflix. They’re all reporting on all the metrics they have to a system called Atlas. The data is just enormous. And previously before Iceberg, it’s got millions and millions of files, so even just the plan time for queries is taking tens of minutes to see all the listing. And once we were able to put it on an Iceberg table we were able to get the run time [00:21:00] to be less than the plan time on [inaudible 00:21:02] table because of that.The advanced filtering also helped to support use cases around governance, like GPDR and allows us to look up highly granular data, like something like the account ID level, whereas that used to be extremely expensive computation to do.Another use case that we have that we’re using advanced filtering is one [00:21:30] called Sizzle which allows us to look up show level data at a very low latency SLA where it’s feeding into custom JavaScript apps and has allowed us to remove some of the more complicated architectures we have where we were looking at extremely expensive search clusters. We were looking at moving data between [00:22:00] different data marts and we were able to kind of just support very similar use cases by just putting TRIO on top of Iceberg and putting that in front of a custom JavaScript app and still get low seconds response time, enough that they can be interacted on website.The snapshotting has been something that’s really been transformative in our infrastructure. We’ve [00:22:30] always had this kind of concept of WAP, which is write, audit, publish. Really common across our data warehouse, and we kind of had a hack Hive tables to work that when we were moving around pointers and looking at high watermarks, and it was just a lot of additional infrastructure. With Iceberg, it became very simple because now we were able to stage snapshots, reference them in queries, do any kind of automating on [00:23:00] that data, and everything has to pass audits before it can actually be published for the commit.Another use case that was opened up was materialized views. Because now we can look at all the dependencies of a view. We can see which tables have been updated and know if the view is refreshed or not based on the snapshot IDs that are [00:23:30] the current snapshot. And then we also have an infrastructure for doing Snowflake external table sinking which was also difficult before on the Hive infrastructure because Snowflake’s external table format, at the time, was a directory-based thing whereas now we can provide the manifest of Iceberg files in [inaudible 00:23:53] table and keep that in sync as snapshots change.Lastly, [00:24:00] because I think I’m starting to come up on time here, but what’s next? What’s the dawn of a new day? We have Iceberg. What are we working on now? One thing that we’re pretty excited about is using Iceberg for secure tables. So we have a nice way in Iceberg to inject the ability to provide a file system implementation that allows us to do some authorization on the side. So we [00:24:30] can do a file-level authorization. That’s currently something we’re trying to push out. Low-level deletes which is coming in the Iceberg V2 spec. So it’s obviously a very important feature for change data capture and for any kind of high granularity deletes. Another thing that we commonly find in our data warehouse, [00:25:00] we’ve always had a hard time [inaudible 00:25:02] the reporting structure supporting large fact-to-fact joins on Treno because we didn’t have a good way of doing bucketed joins, so with Iceberg, there’s a lot of work being done internally to do bucketing on our Iceberg tables so that we can support those large joins.And then one thing that I personally did some work on that I’m excited about is the open-source native Python client for [00:25:30] Iceberg. We have an implementation internally, but we’re starting to try to push this because all we have are open-source, but it’s currently just myself and a few other promoters. So it’s a great tool in the Iceberg stack because depending on what your infrastructure is, it requires very little to get running. You could interact directly with tables in Jupyter notebooks or on Python [inaudible 00:25:59]. And [00:26:00] we’d just really like to see some real people contributing to it. So if that’s something you’re interested in, please reach out to me. There’s a lot of work to be done there still. It’s very early on, but we’re excited for the potential there for companies that might not be running large infrastructure like a company like Netflix, but still wants to possibly leverage Iceberg and the features it provides.So that’s basically all I had. [00:26:30] Thank you for your time. I hope that was informational, informative. I hope you enjoyed it. I’d love to answer any questions that you have so I’ll hand over to Pritty to moderate that and thank you.Pritty: Thank you so much, Ted. That was a very informative session. Thanks for your time.So now we would like to open up the session for questions and it looks like you already have a couple of them.So there’s a question from [Frotz 00:27:00]. It [00:27:00] says, “Does Iceberg allow for schema on read like Hive, or is it a particular format so that the data needs to be transformed to be available for Iceberg?Ted Gooch: Iceberg is schema on write because it writes… we do the resolution of fields by ID. So the IDs that are written to the Parquet files or [inaudible 00:27:26] depending on what format your fields [00:27:30] are in, it must line up with the Iceberg schema otherwise it will not be intelligible.So are you going to keep reading the questions or should I just go through the ones that are here?You’re on mute, Pritty.Pritty: Yeah, there’s one more question. It says, “How does Iceberg [00:28:00] fit into a native AWS environment/stack, or is it mainly if you’re moving from Hive?”Ted Gooch: Iceberg fits into any stack that has a system that is using a tabular format of data. So in the AWS stack, there is actually a [Clue 00:28:27] catalog implementation Iceberg in Java. There’s also a [00:28:30] Dynamo DB catalog implementation as well as just a generic JDBC. So not necessarily any requirement on using Hive. All that Iceberg actually requires is something that can provide an atomic swap of the table metadata. And in our case, we use a service called [Metacaf 00:28:51] that is acting like a Hive metastore but it’s not actually even required, the full capabilities of a Hive metastore.And then I noticed someone [00:29:00] had also had a question asking about what query engines we are using.We use Trino and Spark SQL primarily on our data side. And then for Snowflake, we use Snowflake as a BI acceleration layer. And the use case that I mentioned on Snowflake is specifically for very large tables, where we don’t want to move the table on cluster on Snowflake. We’re able to use [00:29:30] the Iceberg metadata to be able to project an external table [inaudible 00:29:37] if needed. It’s not super complimentary but it’s there.And then how would you compare Iceberg to Hoodie?I’m going to be honest. I’m not super familiar with the capabilities of Hoodie but my understanding was that it’s really oriented around steaming. [00:30:00] A table format for handling streaming data. There is definitely some overlap, but I think it’s a little bit different goals in the projects.Pritty: Sounds good. We are actually up on time so just wanted to mention that there’s a Slack channel that you can use to interact with Ted. If you have any questions after the session, you can connect with him on the Subsurface [00:30:30] Slack that we have. You can look up by Ted Gooch’s name and connect with him.And, Ted, since you’re not kicked out of the session, I guess we can still ask people if they have any more questions. If not, the session might actually end right now any minute. In the meantime, we’ll wait for… If there are any questions.People are congratulating you on a great session, Ted, so thank you. [00:31:00] And someone’s just said that “No such state as over-caffeinated.”Looks like we addressed all the questions. So thank you so much, Ted. Thanks for your time. And we’ll end the session now but you can always connect with Ted on Slack.Thank you very much.Ted Gooch: I’ll be on Slack for a while so if you have any questions, please. I’m happy to answer anything.Pritty: Thank you, Ted.