March 1, 2023

12:20 pm - 12:50 pm PST

Rewriting History: Migrating Petabytes of Data to Apache Iceberg Using Trino

Dataset interoperability between data platform components continues to be a difficult hurdle to overcome. This difficulty often results in siloed data and frustrated users. Although open table formats like Apache Iceberg aim to break down these silos by providing a consistent and scalable table abstraction, migrating your pre-existing data archive to a new format can still be daunting. This talk will outline challenges we faced when rewriting petabytes of Shopify’s data into the Iceberg table format using the Trino engine. A rapidly evolving landscape, I will highlight recent contributions to Trino’s Iceberg integration that made our work possible while also illustrating how we designed our system to scale. Topics will include: what to consider when designing your migration strategy, how we optimized Trino’s write performance, and how to recover from corrupt table states. Finally, we will compare the query performance of old and migrated datasets using Shopify’s datasets as benchmarks.

Topics Covered

Open Source

Sign up to watch all Subsurface 2023 sessions


Note: This transcript was created using speech recognition software. While it has been reviewed by human transcribers, it may contain errors.

Marc Laforet:

My name is Mark. I’m a data engineer at Shopify. And in this talk I’m in a recounted project that I worked on at Shopify where we migrated petabytes of data to Apache Iceberg format using Trino as a compute engine, which is, I think, what makes this project a little bit more interesting or different than maybe some other migration projects. So, let’s go over the agenda. First, I’m going to set up some context of why we wanted to do this. No one willingly migrates petabytes of data is actually hard to do. So why did we want to do it? How did we get there? Why did we choose Iceberg as what we are going to migrate to? I’m going to briefly go over some migration strategies that if you find yourself in the same position you would want to consider.

I’m then going to go over the Trino Iceberg connector. It’s integration and how that evolved over the course of our project. I’m going to show you guys some benchmarks of the pre and post migrated data sets, and then I’m going to finish off with some reflections about the project and things I might do differently if I have to do it again. 

Lakehouse Data Sets

Let’s start off with some context. I’m on the Lakehouse team at Shopify, and we have two generalized mandates. We are mandated with ingesting all our first party data, but also we’re in charge of the lifecycle of data on disk. And so I’m focusing on the storage layer. The lifecycle of data on disk. And initially our team was really focused on this one class or bucket of data called raw data, which had three main subdivisions. They were event data, cdc data, or change data capture data, and third party data from using HubSpot or something like that, that we want to ingest into our warehouse.

Because our team was initially focused on this data source, most of these actually used rational table formats. Most of them are in hive table format. But as our team matured, we realized that we were missing a large chunk of our data sets at Shopify. And that was our model data. As we looked at these data sets, we found that there were a lot of silos between the data sets that were determined by the tools that were built to work with the data themselves. So we found that there were tools that were specifically built to work on top of Spark. There were tools that were just raw SQL, and then there were also tools built on proprietary engines like BQ, BigQuery and when we pulled back the layers here, we actually found a horror story.

A lot of these tools actually invented their own table formats. So they invented what they would consider a data set or a table to look like. And as one might think this made interoperability between data sets really, really difficult. And so if I wanted to spin up a new tool, I actually first had to build adapters into that tool so that it could actually interpret what a data set from other tools were meant to look like. And so this meter code really bloated, it made it complex to maintain and it doesn’t really scale that well. So even worse than that is data scientists would come to us and ask really simple questions like, Hey, I used tool one, I produced a data set. I want to use that as input to another tool. How do I do that?

Lakehouse Architecture

And that’s actually really hard. So, something had to change. We wanted to re-architect our data lake and make that the central layer where all data came into the narrative and produce a specification that all tools would use to store their data on disk. We decided we needed to determine what open table format we wanted to use. And we already had some in-house experience with Iceberg. One thing that was really nice is because we had so many different tools at the company that were reading, and writing data, Iceberg actually produced a specification which we could give to other teams that they could build their tools adhering to, which means that although we weren’t ready on day one to adopt Iceberg there’s a spec that we could all build toward. And at the end we would know that we could come together and our tools could all talk to each other. 

Open Table Formats

So, we’re at Subsurface. I’m sure a lot of you already know what Open Table formats are, but just to go over it really quickly, it really is just a library that keeps track of files that comprise the table. When I first started learning about open table formats, I got confused about this because it’s an abstraction really. One colleague came up with a really useful example that made it really click for me. What she said is that it’s like a file system is to a hard drive is like Iceberg is to an object store. So you have your file system. You have a file that you’re interested in looking at, and you have to open it.

You don’t have to know where that file is stored on disk. That’s the power of the abstraction that the file system provides you. It’s the same thing with Iceberg. You want to select star from a table. You don’t need to know all the files that you need to read now on your object store. That’s the abstraction, the power of it, the power of the abstraction that Iceberg gives you. So you don’t need to track that anymore. And so, as I said, there’s better and more extensive talks on Iceberg here. I just want to show you the Iceberg spec, the diagram. 

