May 3, 2024

The Next Big Challenge- Control over the Shared Lakehouse

The lakehouse vision is a highly-distributed, decoupled data store shared by many teams across the business, each operating within their own tools and practices. Yet, we’ve approached lakehouse integration with vertically-integrated solutions that lack interoperability and mutual-consistency.

Most users prefer to use a single tool (e.g. Trino, Snowflake, Dremio etc.) for all their touchpoints with the lakehouse, including creating, updating, optimizing and querying tables. As a result, multiple users sharing a lakehouse often means an amalgamation of different tools. Since cross-tool operation remains a weak link, lakehouses often suffer from functional silos and inconsistent performance. Multiple engines cannot update the same table without significant effort duplication and resource waste, and multi-table transactions are complicated when they are managed by different tools. For instance, when Trino is responsible for updating a table, Snowflake is only given read access to it, and vice versa. Trickier still is data lifecycle management—what happens if one user chooses to expire all table snapshots, while another relies on time-travel for fixing issues?

In this session, we’ll discuss the next big challenge in lakehouse architecture: shared table management and observability. What is the secret to building a catalog-, committer-, and query engine-agnostic, interoperable lakehouse with consistent performance?

Sign up to watch all Subsurface 2024 sessions

Transcript

Note: This transcript was created using speech recognition software. While it has been reviewed by human transcribers, it may contain errors.

Santona Tuli:

Yeah, so my name is Santona Tuli, I’m going to talk about what I think is the next big challenge now that the lake house is maturing, which is optimization and management of a shared lake house across many different tools. So a little bit more about me. I’ve been working with data for, I don’t know, something like 12, 13 years, starting with massive particle collision data sets, doing some NLP as a machine learning engineer, and then just general data science and analytics and product strategy at data tooling companies. That’s my dog Ridge, she’s contemplating the core glue and plasma on my boat. 

Okay, so for today’s talk we’re going to talk about the lake house vision and sort of how we got to the current state of the infrastructure, then we’ll talk about vertically integrated solutions that we’ve been using, and we’ll discuss some of the challenges you face when there are multiple different tools interacting with a lake house, and finally I’ll propose a interoperable lake house design. 

Shared Storage Vision

So the shared storage vision is certainly not new. We’ve been wanting to have a unified storage layer for all of our different teams and applications, BI, ML, integrating some of our data back into the product, making sure our data stores are auditable and all of that, and through our iterations we’ve gone through warehouses, lakes, and now we’re at the lake house. It’s not the only motivation. Shared storage is certainly not the only motivation. We’re also seeking better performance and more cost effectiveness, but it is certainly one of the things that we’ve sort of been pursuing on this journey to the lake house. And the lake house is getting us there, so I’m sure all of you are familiar with the architecture of Iceberg. We’ve got this data layer, data manifests, we’ve got metadata and snapshots, and then the catalog and pointers to the various different tables. And with this tree-based architecture, we are able to query from our different business functions into the same data lake sort of independently, sometimes even querying the same tables when we need to collaborate. And when I say query, I mean interacting with the lake house, not just retrieving data back, but also manipulating and such. 

Our Expectations From the Lakehouse

So this works. It’s good. It’s getting us there. But because of sort of how we got here, there are some expectations that we have from the lake house, justifiably, that we need to sort of reconcile with how we’re doing things today. So in the warehouse, we have been used to reading and writing data from multiple different clients at the same time, and the warehouse sort of manages everything that happens in the background. It’s doing its own thing. We’re not really exposed to it. But certainly, no one bats an eye if I and five other data engineers are querying the same data table or view at the same time. Now the difference is instead of multiple clients, we’re trying to use multiple different tools that are sort of independent of each other. And at the same time, we expect our database-like qualities. As I said, we want to query. But beyond querying, we want to be able to create, drop, alter tables. We want to insert rows, update rows, manipulate rows. We want to do permissions provisioning, granting access, et cetera. And we’re expecting atomicity of an individual transaction in the lake house, including the ability to actually roll back to a previous state. It’s a lot to ask of a data infrastructure. It’s delivering, but it is a lot. 

Vertically Integrated Tools

