March 1, 2023

11:45 am - 12:15 am PST

Dive deep of Apache Iceberg on AWS

Join this session to learn about the common challenges of using traditional file formats on premises and how leveraging Apache Iceberg on AWS helps you overcome these challenges. You will also learn about the comprehensive and advanced features of Apache Iceberg with elaborate demos that showcase the unique capabilities of Apache Iceberg on AWS.

Topics Covered

Open Source

Sign up to watch all Subsurface 2023 sessions


Note: This transcript was created using speech recognition software. While it has been reviewed by human transcribers, it may contain errors.

Veena Vasudevan:

What is Change Data Capture and General Data Protection Regulation?

All right, so this session is going to be a dive deep on Iceberg capabilities on AWS, and we have a lot to cover. So let’s get started. Let me start with some of the use cases that really motivated us to invest in the Iceberg space. And we have spent a lot of time and effort integrating Iceberg into some of our products. So starting with change data capture or CDC. CDC is basically keeping your immutable data lake up to date with the changes in your source system. And this is especially useful for migration use cases. So when my customers are migrating from on-prem to cloud, they want to do incremental or delta pools and migrate just the changes that have been made incrementally. And then you have GDPR. GDPR is General Data Protection Regulation or CCPA which enforces policy loss.

And then you have deduplication, especially when you’re working with streaming datasets, high throughput streaming with Kafka. They are pretty notorious for introducing duplicates into your system and you would want to ideally deal with those duplicates before they enter your data lake. And then you want to enforce minimum file size if you’re working in the big data systems, you already know about the small files problem, and they really impact your performance on the readers. So you need to also be able to compact your small files before they enter your data lake or do them on a periodic basis. And lastly, you have a time travel use case as well. You may have your data analyst want to query previous versions of your table or previous comments. Let’s say to answer questions like, how did this particular record evolve over a period of time over the period of three years?

Or maybe also run point in time queries? How did this particular record look like back in June, 2020? So to answer questions like this, especially for claims systems or financial data, this is a very common use case. Now starting with the streaming data ingestion pipelines. So you may have some source systems, let’s say mobile apps or web APIs or whatever, and you can call the information from the source systems or IOT devices maybe using Kafka or managed streaming for Kafka in this case, or you may use Kinesis or Confluence. So whatever you use. And then these events can then be processed on a continuous basis using ERM, and you would ideally use park streaming for this. Park streaming is highly optimal and it basically gets all the events from your source and does some ETL transformations on top of it, and you would ride the results to your S3 data link.

Challenges With Streaming Data Ingestion Pipelines

And while doing this use case, you may run into certain challenges. So the first is on the writer’s side. First, you may want to make atomic changes. So what this means is that you don’t want to write partially. And you may also want to achieve a reader and a writer isolation which is also pretty important in the asset terms. And also you want to achieve high throughput ingestion, and that’s pretty much why you’re doing streaming. And also compact your small files. And like I said, streaming systems always lead to too many small files because it’s running on a continuous basis. And also you may want to deal with concurrent writers too. And this is because you may have multiple streaming jobs that are trying to consume or write to the same tables.

And now on the querying side, again, you have certain challenges while leading the data too. First is you may want to run incremental queries. Like I said, you may want to know what are the changes that are being made to your dataset over a period of time. And you may also want to run time travel queries or point in time queries as well to know how a particular record or table looked at a particular point in time. And lastly, you may also want to evolve this multiple engine support. So the data analyst in your team, they may bring diverse skill sets. So some of them may be well-versed with Athena, some of them may be well-versed with Dremio. Some of them may prefer certain engines, like some of them may want Spark or Hive or Presto. So multiple engines and you need capability that integrates or I operate with all of these different engines and tools.

Using AWS Glue to Migrate Table Data

Now, coming to the first use case, CDC it’s a very common use case that we hear from our customers. And this is like basically you have an operational log, let’s say a change log on your database. In this case this is MySQL database or Postgres or whatever. And you would use data migration service DMS, which would basically call the changes from your source database, whatever changes you make as and when you make them. And it would drive those changes to your S3. And this is your ingestion tire, and this is where you would say your raw data is. And then you can have a downstream ETL process, which could be EMR or in this case Glue. And you may have a Spark job that basically takes the data from your raw S3 bucket and applies some transformations on top of it and raises it to an analytical tire which you can further expose for machine learning or for dashboarding or whatnot. So in that case, you have the same challenges that I talked about earlier.

