March 1, 2023

11:45 am - 12:15 pm PST

Lakehouse: Smart Iceberg Table Optimizer

The new lakehouse architectural design pattern provides many technical benefits like ACID support, time travel for machine learning, and better query performance. Apache Iceberg implements this pattern and provides the flexibility to further enhance based on real-world needs. Using Apache Iceberg table format requires special vacuuming like snapshot expire and orphan removal for data governance and metadata and data compaction for efficient and fast access of data.

This talk will discuss how to handle these table operations at very large scale by keeping cost in mind without compromising on data engineering, ML, analytical, and BI use cases. Automating these operations makes life easier for engineers leveraging the platforms without having to worry about how Iceberg internals work. We will share lessons learned for optimizing the streaming and batch data sets in a cost-effective and efficient way.

Topics Covered

Enterprises
Iceberg
Open Source

Sign up to watch all Subsurface 2023 sessions

Transcript

Note: This transcript was created using speech recognition software. While it has been reviewed by human transcribers, it may contain errors.

Rajasekhar Konda:

Hey, this is Raj from Apple. I work as an engineering manager for the data Lakehouse team within, AML data platform. Our group provides stories and streaming application services for machine learning, data engineering, and analytical use cases. Today, we are going to talk about Apache Iceberg table optimization service. We have built it in-house. There are two parts to this talk. First, I’ll cover the introduction to Iceberg, which you guys already know about, but I’ll try to cover some of the benefits, what we have seen, that matters to us, and what is that you need to run Iceberg at scale. My colleague Steve, will talk about its core optimization surveys, which we’ll call it as a smart Iceberg table optimizer, and he’ll explain why it is smart and architecture and internals of it. Steve, would you like to interrupt?

Hongyue Zhang:

Hey, everyone, this is Hongyue and I go by Steve. Today, I’m really happy to share my learning about Apache Iceberg, and then I’ll pass back to Raj.

What Is Iceberg As Compared To Apache Hive

Rajasekhar Konda:

Thank you, Steve. I’ve already covered mine, so I’m going to jump into the talk. What is Iceberg? We have seen part K or C as file formats, but, what is table format? Table format is nothing but how do you lay out the files in a table. Let’s talk more about comparing it with Apache Hive. Apache Hive is a distributed open source data warehouse over HDFS. It organizes the files as directories like DB name, table names, and slash partitions. All files in the directory are actual data files that contribute to that partition. Depends on your predicate condition. It can actually look at the files in the partition or all the files in the table. Iceberg manages these files in a little bit different way. It follows a tree leg structure for metadata files. The metadata layer in the middle of the right side image has these metadata file manifest. File manifest list. Let’s not jump into what each of these actually contains. Assume that these are the files that maintain actual metadata and references to the data files that are based out of the snapshot. Let’s talk more about the snapshot. Every commit operation, an Iceberg creates a new snapshot. Snapshot differs from actual data files via these matter data files. Interesting point is not all files in the directory for an Iceberg table, contributing to the table or a partition. It’s all dealt with by this snapshot.

Why We Considered Iceberg

We’ll discuss a few benefits why we considered Iceberg in the first place while we are building this large scale data platform. Iceberg is asset compliant, which you already know, and it allows concurrent IO operations. This is not very common in traditional data lakes that you have seen. Users can perform, update and delete operations. This is very critical for us to consider why Iceberg matters to us. And this has become very critical for infrastructure needs, especially where we have privacy and GDPR type of compliant. And in case of Hive to delete or update the general practices, we have to insert override everything, even though one file or one record is impacted, we have to read and write the petabytes of data, terabytes of data, whatever the table size is. And that’s not efficient. It’s a costly operation if you’re doing it even in the cloud environments.

Time Travel

Time travel is another cool feature. We have seen growing use cases on the ML side. A lot of our ML users are actually trying to adopt Iceberg, because of the time travel feature. We’ve built in-house tools to provide these kinds of time travel features. Before, we’ve chosen the Iceberg. The main reason is, ML engineers would like to test the models with the previous version of the data. If the model is changed, they would like to test the scores, what was the previous score versus the new score after the model is changed. So that is very critical for us to validate the accuracy of the model. And snapshot here helps us to do the time travel and rollback.

