Subsurface LIVE Summer 2020
Building an Efficient Data Pipeline for Data Intensive Workloads
Moving data through the pipeline in an efficient and predictable way is one of the most important aspects of modern data architecture, particularly when it comes to running data-intensive workloads such as IoT and machine learning in production. This talk breaks down the data pipeline and demonstrates how it can be improved with a modern transport mechanism that includes Apache Arrow Flight. This session details the architecture and key features of the Arrow Flight protocol and introduces an Arrow Flight Spark data source, showing how microservices can be built for and with Spark. Attendees will see a demo of a machine learning pipeline running in Spark with data microservices powered by Arrow Flight, highlighting how much faster and simpler the Flight interface makes this example pipeline.
Ryan Murray, OSS Developer, Dremio
Ryan Murray is a principal consulting engineer at Dremio. He previously worked in the financial services industry doing everything from bond trader to data engineering lead. Ryan holds a Ph.D. in theoretical physics and is an active open source contributor who dislikes when data isn’t accessible in an organisation. Ryan is passionate about making customers successful and self-sufficient. He still one day dreams of winning the Stanley Cup.
All right. Let's get going. Hello, everyone. Allow me to just do a very quick introduction. We've got Ryan Murray here from Dremio. He's got a great talk for you. So let's dive right in and get going.
Hey guys. Thanks a lot, Jason. Yeah, I guess first off, I'd like to thank everyone for coming. It's pretty awesome to see so many people showing up to this, especially on the web like this, and obviously, thanks for the invite to the organizers. I think the lineup at this conference is pretty amazing. I'm excited to be part of it.
I don't have much time. I guess let's dive right in. As you can see, the title is Building an Efficient Data Pipeline for Data-Intensive Workloads. What I really want to talk about today is kind of building on some of the stuff Wes was talking about. How can we use Apache Arrow Flight to do really cool, really interesting, really new things that have previously been hard to do or downright impossible? I'm sure a lot of you were in Wes's talk, so I don't want to spend too much time on what is Arrow and what is Flight, but I thought it'd be really good just to level set for the people who weren't in the call and for people who just want a bit more clarification.
So simply put Arrow is a columnar data format for high performance data analytics. Well, what does that mean exactly? I think there's two main things in Arrow that make it so special and make it worth talking about. The first thing is this columnar format. The columnar format is really what allows us to do really high performance calculations on very large data sets. And the second is it's in a lingua franca. So when everyone's using Arrow, everyone can talk Arrow and everyone can share data. That reduces the time spent with having to write all the different communication layers. And it allows for every tool that I built and open source you can use. We can really build a whole set of tools around Arrow.
So getting a bit more specific, what is Arrow? I like to think of what is Arrow and what isn't the Arrow? As I started off saying, it's really at its base, just a specification that outlines how to store data and memory. The key component of this is the columnar data set. The specification is based around how to make this data very efficient for analytical processing on CPUs, GPUs and even some of the new, more interesting data architecture. So we can think about doing really interesting things on FPGA. I think there's a couple of people who've been doing FPGA work with Arrow. So it's really saying, how are these new data architectures processing data and how can we represent our data best to take advantage of that?
Once we have that specification in place, we can start building a set of libraries and tools around it. And again, as Wes alluded to, there's a lot of people building a lot of different tools and a lot of different languages. So people can manipulate and exchange Arrow buffers very easily, regardless of what platform you're on. Then finally, it's a set of standards, again, to make analytical data transportable, meaning how do we share this data between processes and between consumers, and that's really where Arrow Flight comes in.
Just really quickly, what it isn't is it's not an installable system. You don't go to the Arrow website, download it, install it and run it. Similarly, it's not a cache or a grade or anything like that, though you can use Arrow as a Lego brick to build a system like that. I think it's also worth noting that it's not really designed for streaming applications. The power of Arrow's columnar data storage format makes upending or inserting rather cumbersome. And the columnar format really doesn't have any value if you're say, pushing around a single record. So that's an important aspect when you're thinking about designing new systems is you want to be moving around blocks of Arrow data.
I just want to take a quick look at the columnar format when we talk about Arrow's columnar data structures. We have two diagrams here. We have what a traditional database would hold a data lake and how Arrow holds data in memory. So traditionally you hold things row wise. And again, as I said, that makes a pending data inserting data or moving data very fast, but it's very hard to say, compete the average of session ID. You have to read the first element at a row one. Then you have to calculate where row two starts, read that element, and move through the data structure. And you end up having to scan the entire data structure just to read those four or five rows. Then the power of the columnar data's format has all of those session IDs are in a contiguous block of memory.
See, that's where you're able to take advantage of the way that GPUs work, the way that modern CPUs work with pipelining and SIMD and this kind of stuff. So for example, SIMD calculations require your data to be contiguous. So for doing a computation on session ID, you can use one SIMD instruction instead of say, four or 10 or 18, or however many single instructions on the CPU to calculate on session ID.
So that's a quick Arrow primer. Let's move into Arrow Flight. I think the important thing about Arrow Flight is it delivers on that interoperability promise that we were talking about with Arrow to its fullest extent. So now we're able to use an RPC mechanism to exchange data between consumers, between servers and clients and this kind of stuff. You're able to do that in parallel, and you're able to do that with large data sets. That's something that has been traditionally hard, whether you're moving things with a rest API or through something custom or Thrift or JDBC Or ODBC. There's a lot of efforts spent moving data from one place to the other. I think the stat I saw in Wes's talk was something like 80% of time is actually spent serializing, de serializing data.
Whereas with Arrow Flight, we're actually taking this data and we're putting it directly onto the network buffer. So we don't have to translate the data into something that the network can understand. So we're skipping that step. And then again, on the client side, the client is able to read directly off of the network buffer into its in-memory application. Meaning again, we're not having to do any translation. And that's really going to speed up... Then we're only really looking at the network time to move data between two systems. So a large data set is really only constrained by the network.
And again, because it's Arrow and because of all the things that we find important in Arrow, it's fully interoperable, which is what we wanted when we first introduced Arrow. And it's cross-platform. Again, a very important thing for the Arrow community. Finally, it's built for security. It's comes built in with SSL and with authentication and things. And most importantly, it's built to be parallel.
This is a pretty interesting technique, and we'll take a look at how the parallel streams work. But if you think of a traditional context, like say a JDBC driver in Spark, you're going to have to in Spark build all of your offsets, keep track of all of the accounting for all of your rows, and then spin off a dozen, two dozen a thousand, whatever individual JDBC connections, bring those results back from the JDBC server and compose them again in Spark.
What Flight is really allowing you to do is to do that all in a single step as a native part of the Flight protocol. If we want to call a dataset Flight a Flight, then a Flight is going to be composed of a set of streams. Each of those streams is going to have a Flight end point. Those Flight end points are really just a... It's going to be a Pig ticket, which you can redeem on a particular server for a particular part of the dataset. And then it's going to come along with a consumption location as well.
Currently, the Flight protocol is built on JDBC, so that consumption location is going to be an IP address or a DNS name or something. But you can imagine swapping out the GRPC kit underneath and putting just about anything in. And then that end point is really more of a paid concept of where you want to get the data from. This allows you to take advantage of locality and make sure that your clients are as close as possible to your servers.
Looking at in practice, we can look at what would happen on a Dremio assistant with a parallel Flight enabled. Here we have our coordinator, and we're going to run a Get Flight Info, which could be a SQL statement, or it could be a pointer to a data set or something. And that's going to return all important schema information and a list of end points. Those end points, as I said, are your location information and your ticket.
So now as a client, I have all the locations that I need to build this data set and all the tickets. And I can go out and start asking individual executives from Dremio to get that data. Now this end point concept is nice and flexible. If I'm just a simple single-threaded Python client, then I can go and retrieve those in serial. I can ask for the first end point, second and third, and bring back that data and concatenate it locally. If I'm a multi threaded application, then I could fire off all those DoGets in parallel. Or if I'm something like Spark, a distributed system, then I can actually distribute those DoGets to different executors on the Spark server. So there's a lot of flexibility in how we get the data back and we're able to really take advantage of the parallelization inherent and distributed systems.
So that's basically how Arrow Flight works. I wanted to now talk about some of the things that we could possibly start thinking about doing with Flight. What comes today with Flight is what I think of the Flight starter pack. And that's the primitive operations that allow you to interact with a RPC server with Arrow Flight. We can also think of a set of expansion packs, and these expansion packs are effectively a list of operations and add-ons to a Flight server that allow it to fulfill a specific service.
Going back to the starter pack, the primitives that we've built into Arrow and are available in Arrow Flight as of, I don't know, two versions ago, we have the DoGet and the DoPut and those are simple get data from the server, push data to the server. Recently, we introduced a DoExchange and that's a bi-directional exchange that can be repeated. I can do a DoGet, and then two DoPuts in another DoGet or I can mix that up interchangeably.
Another very flexible option we have is the DoAction. This can be basically anything that the server wants. You can give any number of actions and they can be as complicated or as simple as you want. Some simple things you can do is you can think of altering session parameters or setting server properties upon connection or stuff like that. It's flexible enough that you could say, reformat your hard drive and reboot your computer and actually as well, if that was your thing.
And then finally we have the list actions and those things are listing the services available on the server, listing the actions, listing the data sets, this kind of stuff. That's basically a low level simple way of starting to discover what an Arrow server can do.
Now, if we look at what can we build with these primitives? We can build some of these expansion packs. Some of the expansion packs I've been thinking of are an ETL expansion pack. So you think of how airflow works. Airflow is a directed acyclic graph of transformations to build an ETL pipeline. Well, what if rather than doing these transactions by reading from disk, and then writing back to disk, these transactions were linked together as a set of Flight services? In this case, I think of them as Flight microservices. They're very small, very simple things that do one thing and do it well, and that decouples the system quite a bit. We can now advance these different things at different speeds. We can deploy different parts of the system at different times, write them in different languages depending on what's most useful for us.
So if we're able to turn our directed acyclic graph of ETL components into a set of these Arrow linked microservices, then we lose a lot of the time spent writing to and reading from disk. We now are putting that into just transferring directly across the network. Similarly, we can use this microservice concept when we start thinking about machine learning. So we can use microservices to distribute our feature engineering or our enrichment and transform jobs. We can think about training the model by farming out a set of parameters in a grid search to different Flight services using Flight. And then we have the option there even using Flight to query the server from the client to say, predict this matrix, and then sending it back and factor in predictions as opposed to doing that via rest or something more traditional.
And finally, I wanted to talk a bit about what a SQL service would look like. So we've been talking for a while now about how Arrow Flight has a potential to rebuild JDBC and ODBC. We can finally move out of the roadblock that JDBC and ODBC give us of having to translate all of our data before and after sending it across the network. So by building the SQL service as a set of actions on Flight, we can start to emulate the patterns and the properties that we want out of our Arrow Flight service that we would need from Arrow Flight service to actually replace something like ODBC and JDBC.
The first things we're going to need is we're going to need actions to help us explore our catalog more similarly to how JDBC and ODBC allow you to explore their catalog. That's getting parts of namespaces, looking at the whole database, treating an Arrow Flight service as a database that has a set of tables on it. Similarly, we're going to need statements like prepare our SQL statements, execute our SQL statements. And these are going to be actions that take SQL and return, whether it's handlers to prepare statements or something like that. We're also going to start building, we need to have a lot more support for session parameters and all the extra bits and bobs that a proper JDBC and service would need. And it's not until we actually want to get our results that we return to our simple primitives in Flight where we execute our SQL statement and get a list of Flight end points, and then do a DoGet on those to return our results. Now we're able to mimic something like JDBC, and we're able to do that with our parallel DoGets that we worked so hard to build.
Then we can think about our machine learning pipeline. So if we have a big Spark ML job that we can get data from our cloud data lake through our cloud data lake engineer, whether that's Dremio or something else, we can use our parallel DoGets. So as we'll see in a benchmark in a little while, pulling data into Spark, you can pretty much saturate an easy two instances network. So it's very quick to pull data out of the cloud data lake in Spark. And then in Spark, we can federate off a lot of our transformations and enrichments and feature engineering jobs off to a set of microservices. Maybe these microservices are just going to multiply a vector by two and maybe they're going to go off to separate databases and enrich the data, or there's plenty of complicated we can do. But since they're all opaque and they're all isolated, it makes it very easy to evolve this as a productionized system.
And then finally, once it's all trained and ready to go, our client can start asking the Spark ML engine to predict things for us. Traditionally, this has been done by arrest, and that's not a problem. It's not a very big dataset, but I think it illustrates nicely how flexible the Flight system is that we could actually do this all the way from client all the way to the back where we're actually training the model. A lot of really interesting applications you can think of in there.
And finally, here's an outline more of how I was thinking an ETL pipeline might work. So if we have a set of ETL jobs are presented by these purple spheres. And these are just going to be simple operations, again, enriching data, transforming, aggregating, and whatnot. Then rather than have something like Airflow control, all of these individual jobs and their intermediate results stored on the data lake, we can think about using DoPut primitive in Flight to actually moving this data down the stream and fanning out calculations, bringing them back in, aggregating them, all this kind of stuff.
At this point, we would obviously want to add in checkpoints as well. And some of the intermediary stores we do want, so we can drop this data onto the cloud data lake at the right points for the checkpoints. Yeah, this gives us a lot more... I think this gives a lot more simplicity onto a complex ETL pipeline. And it's certainly going add a lot of speed.
Those are some of the ideas about what I'm thinking about how we can build interesting Flight datasets. I wanted to talk about something more concrete that we actually have done and are using, and that's the Data Source for Spark. So for those of you who aren't aware of Spark released a Data Source V2 a couple of versions ago in Spark, and this sort of overhauled the way a database is able to interact with Spark. Before, getting data from a database into Spark involved having to know a lot of really intricate details about Spark. Now there's a really simple interface that you can build on. We're able to use the columnar batch part of the V2 Data Source to keep these Flight buffers in Arrow buffers, and then pass those Arrow buffers to Spark's engines. It also supports the push downs of some primitive SQL operations like filters and projects. So if we send a complicated SQL statement from Spark to Dremio, most of that calculation will actually be pushed down into Dremio, resulting in a smaller dataset being returned to Spark.
Finally, we have much more flexibility over partitioning in this new Data Source. We're able to leverage our parallelism in Spark and in something like Dremio or another Flight service by petitioning the data set on Flight tickets. You can see once this thing is on your Spark service, it's relatively easy to use, just sending SQL off to it and getting the response back.
Finally, just to show some quick benchmarks of this thing running in real life. So we've got a four node EMR cluster set up against a four node Dremio cluster, and they're both using the same size machines. So taking those two clusters, we set up a data set inside of Dremio and then issued a query with the parallel Flight service to Dremio from Spark and did a nontrivial calculation on it, just to make sure Spark wasn't optimizing, actually fetching the data or not fetching all of it.
So in this table, we can see all the different data sizes. We tried from 100,000 rows up to a billion rows and the different times that it took for different operations. So looking at JDBC, serial Flight connection, parallel Flight connection, and we did it again with parallel Flight for eight nodes. I think pointing out just the top and bottom numbers here, the top numbers are relatively same. That just basically comes down to a hundred thousand rows isn't large enough to actually show the benefit of the parallel Flight in this case. So there's more overhead to setting up, and the cost of translating the data in the JDBC layer isn't as noticeable with a lower number of data of rows.
On the other hand, a billion rows, we can really see how performance can be. And in this case for a parallel Flight, with eight nodes, we're able to transfer a billion rows of data at something like 50 gigabytes of data, I think, in under 20 seconds on this AWS cluster. That ends up being something in the vicinity of five gigabits per second. With these nodes, having 10 gigabits per second of bandwidth, for me, that's pretty impressive. We're nearly saturating, over halfway to saturating the AWS bandwidth, which is pretty exciting. We're spending all of our time translating data instead of calculating data or transforming data, which is again the whole point of Arrow Flight.
So that's it for me. Here's some links if you want, and I'm happy to answer any questions. Thanks everyone.
All right. Fantastic, Ryan. So let's jump into Q and A. I'm going to start putting people in and let's see if this works. Here comes Herman, at least supposed to be. Hopefully Herman gets added here in a second. Not yet. Not sure why. Sorry, Herman. If you're hearing me, post your questions in Slack. We'll try Alberto. Here we go.
Yeah. My question is how Arrow Flight could change the landscape of a machine learning platform like AWS SageMaker for example. My experience right now, the interface is just S3. And I was wondering if this Arrow Flight could improve things.
I think, yeah. If something like SageMaker were to add support for Arrow Flight then any... Again, that's the whole point of Arrow, right? SageMaker adds support for Arrow Flight and then anything who can build a Flight service can send to Arrow Flight to SageMaker. I think [inaudible 00:24:56] is being sent over Flight, it would be a really nice, really compact, really fast transfer.
Yeah. Thank you. Great question. I just tried to add in , but I don't see him coming in yet. Okay. I'm going to try Connor. Here we go, Connor. No luck so far. All right. For other folks, if you have questions, make sure to click that button in the upper right to enable your audio video. So that didn't work. Let's try Sebastian. It could be that folks are also dropping out. I'm not sure. This also just could be network issues. Okay. Nothing from Sebastian. , I'm going to give you a try. Here we go.
I'll be sticking around on Slack for everyone who wants to ask a question here as well.
Yeah, definitely go to Slack. I mean, even if we were able to get everyone to pop up here... That didn't work. I'm going to try David. Even if we were to get everyone to pop up here, it's still not enough time to answer all the questions, of course. So we want people to head over to Slack. But yeah, it's definitely... People are not popping into this thing. I'm not sure why that is. Sorry, folks. All right. I'll try [inaudible 00:26:48]. I mean, it's good that we're seeing some other people like raising their hand, but they're just not getting added in. Okay. Ali, if you're hearing me, hopefully you're like, "All right, ready to go." Nope.
It didn't work. All right. We've got two more to try. And then I think it's over to Slack. Kyle. Kyle, I'm trying to add you. Here comes Kyle.
All good. Thanks guys. I just had a quick question for you, Ryan. You were talking about using Flight as a substitute for Airflow, just in terms of data transfer. I know for Airflow, you write things to file, the next job picks it up, reads from file and goes from there. If you were to use Flight as a replacement, how do you do sort of job management and structuring your overall bag and make sure things are reproducible and whatnot?
Yeah, I think I probably should have been more clear on that. I wouldn't try and replace Airflow. I think having a Flight plug-in for Airflow would probably be more sensible so that rather than having Flight constantly read to and write from disk, you would be wrapping the Airflow job inside of a Flight service, which could then pass it on to the next step.
Okay. That makes more sense. So it just handles the communication between the jobs.
Yeah, exactly. Yeah. I'm not going to go out and rebuild Airflow.
You never know, making sure.