And apart from that, you may also want to do sorts and deletes. Of course, you want to propagate the changes in your source system to your analytics tire. And the next is GDPR data ratio pipelines. It’s a very common use case too. Like we have our customers who run monthly batch deletion pipelines. They may want to go through the entire data set to delete just one or few records. So in that case as well you have certain challenges. You want to achieve role level episodes and deletes, of course you want to delete the records and also concurrent writers, because you can have multiple users that are scheduling jobs, delete, delete accounts from the same table. Now, moving on to Iceberg on AWS. So I think to give a briefing on Iceberg you will hear about Iceberg a lot today and tomorrow, so I don’t want to rehash that information.

But basically Iceberg is made for huge analytical data sets. And just to give a briefing, it’s going to help you overcome all the challenges I talked about earlier. So you can achieve data writer isolation. You can do incremental querying. You can build nuanced incremental or transactional pipelines using Iceberg. You can do schema evolution. So if there are, it’s made for changes, evolving data sets, like if there are changes to your schema, if there is a change to your partition structure or maybe you know any change, even hidden partitioning. So let’s say you have a column in your data set. Let’s say you have a date column and you just want to extract minutes from this date column and use that as a partition field. It’s as simple as just using a transform function, and it’ll automatically just take that minute from your data column and use that as a partition field, and you as a user do not even see that happening.

That’s why it’s a hidden partition. And also it is designed for optimizations as well. And scalability helps you achieve a very high scalability both at the data and the metadata level, and also interoperability as well. Like I said, you want to be able to integrate with several engines and tools. And Iceberg integrates with many engines and many tools and it really helps your data analysts. Now it’s supported by a wide variety of AWS services, and that’s what I’m going to talk about next. From the catalog side, Iceberg supports three catalogs on AWS. First is Glue Data Catalog, and if you’re already on AWS and this should be probably your default choice. And if you’re already using AWS analytical services like retro spectrum to query, maybe you’re using Athena, maybe you’re using Glue ETS or ERM.

Why is AWS Glue the First Choice For Data Cataloging?

So Glue Catalog is a great I would say Universal Catalog for all of these services. And that should be your first choice. It is also very highly performant. And also Glue Catalog can also offer great security from a security standpoint. It integrates with lake formation, which you can use to enforce fine grain authorization and whatnot. And next is DynamoDB. DynamoDB I wouldn’t say it’s as common, but you can use it for niche use cases. Like let’s say you want to maybe implement a secondary diocese on top of your meta store. Maybe you want to do GSI and LSI on top of your meta store, then you can use DynamoDB two, to achieve that performance. And lastly, you have support for RDS, a JDBC based catalog as well. So our customers coming from an unpromised background, many of them have their meta stores on high, high meta stores on MySQL or Postgres.

And the customers I talk to are reluctant to change sometimes, especially very large enterprise customers because it takes quite a bit of effort to migrate. So in that case, they can use RDS based MySQL or Postgres catalog as well, same as they would use. And next is coming to the ingestion and the querying layer. There are several services on AWS that integrate with Iceberg for the purpose of ETL. You can usERM, which I’m going to talk about shortly. You can use Athena both for running queries on your Iceberg data lakes. And you can also use Athena for ETL purposes, which some of our customers do. So, there is support for Glue ETL. And also Kinesis is data analytics. Kinesis Data analytics, KDA is built on Flink.

And Flink inherently supports Iceberg, which means you can bring Iceberg dependencies into KDA and you can run them. And from the storage side, Iceberg supports S3 and I’ll talk about it later on. But not only does Iceberg support S3, it also really optimizes the way in which it stores the files in S3. And I’m going to talk about that. And if you’re on AWS, S3 should actually be your default choice for storage. It offers a lot of benefits. And next is the FSx for Lustre. Now, FSx for Lustre is for specific use cases, so I’m going to talk about it next. EMR offers a feature that helps you store your Iceberg metadata on FSx Lustre. And this is specific to EMR alone, and I’m going to talk about it. And if you’re using ERM, you can use HDFS two, but that’s not something that I would recommend.