Hidden Partition

Hidden Partition, this is something we have seen, like people, we don’t have to stick to one partition. Partitions can evolve over time with hidden partitioning. Iceberg avoids reading unnecessary partitions automatically. And consumers don’t need to know how the table is actually partitioned and add extra filters to their queries. These are some of the things that we need to know if you’re dealing with traditional high. We have noticed better performance and reliable writing using Iceberg to S3. The S3 writer inside open source Iceberg. We have seen some of the jobs run for a couple of hours without Iceberg, and they fail. Six hours of compute is a waste and we have to delete whatever the data that has been done and then rerun the job. So we have noticed that the reliability is a concern if you are using the Apache Iceberg Library inside the Spark. The reliability is a hundred percent. We have not seen a single job fail because of this particular reason.

Column Level Stats

Let’s talk about the last point, which are column level stats, which are very critical. If you want to really optimize and use Iceberg at its full benefits, seek orders. The order of MinMax files. These MinMax stats help you to read less data because if you want to increase the performance, read as less as possible. We all know partitioning, which falls under this category, but with Iceberg you can achieve more with this column level stats.

Data Retention

Data retention in previous, so data retention, there are two versions to the data retention. One is soft delete. If you’re delete, and if you perform a delete on Iceberg table, it is actually a soft delete. It does not delete from the file system. Hard delete is what actually we treat it as discarding the files. It is very important for various reasons. One is cost. You can decrease the cost of S3. The other reasons are functional reasons, privacy and GDPR. They want the files to be discarded even though they’re not accessible to the read operations; they want the files to be discarded, which is very critical. Steve will talk about how to achieve these hard delays without users worrying about how to deal with the data that was in the previous snapshots.

Query Performance

Query performance, apart from column level stats that I explained in the previous slide, data and metadata compaction will increase the query performance by avoiding too many scan files. We have noticed if you’re trying to list the partitions without the manifest rate rate operation, we have noticed for hourly commit tables it takes 15- 20 minutes on that site. If you take high maintenance, all the partitions in its metadata, so you get the partitions immediately. Whereas in Iceberg, it still needs to go to the manifest files and drive all the partitions. What we have done is with the manifest rewrite operation, the metadata will be optimized. So, we have less than a minute, we have seen like returning the op, the list of partitions of the table. I’m talking about the large tables here.

Smart Table Monitoring

If you’re running Iceberg at the production scale, thousands of tables, millions of files and petabytes of data, some of these metrics help the number of snapshots. It’s not optimal to keep a table for the longer snapshot. It’s a good practice to expire them. Data and metadata file sizes are very important. So that if you want to trigger something like data compaction or metadata compaction, these metrics will help us, or provide inputs to the underlying system to run the jobs. Earliest and latest snapshot to see if a table is live or not. I’ll hand it over to Steve to talk about the second part of the stock.

Architecture Of Smart Tables

Hongyue Zhang:

Thank you, Raj. We’re trying to solve the problems above with our smart table optimizer. It’s our in-house solution to manage the Apache Iceberg at scale. In the next few slides, I’ll talk all about it in more detail. So at a very high level, we can think of three components. It has the control planes persistently and the data planes. Control plane is the way we identify the optimization candidates, and we write its optimization parameters into the persistent layer and the persistent layer, what we had as a collection of our Iceberg tables and the data plan is actually the one who applied the desired optimizations using the Apache Spark in distribute manners. At the end of execution, we also write the optimize results back to our persistent layer. So the diagram below showed how control point data work together. So we can see that because in the control point, we need to integrate with different plugins to source the list of the optimization candidates.

And, we need to apply the reasonable default. So we build the conflict resolution logic and deduplication into our spec processor or process the table in order, apply the default, and then write this record in a batch to our own internal set of Iceberg tables in the persistent layer. So we decided to use Iceberg tables for our own internal data storage. This helps simplify our dependency graph. And, we have relatively simple read and right patterns in our jobs in the control plane and the data planes. So the data plane, as you can see on the right hand side, it’s actually much larger. We group jobs by its functionalities and some of the data plane jobs are just a paralyzable wrapper on top of Iceberg’s spot actions. We can see expired snapshots, remote orphans, compacting the small data files and rerun benefits.

