Rethinking Ingestion: CI/CD for Data Lakes

Video Session

At first glance, ingesting data into a data lake may seem like a one-step process — you simply add files to an object store. What else is there to do?!

It turns out that there is more you can do, and blindly writing new data introduces a host of potential problems. For example, how do you know the data you write is accurate and conforms to schema? The truth is, once you’ve written it to the lake, in a sense, it’s already too late.

What we propose and will cover in this talk, is a new strategy for data lake ingestion. One where new data can be added in isolation, then tested and validated, before “going live” in a production table. Finally, we’ll show how git-for-data tools like lakeFS and Nessie enable this ingestion paradigm in a seamless way.

Video Transcript

Einat Orr: Hi everyone. I am Einat Orr and I will be talking about rethinking ingestion, CI and CD for data lakes. Just a few words about myself before I jump in, I am currently the CEO of Treeverse, but by profession I’m a data scientists or how it used to be called, an algorithms developer and later on a manager. And [00:00:30] right now I’m very enthusiastic about manage data and how we can do it better.

So since I’m assuming all audience is using a data lake, this very high level architecture probably resonates with you very well. We use an object storage to store our data, either S3 or anything compatible in any of the other cloud providers. You [00:01:00] can see on the left-hand side the technologies that we use in order to ingest data into the lake, either through streaming or by replicating databases. Those of course are examples, it’s not an exhaustive list.

At the bottom we see the Hadoop Ecosystem, including currently mostly Spark and Presto. And of course, using Hive, orchestrating with Airflow, using Hive metastore. And [00:01:30] going to the right-hand side, you can see data exploration and data visualization tools. And at the top we see the analytics engines, either Dremio, that works directly over the object storage or databases, analytics databases that have their deep storage within the object storage.

And then whether how we look at it, our data, our single source of truth at the end of the day is resides within the object storage. And that’s actually very good. It’s an excellent architecture, [00:02:00] right? This is why we all use it. It is extremely scalable and cost effective, the data is accessible and easy to use, the throughput is amazingly high, and there is a very rich ecosystem of applications that allow us to analyze the data while it’s in the object storage. We can use very many data formats, very comfortable, very general, and with high performance.

So what is there to talk about? [00:02:30] There are still some challenges, very many challenges. And some of them are along the manageability of the data, and our ability to actually deliver quality data to our users or to our data consumption teams. The three things that I would like to focus on would be the inability or the difficulty to experiment, to compare, to [00:03:00] reproduce. Most of those actions will require copying. And if you have a complex enough environment, you manage data from many sources, you manage very large amounts of data objects within the object storage, then solving those problems with copying just doesn’t cut it.

It is difficult to enforce best practices. If you’re interested in having data in certain formats and forcing schema [00:03:30] and forcing other metadata capabilities, there is no clear process. And again, if you would like to do that and expose only quality data to your consumers, you would have to be doing coping. So all in all, we work in an environment that is error prone and is hard to ensure high quality of data.

So what would we be looking to do? We would be looking to solve this problem in a way [00:04:00] that is similar to the way that the problem was solved once upon a time. I mean, it is still being solved, it is consistently improving, but the way it was solved with the code. I think the first time I had the experience of, I wish I could manage the data the way I manage my code was a situation where a retention script we ran in an organization I was the CTO of, it ran, it deleted [00:04:30] petabytes of production data. And of course it did it in a way that was not very consistent or centralized in one place. It could’ve been recovered, it was recovered. It took days to recover and it was a very serious production issue. And I remember just saying to myself, “revert, why can’t I just revert this action over the data?”

So when we are saying we want to manage our data the way we manage our code, [00:05:00] what would we be looking for? What intuitively are the things that we would be looking for? As I just gave an example, the first thing we would be looking for is if we have a data quality issue in production, and we would like to revert to the last consistent high quality state of our data lake. We would want to do it in one atomic action that doesn’t take time, just revert. So you can see on the left-hand side [00:05:30] what we would aspire to have, and on the right-hand side, what it would look like if we had a tool that could do that for us. So just run the command revert and get the latest state that was high quality. And of course, consistent within all datasets that we present to our consumers.

The second desire would be working in isolation in order to experiment. The solution would probably be just opening [00:06:00] a branch just like we do with code, right? So you would have your own branch of the data that you can play with in isolation. If you want to reproduce either an issue in production or an experiment you did very long time ago but you want to reproduce the exact same result by running on the exact same data, then you would want to have a commit ID that you can just go back to. We’re talking about commit of data, right? We’re not talking [00:06:30] about the code that produced the data, we’re talking about the data itself.

So imagine you could just commit the data, have a tag there and be able to go back to that commit and see the data as it was at the moment you committed it, and then you can just run the code that was relevant for that data with the right data and you have full reproducibility of your data application.

