Orchestrating Data Validation Workflows with Prefect

Session Abstract

As data pipelines become increasingly complex and interconnected, workflow management systems are being used to schedule and monitor tasks. Prefect is an open source workflow management system designed for large-scale data processes.

In this talk, we’ll show how to get started with Prefect to orchestrate data validation workflows to preserve data integrity before it’s used in downstream processes.

Video Transcript

Kevin Kho:    Okay. So, hey everyone. My name is Kevin Kho. I’m an open-source community engineer at Prefect. So, my day-to-day job is mainly a support of our community through Slack. Today, I’ll be talking about Orchestrating Data Validation Workflows with Prefect.

So for our talk today, first of all, I’ll give a brief introduction of [00:00:30] Prefect and why you would use our workflow orchestration tool. And then, we’ll see how Dremio and Prefect can work together in building ETL solutions. And then lastly, we’ll build an end-to-end pipeline that combines Dremio, Prefect, and another library called Great Expectations that does data validation. And with that, let’s get started with an introduction to Prefect.

So Prefect is an open-source workflow orchestration [00:01:00] tool that is Python-based and designed to handle the modern data stack. We currently have 6,500+ GitHub stars and our Slack community, which I’m in charge of, is approaching 7,000 Slack community members. When I hear Prefect users talk about Prefect in other consensus or other venues, they always mentioned that the Slack community has been very helpful for them. There are lot of discussions about data engineering. [00:01:30] And even if you’re not interested in using Prefect today or tomorrow, we do encourage that you join our Slack community. Even if you just want to chat about Prefect in comparison to another workflow orchestration tool, or you just want to chat about ETL or data pipelines, we’re always happy to chat with you.

Prefect has a native Dask integration which allows it to parallelize tasks on Dask clusters seamlessly without having to write Dask code yourself. [00:02:00] And Prefect has two offerings in Prefect Cloud and Prefect Server. So when we talk about Prefect, the Python API, we were talking about Prefect Core, and you would write your logic in Prefect Core using Python. And then you would use one of these orchestration backends in Prefect Cloud or Prefect Server. Prefect Server is our open source version. You can go to GitHub, clone it, spin it up on your local infrastructure. [00:02:30] And Prefect Cloud is our managed version of it. Even if you start using Prefect Cloud though, all users get 10,000 free tasks runs for free. So it’s entirely free to get started and that’s more than enough to actually put some production workflows.

So with that, let’s see how Prefect would normally fit with Dremio in an ETL stack. And if we look at Dremio as a data lake querying engine, we’re normally [00:03:00] pulling data from data lakes or in this case, S3 buckets, and then we’re piping it out to a BI tool, in this case, Tableau. Prefect would normally be in charge of orchestrating these upstream dependencies, where we have some specific compute in this lambda and this Databricks that is populating the data in these data lake buckets. So you’d use a tool like Prefect to orchestrate this entire [00:03:30] pipeline. Although not limited to upstream, and in today’s presentation, I’ll show you an example of how Prefect can also orchestrate stuff downstream of Dremio.

And if you look at the picture of that pipeline and you have the compute with the lambda and the Databricks, these pipelines tend to be complicated and there’s a lot of room for failure. So for example, an API query might just intermittently go down or the [00:04:00] machine running the compute may go down. Data may come in malformed. And all of these problems are solvable by writing more code, right? If an API fails, you can code your own retry. If data comes in malformed, you can code your own checks. But the more and more that you code against failure, you find that you’re doing more work that detracts you from what you originally set out to begin with. And at Prefect, we [00:04:30] collectively called this as negative engineering.

So when you ever you’re writing a code that safeguards you against failure, when you’re writing code to handle failure in the form of notifications or failure messages, or even taking other code routes in the event of failure, these are all stuff that we call negative engineering. And at Prefect, our mission is to eliminate negative engineering. So when Prefect was founded, our founder talked to several data [00:05:00] engineers and you’ll hear this frequently that for data professionals, up to 90% of their job is in the form of negative engineering. And then we can reduce this 90% down to 80% or 70%, we can effectively double or triple developer productivity. So at Prefect, we want to eliminate negative engineering.

So Prefect, like any other workflow orchestration tool, allows you to deploy your workflows [00:05:30] and schedule them. These are the basic features that Prefect offers. So in Prefect, we call our pipelines or our workflows as flows. So these are like Python scripts. And then a task would be like a modular unit of work inside those flows. So in this case, a task would be more like a function and the flow would be more like a script. And then we have these basic features. So we support parameterized flows, retries of task. We store secrets for you that [00:06:00] you can use inside your flows. And then we have conditional logic. If a task fails, do you want downstream tasks to still run? Or maybe you want some other route to be taken. Maybe you want the workflow to change based on the value of tasks that happened upstream. So you can use conditional logic to build your flow. And then lastly, of course, we have scheduling.

