March 1, 2023
9:35 am - 10:05 am PST
How Our Iceberg Migration Achieved 90% Cost Savings for Amazon S3
At Insider, we migrated a data lake with hundreds of terabytes of data in Amazon S3 from Hive to Iceberg using Apache Spark and reduced our Amazon S3 cost by 90%. During the migration we changed the column structure and partition structure of some tables, the file type, and the compression algorithm.
The session explains:
– Why we decided to migrate from Apache Hive
– Why we selected Apache Iceberg
– How we designed and executed the migration process
– The outcomes from a cost and performance perspective
Sign up to watch all Subsurface 2023 sessions
Note: This transcript was created using speech recognition software. It may contain errors.
Hello everyone. thank you for joining this session today. So I’ll be walking you through our iceberg migration and the cost savings that we have achieved along. So lemme start with introducing myself. So I’m Dennis. I’ve been working at Insider for last six years, and I’m mainly focused on the machine learning dictator solutions on aws. And in today’s agenda, we’ll see an in turn of our migration from Hi to Iceberg. And I’ll start with Data Lake usage at Insider, and why we have decided to migrate from AAI to HI and why we have selected pie iceberg. Then I will talk about how we designed and executed the migration and what was the outcomes. so let’s start with our data lake. So Insider is a source marketing platform and we’ll be into with customers all around the world. And we, we have more than a thousand clients of us, and we are collecting, like they are mostly e-commerce companies, let’s say.
And we are collecting their end user data, like the click team activities on the website or while applications, the use CRM data and the activities in the messaging channels like web, push, sms, WhatsApp, and so on. And we are using this data to improve the user experiences and provide marketing automations. So we ingest this data into our data lake in Jason format, and we are running streaming hourly and daily ETS to clean, improve and insert this data into our structure data sets. So some of our pipelines use these structured data in our data lake directly and specific. Our mission learning pipeline requires machine learning ready features which are extracted with several feature extraction jobs from our structured data and applied like we are applying operations like aggregations or extracting embeddings and writing down back to our data lake for the downstream machine learning operations like training and inference.
So overall, we are running more than 15,000 spark jobs every day to fill our creative customer segmentation, product recommendations, channel engagement, prediction and reporting and so on. And most of these jobs are actually running in our machine learning pipelines, which are running for more than time products internally and consists of the feature extraction, selection model, training, evolution and influence tasks. So we were using Apache hi to our migration for like six or seven years. But throughout the years with with us scaling out, we, we saw several issues with Apache Hive. So let’s start with the correctness one. has no as a support we, we’ve seen some correctness issues like as the right operations are not atomic. You can be at at bad state if you have an incomplete, right? Like if you have written some data files but fail to write some high is perfect, completely perfect, completely okay with this.
And it’s, it lets, if it leaves your data dataset in a bad state from the point of isolation, if you have parallel rights to same data set and same partition, you may have risk conditions or if you are reading that partition, like if, if you, during your read operation, if there are some other right operations or delete operations, you don’t have a cons consistent width of the data set. So these are the most, these are the correctness issues that we have seen. And also on the other side there is the performance issues. So our query engine is a purchase spark. And for, for Spark to execute a query, first it needs to plan the query. So during the query planning, first it needs to find out which partitions of the table that that is required to be read and to do that it’s one, one using hi we are querying high meta store, which is the mescale table Mescale database.
So we are querying the high meta store to find out which partitions match with the query we have been, we have provided. So after finding out the partitions, we also need to find out the actual data files. Because for example, we are using Amazon s3 or like for any object storage or file storage. Unity. Unity, no, which data files to be read, for example, for the s3, you need to execute s3, get API calls for each and every object key to read the data set. So Unity know which keys exist. And in order to do that in high, the, the data files between the partitions are discovered using DI listing. And in an object storage like s3, a doctoral listing is basically searching for objects with a, with the same. So it’s a cost operation. And if you have end partitions, it’s an OR time complexity operation which is costly.
And finally, when we look at the cost site of our, like when we look from the cost perspective to our data lake we have found several possible optimization points. The fir first of all, as we just talked, the Esther list bucket calls for director listing to high was expansive. And then there is the ES three head object and get object calls to read each and every data files. And if you have small files problem, there are a lot of files to read. And this is expensive, of course, this is not directly related with hire, but in our data lake, we had this problem for several tables for we wanted to solve. Then we thought that we can even decrease the storage size with you with using like the more recent compression techniques. We are using Snappy at that time. Then there is the wasted compute resources like the C two instances and our EMR clusters were just waiting for Spark query planning to be finished.
And last but not least, we were running again, rds instance, a myo instance to solidly for high meta, and we wanted to get rid of that. So those were the points that pushed us to a migration to find a better solution. And a migration. So Y Waves, they have selected Apache iceberg. So if you are talking about the Lakehouse architectures here is your usual suspects. Apache Iceberg, Apache Hoodie, and Delta. So basically they are similar, like all of them have asset transactions, which is crucial. they have time travel feature, which is real nice for rollback scenarios. You can use market data file and so on. So there are, but there are several different like distinguishing features for each and every option. And for Delta, at the time we were evaluating the performance features were not included in the open source version.
And as we were using Amazon EMR for whole spark clusters, that was a big no for us. But the, this have changed recently. but at that time it was a like red flag for us. So we just eliminated it. Then we left with the iceberg and hoodie for hoodie. Actually, we like the primary keys and the duplication features on the iceberg site. We like the partition evolution and hidden partitioning. And, and as we had like previous problems with like changing the partition schema of a table, like to, we wanted to change the partition. We had the full migration for that table in Hive. It was a big operation and so on. We really liked the evaluation and also even purchasing features. So we selected Iceberg for that.
So let’s see how Iceberg solves our issues. And let’s start with the correctness issues first. So as iceberg supports asset as asset transactions, actually the correctness issues are salt out of the box. Like the atomity issue that we talked about is salt. As the right operations are atomic and the the metadata is updated when the data files are fully inserted. So if we have an incomplete right in iceberg, basically the metadata file is not updated and the data files you just have written are another part of the data set for the isolation. if you have parallel rights, it is supported by optimistic concurrency and you can even do better if you are using the REST catalog. from the reader point of view. since Iceberg uses metadata and snapshot to read a dataset while you are reading the dataset you go through the snapshot in metadata so you know which data files are in that snapshot.
So even if someone else, some of the producer writing new data, or even if the data files are deleted by a commit, your read operation is not affected. And we, we have a consistent view of the dataset on the other hand from the performance site. So during the spark planning, now we are using metadata to both find out the partitions and the data files. So the high meta store bottleneck is removed and using the metadata. Now, the file printing credited pushdown operations are part of a distributed planning. Now also to find the, to find out the data files, we are using metadata file, which has the list of data files. So while a single R PC call, we can find out which data files to read. So it’s now, it’s now a one time complexity operation for n and many partitions. So it’s a huge step forward if you have a lot of partitions especially.
So finally before the, before the migration we had to select the new data file and compressional algorithm to move forward. So we have, we have done several benchmarks and we selected park file format with Zs t d compression algorithm. And we have find out that when you order the string columns like the, so the string columns, you can even get better compressional rates. So here we see a sample sample data file which has around FIF 500 50 megabytes when it’s uncompress, and it can be compressed till 177 megabytes, which is around 70% compression when we are using Z S T D and order the string columns. Of course, this can change with respect to your data file, like the data you are storing and cardinal of the string columns, but it was a better option than snappy and ordering the, ordering the string columns were of course definitely beneficial. So that was all the work before migration. let’s continue with the, like, what was our requirements planning and execution of the migration.
So let’s start with the requirements. at the time of the migration, the state of our data lake looked like this. So we had 15 tables, around half a million partitions and more than 100 turbines of data files. Then we want to change our file type from o C to par. Yeah, and we also wanted to use the s t d algorithm for compression instead of snape. And finally, we also wanted to have table level modifications, like changing the partition schema and even data for several files for several tables, sorry. And when we look at it, it was pretty easy to see that it was not possible for us to have an in place migration and we had to perform a full migration. So we planned the full migration without no downtime. Without downtime, and it was like that. So first migrate all the available data to Iceberg, and then like in parallel, start writing new data into both Hi and Iceberg and have the data sets synced. When the data sets are synced, start reading data from Iceberg. So after all the readers are updated to use Iceberg have some time to monitor the system. And when everything is all right, stop writing in Hive and lead all data in Hive, also terminate all the resources. That was the plan. And to execute the plan, we built a temporary pipeline for the migration, and we used the park for heavy lifting and read this for storing team migration metadata. And finally, we have used Grafana to monitor the migration process itself.
And the Spark Job was responsible of reading data from higher table, then transforming that if it’s required for that table, for example, we might have changed the schema, or we might have, like, we might want to apply some transformations at data level, then write that into Iceberg Table and Spark Shop. Each Spark shop takes a table name as a parameter. So one spark shop runs a single Spark shop runs for a single table in its lifetime. And it finds out which me, which partitions to be migrated from a Redis instance. So in Redis, in a Redis instance, we store the, the partitions of high tables in a Redis set. So we knew that which partitions are not migrated yet. So the Spark job, having the table name goes to Redis reads the Redis set. And until the red set is empty, it pops it pops migration partition metadata, and using that partition metadata reads the higher table and rise into the iceberg table and continues like that.
This approach enabled us to scale the migration process by simply ending new Spark jobs and to like, to decide to decide on that, we were using our monitoring dashboard in Grafana. So we were, we were writing the progress in Spark drop to the red instance, and were, and we were also logging that in Spark driver locks. So on the right hand side, you can see a screenshot from our staging. It shows like the progress and number of partitions. We, we wrote this in red instance and using the red plugin in Grafana just showed it to our internal users. So we need the progress for each table, like how, how much percentage of it is completed how many partitions we have left to migrate, and what is the estimated time. So if we think the estimated time left to finish the migration for a table is longer than required, we can simply add new spark jobs and add new resource to just like decrease the time by paralyzing and accelerating the migration. So this us to like see the progress of the migration.
And finally let’s see, what was the outcomes of our migration from cost saving and performance perspectives. So half of our savings were from S3 site. So the S3 cost for the bucket that we were using for our data lake went down by 90%. And the main driver for this was solving the small files issue. Of course, there are other ways to solve this issue, like using the Spark partition, number of partition, et cetera. But like as a SaaS product, like we had same dataset with several megabytes for some clients and tons of gigabytes for some other clients. So like using the number of partition wasn’t like the optimal solution and Iceberg just sold it out of the box. So it was a pretty easy fix.
And how Iceberg does it is like out of the marks I spoke, just writes out data files in 512, 12 megabytes. And this is configurable for each table by the right.target file size bytes configuration here. So by just updating the table properties for this configuration, you can change the table, file the data file bytes for that particular table. So also if you’re interested in the like managing data files in Iceberg there’s a, so there is another session tomorrow from Russell Spit by, by Russell Spit from Apple called Managing Data Files in Apache Iceberg. So I also encourage you to join that session as well if you are interested more in that topic. So this, this feature enabled us to reduce the object count by 94% in S3 and also our storage, the number of bytes, and therefore the cost reduced by 35% using Z S T D.
And since we were having bigger files, also the compression work better, like the bigger file, the the be the better works the compression, therefore the small files solving the small file problem also have the, the reducing the storage amount as well. So that was from the S3 side and the other half of the savings were from compute size. So basically our EC two instance and EMR clusters cost for them went by 24%. And this was like, and this is achieved by removing the bottlenecks of high metal store by querying the high metal store and having the director listing on s3 removing this accelerated the query planning a lot. And also again, having the fever and bigger data files help having better higher performance. So our Spark jobs enjoyed around 24, 20 5% acceleration with directly of course reduces the bill for mrn c2. And when we combine our savings for storage and compute, we end, we ended up with five digit US dollars per month, which was amazing. And I think that was the like biggest ROI that we have seen from a migration at Insider ever. And that was it from my side. So thank you for joining this session listening to me till the end.