Demo-ing a 3-step OLTP database replication to data lake tables

Session Abstract

OLTP databases are a key source of data for analytics. Many CDC tools land database changes in a cloud object storage but can’t automatically provide and maintain query-able tables.

Upsolver’s new CDC capability comes with unique features such as:

  • A simple 3 step flow to build the replication pipeline
  • UPSERT support for replica consistency
  • Stateful transformations
  • Automatic table optimization
  • Dynamic table split based on key field
  • Schema evolution handling
  • Rewind and replay data

This talk will cover

  • Popular methods for database replication
  • The challenges with replicating a database into data lake tables
  • What Upsolver CDC offers that’s new and exciting
  • A live demo of Upsolver CDC in action
Video Transcript

Ori:    Okay. So my name is Ori, the co-founder of Upsolver. Today, we are going to talk about the process of replicating databases via CDC into tables that are in your data lake.

Before Upsolver, I spent almost 16 years working in databases, data integration, led the data integration domain in my unit in the army, Israeli army.

[00:00:30] And I think that time and my experience and then AI startups were the inspiration for Upsolver. Basically we were trying to iterate as much as fast as we could, and being an AI startup required that we move really fast with our analytics. And I was a database person that could do a lot of that work myself. And I really remember the feeling of looking at a task that I would do maybe in three hours [00:01:00] in SQL and then needed to wait for 30 days for that to be completed by data engineering that was really impossible for me to hire. And that pain caused us to just start building internal tools. So we will be able to iterate much faster and top of object storage, S3 at the time. And eventually those internal tools turned into a cloud product, into what Upsolver is today. And Upsolver is the visual product that allows you [00:01:30] to do the analytics engineering cycles, basically from all data to tables that you can query on your data lake. But the visual part and the ability to also use SQL are the main thing about the platform, the ease of use.

Upsolver is deployed for thousands of users, we have thousands of production workloads, starting from terabytes and ending in petabytes per month scale from a single customer, working with enterprise customers, [00:02:00] working with startups that are digital native, and we are partnered with AWS. We are actually the only official partner for Amazon Athena, which testifies to our focus for creating tables and querying them on top of the data of [inaudible 00:02:17] data lake.

So the topic here today is replicating databases into your data lake. So the first question is, why do I want to do that? I already have a database. And I think [00:02:30] the two most common questions, I took the most common answers I’m getting, is that I want to combine my data in an OLTP database with other data sets. And all my data sits in the lake, so I need to bring the OLTP data into the lake. And the second answer is that, I can’t really run analytics on my OLTP database because it’s a production database, it’s used for my applications, and I need to give the analytics people a different environment to work on, and that’s why I want to [00:03:00] replicate the database basically to derisk.

So the architecture to do CDC to a data lake includes several steps. On the left, we have the OLTP database. Then the first step would be an ingestion system that basically takes the database, generates a CDC stream and puts it either on object storage or in Kafka. Upsolver is a solution that can do that, but also Debezium is the open source version, [00:03:30] companies like Attunity. And once that stream is generated, the second phase, and the very challenging phase is to actually turn it into a table in your data lake. So that table needs to accept updates and deleting, basically look exactly the same as it was in the OLTP database.

Let’s discuss the alternative for a second, and why we are talking about CDC as the option for replication. And [00:04:00] there are three ways to really replicate data out of an OLTP database. One is to do a traditional export import. So this is very reliable, it’s not creating a lot of stress on the database, but it just doesn’t really work in real time. So once you want to start getting those updates in a continuous way, the next option would be to query the database. It’s a database it’s meant to be queried, but eventually queries are not as reliable as just [00:04:30] replicating data that’s written to files beforehand. So if I’m doing queries, basically those queries might fail at the moment because there is a stress on the database. Those queries can cause the stress on the database talking about the risk we mentioned before.

And also if you do a query based application, usually deletes are not something that’s well supported, usually you would need to implement those deletes on [inaudible 00:04:58] side, so what’s called a logical [inaudible 00:05:00]. [00:05:00] And that’s why I think CDCs became the standard for replicating data from databases. It works in real time. It’s basically a stream. It’s very reliable because you’re just copying bin logs, they’re copying database log files. It supports everything the database does, so you can apply certain updates and believe you’re not going to miss anything, because you didn’t query the right timeframe of data. And it works in real time.