Here’s a picture of the UI and I’ll show this also later [00:06:30] if I have time to demo. Here, we have the green, which are the flows that have succeeded. Red are the flows that failed. Gray are the flows that got canceled. And on the right side here, the yellow side, these are the flows that are scheduled in the future. So this is my dashboard and this is interactive. You can click these previous flow runs, see the logs, why they fail, what specific task failed, what [00:07:00] error messages were associated with that. This is an interactive dashboard, and this is what you get with Prefect Cloud or Server.

So now let’s talk about Dremio and Prefect together. So when I was preparing this talk, I found that this Dremio Narwhal, her name is Gnarly. We have our own mascot here. So meet Marvin. Marvin is the Prefect mascot. [00:07:30] Now, when we talk about using Dremio and Prefect together, so the first case I showed earlier was when you have upstream data engineering. And this is the most standard way that you’d use these two tools together, where Prefect would be in charge of orchestrating these upstream processes that populate the data that goes into the data lake. And then you would use the Dremio as the query engine and pipe that into the relevant BI tools that you’re interested in.

[00:08:00] And then, let’s talk about how we would orchestrate downstream jobs also. So in this case, we have a S3 bucket, and Dremio can query that and create a table using the CTA. And then, you can use a Databricks job. So for this specific scenario, what can happen here is Dremio would query a couple of different data sources, merge them together, write them out. And then that could be the training set for our machine [00:08:30] learning model training. So, Prefect would run this Dremio query and then run this Databricks job to fetch the data at that location, and then train our machine learning model. Or in this specific case, or in the title of today’s talk, we’re going to query data with Dremio, pipe it to Great Expectations to make sure that data is the form that we want. And then we can do some further Python tasks downstream.

Note here that [00:09:00] unlike the previous example, we’re querying the Dremio data and we’re passing it in memory to Great Expectations. And from Great Expectations, we’ll pass it in memory to this pipe to whatever Python code downstream. So one of the strengths of Prefect is that you don’t need to persist your data between tasks and you can seamlessly pass data from task to task. And this is one of the reasons why we say Prefect was specifically designed for the modern data stack.

[00:09:30] And with that, let’s begin with a Prefect for data validation. But before I go there, I want to mention that at Prefect, we have a task library. So Prefect is basically writing Python code. So as long as there’s an API for whatever you want to do in Python, you can make a task out of it. So you’ll notice we have the Dremio task here and we have the Great Expectations tasks already. I didn’t quite use them for this demo. But [00:10:00] even people who are less familiar with Python, what we find is that a lot of them come in, and with the task library, they’re able to get up and running with their ETL jobs. For example, some people can query from a PostgreSQL uploaded to S3, or you can run a Docker container. You can send a Slack message. So because of these pre-configured tasks, you can already get up and running pretty quickly without having to code these [00:10:30] from scratch. And I’ll be demoing the Slack task in today’s talk.

And with that, let’s go into data validation. So when we talk about data validation, we are making sure that the data meets a certain quality, right? So if we have a machine learning model and unexpected inputs come into our pipeline, the model will still run. Outputs will still be generated. And we won’t have any errors. [00:11:00] But the quality of those outputs may be very bad and maybe embarrassing in some cases. So we want to prevent these silent errors from happening. We also want to make sure that data is not deviating from what the model was originally trained on. We want to make sure that there’s no systemic changes. For example, all of the minimum maximum values or all the categorical values are still what we expect them to be. So we would include data validation in our pipeline [00:11:30] to make sure that data is the form that we want it to be in.

And Great Expectations is the most popular data validation framework at the moment. There is support for Pandas, Spark and SQL. And on the right here, I just have a code snippet of how you would use Great Expectations. But I should mention that Great Expectations is a very robust library with a lot of different features. This is [00:12:00] the most introductory. And then if you were to actually put it in production, then you would have to dive into more advanced concepts. So in this function, I imported Great Expectations as GE, and then I’m taking the data frame and converting that to a GE from this data set, which inherits from the data frame. So it has all of the methods that Pandas has like group by, read, play, all of that.

