Table Formats Apache Iceberg Subsurface LIVE Sessions

Session Abstract

How can ‘Data processing and querying in the Data Lake at scale’ can be improved?Given that our Data Lake supports hundreds of customers with wide (thousands of columns), heavy (10 and 100 TB of data), and fast changing (thousands of appends/deletes/etc) datasets, we need reliability and scalability for a fast changing data environment. At Adobe Experience Platform we try to use the right tools for the right use case. So for our Data Lake, we chose to Apache Iceberg as our core foundation and Hyperspace as our indexing subsystem. In this session, Andrei Ionescu explains how Hyperspace integrates with Iceberg and other formats to speedup the processing time while keeping the data consistent and performant. Watch to the end of the talk to see a demo of Apache Iceberg and Hyperspace.

Video Transcript

Andrei Ionescu:    Hi everybody. My name is Andrei Ionescu, and I’m senior engineer with Adobe working on Adobe Experience Platform on the data lake. This talk is about data processing and querying in the data lake and how it can be improved with indexes, working together with Apache Iceberg, the core table format of Adobe Experience Platform.[00:00:30] I will start by giving you an introduction to Adobe Experience Platform, what it is, and what it is for. Next, we will talk about why Iceberg and the benefits of Adobe [inaudible 00:00:42]. And then we’ll get into the indexes and Hyperspace, introducing you to its concepts and APIs, discussing how it fits into Adobe Experience [inaudible 00:00:52]. We’ll also look at how Hyperspace works internally, we’ll discuss consistency and have a small demo if I’m [inaudible 00:01:00].[00:01:00] So Adobe Experience Platform is a data platform providing real time, personalized experiences. Producers, including our customers, partners, and internal solution, send data through either batch or a streaming interface. The type of data sent to us include profile data such as CRM system data, dimensional data, such as product catalogs, and time series data such as clickstream data for the website. [00:01:30] When data is sent in via batch through a bulk in JEST API or connectors, it is validated, it is transformed, partitioned, compressed, before being written out in the data lake. Downstream consumers will be notified when the data is ripe, and they can then query the data from the lake. Then it also can be sent over streaming, using our pipeline service. That data will be validated, transformed, before being made available downstream, and also being forked off into the data lake.[00:02:00] So let’s take a look at the consumers. First, we have the unified profile and unified identity service. These two combined enable the merging and stitching of user profiles [inaudible 00:02:15] of an identity graph generated from various data sources to provide a 360 degree view of our customers. Next is the experience query service, which is a high performance distributed SQL engine, that enable [00:02:30] customers to query their data and integrate it with external business intelligence tools. Next is a data science work bench, which enables data scientists to build intelligent capabilities into their user experiences using Adobe Sensei. And finally, we have customer journey analytics, which provides out of the box slicing and dicing of behavioral data.In the AEP, or Adobe Experience Platform, we have hundreds of [00:03:00] customers and thousands of datas. And we are growing quickly. Currently we are processing about 10 terabytes per day, but the more interesting data point in regards to our [inaudible 00:03:12] are the following. The data sets usually have thousands of columns, an average of 10 terabytes and thousands of appends or changes. Let me say it again. Data set have 1000 columns, very wide schema, 10 terabytes of data, and [00:03:30] they are changing a lot, 5,000 changes. With this in mind, let’s switch to Iceberg and indexes.So what is it, and why do we need these things called Apache Iceberg? For those not familiar with it, Iceberg is a table format. We define a table as a collection of that data that fit into [inaudible 00:03:54] schema. Data of any respectable size will be spread across a number of partitions and data files. But [00:04:00] table format gives us metadata about all those partitions and files that makes up a given table. So what makes Iceberg so powerful for us? That are three things. First is data reliability. For each use of multi-version concurrency control, Iceberg enables concurrent readers and writers to operate with snapshot isolation. Iceberg further adds asset support, allowing us for the first time to safely restate data in the data lake for use cases such as GDPR. In terms of [00:04:30] reads, Iceberg design allows for pre-planning to be done on a single process at the constant time, O(1), while interacting with the file system, greater than the linear cost, O(N), that we are typically familiar with. Iceberg also tracks statistics on the partition files and the columns in your data, enabling similar data skipping optimizations. And finally, we have scalability. Iceberg is a lightweight library that allows us to store our [00:05:00] metadata [inaudible 00:05:00] next to the data. So there is no need for additional metadata service that can act as a single point of failure.This is what Adobe Experience Platform is and what data lake uses as a correlate. What is missing? As majority of the databases have index system, our data lake needs that too. Sorry. And another advantage of having indexes is that we gain [00:05:30] the possibility of trading processing costs into storage costs, which makes cost modeling easier. There are two main categories of indexes, clustered and unclustered, and the covering index that we will discuss further, it’s part of a unclustered category among columnstore index, B-Tree, and any other tree index, [inaudible 00:05:49] or index, et cetera.A covering index is an index that contains all the information required to resolve the query. It completely covers [00:06:00] the query. We need it because first, Iceberg’s data skipping mechanism is inefficient in the cases where min max columns that’s have lots of overlaps. Second we need it because in the case of data sets with thousands of columns, filtering out the majority of selected to select only a couple is inefficient. And third, it leverage us to thread processing cost into storage costs.[00:06:30] Next we’ll discuss Hyperspace, and the support for covering indexes. As stated in the project page, Hyperspace is a open source indexing subsystem that brings index-based query acceleration to Apache Spark and big data workloads. It’s a simple system that allows users to build, maintain, and leverage indexes automatically for query workload acceleration. Hyperspace comes with simple API, [00:07:00] sorry. And we’ll see that a bit later. Comes with versioning and isolation. Consistency through incremental, refreshing hybrid scans, multi-language support, Scala, Python and .NET. Multi-user concurrency mode. And again, the possibility to trade processing costs into storage costs that we are very interested in.Microsoft is the main supporter of Hyperspace, and they did run the TPC benchmark [00:07:30] with Hyperspace, and the average improvements over multiple kinds of queries is two times with a maximum of 10 times. We did run our own test for our own use cases, and we noticed 20 times better performance when using Hyperspace. Let’s get into Hyperspace basic APIs.To understand the API let’s go through the full process of creating an index, using it, updating it, and finally disposing it. First it is necessary [00:08:00] to import Hyperspace and just adding these two lines is enough. Then we need to instantiate it. To create the index, we can use the grid index method against a dataset, providing a name for the index and the index definition, where index columns are the columns for joint or filter operation, and included columns are the ones used for project operation. We will be able to use this index that we just create [00:08:30] for queries over ID column, projecting the name column as exemplified with the query below. You can observe that the query is just a simple query that does not have any reference to Hyperspace at all.Next, we can check how the plan will be changed to make use of the index that we just created. To do that, we use Hyperspace explain method. This will output the plan without the index applied, the plan with the index applied, the selected index, and [00:09:00] the differences between the two plans. The important change when action is applied is a change of file scan action. As you can see in the plan with indexes, there is a file scan Hyperspace specific action. While in the plan we have indexes, there is an Iceberg standard way of accessing data through ScanV2 action. In the index is used section, we can see the selected index for the given query, and in the physical operator stats, we can see that actions were removed, and one added. [00:09:30] This shows that Hyperspace improves the plan by modifying the scan step, and removing any extraction that is no longer needed, when the index is used. For example, without the index, there is a project action required to limit the selected number of columns. While when the index [inaudible 00:09:46] used, the projects that are [inaudible 00:09:49] longer needed, because the index already has only those two columns.The explain method only shows the changes that will be applied to the plan, but [00:10:00] to execute them at the query time over data, Hyperspace needs to be enabled using enable Hyperspace command before executing the query. Code wise, the queries will remain as they are, no changes required on them, making the adoption of Hyperspace quite easy, and learning basic. To disable Hyperspace, the disable Hyperspace command can be used. To get insight on what indexes are available, Hyperspace gives us the [00:10:30] indexes dataframe, where details about the created indexes are displayed. The details are the index name, the indexed and included columns, the schema of the index, the physical location, and it’s state.Hyperspace has other management APIs that are very useful. To delete an index, we have the delete index command. It is a soft delete, and to restore a deleted index we can use the restore index command, which will bring it back, to totally [00:11:00] get rid of a deleted index. Use the vacuum index command, which will hard delete the index. The refresh index is very useful in the case of moving datasets or changing datasets, where you need to update the index. Hyperspace does not update the index automatically, you have to do it yourself. There are three types of refreshing an index: full, incremental, and quick. The full refresh scans the whole dataset and recreates the index. The incremental refresh only scans [00:11:30] the new changes and updates the index, adding new file to the index. The quick one is just a metadata operation that is useful in the context of hybrid scans, but more about hybrid scans a bit later.If new data it’s coming fast and the index is refreshed very often, the number of files created in other data index will grow and the index will become less efficient over time. To alleviate this issue, optimized index command can be used to modify the index file layout [00:12:00] to minimize the number of files.Now let’s understand how it works. Given a dataset with id, column, name, value one, nested, other, columns as seen on the left, creating an index over ID as the index column and name as the included column means that the new dataset will be created and stored as multiple Parquet files, 200 buckets by default on the disk [00:12:30] containing the ID and name column, with all the values extracted from the data table. It’s like a subset of the first dataset. So the red table on the left, it’s the table that dataset that it’s behind Hyperspace as an index. On the right, Hyperspace first will validate that the created index qualifies for the provided query, and will use that index instead of the original dataset, adding the changes that did not make [00:13:00] into the dataset in there [inaudible 00:13:01]. This is the hybrid scan functionality that Hyperspace has. It uses the index and adds on top of it only the data that was not incorporated in the index at the moment of execution. On the right, you see the red table plus the new file that contains two rows, and the result is below.As you can see here in this GUI, the previous plan did use a scan over the Iceberg table, but when [00:13:30] the index is used, that gets changed in a hybrid file scan action that contains the index data files, the one in green, and the new data file, the one in blue. This provides consistency between the data and index. There are other more complex use cases that Hyperspace efficiently covers, like data being removed, or files being removed, but we won’t get into the details of those modified plans, because the plans are too big to be listed [00:14:00] here.We’ve seen the APIs, we’ve seen how hyperspace works. Now, there is an important question that we need to answer a question related to performance and consistency. How to keep in sync the data and the index? This is a very good question, but if we think about [inaudible 00:14:17] that Hyperspace has and provides consistency, a better question would be, how often should I refresh the index to be performant? And the answer to this question is [00:14:30] depends on the ingestion pattern. It really depends on how much data is ingestion, how often. It depends on the schema, and of course it depends on the read side, the queries executed over the data. Now hybrid scan makes possible the asynchronous writes to both data and index, so neither of them gets blocked by the other. At the same time, it uses the most of the index that is already computed, and that’s to it only what’s not covered, only what’s new. [00:15:00] Hybrid scan accommodates all the use cases, simple ones like appending new files. We have a hybrid file scan, as we’ve previously seen. Complex use cases will help create a union between the index data and the new data, while deletion will filter out the deleted rows from the index.Now let me show a short demo on what we’ve covered. Okay. So I’ve put [00:15:30] together this notebook for us to play and see the integration between Iceberg and Hyperspace. So first, we need to set a table location for the Iceberg table, and an index name for the future index [inaudible 00:15:48]. Next would be to create and to import the Hyperspace library and instantiate it. Next is to [00:16:00] create a dataframe, we’ll create a simple dataframe with two rows, with a simple schema, you see a couple of integers, are a few strings, and a struct. It has only two rows, can see those here. And we’ll use this dataframe as the base for our Iceberg table.Now we’ll create the Iceberg table. So this create Iceberg table is a helper method that they put together, [00:16:30] because when you create an Iceberg table from the scratch, it requires four or five lines of code, like setting the schema, setting the partition spec, and some other options. When you create the Iceberg table. If there are questions about this function, we can go into it and look into it. The next thing is to read, let me scroll a little bit, is to read the Iceberg table using the Spark Iceberg data source. And [00:17:00] as you see it’s the same structure and the same data. But this time we did read it with Iceberg data source. Now let’s create an index over this dataframe called ISDF. The name we have given at the beginning at the top of the [inaudible 00:17:14] index zero one. And the definition, the index definition use the ID as the included column and name as… The included column and ID as the index column.[00:17:30] Now we’ll look at the index’s dataframe, and see what indexes are there. Let’s wait a bit to finish. Okay, it took a while. It should took under a second, but I don’t know why. So we have the index zero one, [00:18:00] the index column included column, the default number of buckets, 200, the schema of the index that has the ID that is an integer, and the name that’s a string, and also the location and the state active. I’ll just create a query. As you can see, we’ll select only two columns out of those six or seven columns that we have, and we’ll filter only [00:18:30] searching for ID with value two, or ID with value three. At this time, we didn’t executed the query, just created. Now we’ll look at the explain methods that we’ve seen in the presentation. So it’s the same. I won’t go through detail, only that the project step it’s removed [inaudible 00:18:48] and we only have this file scan Hyperspace. The location of the index is here and the differences between the two plans are listed below.Now [00:19:00] let’s run the query without Hyperspace. By default, Hyperspace is disabled. So we run it without and see what is happened. So it only did find the row with ID two, because we’ve only had one and two. Let’s have a look on the SQL. So this is the plan, data source, V2 scan, filter, and project. As we’ve seen in the plan. Now let’s run it with Hyperspace enabled. As you see, we enable [00:19:30] it with spark.enableHyperspace. The same results. Now let’s have a look at what happens in behind. We have two jobs, but first let me show you the query, and then I’ll explain why we have two jobs. So we have this scan Hyperspace, and then the filter. And in this case, you can see that we do have two files, which gives us the parallelization that we see with two [00:20:00] jobs. So we’ll do the fact that the index is bucketed by the index column. In our case, the index column is ID, and ID has two values, one and two. It is bucketed into files. One file contains one, and the other [inaudible 00:20:16]. That’s why we have two files. So the parallelization is increased here.Now let’s do some management, let’s delete and let’s see how it looks like. So we just deleted the index, [00:20:30] but we didn’t hard delete it yet, it’s just deleted with the state deleted. If we would execute the query again, it would be not this interesting, it would not do anything. If you want to properly delete it, we can use vacuum index and it will disappear from this list. Now let’s put it back because we want to do some more stuff. Okay? We have again the active.Now let’s [00:21:00] do something a bit more complex. Let’s add a new row in the Iceberg table. And for the beginning, we’ll keep the index not synchronized. So we added a new row. Now let’s execute the query on the new Iceberg table, updated Iceberg table, but without Hyperspace to show you the results. So now we have two and three because we added the row with ID three, and we have [00:21:30] the standard Iceberg reading plan. All right, now let’s execute it with Hyperspace enabled. [inaudible 00:21:46] see already we have the parallelization of two jobs. [inaudible 00:21:49] the query. Scan Hyperspace, but this time we can see that we have three files. So [00:22:00] two files are from the index, and one file is from the data itself. Because the row that we added, the row with ID three, has been added as a file to the Iceberg, and Hyperspace detected that it’s not up to date, so it takes what’s inside the index and added the new file on top of it.Then you go further. Let me show you the files. This is another hyper function that I put together to extract the files from the plan. So the [00:22:30] first file is coming from the index zero one, right? The first value’s up to here. The second file is this one it’s index zero one. The third file comes from the ice table. So this is a hybrid scan that I’ve been talking about. Now let’s refresh the index, update the index and put it in sync with the data. Now let’s run [00:23:00] the query again. We won’t disable Hyperspace this time, we’re just run the query and look at the jobs and at the files. So two jobs, let me show you. A number of files three, but this time I’ll show you one of all three files come from the index, because it’s up to date. [00:23:30] Yep. So this is index zero one, second is index zero one, the third it’s index zero one here. So what refresh did is, it did read that file, extracted the two columns from there, it created an updated file and updated the index. So this concludes the demo. Let me get back to the presentation, I have just a couple of slides.So Hyperspace is a new technology, [00:24:00] and of course it has some gaps in the roadmap to close the gaps. It lacks support for nested fields. Indexing on a property of a strong type is not possible right now, but work is being done to close this gap. The current release only supports cooperating indexes, although there are quite a few other types out there, and the plan is to support multiple type of index. And there is work doing done in this area. And the second index coming in Hyperspace is data skipping index. Hyperspace supports with [00:24:30] Parquet, with open source Delta and Iceberg. [inaudible 00:24:34] support for other file types or table formats like Avro, ORC and Hudi or [inaudible 00:24:40]. In terms of where Hyperspace is used [inaudible 00:24:46] Azure products, Microsoft Azure product in Azure Synapse, Azure HDInsight, Azure Cosmos. We on our side are doing work on trying to add it to the data lake [00:25:00] in the Adobe Experience Platform. This concludes my presentation, and at the end we have some more resources. And please let’s start with a question, if there are any.Speaker 2:    Yeah, thanks Andrei. Let’s go ahead and open it up for Q&A. If you have a question, use the button in the upper right. If not, put it on the chat. I think we have one coming in.Andrei Ionescu:    [00:25:30] So I’ll keep this notebook open a little bit more, because maybe there are some questions in regards to…Speaker 2:    It looks like Furbish Kumar. Do you have a question for Andrei? Your microphone is not on if you’re speaking. Let’s move on to a question that is on the chat right now. [00:26:00] Abraham asks, does Hyperspace work with Hudi or Delta Lake format as well, in addition to Iceberg?Andrei Ionescu:    Hyperspace does work with Delta from database right, Delta from database with open source version. It doesn’t support Hudi though, but I’m guessing that that would come on the roadmap. Hyperspace is a pretty new technology, so [00:26:30] the roadmap, it’s just starting to being built. So there is a lot of improvements there.Speaker 2:    Okay. We have another question for CT. Rama is hyperspace creating duplicate dataset with all columns needed indexed by ID column specified.Andrei Ionescu:    So the short question is yes, it creates a separate data set in the location that Hyperspace is [00:27:00] set to store the data. By default that’s Hive Metastore. I mean the Hive warehouse path. It’s creating dataset based on the default or original dataset extracting only the columns that we specified in the indexed column and included column. So yeah, it’s creating duplicate data and that why there is a question [00:27:30] is, how consistent is hyperspace? Is it consistent or not? But if we use hybrid scans, it will be consistent.Speaker 2:    Another question came in from Hamid. Can you read the stale data from index for snapshot queries from the past, even though index is not refreshed?Andrei Ionescu:    If the question is related to Iceberg, there are a [00:28:00] lot of strategies that can be implemented in Iceberg. No, stale files, no, we don’t read, we only ask Iceberg, give me what’s new from this moment up to now, or the other strategy is, give me all the files using this data set, all the files used, and I can compare with the one that I have.Speaker 2:    [00:28:30] Got it. And then last question here is from Munir. Does Hyperspace work with Delta Lake or only Iceberg?Andrei Ionescu:    It works with Data Lake, but with the open source version. So if you try to run Hyperspace in the Databricks workspaces, it will fail because the Delta version inside Databricks, it’s very performant and they have their own improvements there. [00:29:00] And there are some conflicts inside there which makes it crashing. So, yeah. But if you use open source Delta, yeah, it works with that. [inaudible 00:29:16] like time travel. So it will use these when the index is created, it will use the time travel when the query’s done. I mean, it’s pretty impressive [inaudible 00:29:29] with [00:29:30] Hyperspace and Delta.Speaker 2:    All right. We got one. When Hyperspace is used to read index data, how does it fit into the Meditech metadata reads into Delta slash Iceberg reading both or only the unindexed?Andrei Ionescu:    So when it creates, first it goes to the format reader and asks [00:30:00] the format reader, please give me the data, right? And the files. And it receives the list of files and it scans the files and it makes an index on those files with the content that’s inside. That’s how the index is created.Speaker 2:    Do you require a unique key to do the refresh incrementally? This is from Hamid.Andrei Ionescu:    No, it’s not. There is no key for that, but Hyperspace support versioning. [00:30:30] I can show you that here. You can see if you still look at, you can see now after the refresh, I have version one. So if you are interested in making a synchronous [inaudible 00:30:45] Hyperspace and snapshots on the other table, like Delta or Iceberg, you can do that, but that’s not supported by Hyperspace, you needed to do it yourself. Hyperspace works by scanning the files or asking [00:31:00] the format, the table, as new from this point in time, it has other strategies. It doesn’t keep. And putting it in sync will limit your asynchronicity when using these two technologies.Speaker 2:    And this is the last question, is Hyperspace enabled by default in Synapse Spark?Andrei Ionescu:    Ah, that’s a good question. I don’t have the answer. But I think that’s the plan to have it. I’m not sure if they already have it in [00:31:30] Synapse.Speaker 2:    Got it. Well, everyone, thank you. That’s all the questions we have time in for this session. If we didn’t get to your questions, you’ll have the opportunity to ask Andrei himself in the Subsurface lap before you leave. We’d appreciate if you could please fill out the support. It’s a super short Slido session survey. The next session are coming up in just five minutes, the expo hall is also open. So thank you everyone. And have a good day.Andrei Ionescu:    [00:32:00] Thank you guys. Thank you everybody.Speaker 2:    Thank you, Andre.