Using ERM With Spark, Presto, and Hive

So moving on to Iceberg for specific services, let me start with ERM. EMR is our most popular big data framework. And it supports up to 20 different applications like Spark, Presto, Hive, Trino and so on. And also, not only that, it also supports several deployment options. For instance, you can deploy EMR jobs on two instances, and you can run EMR jobs, EMR containers on EKS clusters or Kubernetes. You can also run fully serverless EMR jobs too. So Iceberg integration is supported in all of these flavors. And next is the Jupyter based notebook interface. So you can use EMS studio, which is essentially an ID for your data analyst and data scientist that gives you a fully managed Jupyter based notebook experience. And this you can use to build Iceberg pipelines.

What Else Can You Do With EMR?

And also you can schedule the notebook using the APIs directly. And also EMR supports all the catalogs I talked about earlier, the DynamoDB, Glue, and RDS, and it supports both V1 and V2 tables, version one, version two, Iceberg tables. And like I said, EMR is not just for Spark, right? It comes with many other applications. And Iceberg integration is supported in Flink. Now, the next is the ability to cash your metadata files. And this is what I talked about earlier. And this feature is very specific to FSx Lustre and it’s a pretty interesting feature. And as when you are working with Iceberg tables, it creates not just the data files, but also a lot of metadata. And these metadata files are man, are basically manifest files, manifest lists, and these are leveraged by Iceberg to do things like pruning your data or filtering your data, getting data from specific comments or snapshots or maybe even merging the Delta files.

So for all these functions you would need metadata files. And it’s very important that the metadata file operations are as fast as possible, right? Especially on the reader’s side. And that is the reason why we provide the option to store your metadata files on FSx for Lustre. Now, Iceberg also has a feature called fast scan planning, I believe, which really speeds up the metadata file operation process. But when you are working with an object store like S3, sometimes that alone is not sufficient. Like you would need to make sure that the read, you are achieving the read, right, right throughput that you really want. And that is the reason why we allow you to store your metadata files on FSx for Lustre, which is essentially a fully managed file system that is very, very highly performant as well.

Data Streaming Demo Using FSx Lustre

And I’m going to show you the architecture of that. So if you want to see the demo of this architecture, you can feel free to scan this QR code, or it’ll be shipped to you after the session, I believe, so you can watch that as well. But this is the architecture that I’m talking about, essentially. So this feature is especially useful for streaming use cases, right? Because the streaming use cases are going to run on a continuous basis every micro batching happens every five seconds or 10 seconds, or there is also continuous streaming too. So in that case it ingests a lot of metadata files and you want to really make sure that the metadata operations are super fast. And for that, here you have an EMR cluster that probably reads the realtime events from your Kafka, and it applies certain area transformations, and it’s going to store it on your Iceberg data lake.

Mounting FSx Lustre to EMR Clusters

Now, this Iceberg table data files itself and the metadata will be stored on S3 as usual, but you can also bring your own FSx Lustre file system. Basically you can create an FSx Lustre file system and mount it to your EMR cluster instances. You would do this through a bootstrap action essentially. And what happens under the hood is then you would configure certain properties. If you look at the demo, like I go much more in detail. And what happens is that the Iceberg metadata that is in your S3, as in when you write to your Iceberg tables, will be asynchronously copied over to your FSx Lustre. That is without any use of prompts, it’s going to happen automatically. Like as in when you write your Iceberg table, this will get copied asynchronously. So by doing that, your reads, whenever you are reading from your Iceberg tables, those metadata reads will be served by your FSx Lustre and the actual when you are writing to your Iceberg table, the rights will be served by your S3. So that way you are getting this isolation so that your S3 will be more performant. You are isolating it for the rights and not for the reads. And also, as FSx Lustre is going to be super fast in the sense it’s going to serve your reaps and it’s going to be much faster.

How To Use a File API to Rewrite Data Files

So moving on to Athena. Athena also offers all the asset transactions and everything I talked about. So I’m not going to repeat that, but I want to talk about a specific feature that I would say is really interesting or useful to your data analysts. So if you are working with Iceberg, you know that you will need to run certain maintenance activities, right? For example, expiring snapshots that are not required anymore, or compacting small files. So these are all some of the maintenance tasks that you need to run on a periodic basis or every day or every week to make sure that your data lake is concise, right? So it’s, you’re not like storing unnecessary metadata and data files. So here you can see on the top left we have rewrite data files API. And this API is basically going to help you compact the small files.