[00:12:30] So this data frame is pretty much upon this data frame except that it has additional methods, right? And these additional methods are what we call expectations. So in this case, we want to expect the column values to be between. And this is the taxi data set from the Dremio demo. So when you talk about passenger constant taxi rides, the minimum should be above zero and the maximum shouldn’t be something too ridiculous. [00:13:00] In this case, I just gave a very generous maximum of 12. And then I have this second expectation, which is just the vendor ID, and I put in the unique values of the vendor. Underneath it, this is building the expectations suite. And now I can just do tf.validate and this will run these expectations and return the results.

Great Expectations is very popular because [00:13:30] one of the things it does well is also data documentation, where after you profile your data or after you’ve run validations on your data, you can render those into documentation and then share this with the rest of the team. So this is a much easier, much nicer view into the validations that we’re running. You can easily see if the data is what you expect it to be.

So [00:14:00] here, we start our data validation script. In the first function here up top, I am connecting to my Dremio server. I copied the code from the Dremio documentation and just use this function using Apache Arrow Flight. And then, I provided the host name, the port, my credentials, and I’m going to provide a query. And then, this second function is what we discussed earlier with Great Expectations already. So this is the data validation. [00:14:30] And in order to bring this to Prefect, all we need to do is to wrap this test decorator above each of these functions. So in the top here, I’m just showing that because this is loading data from a database, sometimes this can fail. So here, I’m setting max retries equals to three with a time delta of 10 seconds. But you don’t need to do that. Down here, I’m just setting this task decorator without any inputs. And now, this can be used [00:15:00] inside the Prefect workflow or flow as we call them.

So now we have a Prefect flow. This is what the code would look like. So we have a flow block. And in the first set in the first line of this block, we are fetching the data with a given query, putting that in a data frame. And then this data frame, we’re passing it to this validate data function. And then we can do flow.run. And there’s a bunch of interesting [00:15:30] things here. Number one, we are automatic. We can pass the data through the task without having to persist it. So if you use other workflow orchestration tools, you frequently have to persist it in the upstream tasks and then load it downstream. Second is that it’s a very pythonic API. It still feels like we are writing just regular Python functions. A lot of the times in other frameworks, you have to [00:16:00] explicitly set upstream and downstream dependencies. We provide the option too, but we also do a lot automatically for you so you don’t have to.

And the third thing here is that this flow.run call can run the whole Python script without having to set up infrastructure. So again, other tools, they require you to set up a server, I mean, a database at the very least. And in order to run your flows in Prefect, flow. [00:16:30] run basically just runs the Python script with the additional logging. So it’s very easy to prototype before you move things into production. And then, after you’re ready to put it into production, you would just use flow.register, feed in the project name, and then that would register it with your backend Prefect Cloud or Prefect Server.

And with that, I’m going to do a quick demo here. Now would be a good time to ask questions, also [00:17:00] because this may take a bit to run. But what I’m going to show you is that I have the… This is the flow that I was talking about in the PowerPoint. Now we have a query. I’m just grabbing five columns from the Dremio test data set, grabbing 10,000 rows. So this will grab the data. We’re going [00:17:30] to validate it with Great Expectations. Great Expectations returns a dictionary of results which you’ll see later. So here, we’re primarily interested to see if it was a success. And if it was a success, we don’t do… We return true, if not, false.

And then here in this line, I just added… This is an FL statement. So we’re going to check if the value was true or false. And if it was false, if our data validation failed, I am going [00:18:00] to send a Slack message to myself. And this Slack task came from the Slack library. I just imported it, right? So there’s nothing… We can send a message through Slack with very minimal configuration, right? A lot of this is the message itself. And then this is the only configuration I have, the web book secret, and Prefect will take care of sending that.

So I registered already. I registered this flow in Prefect Cloud. So I have [00:18:30] this Dremio project. And under the Dremio project, I have this validation flow. And all I have to do is click the quick run. And locally I have this agent running. So when you use Prefect, you supply your own compute. So you would have your own process running, which is the agent. And the agent is always checking Prefect Cloud for flows. The agent pick this up. It turned blue because it’s now running, [00:19:00] and it fully ran, which is why it’s green. So it actually successfully ran. So I could go in here, see the state of the test runs. I could see the logs. But I will show you this…

So here we sent the Prefect message. This message came from Prefect and the validation failed. And then [00:19:30] here, it contains all the information. So if you remember, we had this expectation that the number of taxi riders must be above zero. Here we see the values that broke that expectation. We have a lot of zeros, so there’s a lot of taxi rides with no riders. And then we have this 208 also. And then we also have a bunch of other unexpected values. And then it also gives you statistics of how many, [00:20:00] like what percent of records fail. So here, we had 2.96% of our records fail and then gives a bunch of those examples.