I think the only real important thing for my talk is that there’s a metadata layer which points to your data files. That’s all I really want to go over. So at Shopify, I don’t want to take too long on this, but just getting to this point where we were able to convince senior leadership that this was a problem, that this was the right layer to solve the problem, was a huge endeavor and something that took a lot of time. However, once we got there, people were a lot more excited to write new data, but what’s actually holding you back is compatibility with your old data. And so this is where you say, “oh, crap.” Okay. Now, I need to migrate my entire data lake to this new table format.

Migration Strategies

Okay, so to go over the agenda quickly, we’ve gone over the context. I’ve explained why we decided to go with Iceberg, and now you have to do your migration. To do our migration, we decided that we were going to first focus on one subset of our data. So as I said, in our raw data bucket, most of our tables were already stored as hive tables. But, one thing that’s really sad is that the file format was actually in JSON. And that will be more important later. But now that we want to, we’ve identified one dataset or a class of data sets that we want to first migrate. We have to start thinking about the different migration strategies. This depends on what your current state is. If your data is not in a supported file format, then that means you’re going to have to restate or rewrite everything if your data is in a supported file format.

But you want to change the partitioning strategy, so how it’s organized on disk. Then again, you’re going to have to restate or rewrite all your data. If you’re in the lucky position where your data is in a supported file format for Iceberg, that means your data is in Parquet or Avro and it’s properly partitioned, well then you’re actually in a really good spot because this means you can get away with only generating those manifest files. You don’t have to restate any of your data, which is the real time consuming and expensive part. So as I previously mentioned, our data format was actually, JSON. So this meant that we had to rewrite everything.

System Designs

We came up with a system that would rewrite our data. We decided that we would have this table manifest file. We would check that into DAG generator, kinda like dynamically generate DAGs based on that table manifest. And you would get one DAG per table if we blow up into what one DAG actually looked like. It had three basic steps. It filled a queue with all the partitions to be rewritten. And then we had a worker which would, in that process, take a partition off the queue and rewrite that data using Trino. We would then reconcile the partitions using Trino as well. We would do counts between our old and our new data. If they didn’t match, we would drop that partition and try again. Basically, what Trino’s just doing though is reading data into memory and then you’re writing it down in Iceberg format.

Trino Iceberg Integration

Now that we’ve gone over a few different migration strategies, I want to take the next little while to talk about the Trino Iceberg integration. What was really interesting about this project was we were actually developing our project when the Iceberg connector was being developed in Trino. We hit a lot of the first roadblocks, I think, and reported them. What was really cool was that the Trino community was really quick to address our concerns and fix all the bugs that we were finding. In our current setup, we found that for our larger tables it was going really slow.  You have your queue, you have your worker, your workers just taking partitions off one by one and rewriting one by one. I think if most developers found themselves in this case, they would just think to scale the number of workers or increase concurrency of the process.

So, you have end workers pulling off of the queue at different times. And so that’s what we did. But when we did this, we actually started getting this error a lot. We got commit failed exception, and you can read what’s happening, but I’m going to explain it in some diagrams. Let’s say we have two processes that they’re trying to make a pens or any changes to your Iceberg table. Well, in Iceberg’s snapshot model, what it does is your snapshot model, has to increase sequentially. So your metadata, if you decide to append to it and needs to reference the snapshot ID that’s previous to it. If one of your processes succeeded, and then your old pro, your other concurrent process tries to append to it, it actually isn’t referencing the head of your table anymore. You’ll get that exception. This is something that’s changing in Iceberg. Yeah, it’s changing in Iceberg as it evolves, but at the time we were doing this project, it was still in this model. Iceberg also allowed you to do something called a commit retry, which meant you didn’t have to rewrite your data files. You could just regenerate your manifest files, which is a really lightweight operation and reference the new head of the table.

But what had to happen is you had to throw the proper error so that you could make use of Iceberg’s commit retries. And that’s what was patched by Alex Joe, in May of 2022 in this PR and was in the release of Trino 379. What was interesting about this is that Alex wasn’t actually entirely focused on this bug. He just patched it as he was working and you see it as a commit message through proper exception. And I don’t know if he realized how much help that would’ve been at the time, but it totally unblocked our project. So that was really nice that we could now scale our writers and really rewrite data is at petabytes. So to scale.

Missing File Problem

Another issue that we started running into a lot was this thing called the missing file problem. What this actually showed was that once we started writing in Iceberg, there would be files that Iceberg thought were part of its manifest in this case, but they actually didn’t exist on GCS or whatever your object story is. And so what was happening here was that Trino was writing to the Hive metastore while doing the Hive metastore of reference, but the connection between Trino and the Hive metastore actually was severed. And so it couldn’t, it didn’t know if its commit actually succeeded or failed. Trino was trying to be nice. So, it was trying to be nice. So, it said I don’t know what the result of my commit was, so I’m going to go and proactively clean up the files that I just wrote.

