Thank you very much. I’m Adrian, dialing in all the way from London, England, where it’s unseasonably hot and sunny, and Christine is about 150 kilometers away from me over in Berth. So we’re going to be talking today about the experiences we’ve had the last nine months or so working on what we call Next Generation Data Lake Technologies. So we’re both been working on the Expedia Group data Lake platform, and looking into what could it, and should it evolve into in the near future. And Iceberg came up as a table storage format that looks really interesting. So we took a look at it and we did a bunch of work adding hybrid support Iceberg. So we’ve imaginatively called that Hive Berg, and that’s what we’re going to be talking about today.
So for those of you who don’t know, the Expedia Group covers many different brands and companies, as you can see on the slide. And we provide a wide range of travel solutions from hotels to car rental, to flights, to vacation rentals, and much more. So, as I’m sure you can imagine, this means we have a huge variety of different data. We also have a lot of different data, and if we want to get a complete picture over all of this, that’s the responsibility of us on the data Lake team to manage all of this and make it accessible. So before we dive into Iceberg and Hive berg, I think it’s useful to understand just a bit of the broader context of our data Lake journey and how we got to where we are today.
So those Expedia Group companies that I just showed you, some of them have been around for a couple of decades, some of them are a bit newer, but the point is, we didn’t create our current data platform from scratch. It’s something that’s evolved over quite a long period of time. And we followed the path that I’m sure is similar to many other companies out there. So we started off with what would now be called a traditional data warehouse with most of the data stored in relational databases often proprietary. And this was running on specialized hardware, like a mainframe.
So then with the rise of Hadoop and big data, we moved a lot of the data sets out into Hadoop to take advantage of the improved scalability that is provided. So we set up a shared on premise Hadoop cluster, many hundreds of nodes, distributed storage, and we put Hive on top of this to keep track of the table schemers and so on. Because we had such large SQL footprint from our relational database days, a lot of the data processing and querying that we did in Hadoop was also done using SQL via hides query engine.
But over time, we ran into problems with not being able to scale out our on-premise cluster to meet peak demand. And we also found that the upgrade path for both the hardware and the software was expensive and painful as were the operational cost of maintaining all of those machines ourselves. So we decided to move our data platform out into the cloud to take advantage of elasticity, operational costs, and the more flexible upgrade parts, and we’ve created a cloud data Lake. So what do we mean by a cloud data Lake? So if we start off with what we just considered to be the band minimum, for us, it is the data stored in a distributed file system like S3, and then metadata above this data stored using Hives metastore service.
So the metadata consists of the schemas is for every data set. And then these also then point to the physical locations of the data. The Hive metastore has got wide support for integration with the number of computing query engines, which we can then use to access the data. So now we’re not limited to just Hives SQL query interface, but as you can see on this slide, some of… This shows some of what we’re using today or plan to use in the future. So these are things like Amazon’s EMR, Spark, Presto’s [inaudible 00:04:23] query engine, but then also things like Tableau Dremio, etc. This obviously isn’t an exhaustive list, but the point to make is that Hive provides us with an interoperability and consistency layer that allows us to be fairly flexible in how we access our data address.
So there are a number of things we already like in the setup. Most of the foundation is open source or at the very least, commonly used commodity software. For the most part, it’s all being very actively developed. It’s evolving constantly, and we’ve engaged ourselves and contributed a fair amount, which has been great. As I just explained, we’ve got interoperability with a wide variety of tunings, so we don’t get locked into any one data access technology. And then, because of our relational database history, we’ve got a large number of users who obviously skilled in writing SQL. So being able to keep those skills and reuse them in our cloud data Lake is a huge benefit. But at the same time, we’re not limited to this. We’re also able to use more modern languages and frameworks like Spark AR, etc. And finally, we found all of this to be massively scalable, and we have tens of thousands of data sets, thousands of users, jobs, and systems interacting with the data Lake to produce and manipulate multiple petabytes of data.
So for the most part, were happy. We’re getting a lot of value out of the flexibility and scalability of this cloud data Lake, but like most things in life, there’s room for improvement. So I’d like to focus on some of the challenges we face just dealing with the metadata layer that’s currently stored in Hive. So we found that in certain situations where say, you may have a data set that’s partitioned by [Awa 00:05:59], and you have a few years worth of data, if for some reason you want to access all of the data for some historical query, or you need to reprocess all of the data, this means that the Hive metastore needs to load up tens or possibly even hundreds of thousands of partitions. And this is not only slow, but it takes up a lot of memory. We have a few years of doing this. At the same time, it has a noticeable negative impact on our systems.
Hive’s partitioning scheme tracks data at the folder level. So this means that query planning and execution involves doing file listing operations and potentially reading many files that aren’t actually even relevant for the query. So we could look to resolve some of these issues by partitioning the data in a table differently of the time. So, for example, you could keep hourly partitions for the most recent months data, which gets access to the most often, and then you could reposition older, less frequently accessed data into daily or possibly even monthly partitions. Unfortunately, Hive doesn’t allow you to evolve your partitioning scheme like this. It’s fixed when the data set is created, so you then have to come up with workarounds, having different tables for different petitioned grains, which isn’t ideal.
And finally, we have a lot of data processing that restates existing data. So we’re not just appending data, but if we want to allow consistent, concurrent read and write access to the data whilst changing it, we effectively have to perform a copy on right of the entire effected partition whenever anything in it changes, even if we’re just updating or deleting one run in a partition. So this obviously has negative performance impacts, but there also operational issues because we have to keep track of what data is in use, when data becomes often it can be scheduled for deletion, and there are also financial storage or processing costs of having to do rewrites at such a coarse grain. But we’re not alone and experiencing these issues, and the number of next generation table formats have been designed to deal with precisely these problems. So one of these is Apache Iceberg, which aims to support the storage management and usage of petabytes scale datasets.
So Iceberg has a very rich feature set, and there’s an entire presentation later on today discussing it in much more detail. So I’m just going to focus on what we thought was the most valuable to us for possible inclusion in our data Lake in the future. So the fact that it’s an open type of format is obviously very important. At our scale, if we’re going to commit to storing petabytes of data and storing this going back many years and possibly many years into the future, we want to make sure that this format isn’t proprietary in any way whatsoever, nor is a controlled by a single entity. We’d also like to have the option to contribute to the code base in case we need to add functionality or fix bugs. And what’s interesting about Iceberg, it has very good integration with Spark, but it doesn’t necessarily require Spark, so the Java and Python APIs.
And what’s nice about this is You are not tied to Spark as an execution engine. You could potentially use it from a wide variety of different tools and platforms, and even maybe integrate it into tuning that doesn’t exist today, directly from those Java and Python bindings. And one of the key concepts of Iceberg is that it tracks data at the file grain, not the folder grain. So this allows credit planning and execution to avoid these potentially costly and eventually consistent file listings. And in turn this results in improvements in [crate 00:09:23] performance. So Iceberg handle schema evolution very well, but it also has more advanced concepts like being able to evolve the petition layout over time. So, coming back to my earlier example, you can reposition data to be at a day or a monthly grain instead of an Aldi grain in an existing data set, without having to recreate the entire table.
It then has a number of advanced features, which Christina is going to touch on shortly. So these are things like time travel, which is especially useful for things like model training and machine learning. There’s flexible schema evolution. So you can evolve how you model your data over time, or potentially even have producers and consumers of the data, having slightly different views over the same data set. And it then aims for a high level of data correctness and consistency which we think is crucial when you’re making important business decisions on your data. And it allows for concurrent data processing with isolated reads and writes. So I’m now going to hand over to Christina, who’s going to talk a little bit more about how Iceberg works under the hood, and then the work we’ve done to add Hive read supported.
So, as Adrian mentioned, the key idea behind Iceberg is being able to track all of the files in your table over time, versus something like Hive, where you’re only able to track of the partition folder level. And this is enabled using this concept of snapshots where every time you make some change to your table, such as a right to add some data, you create a new snapshot. And this creates a really nice linear progression of the changes that have happened to your table over time. And there’s also the feature that enables time travel. So this here is a diagram of what the the Iceberg table format actually is. The table format is made up of multiple layers of different metadata files. At the top layer, we have the snapshot metadata file, and this essentially contains a complete list of all of the files that exist in your table for that snapshot.
It contains information like the table schema, the partition spec, as well as a location of the manifest list. The manifest list is the next layer down and is, as it seems, a file that contains a list of all of the manifest files for that snapshot. For each manifest that it lists, it contains some information about that file. So it’d be things like the range of values that the manifest file spans for the partition column. And this is what Iceberg can use in order to skip reading entire manifest files if the value that you’re looking for in your query doesn’t exist in that manifest file. And it’s the part that allows Iceberg to significantly improve the query planning time. And then the next layer, we have the manifest files and these contain lists of data files, and pre data file also contains some information about that file such as things like per column, upper and lower bounds which Iceberg can also use to skip reading individual data files that don’t need to be read for a specific query.
And then at the bottom layer, we have the physical data files and they can be written in things like Parquet or [CEO Atrow 00:00:12:35]. When we were first investigating Iceberg, we took notice of the different query engines that Iceberg supports. And Iceberg comes with really good support for Apache Spark, where you can both read and write Iceberg tables using the data frames API, and I believe there’s also some work going on to integrate Spark SQL with Iceberg, with Spark 3. And the community over at Presto SQL have also written a connector that allows you to read and write expert tables as well, which is really nice.
However, when we were investigating Iceberg, there wasn’t at the time, any integration of Iceberg and Hive. This is a really key component for us to unlock a shift in our data like from Hive to Iceberg, as we really need this bridge between the older data sets in Hive and any newer data sets in Iceberg in order to not have a barrier between the two technologies. So we don’t have to complete, we still have to do a big bang migration of all data sets in one go. And that’s what led us to creating this project called Hiveberg ,which aims to add, read support for expert tables from Hive.
So when we first started thinking about Hiveberg, we first engaged with the Iceberg community, reached out to see if they… anyone was interested in the idea of adding this feature. And we got some good general support from the… for the idea. And we got some pointers and some feedback about how we could go about implementing this. Our initial aim was actually to contribute the project directly to Iceberg. So working the project and developing in a branch of Iceberg, but what ended up happening was, we encountered a large amount of issues with some dependency conflicts between Hives older dependencies and Iceberg newer dependencies. So this ended up with us deciding to create Hiveberg as a temporary stop gap project, where we could learn and develop in a simple, separate project where we weren’t being improved by these dependency conflicts, because it turned out to be a much bigger issue than we initially thought it would be as it required a change in the dependency resolution plugin that was being used with cradle.
And we got some help from the Iceberg community to fix that issue and get that changed so that we could finally start contributing someone’s this project back to Iceberg. And then when we first started the project, we decided that we were going to just focus on getting the read path, working from Hive, meaning that you can’t currently write expert tables via Hive. And this really meant that we could ensure that our existing tooling would be able to access any new Iceberg datasets that we were writing, and allowing for some incremental migration of datasets over from Hive to Iceberg. And it’s the part that would really allow us to make sure that we’re not creating any data silos. So how does Hiveberg actually work? The core of Hiveberg is centered around the input format. And this is the bit that essentially integrates Iceberg with Hives execution framework map reduced.
So it’ll take files retrieved from Iceberg tables, split them into manageable chunks for the mappers, and create Iceberg records from those chunks. Then the translation layer is the sturdy bit, which will take Iceberg records and convert them into fields and values that Hive can interpret. Then there’s also some translation going on there between Iceberg types and Hive types. And then we have the storage handler, which bundles all the different parts together into one class and makes it easy for a user to pick up the input format and start using it. It also adds some extension points for us to cable to add some more advanced features to project. And then the bit that ties everything together, it’s… We’ve been extensively using this project called HiveRunner, which allows us to perform integration style tests by running Hive in memory. So this meant that we could test the input format by running unit tests within our IDE. And it meant we didn’t have to go through all the trouble of sending up an external Hadoop cluster to test the input format.
This here is a diagram of what… of how we might want to integrate Hiveberg our existing data lake. So we have here at the bottom S3 as our storage layer where both Spark and Presto can read and write expert tables too. But then if we have any users using some of the older technologies such as Hive and Tableau, they need some of these data sets. They can still access them via the Hive manage store in Hiveberg. So this means that were not sectioning off any of the newer data sets from users still using older tools. And everyone has visibility on all the different datasets that they might need to complete the task that they’re doing. So these next couple of slides are going to show a bit of example code of how you might be able to use Hiveberg. So the one caveat here is that, in order to use Hiveberg, you have to actually create your Iceberg table using some other technology.
And here is this example where using Spark and the Iceberg Java API, and we’re just creating spark session and we’re choosing the Iceberg catalog that we want to use to create our table. And in this case, we’re just using Hadoop tables. And to create our Iceberg table, we use the catalog to create a table using an Iceberg schema, and then the location where we want our table to exist. And then we’re just adding a bit of data to the table by reading an adjacent file, and selecting the columns that we want to append to a table.
So once you’ve created your expert table, you can then go to Hive to create your Hiveberg table. To do this, you create a new table and you specify that you want to store it using the Iceberg storage handler, and you give the same location as we just use where your actual Iceberg table exists. That’s all you need to do because then if you run a Hive query like this one here at the bottom, you should see the same results as you’d expect to see if you ran the query directly against your Iceberg table.
Once we got the core functionality working, so we could actually read the tables from a Hive, we started to have a look at some additional features that we wanted to add, that we could leverage some of the cooler capabilities of Iceberg that were really bringing the benefits that we were looking for. And the first one was this concept of filter pushdown where you take the filter provided by the user and the where clause of the query and push it down into the Iceberg readers. So they can use that filter to prune away files that don’t need to be read for that query. And this is the part that allows Iceberg to significantly speed up the query planning phase, and it’s a really big benefit. We then also have a look at any column projection, which is where you take the columns and the select part of the statement and push them down into the readers so that readers can skip reading certain columns, only read the ones they need to. It also speeds up query execution.
And then we also really wanted to add this feature for user to be able to write time travel queries. So allowing the user to specify the snapshot ID for the snapshot that they want to run the query against. This sort of coupled together with adding an extra feature of exposing high-speed system tables as Hive tables. So this is where users would be able to see the different metadata tables that they have related to their Iceberg table via Hive so they you can use some of that information in some of their queries, such as a snapshot ID for a time travel query.
So if You wanted to perform a time travel query, you create your Hive table similar to how we did earlier, by creating a new table and specifying you want to use the Iceberg storage chamber in the same location as before, but then when we run a Hive query and the where clause, we specify the snapshot ID we want to run it against.
But if you wanted to actually figure out what snapshot ID that you have available for your table, you can create a system table to see that information. To do this, you create a new table and you still specify that you want to use the Iceberg storage Chamber, you give the same location as before, except you add this extra suffix at the end of hashtag snapshots which is the bit that specifies to the input format that you want to be loading data from the metadata table versus the original data table. And if you were then to run a select star against this table, you’d see information like this here at the bottom, where each row is a different snapshot and that’s part of your table, and you’ll get information about that snapshot, like the timestamp when it was committed at, as well as the snapshot ID, which is what you can use for the time travel query.
So this here is a timeline of how we wanted to contribute the project back to Iceberg. So we started off with some initial PRS, all of the core functionality, just to be able to read an Iceberg table. So we got the [inaudible 00:21:32] first, and then we had a PR open for the initial input format. But we got some feedback from the community and we decided to change our approach. We collaborated with another developer and then bought a new version of the input format [inaudible 00:21:48]. And then quite recently, we also got these storage handler part with them as well. And now that all of the sort of core functionality has been merged into Iceberg, we are looking at openings, more PRS to add the extra functionality like the filter push down and the time travel stuff. Then once all of the coding stuff has been merged in, we’re going to make sure that we add some good documentation on how you’d be able to use the feature. We know documentation is one of the most important bits.
So Hiveberg is very much a project that’s still in development, and it’s still a very fresh project. But it really has the ability to unlock a lot of capability within our data. And one of the major key takeaways of this project is that we want to ensure that we have backwards compatibility within our data Lake, so that any new data sets created with Iceberg aren’t siloed off from the old… from other users using older technology. Adopting Hiveberg also help us improve existing use cases. So historical queries that look at a lot of historical data would have significantly improved performance, as well as just general performance would be better across the board, but also having the additional flexibility of being able to evolve a table scheme and a partition spec is a really big benefit. It would also help enable some new use cases, such as version monitoring, having to data backups, and also being able to roll back our table to older versions as well.
Then a key part for us has really been this concept of future-proofing. So the fact that the table format is open and that the project is open source means that we can contribute different features back to the project that would really help us to be able to adopt the technology, as like the work we’ve been doing here with Hyper-V. Now we really have been… have enjoyed and are excited about all my contributions, making that into Iceberg, and we look forward to all the further PRs on the work we’ll be doing in the future. If anyone’s interested in the project, please go have a look. We welcome all feedback and comments. So believe I can direct you over to the Slack channel. If you have any questions, we’ll be around for about an hour. And thank you for listening.
Thank you, Christine and Adrian. We actually do have about seven minutes left in this session. If you would like to take the first question, we have one here from June. Let’s see, June, are you there? Okay, it looks like June may have dropped off. How about Alberto?
Hey, so my question is about what made you decide for Iceberg and not Delta, HUDI or Hive 3, and the second question is about the limitation of the S3 object status so that is eventually consistent. So how did you solve the [conclude and Bryce 00:25:06], if you have these use cases in your setup? Thank you.
I can take this. So the first one about the choice of technologies, Hive 3 could be interesting, but getting from Hive 2 to Hive 3 would been quite a journey. And it has some benefits, but I’m not sure it has as many as some of these next generation table formats that you mentioned. Delta is interesting, and we do use it in parts of Expedia. We use Databricks and that comes with Delta. So it has lots of good features. Unfortunately, some of it’s nicer features aren’t in the open source version. So that comes back to my thing about, we… whatever we commit to, we would like it to be 100% open source.
Who do you we’ve looked into also, so want to stress all of this as we investigate it, we’re evaluating all three of them at the moment, but what we liked about Iceberg, it just seems to be ready, well architected ready, well thought through, and we’ve just found the community very open and engaging. We’ve had number of conversations with them, and it just feels like we’re really nicely aligned with the vision where they want this table format to go. So I think it lines up nicely with where we could possibly go in the future. For the venture consistency issue, I think Iceberg is built to solve a lot of those problems. So, tracks things at the file level. Those manifest files that Christina talked about in the snapshots, each of them has a point in time, all the files that are there. So you don’t have to do those lists operations. And that’s definitely, it’s one of the goals of the community, is to get around that and to make sure that your data Lake is consistent, not eventually consistent for concurrent access.