And if you’ve used Iceberg, you know that you would want to either use a Java API like you see on the left hand side to compact the small files, or if you’re using Spark or Presto or Trino, you would use the call command that you see on the top, right, right? And you would specify the strategy that you would want to use for compaction, whether it’s Vinpac or sorting. I think Iceberg offers two. But with Athena, basically you would be able to compact your small fast using just a SQL query. So you can see in the bottom of a screenshot, you can see the optimized command, right? It just gives an optimized stable name or the strategy you want to use. And the small files are compacted just with a single SQL. And you can also set the compaction properties as an active command.

Using Athena Vacuum Operations to Run Multiple Jobs

So in the audit table, I’m specifying the maximum file size that I should have after the compaction, which I’m setting to 512 MB, I believe. So this is how you can easily compact your files just with the SQL query. And the next one is vacuum operation. Vacuum operation in Athena means you are doing two things. One, you are deleting the orphan files, and two, you are expiring the snapshots. And what I mean by orphan files is, Spark is resilient by nature, which means that if there is a task failure, it’s going to be retried a few times, like four times by default. And also your job itself may fail for whatever reason. And in those cases, what happens is that the, it creates a lot of files, headless files that does not have any metadata, and that’s what is called as often file essentially.

And you would want to frequently make sure that you’re handling the orphan files by removing them. Of course you don’t want too many files to be there in the data like unnecessarily. And next is the expiry of the snapshot. Let’s say I talked about the time travel use case, right? Let’s say your customers want to query data until like after 2020. Let’s see, 2020, which means you don’t need any snapshots older than 2020. So in that case, you would want to also frequently remove snapshots that are not needed anymore. That way your metadata operations are faster. So in that case, you would like to call the expired snapshot API and provide a timestamp, anything older than this timestamp you would expire. And that also you can achieve just by a vacuum command, as we can see here, you can see and also set the properties using all table commands.

Oh, and one more thing. So the reason why I talked about this feature is because it’s really useful for your data analyst. And the reason is when you are working with data analysts, like they are essentially connecting their Athena using maybe Simba JDBC or ODBC connectors, right? And they’re maybe connecting it to Power BI or maybe QuickSight or whatever let’s say they’re using it for dashboarding or whatever purpose. In that case, they may run into issues where they’re not achieving the speed that they expect. Maybe sometimes the query is running slow or whatever. So in that case, they usually contact the ETL team, right? The upstream team says, Hey, my table is slow for whatever reason. Can you do something and optimize it?

Athena Makes it Easy For Analysts to Run SQL Commands

Which is an overhead process. So that is the reason why Athena makes it easy for them to just run a couple of SQL commands. So they don’t even have to worry about what’s happening upstream, whether it’s the ETL job from Glue or EMR. They don’t need to have Spark knowledge anything. So they’re just using a tool that they’re familiar with and just running a couple of commands. And with that, they get a lot of performance benefits. And this is like, helps them do self-service essentially, right? They can sell services and also optimize on their side.

And last I want to talk about the Iceberg integration with S3. Iceberg interacts with S3 using S3, which is what it uses to read and write to S3. And like I said, it supports all the features of S3, like multi-part upload, service side encryption, you know access controllers, access points for multiple regions. I’m not going to talk about those things, but what I want to talk about is a feature that has been really invaluable for some of our largest enterprise customers who are working with terabytes or even petabytes of data in their data lake. And that would be objects store file layout. And it’s very easy to enable this feature. All you would do is when you create your Iceberg data set or your table, you would just specify the two options you see highlighted in yellow, right? you would set the object storage enabled to true, and you would also provide the data path, which is in S3 location.

How Iceberg Stores Data Files in Hash Prefixes

And what Iceberg would do is it would neatly store your data files, your actual data files inside the hash prefixes, and you can see how it looks like the hash prefixes are basically the hexadecimal strings. I believe Iceberg uses hash functions and creates these strings and your data files, as you can see below the Parquet file is stored neatly under this hash prefix. And why Iceberg does this is, S3 imposes certain limits being to objects that store that it’s right. For instance, you are able to do up to 5,500 get calls and 3,500 put calls per second per prefix. And this imposes certain limits. And also the way S3 scales is highly dependent on the way your prefix is laid out, like we are always used to year, month, and date partitions, which means that your S3 prefix always starts with a specific pattern.