The only problem is, is that the commit actually succeeded. The data files and the manifest files were actually included in the Iceberg table. Now, after Trino went back and cleaned up after itself, your table’s corrupt and it can’t actually read any of the data. And so this was patched, by Marius Grama in September of 2022. And basically what happened in this PR is that they just avoided the proactive file cleanup on the Hive metastore timeout. And this is really the approach that most engines are taking, these days. And it’s because Iceberg comes with out-of-the-box orphan file cleanups. So having files that aren’t actually included in the table in your object store isn’t a big deal. You can just go back after the fact and clean it up. So might as well just keep them around.

Benchmarks At Shopify

Okay, so we’ve gone over context, why we chose Iceberg, different migration strategies, and I just finished going over some of the most interesting changes in Trino Iceberg integration. Next, I’m going to go over some benchmarks of some of the data sets that we migrated. So this first data set, as I said, I work at Shopify. Shopify is an e-commerce platform. We host a lot of different stores and people buy things from those stores. We have one table, which is all the orders of all the stores at Shopify. That table is 160 terabytes in size. I decided to just make some contrived examples building models on top of that table. I ran these queries with 60 and two standards, 64 workers, and used Trino, obviously. So we can see that the migration from Hive and JSON to Iceberg and Parquet really improved the planning time.

We can also see that the memory pressure on Trino itself also was greatly reduced after these migrations. Maybe, in a practical sense, the most important thing is that the execution time went down. The execution time of the query that I ran went from taking one hour to three minutes. I know this is probably a big deal, but I always just want to think of the data scientist. They submit their query. They’re looking for data. They’re in their flow state, but then they have to wait an hour. That’s going to break their flow state. They’re going to go on their phone. Go do something else. Break all that context, then they’re going to have to come back and look at the results of their query. Now that it takes three minutes, they can stay in that flow scene and it greatly improves the speed that they’re able to innovate, which is why I like being a data engineer.

So it’s really, really rewarding seeing these results. I decided to build a little bit more complicated of a model and I decided that I wanted to see what stores had the most checkouts in the past month. I wanted to basically see what our best stores were in the past month. You can see planning time also went down, memory went down and execution time went from about 30 minutes to about two. So the last benchmark that I’m going to share with you is actually one of our biggest tables. Shopify has this query API, which we expose to any developers if they want to build applications on top of Shopify. This table is about one and a half petabytes, actually a little bit larger, but who cares? I decided that I wanted to know what were the most popular schema or endpoints on this API that people were using in the past month. So we can see that again, planning time greatly improved, cumulative memory greatly improved, and execution time goes from 1800 minutes to 1.78. So this is the difference between submitting a query and going to sleep versus submitting the query and getting it almost instantly.

Reflections On Completed Project

Those benchmarks speak for themselves, but we’re going to go over some reflections that I have of completing this project and maybe some things I would do differently if I were to do something like this again. So, I mean, we just went over the benchmarks. Using a modern table in file formats significantly improves query performance, and I think that’s a no-brainer. One thing that I was really depressed about in this project is in our initial design, we decided that we were going to write our old data and our new data to two separate tables, and that we would stitch them together via views. Unfortunately, right now in Spark, you can’t have something like an Iceberg view. They’re not supported, although that is changing in the near future. It wasn’t supported at this time. And so it was the last step of the project and we found out that it actually wasn’t possible.

It was really depressing. So what we ended up thinking of doing was that we were going to write to the same table and that would prevent all these issues. The next thing I’d recommend doing is constructing a migration priority list. With the benchmarks and with these speed improvements, you’ll have some data scientists that will come to you and be, oh, I don’t want to wait an hour, I want to wait a minute. And so you’ll prioritize their data sites that they want to move. There’s going to be a large class of data scientists that have their jobs running overnight on a schedule that don’t really care if it takes an hour or two hours. But, people who will care are the people who operate your query engines because this means that they’ll have a lot less load on their platform. So using that information like the load on Trino, we constructed a priority list for the data sets that we would have. The biggest bang for our buck if we migrated.

Trino Iceberg Integration Evolution

As I said, the Trino Iceberg integration has quickly been evolving. And so one thing that we had to get really good at is redeployment tooling. If Trino came out with a patch or a new feature, I wanted to get my hands on it right away. So I kept being a nuisance to the Trino team and asking can you redeploy Trino with the newest version? So getting really good at this redeployment was really important for us. The last thing is a tangent, but I daydream about alternate ways to rewrite history. Rewriting petabytes of data is really time consuming and costly. And so you might think that as data gets older, it actually gets queried less and less. So maybe you don’t actually need every single row. Maybe you can get away with  summarizing your historical data in terms of distributions or something like that. And then instead of rewriting every row, you just have a summary of that data that can be used in your queries. So that’s something I’ve been thinking about a lot lately. Lastly, I just want to acknowledge the other people that worked on this project with me, Sam Wheating and Victoria Bukta. And that brings me to the end of my presentation.