[00:07:00] If you want to ensure changes that are safe and atomic, then you can merge data into your main branch knowing that at that point all changes that you were interested in were done in one atomic action that you can then return to. And if you want to experiment with a very large and very dangerous changes, you can just do that by opening a branch and running [00:07:30] stuff on the side. We can see here, the example is an experiment with Spark. So we get the idea of what our aspiration is. We want to be able to manage ourselves in a way that allows the logic of CI/CD for code running on our data.

Okay, let’s go a little bit more into the details and give a few more examples of what this idea would look like and then we can talk about how it could be implemented. [00:08:00] If we want to do experimentation it means we need isolation. We want to run our experiment on an isolated version of the data, so we will be opening a branch as we offered. Once you have that branch you can run your experiments on the branch, you can then commit results of experiments for reproducibility.

And if you would like to compare either your new data pipeline to your production, [00:08:30] what is referred to in many organizations as running parallel pipelines, or if you want to just compare two experiments in order to choose what works better, you can do that by [inaudible 00:08:43] schema. And what do I mean by that? For example, here we have the main branch and that main branch is then used to open two different branches, [00:09:00] an experimentation branch number one, experimentation branch number two. And we can run, for example, a code in Spark and an SQL in Presto, do the comparisons that we want to understand which works better for us and then decide what we’re going to do, and just discard those branches. So we’re not getting a swamp, we didn’t copy data and forget to deleted [00:09:30] it. We simply did a very, very clean metadata action.

And another option would be that we would want to actually incorporate one of those results back into main by merging it in. So we have the freedom to experiment on data that is similar to production data in a very easy and intuitive way. Oh, sorry. [00:10:00] Another thing we would be interested in doing is continuously ingest data or integrate data into the lake in a way that ensures quality.

In order to ensure quality we would want to enforce best practices such as metadata validation or any other validation that we would like to have before we actually present the data for analysis or for [00:10:30] anyone who uses it, whether it’s ML teams or analysts, just data engineers running ETLs. How would we do that? So again, the intuition would be just the way we do it with code, to open a branch that would be the ingestion branch. Here it’s called new data one.

So that branch gets batches of new data or micro batches if you [00:11:00] are working with streaming, and we can now run a pre-merge hook that actually tests the data on the branch. If the data had passed the tests, and again, the tests could be defined on the testing platforms such as Monte Carlo or Great Expectations or anything, Homegrown. And once the test had passed, then the merge actually happens and the data is merged to main and then it is exposed [00:11:30] to the users. Or when organizations run their tests today, they usually do it on play… in place, sorry. It means that first we create the data, it is created in main and then we test it. And the risk here that usually at the worst moment suddenly happens is that someone accesses the data because it is already main before the test actually finished. And although we could protect ourselves in all kinds of ways, none of them is very smooth.

[00:12:00] So what we can do here is actually run the test, expose the data to the users only if the test passed. What happens if the test fails? If the test fails we have a snapshot of the data at the time of the failure in order to debug the problem. And we all know that if we have complex data pipelines, debugging becomes a very heavy [00:12:30] problem. By the time we try to debug, we no longer have the snapshot of the data because the data continues to accumulate. If we work with this best practice we will be able to easily identify the problem.

And what about continuous data deployments? Here we’re talking about production data, either produced by our ETLs or by our ML engines or by other sources that are exposing data [00:13:00] directly to our users. We want to prevent data quality issues. And then the method would be very similar to what I’ve just presented. We want to be able to actually monitor the data constantly because our code doesn’t change when we are running production jobs, the data that comes in is new and it constantly changes. Data is stateful, it’s there, it is changing. We need to constantly make [00:13:30] sure that the assumptions we took on the data are holding, and we can do that and call it continuous deployment of data. And of course, as we mentioned, we want to be able to revert. If we do identify a problem, hopefully we identify it before our customers, but once it is identified, we want to be able to just revert back to a consistent state of our data lake.

So [00:14:00] how would we be doing that? When we’re running production ETLs and [inaudible 00:14:06] mentioned earlier, I think you would probably know the problems I’m talking about if you are running a rather complex data lake [inaudible 00:14:15]. You have jobs running, for example, in the Airflow, you have large amounts of data or very many objects, or you are running over very many data sources, this Airflow DAG is not a stranger to you.

[00:14:30] Now, we want to make sure that we actually test the results of each one of those small jobs that we run through Airflow, right? That would be ideal. We would run a test after each one of the jobs. But we can do that just by running Airflow, right? And then calling the test after we call the job. The problem is we’re not doing it in isolation. First, we run the data… sorry, we write the data into production then we run the test over the data. So clearly the jobs [00:15:00] that the Airflow DAG that is running will not be calling the intermediate results before the test, but someone else might. And I’m not sure if you’re familiar with that, but it has happened to me very many times when working with large amounts of data and with very many DAGs that are calling one each others’ intermediate results.