[00:05:30] Now the second alternatives we need to review, the first were, how do we extract the data from the database? The second part is, how I’m going to really use a table in my data lake, what that table is going to look like. One option is to use Parquet, using the very common Glue catalog or Hive Metastore. And I think the main challenge companies see with that, is that it doesn’t support updates and deletes natively. It’s hard to implement updates [00:06:00] and deletes on top of [inaudible 00:06:01] only Parquet files. Upsolver is addressing that challenge, and I’m going to show that during the demo. There are other technologies, I’m sure many of you know of Apache Iceberg, or Delta Lake, and those do accept update and delete, it’s basically a table format. But I think that we are still seeing a compatibility problem, not all query engines support that new table format. And also [00:06:30] usually those table needs to be updated in the streaming way, but also need to go to operation like compaction, usually the use of one committal service limits the ability to do CDC well. And that doesn’t really exist when you’re using Parquet, because you’re [inaudible 00:06:48] Parquet, you’re updating your catalog. There isn’t a single committal service that throttles you.

So I want to move to the demo [00:07:00] thing, it’s probably the best way. I want to explain the architecture of what we are going to see first. So on the left, we have a MySQL database. And that database first needs to, we first need to write an initial sync to get all the core and state of the tables into the lake. And then we also need to get the incremental CDC log. Upsolver is using the Debezium open source, so we embedded the Debezium open source in [00:07:30] order to get the CDC log. And we are saving a copy of those logs on S3. That’s step number one. Step number two is to take those logs we saved on S3 and actually create the tables in the lake, connect them to the catalogs, and put the Parquet files on S3.

I’m going to dive a little deeper into what I mean by one master table and one view per table a little later [00:08:00] in the demo. On the other side, once those tables are created, you can use any of the query engine in the market to query the data. So if I’m going to jump to the end of the demo, I can query the data from Dremio and here is my replicated tables. I can see that those tables are exist in Glue, and this is our integrated with Dremio. So I’m going back to the beginning of that architectural [00:08:30] diagram, and you can see this is my MySQL database. It has several databases within it, and I’m specifically planning to replicate the Northwind database, which is a sample or a sandbox database used with the database like SQLServer from as far as I can remember.

And I also wanted to show how the incremented CDC log works, so in addition to taking that database and [00:09:00] replicate it, I also did a few operation update and delete operation. And I want to show you exactly how that looks in Upsolver. So my next step, I’m going to open Upsolver. So Upsolver is a service, so you sign up to it with a user and password, and you’re basically giving Upsolver permission to run a data [inaudible 00:09:22] in your account. So Upsolver runs on EC2 spot instances, also runs on Azure, and you don’t [00:09:30] need to actually install it. You just give it permission using a CloudFormation script.

What you see here is the first page you’re really going to see when you go into Upsolver, and that’s the data sources page, where can I ingest data from? So I can bring data from object storage, I can bring it from streams, and I can bring data using CDC. There is also the JDBC method of getting data, but there’s also the CDC. I’m going to choose the MySQL [00:10:00] option. And once I fill this form, I’m basically going to complete everything I need in order to do the replication. So I already set up the connection to the MySQL database, so I’m going to choose that, I’m going to test the connection, see that it works. Now I want to choose which tables I actually want to replicate. If I’m not going to choose anything, it’s just going to replicate all the databases and all the tables that are on this MySQL instance. In [00:10:30] this case, we said that we only want to get the Northwind tables, so I’m marking those, and I can see the checkbox. The next thing I want to do is to choose where do I want to store the data at? So I’m going to choose the Glue catalog as the place I want to… Where is it?

Okay, I’m going to do it using Athena. I chose the Glue catalog basically is my [00:11:00] destination. It is my Glue catalog… Sorry, I actually wanted to do this and this. And now I’m going to say, I want to create a database in my lake called Northwind, and there is something here called the table prefix, which we are going to touch in the end. This is basically going to be one underlining table to keep all the data. That [inaudible 00:11:24] will be more clear in the end. Now that I finished step number two, so first with the connection, and the table [00:11:30] is now with the destination, the last thing I need to give this job resources. So Upsolver runs on clusters, in this case I already have a cluster defined called default, I could change it from here. So I defined the cluster and the target storage with these names. So this will be MySQL to data lake at sub-surface. Once they click on continue, the process would… [00:12:00] Oh sorry, I already have this table created.

Okay, so now I’m going to start and you can see that I have a process here that describes how are we going to bring each one of these tables? So right now everything is pending. What we are going to do now is actually do the initial sync, and then the CDC log is going to start [00:12:30] being implemented. I’m going to click on data sources. And before this session, I already created such a replication job, it’s called MySQL [inaudible 00:12:41] application. And what you’re seeing here is basically a representation of the CDC log. So right now, what you see here on the left is that I’m replicating the following tables. And I’m going to see the operations that [00:13:00] were used to get the data. So for example, there were 60% of operations were snapshot, 40% were an update, and a very marginal part were deletes.