Optimization Of Tables

Those are the required and optional maintenance tasks suggested by the Iceberg community. Other jobs such as monitoring this provide us the visibility into the business adoptions, into the cost reportings, and to make sure Iceberg is our state performant over time. So one of the questions we ask ourselves is, as we have more and more tables, all of the tables we found in the Iceberg catalog worth optimizing width. So, optimizing helps with the query and provides the user with consistent and robust experience, but at the cost of limited compute resource. So, the answer to this question is kind of dependent on business needs, but what we figured worked best for us in the past as based on the historical usage. So we built the integration with the Spark listeners, with the Trino execution events to figure out what’s the hot partition being used. And this can benefit the most from our optimizations, but we also know that the past doesn’t represent the future. So we implement a mechanism to provide explicit opt in and opt out mechanism so that we can cover the table we know for certain.

Tracking Execution History

Another problem we have had in the past is how do we track our execution histories? We started small, we can just search off our Spark application log to figure out the history executions. But as we are getting more and more tables under the management, we decided to persist all of this optimization input and output into a table format so that we can query these results later. Below, you can see the example for the all firm removals. We can see the Spark actions on the right hand side, which probably needs a table identifier and older than timestamp to execute. On the left hand side, we model this input and output into the corresponding tables so that we can write these number of orphan files removed, Spark application IDs, into our own tables using Iceberg to start our own internal data sets.

We can easily debug if there’s a particular job failure or we can see the aggregate chance of our optimization over time. And the best part is we can learn from our histories to decide what kind of optimization we should prioritize in the future. 

Deep Dive Into Remote Orphans

So, speaking of executions. Let’s dive deep into the remote orphans. Essentially, how Iceberg remote is orphan is comparing the desire with reality. So desired in this case are the number of files tracked by Iceberg metadatas, and the reality here is the number of the files we found on the underlying file systems. And if there’s a divergence between the desired and the reality, then here came the orphan files together with condition. How old are these files? We can see if the given orphan can be safely removed or not. It is important to know that this operation actually needs to scan all of the files tracked by Iceberg and also the files on the file system.

So this can be the most time consuming part of our data plane jobs. And this becomes more relevant if our underlying data storage is on the S3, because for S3 we know there’s a non limitation. It can only list up to 1000 objects under a given prefix in a request. So as we see on this diagram, we see there’s eight files tracked by the Iceberg metadata, but we found actually nine files on the file systems. This particular d3 DOT parquet file, it’s the orphan and I think it should be removed given that we know if we have more data being tracked in Iceberg tables, it takes longer for the entire operation to complete. You can try to improve by either reducing the number of files tracked by Iceberg tables or removing the number of files found on the file system under the given prefix. So this leads us to the next slides on the data compactions.

Compact Data Files

So a patch Iceberg offers a way to stream the data into the table with a high parallelism, but oftentimes this implies that we are also running into the small files problems by compacting those small files together with sorting will improve our performance. Reducing the number of files means we spend less time on open and cost files when we are trying to do the read. Currently, the data compaction allows us to optimize on protection levels except compact of the data files we found in our table. However, the data compaction will only create larger files in a new snapshot. It does not really clean up the previously small data files. In order to handle this destructive part, we actually need to look at another Iceberg operation called the expired snapshots. 

Expired Snapshots

So snapshot, as many mentioned, it’s a Iceberg primitive to track all the data change. It enabled the beloved time travel features where data consumers can actually go back in time and look at data at a particular timestamp. However, this implies we’re getting more and more complex when we are trying to remove a file. So a file can be safely removed from Iceberg if there’s no metadata reference to that. And, this came to rescue with the expired snapshot actions so that they can identify the snapshot to expire first and then remove all the files, which only is reachable by these expired snapshots. So let’s look at an example using the data compaction example we had in the previous slides. We can see that manifest M2 data file D2 and D3 can be safely deleted because snapshot S1 is removed, it’s expiring because only these metadata and data files are reachable by the expired snapshots.