So we have been building these vertically-integrated tools that our different teams, based on mostly personal preference, are leveraging to interact with the lake house. So let’s say these are four different, sometimes called query engines or processing engines more broadly, that are interacting with my lake house. And each of these query engines is going to have a connector to the lake house and have a model of the source data from the lake house, which it’s going to use to plan and optimize its queries and then coordinate the workers to actually do the work. And this is, again, sort of happening separately. So even when they’re sort of touching the same tables, a lot is being decided and done by the processing engine. So in practice, what that looks like is when I try to execute a statement or many of these different business functions through my processing engine, it’s figuring out the how, and then it’s going and doing the execution, and that execution can touch all the different layers of my lake house. So it’s not just interacting with the catalog and returning something. It actually has the ability to change the metadata and the snapshots and the manifests and data files. 

So the challenge arises when we have this multiple users, therefore multiple engines, interacting with multiple tables in a very collaborative and sort of spread out manner. So for example, when multiple engines are working on the same table, we have concurrent reads and writes, something that we have to deal with. And data consistency across all of these different engines is extremely important, of course. Not negotiable. But it is difficult. And it can be inefficient when a single engine can’t stand out as the leader, as the voice of authority, like the warehouse did in its era. What the warehouse says goes as far as how things are going to be executed. Now you’ve got different processing engines that are sort of — they can each do all the different things that you need to do throughout the data life cycle. But no one’s — there are — you know, they’re competing against each other in some sense. Now we do have to be — again, we do have to be able to support asset transactions through multiple tables when we have such a vision of a shared lake house that’s representing the entirety of my business for all my business functional units. An example of a single transaction that touches multiple different tables would be CDC, where I’m replicating a whole database that has maybe hundreds of tables underneath it, and it’s committing all of that data every so often into my lake house. So it’s certainly not — it’s a very common use case where multiple tables are being updated by the same time, and you have other engines that are trying to access that data or even update that data at once. 

So Iceberg, of course, has a way to deal with it, right? It distributes this sort of responsibility of ownership and maintenance across the different engines as much as possible, and the way it does that is through optimistic concurrency, which is achieved through snapshot isolation, and this is the idea where each writer and reader assumes that it’s working in isolation, and in the reader’s case, that’s just saying the reader will see a consistent snapshot while it’s doing its work, and in the writer’s case, it’s going to take the latest snapshot, it’s going to try to make all the changes, write out a new snapshot, and then try to commit that, and if it finds itself unable to do that commit because something else has come in and changed the snapshot, then it’ll fail. That commit will fail. Then it’ll go and grab the latest snapshot from whatever other engine has made the change and then try the process again, and if that fails, try it again. This can get slow when we have high concurrency of events, which, for example, in a streaming data source where there are frequent updates, due to the retrying of commits, so it’s always a trade-off between performance and how serializable you want your architecture to be. And occasionally, it can lead to deadlocks where maybe two engines are trying to update the same table, and every time one tries, it’s stalled by the other, and so on and so forth. It’s very rare, but that’s the sort of situation you can run into when you don’t have a voice of authority and rather multiple things are happening at once. 

So getting it right is tricky. Getting the right concurrency control usually requires some domain expertise and should be done at the table level, so if I am a — let’s say I’m in the analytics or BI team, I’m going to have some knowledge about which engine should be — or maybe let’s not even think about the engine. It should have some context around what the table properties need to be. What is its retention timeframe? How should concurrency — concurrent rights be handled and stuff? So it really should be at the table level and not the engine level, which we have sort of become accustomed to. I want to mention that catalogs do play a key role in managing snapshot isolation, so this is not an unsolved problem, but there are different catalog options, and not all catalogs implement the same procedures or in the same ways. 

Data Lifecycle Management

