Subsurface LIVE Winter 2021
Data Lineage with Apache Airflow
With Airflow now ubiquitous for DAG orchestration, organizations increasingly depend on Airflow to manage complex inter-DAG dependencies and provide up-to-date runtime visibility into DAG execution. But what effects (if any) would upstream DAGs have on downstream DAGs if dataset consumption was delayed? In this talk we introduce Marquez, an open source metadata service for the collection, aggregation, and visualization of a data ecosystem’s metadata. We will demonstrate how metadata management with Marquez helps maintain inter-DAG dependencies, catalog historical runs of DAGs, and minimize data quality issues.
Willy Lulciuc, Software Engineer, Datakin
Willy Lulciuc is a Software Engineer at Datakin. He makes datasets discoverable and meaningful with metadata. Previously, he worked on the Project Marquez team at WeWork. When he's not reviewing code and creating indirections, he can be found experimenting with analog synthesizers.
Hello, everybody. And thank you for joining us for this session. My name is Louise and I’ll be your moderator today. I wanted to go through a few housekeeping items before we start. First, we will have a live Q&A after the presentation. We recommend activating your microphone and camera for the Q&A portion of the session. Simply use the button at the upper right, and you can share your audio and video, and you [00:00:30] will automatically be put in a queue to ask your question. So with that, I’d like to welcome our speaker, Willy Lulciuc, who’s a software engineer at Datakin. Willy, over to you.
Great. Thanks for the introduction. And also, thanks everyone for attending my session. Today we’re going to be talking about data lineage, but really how it relates to a Apache Airflow. I think we’ve all written DAGs and they’ve failed. And what we end up seeing is that there are dashboards [00:01:00] that rely on those ETL jobs or just workflows that populate dashboards on a frequent basis, like daily or weekly. And a tool like Marquez will help you, I think, troubleshoot those a bit easier with the data lineage that Marquez stores on the back end and with the UI that allows you to explore and visualize the dependencies between the DAGs themselves.
So, before I start I’ll introduce myself. Hey, I’m Willy [00:01:30] Lulciuc. I’m a software engineer at Datakin. We are a startup that’s focused on data lineage in the data ops space. I’m a co-creator of Marquez. So before Datakin I worked at WeWork on the data platform side and worked on Marquez and built out the service and open-sourced it as well. And you can follow me on Twitter if you’d like. My Twitter handle is W-S-L-U-L-C-I-U-C.
The agenda for the talk. [00:02:00] We’ll cover a few things. We’re going to go and do a quick intro into Marquez. Talk about some of the APIs, the data model, and also some of the extractions that introduces. We’ll also go into the Marquez Airflow library. Talk about the code and also the metadata that’s extracted when your DAG runs.
We’ll also go into a demo. I’m pretty excited about that. We’ll actually kick off some DAGs and see the lineage be visualized for the UI and explore a little bit in terms of what those DAGs are. And also some [00:02:30] of the links between the two, between the nodes in the graph.
Then we’re going to be doing a quick intro as well into OpenLineage. So that’s a really exciting standard around collecting metadata from different systems. And then we’ll go into future work as well.
A quick background in Marquez. Like I mentioned before, it was open sourced at WeWork from day one. So if you look at the commit history you’ll see that we open sourced it with [00:03:00] the idea that it’s a standard that was missing or a metadata service that was missing. And we ended up I think having a lot of sessions on the data model. So we pulled from our background in previous jobs that we had and realized, you know what, there hasn’t been a really good solution around a metadata service. So we decided to open source Marquez and work on that when we built out the data platform at WeWork.
So it was a really core component. There was Kafka streams that we were writing to, services [00:03:30] that have operational data stores, warehouses, all that information was stored in the Marquez back end. And as of 2019, it’s part of the LF AI and Data Foundation, so we donated the code to the foundation.
Then we pulled background information, background for the Marquez Project from our past experiences. We were pretty excited to actually learn about this paper called Grounds, which actually talks a lot about the [00:04:00] importance of versioning. So if you are going to process data, so if you have input datasets and output datasets, what is the intermediate process? Really what’s the version of code that produced that dataset? So it talks about the importance of really having a link to code versioning the schema. So all of that this paper goes into. So if you want a background in terms of what the space is all about, I’d highly recommend this paper that came out by Berkeley.
[00:04:30] So if you look at a Venn diagram, in the data processing space you’ll have data governance. So really understanding who’s touching your data, what data might have PII information. You also look at the data discovery component. So really cataloging your datasets and jobs. So if you joined the company on day one and you wanted to start putting together a quick dashboard or understand what are the core datasets for your company, you want to be able to do a search and see results.
[00:05:00] But for this talk we’re going to be focusing on data lineage, which is really understanding as your data goes through your system how is it transformed? What are the downstream dependencies? Where are the jobs that rely on those datasets to feed a lot of your company initiatives?
At a really high level, we can take a look at the Marquez Service, which is just a metadata service that ETL batch and stream workflows communicate with, the REST [00:05:30] API. And at the core, it’s really managing three entities, which are sources. So really understanding things like where is your data actually located? Is it in Postgres? Is it in Redshift? We also begin to look at the datasets. So metadata around the schema, who owns that dataset and how is that actually linked to the source is really critical. Because if you look at the jobs, as a data engineer you’re going to write a job you’re to want to know what datasets are going to be [00:06:00] part of my input.
Then we’re going to look at the modular framework. Marquez itself is also modular. So we’ve actually done quite a bit in terms of understanding how we make the code a bit more extensible. So there’s a component around data governance so you’re able to tag datasets and jobs as having PII. You’re also able to look at the lineage itself. So we’re working on a lineage, a service that allows you to query the [00:06:30] lineage that’s stored on the back end. And then also dataset at discovery and exploration. So there’s a search API that allows you to search for datasets. A lot of what we talked about in that Venn diagram, but also links back to that paper. We built a lot of that in Marquez.
And then at a high level we could take a look at how things are broken up. So at the root you have APIs and libraries. So the integration is what this talk’s [00:07:00] about around the Airflow integration. Then we’re also working on Spark, and as you can imagine you could extend that to Flink and a few other services as well, to start sending metadata in Marquez’s back end.
We also broke it up into services. So Core is really our core model. And then if you look at the storage component, there’s the Postgres instance that Marquez uses to store the metadata collected from different systems, different jobs and so forth. [00:07:30] Then you had the lineage API, which allows you to query the jobs and datasets and really getting an understanding of how all of those are dependent on each other.
And also the search aspect. And Marquez itself, though it catalogs the datasets and jobs, it’s not really a data catalog. But it’s really important that at a minimum when you build out the lineage graph you understand where your inputs are coming from and also some [00:08:00] of the downstream dependencies. If you search for, let’s say, if there’s a job that’s failing and you search for the datasets, that’s your input. You can actually begin to start tracing to see who owns that job and what job is actually producing the dataset that you rely on and is causing your job to fail.
There are three main entities within the data model that we store on the Marquez back end. You can look at [00:08:30] the one core component as we talked about, which was sources. So you have my SQL Postgres and a few others, then the datasets. So a source will have one or more datasets. And with the job entity you have different types. So you have Batch, Stream, and Service.
I think this is where things get really interesting with Marquez is that as you write jobs they’re not static. The code is not static so you could have [00:09:00] constant deployments of your ETL workflows. So every time you change your code Marquez has its own versioning functionality on the back end, which will version jobs and datasets transparent to the caller. So if you have a job and you modify the code, Marquez will actually detect that based off the metadata that you send and it’ll create a new job version.
And for every job version there is a run. And that’s how we begin to understand [00:09:30] what datasets are produced by a job. So you could view a run as an instance of a job version which has datasets as inputs, which are actually mapped to versions and the output datasets themselves. They’re also versions as well. And the runs, the way we version datasets currently is simply if a job completes we do update a last modified date, which for the most part we can assume that if the job completes so does the [00:10:00] dataset itself is assumed to be modified. But there’s a lot more work we’re doing there.
And in terms of dataset versions, there’s actually three components. You have DbTables, File System and Streams. The one that I think has most evolved is the DbTable dataset version. So really in that aspect we track to see if the schema of the DbTable has changed if you add a column, remove a column, [00:10:30] or you change the type. All that’s versioned on the back end, and we’re working on APIs to begin to list versions of the datasets.
So what are some benefits? I think a big one is debugging. So you can imagine the use case I talked earlier where you’re a data engineer, you’re working on a brand new pipeline and you released the first version of the code. If you have dataset versions as output [00:11:00] you’ll be able to see the downstream jobs that depend on that. But not only that, you can actually trace the versions that they depend on.
So the one thing that we’d like to really convey with the notion of versioning is that the job itself as an interface, you can think of the output is the interface that other jobs depend on. And what are the versions of those datasets that it produces? So you can almost think of it as with APIs you have some versioning, but how do we take that same notion [00:11:30] and push it to workflows?
And then the other benefit is backfilling. So we’ve all had to backfill and it’s usually not a fun experience. And what you’re able to do with this model is do full and incremental processing. So the one thing that the model currently doesn’t support, but it’s on our roadmap, is dataset partitions. So you can imagine if there’s corrupt datas the quality of the data for some reason drops and only a few partitions [00:12:00] of that dataset need to be backfilled. You can do incremental processing, but if you need to do, let’s say the entire dataset is corrupt and you need to start from really the beginning of time, you could do a full processing on that dataset.
How is metadata collected and really it’s push-based, so you just integrate either a Python client or Java client, and begin pushing metadata to the REST API. [00:12:30] So that includes dataset, metadata, and job metadata, as well as the run metadata. So the one thing that we collect are the run args. So if your job executes we look at the args that were passed into the job. So you could have flags maybe that trigger certain parts of the code. You could have your nominal start time and nominal end time that could be part of the run args. So there’s a lot that you can do there.
[00:13:00] With the understanding of how Marquez stores the metadata on its back end, what I’m going to do is walk you through the sequence of steps that also occur in the Marquez Airflow integration. And I’m going to walk you through also the parts of metadata that I think are more interesting when we start collecting, start making those APIs and start sending metadata to the Marquez API.
So [00:13:30] the first thing that you’ll need is a source. So you give it a type. In this case it’s going to be Postgres, and we’re going to give the source a name, and it’s going to be analyticsdb, but also provide a connection URL. In this case, that is just helpful for debugging. It’s not necessarily used for connecting to the actual source itself. So we’re not going to store passwords or anything like that. It’s just a good way to begin to begin to catalog the sources within your org.
[00:14:00] And the other part is the description. So we feel that all metadata should come with a description because a lot of this is going to be read by humans. So it’s really helpful to get that extra context. So in this case we just have a source that contains all the office room bookings. So we’re going to use a WeWork example since I previously worked there. And so we’re going to go ahead and create that source.
And the next part is the dataset. So, after you defined a [00:14:30] source then you can start registering datasets to that source. So here we have a DbTable, so we give it a type. We give it a name. So, Room Bookings. We also give it a physical name. We also then link the source. So we provide the source name as well as the name space and some of the fields. So I think the important parts here is the type, the name, and also the fields that come along with it. And here we’re just going to have a dataset that has [00:15:00] all the room bookings for each office.
And the one thing I’m going to highlight is the name and physical names. So the logical name is really the name that Marquez is aware of in terms of the dataset. So this is the name that you say, “Hey, Marquez, name this dataset using this name.” And the physical name actually maps to the physical source of how it’s actually named in the source itself. So one example could be you [00:15:30] have an S3 bucket that could have room bookings, but really the physical names, the full path to the bucket itself. And this begins to introduce the idea of data extractions. So within your workflow you could reference the dataset by name, but the physical name itself, Marquez could do that look up for you and at the same time provide some of the connection details for the source.
Next is going to be the job. [00:16:00] So here, we’re going to define the batch, a batch job that does room bookings. That’s called room booking seven days. So it’s just going to look at the past seven days room bookings and do some calculations there with the location and also the namespace that it’s associated with.
So if we go back and look at the important links, so we look at how a dataset is linked to a source and also how the datasets are linked with the job itself. So once you define your source, your [00:16:30] datasets, then you could actually start using them within the job. And the most important, I think a critical part here is also ownership. So it’s important to know who owns the datasets and who owns the jobs so that way you know who to contact. And Marquez on the back end keeps track of that ownership as well as if the ownership changes. So let’s say this job and dataset was owned by data science but tomorrow it could be owned by analytics or another team, Marquez also keeps [00:17:00] track of the ownership changes.
If we go quickly through here, we’ll see one on the left side, you’ll see the first version of the job. And on the right side, you’ll see the current lineage graph. So here we just have a single input. It’s owned by data science and the name of the dataset is room bookings. And let’s say now we have an output. So before the job was just sending an email, but now it’s actually not only sending an email, but it’s also doing some room booking [00:17:30] aggregates as well. So on the right you’ll see there’s a new representation of the lineage graph. So the circle represents the jobs and the squares themselves represent datasets. So if you’re here, you’ll have three nodes, two being datasets and one being the job itself.
So next we’re going to go through the Marquez and Airflow integration. What it currently supports is metadata extraction at the task level. And that’s really the important part because operators [00:18:00] are really what do the extraction. So we look at the task life cycle, so understanding the run states. We look at the parameters that are passed into the task itself, or really that’s part of the DAG. We also then built out the link, so really versioned to code, kind of going back to that Ground’s paper, understanding the importance of as your code evolves what are the drive datasets from that code?
[00:18:30] The other component is the lineage. So the tasks themselves will begin to visualize within the Marquez UI. And so as tasks run we register those as jobs and then begin to visualize the metadata on the UI. And it does come with built-in components. So the SQL parser that we use to tokenize the SQL and register the input tables as well as the output [00:19:00] table, and we have a link to code builder. Right now GitHub support it, but you can imagine we could have one for GitLabs. And the other component is that it ships with metadata extractors.
So at a high level, if you go from left to right you’ll see that when a DAG runs the integration will send metadata to the Marquez REST API, and begin to start populating that model. So registering [00:19:30] the job, versioning the job itself, capturing the runs and also understanding what input versions the job has, but also what output versions the job produces.
An important thing is that Marquez Airflow Library is open sourced. If you wanted to check it out, contribute extractors, you can totally do so. All of that is open sourced. It’s also enabled globally at the task level. [00:20:00] So when a task runs, like I mentioned, we extract quite a bit of metadata as well as the SQL. So if the SQL is executed or the task executes SQL, it’s also stored on the Marquez back end and linked to a version. And really all you have to do is just extend the Airflow DAG. So normally you’d have to do Airflow Import DAG, you just have to modify your code to do from underscore Airflow import DAG, and a few environment variables in terms [00:20:30] of the back end that prompts your ID environment variable for the Marquez back end to API.
So the way that we extract metadata is here you can see that the Airflow shifts with Postgres operator, but we’re also doing quite a bit of work on big query. So if you’re running on GCP you could begin to extract metadata on your big query workflows. So the idea is you have the operators that map to [00:21:00] an extractor, and that’s really what begins to extract the metadata from the operators. So for every operator that Airflow has we are working on also providing a extractor.
So if we walk through the operator metadata that’s collected, so here in the context of an Airflow DAG, we see that we first collect the source. So here it’s going to be called analyticsdb, which is Postgres connection ID. Then we’re going to tokenize the SQL [00:21:30] and we begin to look at what tables you’re inserting into. So here we just have a pass that’s called new room bookings DAG. And that’s where we’re going to be registering for our job. So after the extractor runs you’ll see that all this metadata is then put into an API call and then sent to the Marquez back end.
In terms of managing the inter-DAG dependencies, so this is what [00:22:00] your normal view is. So a lot of you may have worked with the Airflow UI, and what you’ll see is that you have to click on the DAG itself to see what the dependencies are. You could also use sensors to create these dependencies between the DAGs themselves, but it’s really hard to visualize. There’s actually some companies that extend the Airflow code, and that’s actually pretty common. I’ve seen it where one company has an entire DAG in order to capture the dependencies. And [00:22:30] that’s actually pretty hard to manage.
And really all we’re looking for is that these two DAGs depend on each other through this common table. So you’ll have public room bookings that inserts just room booking. So you’ll have a location, what time the room booking was, but also what room number. And between these two DAGs you’ll see that one task is writing to that table and the top room bookings dag, which is just kind of competing what are the top room bookings [00:23:00] maybe within a region or within a city, and it relies on that table.
All right. So what I’m going to do is walk you through a quick demo. So what I’ve done here is I’ve already kicked off a few DAGs, but the DAG that we’re going to look at is this ETL delivery seven days. And what is it going to do it’s just going to load deliveries for the week. And here the example’s for food deliveries because [00:23:30] I think it’s pretty common nowadays to order food.
So the first thing we’re going to do we’re going to look at the code. What I’m going to do is highlight that the Marquez Airflow DAG is being used. So just swapping it out from the Airflow one. We’re also going to look at… There’s a number of different dependencies that it relies on before the DAG kicks off. So with the example, it’s going to use Postgres pretty heavily so there’s going to be a Postgres operator that creates this delivery seven days, which [00:24:00] we’re going to be writing into. But at the same time we’re going to do some inner joins on a few tables that this task relies on.
So if I go to the Marquez UI, we’re going to be looking for restaurants because I think that’s a pretty fun one. Just really quick. So I typed in restaurants. You’ll begin to see the metadata that’s already been collected. So I’ve clicked on public restaurants. And I zoom [00:24:30] in a little bit here. You’ll see that the schema itself is automatically captured. So one of the key things in the extractors for Postgres, but also some of the other extractors that we’re working on is that the schema is pulled directly from the database.
And here we’ll see that we have an ETL restaurants, ETL task, and that’s part of the food delivery namespace. And what we ended up doing with the Airflow integration [00:25:00] is that we append the DAG name and then the task name. So we use a document right there. And we can also see that it’s ran few times so it completed twice. But if the job failed that would also be shown and it shows you the last time it ran. But also the locations, if you click on this it’ll actually take you to the source code.
So, what I’m going to do is I’m going to enable the ETL delivery seven days, and then I’m going to also enable [00:25:30] a few downstream tasks that depend on it. I’m just going to do some quick refreshing and it should complete really shortly. Probably one more task here. [00:26:00] So if I type restaurants… The one thing I didn’t show is it also registered the different sources. So, if you wanted to just hear the connection ID that we used it’s food delivery dB. So if go back to the restaurants [00:26:30] example and we look at the graph again, so that the DAG is completing but as it’s completing it’s also building out a new lineage graph.
So here we can see that the DAG that we executed has some downstream dependencies, but for that restaurant, ETL restaurants, it also now has a lot more in terms of visualization. [00:27:00] You can see that there’s a lot more jobs and datasets that are part of the lineage graph. And if you look at the downstream dependency, this ETL delivery seven days, depending on restaurants. And now you have a better view of the dependency graph as well.
So, I think I’m running a bit over, but what I’m going to do is quickly introduce OpenLineage. I just want to say that a lot of this work really can be very custom and as different systems evolve [00:27:30] new versions come out. So in this case it’s scheduled for Airflow, we also then have to introduce new code to handle the integration. And that’s really cumbersome.
What we want to do is really push for a standard that pushes the metadata, the lineage metadata coming from these systems to the system itself to make it really common across these different toolings to collect metadata. So if you’re going to be writing to Marquez or Data Hub or Amundsen, [00:28:00] or a number of others, we want a standard to be present so that way lineage metadata is just collected a lot easier.
And I’m not going to get into this one too deeply but the idea is that when a run starts you have all the metadata around the run, the job, the datasets themselves. So it’s an open standard around specifying how to collect metadata, lineage metadata, and the metadata itself is collected at the job level. So understanding what’s the [00:28:30] run ID, what are the inputs and outputs for that run? And also what are the schema for that dataset?
And the one thing I’ll notice that it’s a standard that’s in progress and the way we’re looking at extending it is through facets. So, that’s where you can add your custom metadata. And we’re not doing this alone. There’s a number of teams that are interested in contributing to the standard. So you have dbt, Data Hub, Iceberg, and it’s a pretty exciting field.
[00:29:00] So the one thing I’ll end with is just the roadmap itself. We’re going to be releasing Marquez Airflow which has the OpenLineage support versioning APIs and graph QL. And then we’re doing a lot of work with Apache Spark to begin collecting metadata and some medium term APIs that we’re looking to add support for its lineage search. Of course, the facet concept [00:29:30] to our core models to better align with OpenLineage.
So with that, that’s all I got. But if you’re interested in Marquez, go to marquezproject.ai where you can learn more about the project, as well as the roadmap. And if you’re interested in this or if you think the metadata space is cool, want to work on APIs that do lineage analysis, definitely reach out to us at Datakin, so email@example.com. And with that do [00:30:00] we have any questions?
Louise: Thanks, Willy. We’re just about at time, but let’s see what we can get a couple of quick questions in if the tool will allow us. So I’m going to queue up the ones that we have on video here. So Sudhir Maura, I think is up first. Let’s see if he’s still online. Nope. All right. We have some questions in the chat. So question, can Marquez work with commercial tools like Informatica or DataStage?
[00:30:30] That’s a good question. Metadata could certainly be pushed to it. [inaudible 00:30:36] has an API that if there is integration work that others want to do it certainly could be done. But out of the box there’s no support there. But again, it’s pretty flexible with the Python and Java clients that we have to begin sending that metadata into the Marquez’s back end.
Okay. Next question. Can you add custom metadata and where is metadata stored?
[00:31:00] Yeah. So custom metadata is that facet concept. So, if you wanted to add metadata that’s specific to your org to jobs or to datasets, it’s that facet concept, which is something we’re working on and part of our roadmap. And it’s going to be stored in Postgres. So this entire model that I discussed is in Postgres. It’s going to be in a table. So we’re working on modeling that out. So if that interests you, please continue the discussion [00:31:30] either offline or even in the [inaudible 00:31:32] through an issue.
Okay. I think we’ll answer just one more question and then we’ll have to take it over to the Slack. That question is, can source and dataset types be extended to proprietary storage systems?
Absolutely. Yeah. The one thing right now, it’s a bit hard to extend, but there’s a [inaudible 00:31:54] that we have that enables a lot more flexibility in the source types but also the dataset types. It’s a [00:32:00] really good question.
Okay, great. Thanks, Willy. Greatly appreciate the presentation. That’s all the time we have for questions today. So again, go over to the Slack channel. You just search for Willy’s name in the available list of channels, and you can continue asking him questions there. Before we leave I do want to ask you to please fill out the super short Slido survey right at the top of your screen. You can see it up next to the chat. The next sessions are coming up in about [00:32:30] three more minutes, and of course, the expo hall is open. Thanks everyone and enjoy the conference.