If you look at the graph over here, it actually makes a lot of sense. So I can just zoom in to this part, and I can see that the only operation in this initial peak was the snapshot. That’s the first thing I’ve done. And now I’m going back. And if I’m going to zoom in into this part, I can see that some updates and deletes were [00:13:30] done later, and these are that part of the stream.

Next let’s look at the log a little deeper. You can see a report called [inaudible 00:13:41] and a whole bunch of names. Those names represent the columns that we have replicated. I can actually go and filter that. And if I do that, and I’m going to use the order details table, if you remember, that’s the table I ran the updates and deletes [00:14:00] on. So if I’m going to just filter by that table, I can see that I only have five columns that are being replicated, the discount until unit price. And I also get some statistics into those columns. So I can see that product id has 77 distinct values, and I can see the exact distribution. This is all information, it’s nothing that needs to be edited, it’s all auto-generated after we defined the [00:14:30] replication job.

And the next step. Right now, we are here. We have gathered the CDC log and we have stored it on S3, now we actually want to start creating the table, create the mirror image of MySQL in the lake. So I’m going back to the data source, and I have a tab called lineage. In lineage, I see I already have an output [inaudible 00:14:57]. Output is basically a data pipeline. [00:15:00] It’s a job copying data from point A to point B. This output was automatically generated in the beginning.

I’m going to dive into that output, I actually have it open over here. And this is the definition of the transformation we are doing. On the left we have that CDC log we extracted using Debezium, and basically now we want to create a table for each one of the source tables. So what I’m doing here, I’m defining that summation [00:15:30] with SQL. Each table that is going to the target has the primary key. And we are getting all the columns that are under the row record here on the left. The use of star, basically select star is important, because if you’re going to change your MySQL database, and you’re going to add a column, that column is automatically going to be replicated to the lake. So the entire process of schema evolution [00:16:00] is completely automated. It’s not something that you need to think about. You can shut it off, but it works with schema evolution out of the box.

Last thing on this transformation job, I want to talk about this special syntax. So what we’re doing here with replace on duplicate, we are basically saying, every time that we’re seeing an update or delete, we want to update or delete my primary key based on my primary key, instead of [00:16:30] doing offend only. That’s a capability that Upsolver developed on top of vanilla Parquet files. And I’m going to explain how the under the hood part works a little in the end of the demo. The second row is delete where. So in this schema, if I go under metadata, there is a field called is delete. So if there was a delete in the source MySQL database, I’m going to mark that as true. And then I’m going to delete based [00:17:00] on my primary key. So that’s the second row here. And the last row is the split table. Since we have multiple different tables, then we need to split the CDC log into multiple output tables. And this why you have this syntax.

Last reminder on this output, this is a running entity. So if I’m going to click on monitoring and I’m going to click on progress, I can see that I’ve finished ingesting my data source, I have no [00:17:30] delay. And my output is almost up to date two minutes delay, it’s probably going to catch up very, very soon. So this is running all the time.

Now let’s understand what exactly happened here. We went to the first page to the data sources page. We defined a connection, the tables that we want to bring, and the targets that we want to send the data to. We also defined compute [00:18:00] resources to run CDC log. Defining the form took 30 seconds to do. What was created behind the scenes in Upsolver is a data source and a data output. The data source extracts the CDC log, the data output creates the tables in Glue catalog, and allows engines like Dremio to query the data eventually. If I’m going back to the presentation, I [00:18:30] think we will finish with the end to end. We went all the way from the left for a MySQL database to its exact replica in the lake, and we saw it in the Glue catalog, and we saw it in Dremio so I can query the data using SQL.

Now I think that architecture under the hood is interesting. What exactly is the file system being created on S3? And this is how Upsolver decided to solve the problem. Once we extracted [00:19:00] the CDC log and we are going to write the tables, each replication job, and we defined one replication job today, is going to create a folder on S3. That folder is going to hold my master table. Master table is going to contain the data from all my tables combined, and that’s queryable with the Glue catalog. But what we really want is multiple tables. So under that folder, there is one folder for each one of my replicated [00:19:30] tables. That folder is divided into two types of sub-folders. One is the updates. So every time there is a delete or an update, we are immediately writing a small Parquet file. That’s partition type number one.