Let’s say 2021 or 2022 or whatnot. This is all right, this is all right for the tables that are relatively smaller in size, let’s say one terabyte or two terabyte. But when you are dealing with and we have customers who are running the queries on top of like petabytes of data or terabytes of data, hundreds of terabytes in that case they would like run into 503 throttling errors, slow down errors, or they may also run into like latency problems. They may like to observe higher latencies. So that is the reason why Iceberg provides this option where it neatly stores the files under hash prefixes and it really helps you mitigate the 503 throttling errors, which you would’ve surely faced if you’re working data assistance. And also it helps you improve the S3 performance. Alright, now moving on to the demo.

Live ETS Data Demo

So there are two demos I have here apart from the one I showed you earlier. So I’m just going to do this one demo. The other one you can scan the QR code as usual if you’re interested to review that. And this one is to change data capture. I already reviewed this architecture earlier where you just take the changes from your source database, apply those changes to your raw input, and then you would use an ETS, which would crawl the raw input, apply some changes, and publish it to your data lake on a continuous basis. So now I’m going to just run a pre-recorded version of this demo. I’ve already recorded it. And if you’re interested to review the solution or any of the points I’m discussing now, you can scan the QR code on the bottom of the screen, which will take you to my GitHub repository.

Creating RDS and MySQL Databases

And there you have all the artifacts that are needed. So I’m going to play and walk and voice over the demo for you. So in this solution, there are about 10 steps. I’ve already completed four steps to make it time efficient, but let me walk you through them one by one. First is you would create an RDS or MySQL database, which I’ve already done here, the Iceberg CDC database that you see. And next you would need to turn on binary logging for this database and that you can do by referring to the article that I’ve linked here. And this is just like creating a parameter group and you would attach it to your RDS instance. So I’ve already done that. Next is you would log into your MySQL instance.

And basically we’re just going to create a database and a table and we are just going to insert some records into it. So here I have created a database called SourceDB and I’ve created a table called Products and I’ve inserted five records into this table. So product ID 100-104. Now I’m going to go to the DMS data migration service. And here I have to do a few things. First, I need to create the source and the target endpoint. So the source endpoint is obviously my MySQL database that’s going to act as my source. And you can see here I have the endpoint here for MySQL. This is my database endpoint. And as the target, I have an S3 location. It would be empty when I first created, but as, and when I write records to my source database, the S3 will be populated too, and it’ll basically create CSV files.

How to Create a Replication Instance in the RDS

So now next you need to create a replication instance. You would just need to create a replication instance in the same VPC as your RDS ideally. And then you would go ahead and create the database migration task. And you would need to provide the source and the destination, which would be your Aurora and S3. And also here I have chosen the full load ongoing replication, which means it’s going to do the full load at first, it’s going to whatever records I inserted, it’s going to put it in the destination. And apart from that, whatever changes I make to this database will also be published on an ongoing basis. So here I’ve inserted the file records first from hundred to hundred and four, and if I go to the migration task and go to the table statistics I’m highlighting the part where it says applied inserts.

And you can see the five inserts that came as a part of the full load and corresponding CSV five will be created in your S3 that contains the five records that I inserted before. So now I’m going to go to the Athena and I’m going to create a table on top of this raw input. This input that we got from the DMS. It’s going to access the input from the data lake perspective. So here I’m creating a csv based table on top of the S3 location that my DMS is using to write the CSV files into. And I’m also going to create an Iceberg table, which is our target data lake. And you can see I’m using the table type as Iceberg and I’m just going to create it as well. Of course, you can also use Glue Crawler if you’ve used it to create this table.

Running Glue ETL on Top of Your Source