Looking at the size distribution of all the tables under the management, we realize a few interesting things. Those tables are like marine animals. We find out there’s many small tables like anchovies. They are not updated often, maybe get read once or twice per week. But there’s also the problem of whale tables with extremely high throughput and resource hungry for optimizations. So it is important to group them by different Silos. We can allocate corresponding CPU and memory resources so that we can schedule them as a different cadence to meet our optimization promise and avoid resource contention between them. 

Adding Silos To Scale

Since the data plan operation is most, heavy work, we start by separating tables under management into our logical groups we call Silos. There are no overlapping tables across the Silos, and we can scale our data plan operations simpler by adding more Silos and there is always the need to actually separate data by the physical separations. So we built another mechanism to build a control plane in a new account to scale our entire system for both control plane and data plane. I think that’s what I have. I would like to pass back to Raj.

Contributions To Apache Iceberg

Rajasekhar Konda:

Thank you, Steve. This is the last slide of this talk. In this slide we’ll talk about some of the contributions we have made. Back to the upstream. We use open source Iceberg. We are on 1.0, but we have branched out for internal use and we have some of the patch customizations that work with our other ecosystems. We have continuously provided some commits with features and bug fixes and specs to the upstream. Last quarter itself, we have done 20 plus commits. I’ve listed some of those here. We are also looking at implementing IRC Iceberg Risk Catalog. That is another one. We don’t have the spec released, but we don’t have the implementation today. We relied on, as a catalog, the catalog options are either high meta stores, or we have Glue catalog, project Nessie, and some of the other enterprise catalogs. We don’t have a proper catalog which actually supports Spark consumers as well as for the non-Spark consumers. It supports Spark and Trino, but what if a distributed processing engine is something else, right? So we are looking in that direction. That’s about the talk.

QA

Moderator:

Great.

Moderator:

There we go. Thanks Steve. Thanks Raj. Any questions here in the room? 

Audience Member:

Hi. Do y’all do any data quality analysis on your metadata? And if so, what kind of metrics do you collect?

Hongyue Zhang:

The question is, do we do any data quality check on the metadata? I think this is actually one of the highly requested features from our customers. Our downstream users were actually data engineers and data analytics. They care about actual data qualities. So for us, our team is mostly the data infrastructures. We are providing the ways and the tools for them. Because we don’t know the specific data sets, there are too many data sets. We can’t have a generic quality check on all of them. So we provide them the tools and runbook so that they can actually run the Iceberg manager queries to check if the data ingested right into the tables that are being tested against their own scenarios. Because every team has a different quality of standards, whether or not there’s enough records or there’s a specific new value counts in the tables. We provide them with ways how this can be done in the Iceberg tables, but we don’t do them at scale for all of our customers. I hope that answers your question.

Moderator:

Great, thanks. Any other questions here in the room? Cool. We do have a couple here online. If anyone else comes up with a question, let me know or raise your hand. One of the questions was, you said you used the Spark listener in the Trino event. The listeners. Is there a reason why you chose that over the existing Iceberg Listener framework?

Hongyue Zhang:

I think, because our downstream users actually query the Iceberg tables using Spark and Trinos. We start there because this is a maximum for our user to actually query the Iceberg tables. I figure it’s more representative of how data is being used in our organization. So, we start here, but I think we’re also going to look into Iceberg listeners if there’s any.

Moderator:

Cool, that makes sense. Thanks. There was another question here. Will any of the work that you guys have done around the automatic optimizations, so not necessarily Spark actions, you know, the stuff that’s already in, you know, rewrite data files, that kind of stuff’s already in the project. Are you guys planning on contributing any of the automatic optimization, the services to the community, to the open source project?

Rajasekhar Konda:

Yeah, we’re working internally. We are planning. We have initiated the talks. It takes a couple of phases to review and get approvals. But the plan is the service that Steve talked about. The smart table optimization service. We are planning to contribute it back to the community and it’s very smart. We have saved a lot of money. We don’t blindly go and optimize every data set. The smartness of this process is to find the right candidates. The right candidate tables to optimize in the right interval of time. So that is very cost effective because if we start optimizing every table, then we’ll be spending a lot of money simply optimizing tables irrespective whether somebody’s reading it or not. So yes, we do.