So again, complexity makes the idea of not working in isolation very problematic. [00:15:30] What we are missing here is being able to run those tests over the production DAGs in isolation. And what do we mean by isolation? We mean on a dedicated branch. If those jobs run over a branch they are not exposed to the main users and we get the isolation that we need. We can then utilize Airflow to run the jobs [00:16:00] for us one after the other, according to the DAG, and we can rely on pre-merge and pre-commit hooks to ensure that the test that we run monitor each and every one of the intermediate results and also the full results of the DAG. Note, of course, that if one of those tests fail, again, since we’re working on a branch we have the exact snapshot of the data at the time [00:16:30] of the failure to debug. So kind of the same idea as ingestion, but implied… implemented, sorry, in production and reverting.

The example here looks at micro batches that come from Kafka, for example. So if we do ingest data from streaming, we can see that we have an ingestion branch here called [00:17:00] stream one, and we run those micro batches that come in one after the other. Each one of them is actually being tested, we can have as metadata that we want to save. I’m sure we would want to do that because Kafka itself has the ability to revert, but we want to be able to revert with it also, the data lake. So we would need the [00:17:30] Kafka offsets. We can save them as metadata together with those small data commits, that would help us to manage ourselves end to end in very high quality.

Okay. Another important thing we would want from revert is the ability to make sure that when we revert we revert to a consistent state of our data lake. So if we only revert [00:18:00] one dataset and the others are not reverted we can find ourselves in a situation where the different datasets are not consistent with one another. In many cases, when we do need to revert in production and we do it manually it either takes us time. And since the action is not atomic it might expose our users to inconsistencies.

Another option would be that we would simply have downtime. In order to [00:18:30] make this action atomic we would want to be able to actually ingest data from several ingestion branches, as you can see here in this illustration. We have three different ingestion branches that are first actually committed one into the other resulting in one commit to main of a consistent state of all those data sources. And then if all our commits or all [00:19:00] our mergers to main are of this form, when we revert main we always revert all datasets to a consistent state between them.

Okay, I think we got the idea of why managing data the way we manage code is a good idea, and why we need the ability to use Git-like operations to get isolation and the capabilities [00:19:30] of reverting, reproducing and so on. Now the question is how do we do that? What we’ve been working on in Treeverse is creating an open source project called lakeFS that actually allows you to get an atomic inversion data lake on top of your object storage and allows all those actions we have presented so far. What [00:20:00] lakeFS does is that it wraps your object storage and provides an interface that includes on the one hand, as you can see on the left-hand side, the API of the object storage itself. So you can do all data manipulations you’re interested in.

And on the right-hand side, as you can see, any of the Git-like operations, branching, committing and merging, all those actions are metadata [00:20:30] actions. If there is an actual change to an object we use copy-on-write. And in any other case, of course, it’s only a metadata operation and the copy-on-write would be the method by which we create versioning. In the middle runs Versioning Engine. That is, of course, proprietary to lakeFS, but open source, so you can just have a look, read the documentation and dig in. And it is something between [00:21:00] MVCC and managing very shallow Merkle trees, but it allows us to actually do those operations of merging, committing, and branching in very high performance on a very large data lakes.

The way it would look like when you use it is actually very simple. It would just change the path by which you access the data. Instead of just calling S3 with the [00:21:30] bucket, and the collection, and foo you would be also mentioning what branch you’re accessing or the commit ID that you want to access and so on. Very much like what we are used to from Git. You can also see an example here on how it works with Hive metastore. And it does work with higher metastore, assuming you access the data through Hive.

So it integrates with your existing tools and [00:22:00] it does that either by using a client or in some end cases by calling a gateway. In the case where you call a gateway, we are actually the interface between you and the object storage. And if you’re running a client, then you would be actually calling the object store directly for the data, and lakeFS for the metadata. Which brings us to [00:22:30] the architecture of lakeFS, that explains exactly that.

As you can see on the left-hand side, you have a client, [inaudible 00:22:40] client, and also of course, clients if you run proprietary code in any reasonable language that data engineers use. Or on the bottom you can actually call our gateway. If you are using the gateway, then you would be actually accessing [00:23:00] the data through lakeFS, which is less desired, and hence it is being used only in cases where there is no other solution. Although some organizations actually like it, it makes it a very simple solution because it is one configuration of pointing the DNS to us. And the other option would be, of course, using a client. In that case you would be calling the data directly through the object storage. But [00:23:30] in order to resolve where the data is you would be calling the lakeFS metadata manager that actually resolves the path that includes the branching and provides back the actual path, sorry, within the object storage.

So yeah, this is it in a nutshell real quick, why you should do CI and CD for data, why we should be managing the data the way we [00:24:00] manage our code, and then how you can do it with lakeFS. I’ll be happy to take questions if you have any.