So again, Iceberg provides a lot of functionality for optimization and data lifecycle management, so this includes the ability to compact small files within Iceberg, deleting old snapshots, cleaning up garbage, pruning manifest files, and it’s got really good way to sort of use data filters on partitions so that you’re not necessarily scanning all the data, even when — you know ahead of the data file scan which might be relevant and which might not. So all of these things are really fantastic, but being that Iceberg is a table format specification and not as an engine, it is rightfully not authoritative about which procedure you should follow or which function you should use. It’s not enforcing anything for you. It’s sort of telling you these are the protocols that I support, these are the defaults, these are some other options as well, and you can sort of pick and choose. So what happens is these engines go and pick and choose. That’s how we configure the different options. So let’s say, again, my BI team representative goes and configures their query engine, processing engine against Iceberg Lakehouse with certain properties, and then my ML team goes and does the same thing for their processing engine. Again, it’s at the engine level, not the table level. 

So in a shared lake house, how do we deal with this issue of data lifecycle management at the table level? This is how we don’t want to do it. So we don’t want to do it at this sort of team level in an ad hoc manner where I run a query against a table, it’s taking too long, and I’m like, oh, that’s running too slow. Maybe I should go and compact this table now. Or I don’t want to be like, oh, I don’t need these snapshots anymore. I’m good from here forward, so I’ll just delete the old snapshots. You shouldn’t unilaterally make those decisions ad hoc because your decisions impact the other teams that are sharing this infrastructure, this lake house. These sorts of tasks should be scheduled and executed at the optimal cadence for specific tables, for each tables, which are going to be unique. The requirements are going to be unique. For example, if a table is being updated every five hours or something, you don’t want to be compacting it every five minutes. But there are certainly tables that you would want to be compacting small files for every five minutes, if you have a streaming source, for example. Even concurrent write retry interval, how long should you wait if your write has failed because there was a competing one to try it again? These things are going to vary widely by the table. The retention period, how long to keep data in a given table is going to vary based on the kind of data. Even if you can agree on general best practices for your lake house across all of these teams, which is a near impossibility as soon as you go to enterprise grade, and you have lots of teams, actually maintaining and implementing those policies, if you have all these different engines, at scale, gets difficult. 

Interoperable Lakehouse

What’s a solution? Here’s what I think. We need to make the lake house interoperable, more interoperable across different engines. The way to do that is by wrapping up my layers within a maintenance and optimization layer, for lack of a better word. This system or engine, whatever it is, would be in charge of maintaining the data, manifest metadata files, and updating the snapshots — I’ll go into more detail in the next slide on what sort of functions I’m looking for. But this is sort of the idea where you have a maintenance and optimization engine that’s separate from other processing engines that are being used to access the data by different teams. So what do I mean by maintenance and optimization? One of those things is small file compaction. It shouldn’t happen at the individual engine level. I should have a lake house policy per table around how often to compact small files. 

Dynamic clustering or repartitioning of data. The iceberg offers a lot of hidden partitioning options. It makes data access very efficient, if you know how to do it right. You shouldn’t have to do it as a user, as an individual, as a data engineer. It should be a system that is able to intelligently, dynamically repartition your data according to the best representation, based on the table’s usage. Frequently delayed consolidation. So this is an important one for streaming data sources, and I think about streaming data sources a lot, where, when you have frequent updates, what happens is, in a short amount of time, you might get a bunch of updates. And if you wanted to apply one at a time, in a full transaction, that creates a lot of wastage. Like, you create a manifest file for each update, and it’s unnecessarily complex. Instead, what we can do is we can consolidate a bunch of updates in a certain window of time into one update file, and then apply that to the data. Data sorting is a fairly basic one, but also should not happen differently, dictated by different engines, but rather as a set policy around how you want to sort your data for different kinds of data in your lake house. 

Table retention is another one. What is my table retention policy? I should be able to make that decision in a sort of collaborative manner, and then implement that through this maintenance layer. And finally, file and metadata cleanup. Again, great ways to identify which files need to be cleaned up, but the actual execution of that, doing it, should be handled by this sort of maintenance layer. So with that, with this horizontally integrated engine, what we’d have is when I run a statement from any of my engines, it gets passed through that engine, but then the maintenance and optimization layer is what determines the how. So it’s really doing all the scan planning, et cetera, and then passes that back into the processing engines, which then can do their execution through and through, no problems. That’s it. I ran through that a little bit quicker than I intended to, but that just means more time for discussion. That’s the proposal for an interoperable lake house.