So with that, I think-

Speaker 2:    Kevin, there’s a couple of questions. Let me know when we can connect with them.

Kevin Kho:    Okay. I’ll wrap up. And then, yeah. So just to wrap up before questions. Number one, what makes Prefect different as a workflow orchestration [00:20:30] tool? Number one, it’s very easy to pass data around. It’s very easily extensible because everything’s in Python. We, like Dremio, don’t believe in vendor lock-in, so it’s very easy. A lot of our users use us for cross-cloud execution or combining different tools together. And Prefect, it’s a very general purpose framework, so it allows that. I think going until today but we have dynamic alteration of the workflow graph or what we call the [00:21:00] DAG, where we can map out tasks and execute it parallelly and then reduce it later.

And last, I want to talk about the hybrid model which… There’s not enough time to dive it into, but I just wanted to mention it because this is important. Prefect does not keep track of any of the data that you are working with or any of your Python code. All we’re doing is keeping metadata around it. And then when the flow runs, we’re just sending the instructions to that agent to execute it. So it’s [00:21:30] very compliant for a lot of privacy regulations.

I have a bunch of resources here in my slides, Great Expectations. Also, documentation is here. And last, I want to mention that we are hiring in the several positions. If you want to contact me for career opportunities or even just about Prefect or anything else, feel free to email me at kevin@prefect.io. Again, you’re all invited to join our Slack channel. It’s a very active community. [00:22:00] We’d love to see you there. And then check out our careers page if you’re interested. Thank you.

Speaker 2:    Thank you, Kevin. We have a couple of questions from two attendees, from [Bedrasha 00:22:11] and Julia. So I connect Bedra first. Go ahead Bedra.

If you can please connect your audio and you can ask a question Bedrasha.

Yes, you can ask your question now. [00:22:30] Can you unmute yourself Bedra?

In the interest of time, I’ll also connect the other attendee, Julia. [00:23:00] And whoever gets to connect to the audio first can please ask the question.

Kevin Kho:    Actually before this, I was at the last session too, and I saw that people had problems connecting in the Q and A as well there.

Speaker 2:    Oh, yes. Okay.

Kevin Kho:    I [00:23:30] see a question in the Q and A from Robert. I’ll answer this. Is there any ways to specifically identify the exact records that failed outside of the parameters so those records can be resolved using Prefect?

This is, I would say, on the Great Expectations side, I believe you would have to… It gives you examples of records that fail, but then you would have to go in yourself to find [00:24:00] those records. I’m not 100% sure, but I do think it’s like this, where you have to go in and pull those records yourself. So I think how this would work is that a validation pipeline would be signaled as failed. You will get a notification and then there’s something you have to do. It would give you clues, but it’s something you have to do where you go into your data sets, what generated those records so that you can figure out how to fix them. It might be a bit outside of workflow orchestration.

Speaker 2:    [00:24:30] And for the attendees who could not connect to the… For asking your question, there’s information in the chat window on how you can connect with Kevin on Slack to ask your questions even after this session.

There’s another question. Is the event log available?

Kevin Kho:    Yes. So Prefect [00:25:00] has… Maybe I should just share my screen. I have it open already. Here, I am looking at the log. Yeah, I see it. I’m looking at the logs. So these are the logs of the tasks. And then you can add logs on top of this if you use the Prefect logger, or use some other logger and attach it to Prefect.

Speaker 2:    [00:25:30] You can post your questions in the chat window because it looks like a couple of people had issues connecting. So we are monitoring the chat if you have any questions.

Kevin Kho:    Robert. Oh, okay, just said thanks. Thank you, Robert. Okay.

Speaker 2:    Looks like [00:26:00] that’s all the questions we have for the time being. As I mentioned before, you can connect with Kevin on the Slack community. The information is in the chat window. And if we didn’t get your question, that’s how you can get it answered. And we would appreciate if you can leave us a feedback using the [00:26:30] survey on the right side of your screen. And the next sessions will be coming up in the next five minutes. But the [inaudible 00:26:39] is open, so we encourage you to check out the booths there. You can get some demos and maybe even some giveaways. So thank you everyone for attending this session. And we have a couple of minutes. So if you still have questions, you can ask Kevin, or Kevin if you have some closing comments.

Kevin Kho:    [00:27:00] No, just thank you everyone for attending. Feel free to join our Slack community, of course, or message me. But yeah, I appreciate you all joining this presentation. Thank you.

Speaker 2:    Thank you everyone.