You don’t have to use Athena, but I’m using Athena here, just for example. And you can see I’m selecting from the input table, you can see the file records that we got from the DMS that got ingested from our SourceDB. The next step is to run Glue ETL on top of this source. And for that we need a connector, the Iceberg Connector for AWS Glue, and you can get it from the marketplace. I already subscribed to this software, but if this is your first time, it’s going to ask you to accept terms and hit subscribe and you’ll be subscribed. And once that is done, you would go ahead and configure this connector in Glue. And here I’m choosing the Glue 3.0 and also the software version that you want to use, the default to the latest version. And then you would go ahead and launch the software and this will automatically navigate you to the Glue console where you can finish setting up your connector. And that’s what it’s going to do now. So here you create it, and it navigates you to the Glue console here. You just name your connector, I’m just naming my connector and hit Create. So my connector is created. Now you can see the type is Marketplace.

Running Glue Jobs With Glue ETS

The next step is the Glue ETS. With this connector that we just got from the marketplace. So I’m going to go back to the Glue console and go to the Glue jobs. So the first one is the Glue job that I created. And I’ll walk you through the script in just a moment. Before that let’s go into the job details and see what we have there. So I created this Glue job. You just provide a name for the job and I am role in all that. And here I’m using Glue 3.0, which is the connector I got from the marketplace. And if I scroll down to the advanced option, you can see under the connectors I’m using the Iceberg Connector for Glue 3.0, which is what we got from the marketplace. And then you would also provide a job where I’m passing a parameter, you would want to set Iceberg Warehouse, which points to an S3 path and that I’m setting as a job parameter. Of course, you can directly set it in your job if you want. So going back to the ETL script itself, if you worked with Iceberg, you know that you need to set certain Spark properties to work with Iceberg. The most important properties I want to highlight is here we are instructing Spark to use Glue catalog and also use S3, which I talked about during the presentation.

Now scrolling down let’s get into the meat of the solution. Here we have the merge into Query, which you would use for batch updates, batch episodes and how DMS works is whenever you make changes to your database it creates a flag essentially. So that flag tells you whether it’s an insert operation or update or a delete operation, right? IUD. And with that you can use Merge into Query and respectively apply changes to your destination. So if it’s delete, if the flag is delete, then you know that you need to delete. If it’s you, you would update if it’s IU. So that’s what I’m doing here. So whenever I run this e l job, it’s going to automatically take changes from the DMS and it’s going to apply to the destination. Now I’m going to run this ETL job. Not sure if it paused.

Okay. Maybe like somewhere on here. Alright. Alright. The job that I ran succeeded now. So we go back and review the table on Iceberg and you can see the five records that got added from our DMS. Now we are going to get into the CDC patch. We are going to make some changes to our SourceDB and see how it reflects our target. I guess it’s parsing again, it seems like it is parsing again, not sure why.

Live DMS Data Demo Results

Alright. Okay, so I ran like four queries here. I ran two inserts, let me not touch it. I ran two inserts, one update, and one delete. I inserted two records 105 and 106. I updated the quantity for the record 102 and I deleted record 103. So now I’m going to go back to the DMS and it’s going to automatically pull these changes, right? No, it’s what it does, it’s an ongoing replication. So here you can see the applied updates is one. And applied deletes is one. Again, we updated one and we deleted one and also we inserted two records plus the five we inserted initially. So 5+2=7 got past I think, I’m not sure why. I think we are nearly at the end. Alright, so now the corresponding CSV file will also be generated on S3, right? Automatically the DMS is going to do this for you. You don’t have to do anything. So when you reference, you will see that the CSV file was created to reflect the changes that we made in the SourceDB.

So the next thing we are going to do is just run the ETL job again on Glue to process these changes to our final data lake, right? So, which I’ve already done, it takes about a minute, I just pair up the video. Now if we go and query this table data lake in Athena, you will see the changes that have been made. And you can run this query again which I’m going to preview the table now and you can see the two updates, the one update, one delete, and two inserts that are done again.

Alright, so you can see the changes that we have made. We added records 105 and 106 and deleted 103 and updated 102. And you can see all the changes. And of course here I manually ran the Glue ETL but in a typical production scenario, what you would do is you would basically like to use a Glue job trigger to automatically run this job every five minutes or 10 minutes. Or you can use SNS and based on the put object calls in your S3 that comes from DMS, you can schedule the job using Lambda. So there are multiple ways to do that. So that brings us to the end of our demo. This is the other demo, which I’m not going to cover, but you can scan the QR code. It combines both this park streaming real time ETL and GDPR use please. And that’s about it. Done.