Other than that, there is all the rest of the data that we have. And what Upsolver is doing is basically compacting the data all the time. And if we wouldn’t compact the data all the time, the performance [00:20:00] on top of my SQL performance for those [inaudible 00:20:03] would be really bad. So the task that we still have at hand is taking that list of updates, taking all the data that we already have, and merge them into a consistent view. Eventually, when I do my SQL, I want to get the exact same results as I got to MySQL. So Upsolver is creating a Hive view, a Glue catalog view, and that view joins the updates [00:20:30] and joins all the other sub-partitions together into the result that represents the data in MySQL. So the combination of one partition is the right ahead log for all my changes and constant, or I would say non-changing partitions or sub-partitions. Basically those together can give me a consistent result. The view gets me that consistent result, but the performance of that view would be horrible [00:21:00] without compaction. I think the key here is to keep the list of small Parquet files as small as possible. As fast as possible, we want to take those small Parquet files and basically embed them in the compacted Parquet files part.

Last slide. And then we can go into Q&A. I wanted to run to some of the bigger challenges that we see with CDC very [00:21:30] often. I think that schema evolution is an area that always comes up. If you’re thinking about CDC, you’re probably going to have a lot of tables and column and additions of columns, and the customers are asking us for that to be automatic. That’s why we added the select star syntax, because no one wants to handle the manual additions of columns, and sometimes not even the manual addition of tables. The second thing is that compaction is really not optional. [00:22:00] CDC is a stream. A stream creates small files, small files break queries on top of S3. So compaction is something that you have to do in order to query the data at the targets. And the next thing, number three is guaranteeing exactly once and good data freshness. This is a two-phase process. There is extracting the CDC, and there is implementing the CDC into my data lake, making sure there are no duplication, no losses, [00:22:30] and you’re giving the exact same data is something that a data engineer that does that by itself needs to think about.

Last part, and that’s here, we see both types of examples, but often the data in a MySQL database is an application database. It’s very sensitive, and organizations want to host the solution that does that application. They don’t want to take the CDC, drive it to some other cloud, and then get that data [00:23:00] back into their own a VPC. That’s a security risk and definitely a long process to run to. And so that’s why we run Upsolver within the customer’s own VPC, in order to avoid that potential leak of data.

That’s it for me, this is my last slide. I’m happy to take any questions and connect with you after the session either via email or Slack, or whatever you [00:23:30] want to do. You can try Upsolver yourself, there is a community edition. We run monthly workshops that allow you to actually try the product on your own and do the end to end. And of course, happy to get résumés, Upsolver is expanding and happy to hear from people from this great community.

Speaker 2:    Awesome. Well, thank you so much for that great session Ori, it was really informative. Really cool, love always seeing demos, so thank you for that. So let’s go ahead and open it up for questions. [00:24:00] We do have one question in the chat, which I will get to in a second. But again, if you want to ask a question, you can either put it in the chat or you can share your audio and your video, and you’ll be automatically put in a queue. And then I will be able to load you in there to ask the question. So with that being said, the question in the chat is, can you install Upsolver on premises?

Ori:    So not at the moment, Upsolver is deployed, is available on Azure and on AWS, the play here is [00:24:30] definitely a cloud play, and no on-premise plans in the near future.

Speaker 2:    Okay. All right. Thank you for that. All right. Are there any other questions that are out there? All right. Well Ori, thank you so much for your time today and for the session, that was great. Again, just a reminder to everybody that you can see in the chat, we do have a Slack community, and you can see the workspace there. So if you do have some questions, you can absolutely [00:25:00] sign up for that workspace. And then you can talk to Ori, and his name is going to be the channel, if you want to follow up with an additional question later on. And then also just please make sure that you fill out the session survey, wow that’s a mouthful, in the Slido tab at the top right of your screen. We do have one other question just came in from Gary. How would you reload a target?

Ori:    I’m reading, I’m not sure that I completely understand. So I try to answer it in two ways. One option [00:25:30] is just to create another replication job and reload the targets, you can delete the old target and recreate your replication job. And since the process starts with an initial sync, it wouldn’t take long to recreate your targets. Your other option was to go to the outputs that I showed, edit it, and then you can apply changes to your CDC. And basically your CDC flow basically change the table, and you can do it with replay. So [00:26:00] I can choose a point in time, and from that point in time, I can apply all the changes with a different transformation logic than the default one I showed that during the demo.

Speaker 2:    Great. Gary, hopefully that answers your question. If you would like to dive into that a little bit deeper, then you can absolutely go on the Slack channel. So we do have many more sessions today and tomorrow, but in the meantime, I invite you all to check out the expo hall, check out some of the booths from our great sponsors, check out some more demos, and [00:26:30] there are some tech giveaways as well. So please take some time to go through the expo hall as well. Thank you everyone for joining. Ori, thank you so much for the great session today, and everyone enjoy the rest of Subsurface.

Ori:    Thank you. Bye-bye.