Subsurface LIVE Summer 2020
Data Lineage and Observability with Marquez
Data is increasingly becoming core to many products. Whether to improve recommendations for users, get insights on how they use the product, or use machine learning to improve the experience. This creates a critical need for understanding how data flows through systems. Data pipelines must be auditable, reliable and run on time.
Tracking lineage and metadata is the underlying foundation that enables many use cases related to data. It provides understanding of dependencies between many teams consuming and producing data and how constant changes impact them. It enables governance and compliance and generally helps you keep your data running.
Marquez is an open source project, and part of the LF AI, which instruments data pipelines to collect lineage and metadata and enable those use cases. It provides context by making visible dependencies across organisations and technologies and enables lineage governance and discovery.
Julien Le Dem, Co-founder and CTO, Datakin
Julien Le Dem is the CTO and co-founder of Datakin. He co-created Apache Parquet and is involved in several open source projects including Marquez (LF AI), Apache Pig, Apache Arrow, Apache Iceberg and a few others. Previously, he was a senior principal at WeWork; a principal architect at Dremio; a tech lead for Twitter’s data processing tools, where he also obtained a two-character Twitter handle (@J_); and a principal engineer and tech lead working on content platforms at Yahoo, where he received his Hadoop initiation. His French accent makes his talks particularly attractive.
Julien Le Dem:
Okay. It is 1:05 PM and it's time to get started. So we've got another fantastic technical session here for you all today. Thanks for joining us. We've got Julien Le Dem from Datakin. He's going to give us a real interesting talk on data lineage. So without further ado over to you Julien.
Julien Le Dem:
Hi, everyone. So this is Julien, I'm the CTO and Co-founder of Datakin. You may have heard me speak about Parquet or Arrow in the past and today I'm going to talk about Marquez, which is a project now part of the LF AI Foundation to collect lineage and metadata and understand what is going on with your data pipelines. So, today in the talk, first I'll talk about the problem we're solving. Why do you need metadata and what's metadata? And then I'll go over what Marquez is. Why we built it and then the details on how it works in particular taking the example of airflow. And I'll chat a little bit about the Marquez community.
So first, why Metadata? So the main goal... I think nowadays you hear people talking about DataOps, talking about DataMesh and there's a movement or really to be more agile, to have each team in a company to own their own analytics, to own their job, to own their machine learning and really you build an ecosystem of teams depending on each other and people consuming data, producing data, producing dashboards and depending on each other. As this ecosystem grows and grows, you have more interdependencies and then you start having issues due to the complexity of all those dependencies. And every time someone changes something, they break something. And so, as those ecosystem grow, you kind of lose the agility you add by having people instead of centralized teams that do certain things. Or you lose this agility of people owning their own area, right. Because every time something changes something, they break something, and so they tend to not change anything like this ecosystem tends to ossify and then not change anymore.
And so, we're going to talk about what we can do to improve that. Right. So you have those interdependent teams and they understand very well their area but they don't understand that well how other teams work or how change are happening or teams may not be aware of who's depending, who is impacted by the change they're making, and so there's lots of friction that happens at the team boundaries. That creates breakage and then data pipelines break and then it takes a long time to figure out what's going on when you lack context.
So, some of the limited context and question you want to ask is what is the data source? Where is this data living? What is the schema? How is it evolving over time? Who is the owner? Who is responsible for that thing? Who's making sure that these dataset is updated? How often is it updated? Where is it coming from? Who is using that data? What has changed? Right. Who is going to be impacted by a change? How do I understand if I'm going to impact someone when I do a change that I think is necessary? And so I've stolen this... You have this Maslow's hierarchy of needs, which is a common theme that is reused in presentation. It's talking about before you want to achieve happiness, right? You have underlying goals that are important to you. First, you need to be able to eat, to sleep, to have a shelter to stay. Once you have those basic things, then you can look into, how do I leave a better? How do I improve? And all those things. And so, there's similar things for data.
When you look at your data, the first thing it needs to be available, right? You have to put your data in a place or have access to it so that you can start doing analysis or you can start extracting inside of... Build feature around it. And second, the data needs to be updated in a timely fashion. It needs to be available but then whenever it changes and whenever there's new data you want you to have it in a timely fashion. So there's freshness. And third is data quality. You want to make sure the data you're getting in timely fashion is actually accurate, right. There's no problem in the instrumentation or in the transformation or some misunderstanding that makes your data bad quality and give you wrong results to what you're expecting to achieve. And that's kind of the baseline. That's your water line.
Once you have your head above water that's when you can start getting to the next step of, how do I optimize my business? Like as an example, you can say, "How do I make our systems better? How do we make things cheaper? How do we make things more efficient extracting insights for your data?" And then once you have this, that's where you can start thinking about, "Oh, how do we have new business opportunities? How would we get further than making what we already have better?" So it's kind of this hierarchy of needs but the bottom line of this is before you get value out of your data, you need to get your head above water. Right. You need to get out of frequent breaking changes in your data and maybe suddenly your dashboard is not being updated anymore and maybe someone changed your schema somewhere or some database is not being refreshed anymore or something else, right?
You want to get visibility and fix all those things and once this is a well oiled machine and it's working then you can look into, how do we improve? How do we make things better? How do we get value out of these things? But often those bottom layers are the ones that hinder the ability to extract value from your data. And so that's where... Tah-dah. I'm going to do the intro of Marquez. Marquez is an open-source project. We started at WeWork as part of building the data platform there. And it's an open-source project to collect all the metadata about all the lineage, about all the jobs and all the runs in your data ecosystem. Like everything that, when it's running. What was the version of the code? What was the version of the input? What was the version of the output? When it started. When it ended. What were the parameters? And how they all dependent on each other and build this entire lineage graph of all the interdependencies within your organization.
And so, it sits somewhat at the boundary between data operation, data governance, data discovery. Right. There's this aspect of understanding everything that exists, understanding where the data is going and where it should not be going. And these data operations of understanding if things are running, if they're running in a timely fashion, if they're taking time, if they break often and what has changed. And so this project is grounded, so to speak. So, a lot of the concept in there are borrowed from the grand paper, from the UC Berkeley, which is... If you look at the beginning of the paper, there's this notion of, it's not just big data, it's big metadata. It's not just about whether the data set is big that's the problem, it's that you have many data sets and many transformation and many people interacting with it and that becomes very complex. And you have to deal with this complexity.
So this is an example of what data platform, if you have one or what it could be looking like. It starts from ingesting the data and then you store it somewhere. Ingestion would work typically in two ways, either you replicate some operational database to get some states or you send events through something like Kafka and collect events about what's happening. And then typically, you have some archival of the streaming data in an addressed storage and you can use a streaming framework like Flink or Kafka Stream or Storm or other things to process that data in real time.
You also usually use a scheduler to have batch processing like some hourly, daily jobs. That will process this incoming data and build dashboard, do machine learning, transform the data, curate the data and make it usable. And typically you have also a BI stack, that may or may not be the end value of this DataOp and a bunch of dashboards may be using Power BI or Tablo or Superset or other things to analyze your data. And the point here is, end to end, you want to understand how the data goes through, how it percolates through this environment and if it's happening in a timely fashion, right. So that all those things from the injection to the BI, to the machine learning jobs, and the models being produced, you want your instrument to collect metadata and understand how all this is happening and having visibility, being able to observe what's happening and when it changes especially.
The Marquez model bulge down to a generalization of a few concepts. You have the concept of job and you have a concept of dataset. And the lineage graph is a bi-graph of always jobs consuming datasets and producing datasets. So you build this bi-graph but then what we are very careful about, it's not just about jobs in datasets, it's also about understanding every run and really building that lineage graph at the run level. So any run you know when it started, when it ended, what was the version of the input it consumed and what was the version of the output it produced and you know what parameters were passed to it. And so we're really building that lineage graph at the run level, which means that we know exactly everything that has changed between a run that has failed and a run that was successful, right. So when something goes wrong you can look at everything that has changed between the last time it was successful and the first time it failed.
These generic model applies to batch processing, stream processing and something that you would not necessarily call a job but you can name for those generic entities of things that process things and data that stays. And so we have some generalization of... A dataset then can be a DB table. Maybe it's a table in your data warehouse or it's a set of files in your distributed file system like history or it could be a topic in Kafka as a stream. And so you have the generic model and then you have some specialization so that we can collect metadata that is specific about certain type of dataset and certain time of jobs because we may have more detailed metadata.
In particular, for example, if you're using something like Iceberg or Delta Lake, you want to capture the actual version of the data. Since those are version stores, that way you know exactly what version of a data set, what's consumed by that job at that time when that run ends. And so, in that sense, you can go look at the data as it was when this job run because you captured the version in the metadata. So that's where the model and the design benefits being, you know all the version of the job, how they evolve, maybe they have more inputs and outputs of code change of the job and that resulted in updating the scheme of the output.
This is really important in versioning everything and keeping track of what has changed and not just what the current state of things because when something breaks... If you're investigating and something looks fishy, you want to know if it was always the case or if it's changed recently and that's correlated with the problem. And other things that are very useful, if you knew exactly the state of the system as of before it failed, you can roll back to that state. In particular, if you're using something like Iceberg or Delta Lake, you can roll back the dataset to the version before the bad job run and created bad data.
So, how metadata is collected. It's very simple, push-by-system. The idea is you can start from the scheduler. So, I'll be using Airflow as an example because this is the most popular open-source scheduler out there but you could do the same thing with Luigi or Oozie or other schedulers you might be using. And this is an open-source project with an open API so if you have your own custom scheduler, you can make your own integration as well. And basically, whenever the job runs, we add an integration to the scheduler and it's going to call the API to say, "This particular job has started with those parameters, this version of the code and reading those inputs." And when it's finished it's going to send another event to say, "The job is complete." Or if it's failed, it's going to call this API to say, "The job has failed." This is a simple REST API and the goal is to really define a standard in how we collect metadata and lineage information at run-time to instrument our job and achieve observability.
And so, there's simple REST API you would integrate your ETL, your batch, your stream, dashboard and Marquez as these modular framework, when there's this core model when you store the data. And then basically, it's plugable. The idea is to be modular and you have your core presentation of the data that is collected. And then we can add multiple features around it based on that and to make it a little bit loosely coupled set of features so that it's easier to evolve over time and to be an optional extension to it.
And so, we go a little bit deeper in the internal layers. You have the core database and the metadata API on top of it. As part of this API, there are two major parts. One is the lineage collection, which is when the job starts and when the job ends and collecting metadata in lineage information. And the second is the core API, which is, how do I look at the job that exists? What are the inputs? What are the outputs? It's exploring that metadata. And on the client side, similarly, you have the integrations, which is how do we collect data? And you have the Marquez UI that lets you look at the current lineage graph.
And to give you a little bit of context of what we're doing at Datakin, is in Marquez you can have a listener to get notified for these data as it comes in and what we do is we build a graph representation of it and we build a bunch of tooling to help with this lineage analysis like troubleshooting, understanding dependencies, who depends on what? Understanding everything that has changed. How do I build the diff of the entire environment when something breaks or something becomes slower or the data becomes bad and understand everything that has changed and correlated with the problem and seeing. And other aspects are, you could look also like correlated failures. Maybe someone updated the version of Spark and now all the jobs are failing between the previous version and the new version. Right.
Then you can start doing these kind of analysis by extending the model. Right. So you have the lineage collection aspect but then you can leverage this data to do other things around the core of the project or other things people might be doing, typical application are governance, compliance. You can think about GDPR or CCPA and understanding where your user private data is going and is it being consumed by the right system or not? Or where do I need to go delete data if I have a request for data deletion. So, those are examples of other things you can build around these core Marquez module
To go a bit in the details, this is an example of how you collect metadata, so the actual payload of the REST core. The first step is... Okay, this is the source it's coming from, so this is your database. It could be your Redshift, your Vertica your Snowflake, your Dremio or you [inaudible 00:17:51] straight back it, when you start Delta Lake or Iceberg data or it could be a Kafka broker that contains Kafka topics. So first is the source, then you have the dataset. The dataset is an actual dataset, so in a warehouse it's a table, in Kafka it's a topic. It could be a folder in a stream. The last element is the job, right. And the job is the thing that consume dataset, it has inputs and outputs. It has drawing version. So capturing... If the job is collecting the GitHub URL with the GitShot, you know the current version of the job that's running?
And all those things are version, right. And so what we do with have this notion of namespace that let us have unique names for datasets and jobs and that's how you refer to them. The job is consuming these data set, which has a unique name with a namespace and that refers to these data source as being the container for it. And so when something changes, right. You may have the V1 of the job and it's consuming one data set and for now it's not producing any data. And then you push the V2 and now it's updating its output and the graph shape changes, right? So it could be... Another example, if you update your job and now it's reading from an extra table or now you update your job and the schema of the output has changed. So all those things would be version and you'd be able to see what was the schema before and after.
And so, I take the example of Airflow to make it a little more concrete. So, in Airflow you will have multiple DAGs. So the DAG is a notion of job in Airflow and the DAG is made of tasks. So in the case of the Marquez integration, each task will be integrated in Marquez and will capture when it starts, when it ends. And you would argue that in Airflow you have this notion on graph already, except it's only within the DAG and it's not really showing you any data dependency, right. So you see how tasks depend on each other but really the underlying reason that a task depends on another task is because a task is consuming the output of the previous task, right. So, you have an underlying data dependency that will become visible in Marquez. And if your DAG definition is not consistent with it, it will become abused in the Marquez UI.
So we also track inter-DAG dependencies. Some organizations like to have multiple Airflow instances to make more people more independent, so it's going to all stitch all that together to make one graph of all the dependencies. The way does it, there's a built-in SQL parser for when you SQL to extract the lineage from that, it connects the current version of the DAG as defined by the GitShot, for example, which is the most common as well, and extract metadata like parameters and all that. And so, you add the Marquez integration to your Airflow and now is going to call the Marquez REST API, to create those notion and track the lineage as the jobs are running. So every time a job starts it's going to call, every time a job is successful or fail is going to call the Marquez API.
And so, there are two ways to integrate. So the simple way if you're just trying it out in your DAG is to, instead of importing the Airflow Dag class, if you're familiar with Airflow is to import the Marquez Airflow DAG, which is just Marquez extending the Dag class to add the instrumentation to it. And the other way if you want to globally integrate that in your Airflow in Python, you can substitute an import and say that whenever people import the Airflow DAG, import the Marquez DAG instead and suddenly everything is instrumented.
And then Marquez has this notion of extractor, which is a mechanism depending on what operator you're using to extract the lineage information because this is operator specific. And so over time there'll be more and more operator integration, we call extractors that extract the metadata from the operator in collected lineage information. And so, depending on the operator, we can extract the data source the data is coming from, what table is being written to, what table is it reading from and so on. Right. And as all things, based on the namespace create unique identifier for jobs and how we collect the information from run over run how things have evolved. And so I mentioned it, if you have two different DAGs in Airflow then the fact that one DAG is producing some data and another DAG is consuming it, is going to connect those DAGs in Marquez and you're going to have visibility about everything that depends on each other.
I just wanted to give some quick context about Datakin. So, Marquez has the integration and the API. And so, we contribute to Marquez obviously, adding more integration. And just like you have OpenTracing, OpenTelemetry as a standard for tracing independently off the backend, you can see Marquez as that for data lineage and metadata. And then Datakin is just leveraging that data enabling better access. So Marquez standardizing metadata collection, runs, the parameters, versions, inputs, outputs, and Datakin then involves better understanding of operational dependencies, impact analysis, what is going to be impacted by this change? Troubleshooting. How do I know everything that has changed since the last time it worked, right? How do I correlate all the changes with the failures and get really quickly to the bottom of the problem?
And so, now a quick pass at the community. Marquez is posted on GitHub. You can see the Marquez project tag on GitHub. It has joined the LF AI Foundation, with the obvious goal, if you wants to define a standard for metadata collection, you need to be a neutral, not controlled by any company entity project. Right? So the goal is really, as a community to grow adoption, to show that the goal is to make a shared standard. And it's the kind of thing that, the more people are using it and the more it's valuable to everyone. And have a clear decision mechanism... Public decision mechanism, clear mechanism to become a maintainer, and have a clear code of conduct in the projects.
So, the LF AI, if you're not familiar, it's a set foundation of The Linux Foundation, it's a sister foundation of the CNCF. The CNCF is being around Kubernetes in cloud and all that and LF AI is more about machine learning and data. And it's a growing foundation with more and more project joining it. So you can join the project. We have six sub-repose for the different component of the core Marquez, the UI, the integrations. There are a number of contributors already. It was started at WeWork but with contributors are generally collaborating with people at Stitch Fix and a few other companies and it's a growing project now as part of the LF AI Foundation. Well, thank you. And now I think I'm right on time for taking a few questions.
Perfect. Just making sure. Yes. Excellent. Thank you for that. Let's go. We've got one question queued up right here from Santel. So Santel, I'm about to add you, let's see if this works. And meanwhile, Julien, Flavien posted a question it's just inside the session announcements tab. Looks like there's another question there as well. So I may take a look at that and we'll see if Santel is able to join or not.
Julien Le Dem:
Yeah. So Flavien, thank you for the question. Yes. There is definitely a vision of Marquez working well with Amundsen and you'll see that Marquez and Amundsen may become more closely integrated in the future. One way to see it is, Amundsen is more the data discovery path and usually it requires that you push lineage information into it. So the way we see it is to have a... We have a listener for Datakin you can have a listener in Marquez for Amundsen, that would make it easy to push whatever lineage we collect with Marquez into Amundsen and probably have some cross-linking in between the two. So for yet, right now it's not been beyond the proof of concept step but it's something we'd be eager to see happening. We're friends with Amundsen folks and they do a good job. I think Marquez is more focused on the core, on how do we collect lineage? And Amundsen has a lot more on data discovery, that type of use case and how we know all the data that exists and so there's clearly a good partnership to have here.
Hey, Julien. This is pretty cool. We're actually looking at something similar internally. So I had a question for you on data versioning. How do you guys handle versioning for things like streams, where you're continually getting more and more data added onto it? Do you just... I guess, yeah. How do you handle that?
Julien Le Dem:
So like I said, we try to have a unifying model of jobs and dataset. Right. I mean, we have a unifying model like that, so jobs can be batch or stream in a different sort. But the other thing we do is specialize in job code. So depending on the type of job you have different metadata. Because obviously for batch, you capture those dependencies like the... We want to capture partition level and dependencies at this level. And at the streaming level it's very different. So you have this notion of run and then when it's a batch process, the run is every time the job starts and end, that's a run. For a streaming job... A streaming job doesn't actually run continuously, right. So we run from a point on until we stop it and then you may stop it to update it, maybe you change the code and then you wipe and then you start it again.
Julien Le Dem:
And so, the notion of run then matches those things. But so you would have a notion of run. And then, one thing we do is whenever a run stops and it kind of creates a version of the dataset. So for streaming, you're not going to keep track... The goal is also the metadata has a reasonable role. So we can keep track if the job has changed and so at least capture that information. Other information that we capture is the schema has changed so if you're using something like our schema registry... I mean, schema registry [inaudible 00:29:35] And the fact that the schema has changed. And so if schema registry can keep track of versions of the schema and the instrumental value stream. And so we can store that and then we print it out. The screening aspect of Marquez is something that is still more to be developed.
So this is kind of like the beginning of the thinking. If you wanted to push it, you could actually, when you stop the job on the check points usually you fill your states. Typically, it's a list of partitions that are interim and then what is the offset on each of them? So you could imagine a way of storing just that information that captured exactly when you stopped reading, when you stop the job and say that is the vision that is connected to the line that just ended, if you wanted to set that. So this is not something that exists in markets today but you could imagine adding this data in there. The goal is to work with the core model and then it's extensible. So per-type of data set, per-type of a job, you can have more metadata that we need to do that in particular and that would make sense and help to capture that more precisely.
Okay, cool. No, I think that makes sense. I mean, it's a complicated topic, so we don't have a good answer for it either, I was hoping you-
Julien Le Dem:
Yeah. But I think one of the goals of Marquez is to have a precise modeling of how it actually works and when the actual versions are so that you capture the state of your system. And like the Holy grail, for example, in the batch side is when a job has failed or actually the worst case is the job has not failed but it produced wrong data and a bunch of other jobs I've been triggered from that and they produce more and more wrong data. Is knowing exactly what needs to be rerun and how do we roll back to the previous state? So if you capture the entire state and you have things like what was the version of the dataset before it ran and you can walk back everything to where it was before and you know exactly everything that needs to be a rerun.
So it's kind of the same ID and so we spend less time on streaming for now because... But this is very welcomed and this is what we want to from this community. If people want to focus on the streaming area, they're very prone to join the conversation and pushing this area for Marquez.
Great. So I'm going to jump in with that. We're out of time. So thanks to everybody. Thank you, Julien. Really appreciate. Great question and take the conversation to Slack. So I just put it into the chat here, jump over there and Julien will be there and of course that community will continue to exist even after the event. So thanks all.
Julien Le Dem:
Thank you very much. And I had a great time, have a good day.