Speaker 2: Thank you, Einat. I think there are plenty of questions on the chat, we have one of the finest to tackle it. The first question is what is the difference between lakeFS and project Nessie?

Einat Orr: Excellent question. I was hoping for that question. Project Nessie indeed tries to solve the same problem. [00:24:30] It focuses on analytics and on more than that, on actually open table formats. Nessie now supports Iceberg. And I know from looking at the Delta Lake group that they’re working hard on actually supporting Delta Lake as well. But the way it is technically working is relying on the log files of those [00:25:00] open table formats.

lakeFS takes a different view of things, we are format agnostic completely. So we can work with Iceberg and Delta Lake, but we could also work with any other formative data that you have just because we took a different technological view of the problem. I think that is the main difference, but the need and the idea that both projects are trying to answer is [00:25:30] similar.

Speaker 2: Thank you. We have more questions. Let’s see. Let me start. Isn’t it Databricks’ Delta table solving the schema enforcement reliability and data quality?

Einat Orr: Definitely, Delta Lake has the [inaudible 00:25:51] that you could use, and then it would enforce your schema if you require it to do so. It works per [00:26:00] table. All aspects that we have discussed that relate to cross-collection or being able to actually move in time in a consistent way with all your datasets is something the Delta Lake doesn’t do. If you are only using Delta Lake, yes, it would solve the validation [inaudible 00:26:29] the schema. But it [00:26:30] wouldn’t solve a lot of other things that also via lakeFS. And of course, if you are using other formats then you would need to validate the formats.

Speaker 2: Got it. Thank you. Are you pollutioning data and code? Bad data usually comes from bad ETL code.

Einat Orr: Well, not necessarily. Of course this is one of the options, but in some cases there’s just a bug in the collection on the code [00:27:00] and depending on the source of the data. And if something changes in the world, for example, if you were collecting data from your application on the firms and there was an upgrade in one of the operating systems and suddenly some of your reverts don’t work. Just one example of a problem that could happen. So not all data issues are result as a band code, but yeah, of course we would also like to address the problems of that the [00:27:30] code. But when we have process of, or a life cycle management on how we test this code, we should be able to test the code over production data, maybe one new ETL in parallel to production, and do it very easily. So we want to have a sandbox that is really identical to production and we want to do it in a quick atomic way because that would allow us to better test the code and increase the quality of our results.

Speaker 2: [00:28:00] Okay, great. The next question is don’t you accumulate vast amounts of metadata if you keep data lake history, say for six months?

Einat Orr: So now we are managing our metadata very efficiently. Unfortunately, I didn’t have time, but if you go into our recommendation there is an excellent explanation on why we are very efficient in collecting the metadata. [00:28:30] So now the amount of metadata that we create are not huge, they really depend on the needs of the organization. If you do need actually to have access to all your history for six months, you would be accumulating metadata for six months, and again, that would not be a very heavy amount of metadata. And the metadata with lakeFS, you can see in the slide of the architecture, which is actually saved back [00:29:00] to the object storage. So even if there are large amounts of metadata in very, very large data lakes it is saved back into the object storage because it doesn’t infer too much cost or any other problem.

Speaker 2: Right. Actually we may not have time. We are almost towards the end of the session, but I’ll take one more question, which is related. [00:29:30] Will lakeFS support metadata unstructured data? Example, PDF.

Einat Orr: Yes. As I said, it’s data agnostic. It’s format agnostic, I’m sorry. But if you’re talking about the content itself of the PDF, no, because we are looking at it as an object within the object storage. If you would want to do something that is related to the content of the files themselves [00:30:00] you can always add that capability by running over our diff of the files themselves, a logic that is a diff of the content of the file. But if it is only accessing metadata it could be done through lakeFS. There is a question here, if it supports Azure? Yes, we support [inaudible 00:30:22]. [crosstalk 00:30:26]

Speaker 2: Awesome. I think we addressed most of it except one [00:30:30] where maybe we’ll take it quickly. How does a date lakeFS difference between branches look like? How does a lakeFS diff between branches look like? Will its how difference within the files or just which files change on which branch?

Einat Orr: Which files changed? It is actually built now in a very structured hierarchy, assuming you have one within your data lake, but it shows you which objects [00:31:00] or which fives changed. If you would want to go in and understand the difference between the files themselves, again, as I said, it is very easy to just run compute over that and pull the diff and see what’s the diff within the files. It’s not something that we have currently implemented because it’s not very clear how the users you want to do it. It’s kind of a business decision, so we left it for our users to implement.

Speaker 2: [00:31:30] Got it. Thank you. I think we are at the end of the session. If you have more questions, the speakers have their own Slack channels, feel free to send them their questions. Thank you for attending. And thanks, Einat. Thanks for the great presentation.

Einat Orr: Thank you.

Speaker 2: Thank you. Bye.

Einat Orr: Bye.