Moderator:

Great. Yeah, looking forward to that. That’s awesome. There was another question here. Data. Yeah. How are conflicted data files when you’re doing compaction handle and other writer processes coming in terms of scheduling and how do you guys do that, you know, conflict resolution?

Hongyue Zhang:

Yeah, I think, this actually we rely on the benefit for Iceberg, because it, as many mentioned, actually allow the concurrent modification of the tables and they’re separated by the snapshots. So compaction by itself does not change any content of the data files. They are just basically a change of the data size with some sorting, unless the concurrent right has the impact on the schema to have the modification. Most of the compaction we saw does not interfere with the rights issued by users.

Moderator:

Cool. So the inbuilt stuff you find in the product works pretty well?

Hongyue Zhang:

Yeah, works pretty well for our use case.

Rajasekhar Konda:

It really works very well because, maybe I can share one of the large tables we deal with. The large tables that we deal with contain over 10 petabytes of data, one single table, right? That’s one example. And we have thousands of tables covering lots of data. Different varieties of data. So we have small tables, we have large tables with thousands of columns. We have different dimensions of tables. So, fitting everything for all is not that easy. So, one size does not fit all. So, we have to do all these customizations to make sure that it works very well for the rest of the ecosystem.

Moderator:

Nice. Yeah, it makes sense. Yeah, definitely excited to see the work when you contribute it. Do you have any preference on, I think, more general like normalized versus normalized as foreign vice? But I think they mean normalized more of a, the schema design. How do you guys go about the normalized versus denormalized kind of table design and decisions there?

Rajasekhar Konda:

So, that’s data modeling, right? So, that depends on how the tables are being queried by the consumers for reporting purposes, right? Most of the data engineering tables come onto the data lakes for the plaque table concept, right? If the tables are with the various levels of hierarchical dimensions, then joints are expensive. So, flat tables are what most people prefer, as much as they can. But, if they have proper use cases. I’ve seen that. Not in this particular use case, but in the past use case, if finance use case, marketing use case, they have different data sets coming from third party data, then they would go by these different data modeling techniques. But, we support those data modeling techniques. We don’t actually do the data modeling for our users as a platform. We try to see what pain points they have. If any particular data modeling technique that they want to implement and it’s not being supported, then we’ll go and help them to support that.

Moderator:

Great. Yeah, that makes sense. There’s another question here. Does the compaction apply to copy on right or more or emerge on read tables or both? Any major or minor, compaction policy differentiator considerations between them?

Hongyue Zhang:

Yeah, I think right now most of our data users are still using a copy on, right? I think for the merger on read, we’re still exploring whether or not it’s safe to release without. So, I think the interesting part we figured out recently is that when we do the data compaction for the V2 table, which is merged on read. They’re not actually covering off the delete files because sometimes you’re deleting a few records out of hundreds or thousands. So, by default, I think compaction only looks at the file size small enough that works compactions. So I believe they are also a work in progress, in the open source community to handle this specific delta delete files so that we can actually properly remove them in future. So I think we’re still exploring V2 for compaction.

Moderator:

Cool. Great. Thanks. It looks like we have time for maybe one more question here online. I think you already addressed this. Is there anything else you want to add? Your expert tables can contain deletes or updates, compaction could fail, do do a delete, you know, winning the commit race and conflicts. Anything to add on that, but you guys kind of already addressed this.

Hongyue Zhang:

Yeah, so I think one thing to add is that our compaction is by partitions. We don’t just apply to all of the table art all at once. So, we over the petition say if it’s partitioned by hourly, we see if there is any specific reason. If there’s a lot of small files, we identify that first and then we apply them so that this kind of reduces our chance of collisions because we’re more selective on what kind of partition are we optimizing for.

Moderator:

Yeah, it makes a lot of sense. Okay, so that is all the time that we have. Thanks everybody for attending. Both in person here eating lunch, as well as online. There was a lot of good feedback online that we could show you as well in the chats that people like the session. So thanks a lot guys. Give it a hand for Steve and Raj.

Rajasekhar Konda:

Thank you.

Hongyue Zhang:

Thank you Jason.

header-bg