Dremio Jekyll


Subsurface Summer 2020

Apache Arrow: A New Gold Standard for Dataset Transport

Session Abstract

This talk will discuss the role that Apache Arrow and Arrow Flight play in disrupting previous approaches to creating data services that transport large datasets. We'll look at the technical details of why the Arrow protocol is an attractive choice and share specific examples of where Arrow has been employed for better performance and resource efficiency. We'll also discuss the implications for the upcoming generation of data systems.

Presented By

Wes McKinney, Director, Ursa Labs

Wes McKinney is an open source software developer focusing on analytical computing. He created the Python Pandas project and is a co-creator of Apache Arrow, his current focus. He authored two editions of the reference book - Python for Data Analysis -. Wes is a member of The Apache Software Foundation and also a PMC member for Apache Parquet. He is the director of Ursa Labs, a not-for-profit development group focused on data science tools for Python and R powered by Apache Arrow, built in partnership with RStudio. Previously, he worked for Two Sigma, Cloudera, and AQR Capital Management, and he was co-founder and CEO of the startup DataPad.


Webinar Transcript

Wes McKinney:

All right. Thanks, everyone, for being here today. This is a relatively short talk and it's prerecorded, so I will be live at the conference to answer questions after the talk is over. This talk is about Apache Arrow, which is an open-source project I've been involved with for several years and in particular, about Flight, which is a subproject within Apache Arrow. Thanks very much to the Dremio folks for having me here at this conference.

Many of you may know me as the creator of the Python pandas project. Over the last five years, I've been involved with the Apache Arrow and also Apache Parquet. I'm a member of those PMCs, and that's been my focus over the last five years. Pandas' development has been all a community-owned and led for about the last seven years. I operate a group called Ursa Labs, which is a not-for-profit development group focused on developing Apache Arrow for data science use-cases. So we're really working hard, not only to grow the Arrow community and to build, and support, and maintain the open-source project, but also to focus on building out features to empower data science applications with faster and more efficient Arrow-based data transfer and data processing.

So the topic of my talk today is to talk about data, data transfer, and in particular, the problem that a lot of systems engineers encounter that there's a lot of waste associated with moving data from where it is stored to where it is processed or between two steps in a processing pipeline, which might even be on the same host. So I want to talk a little bit about what is meant by waste in moving data and how we can begin to think about that.

So some of the most common sources of waste. One of the most common ones is the time that the CPU spends doing data serialization. That means converting from one data format to another, for example, from a file format like CSV format or from Parquet format, those files must be parsed and loaded into memory into some data structure and that data structure is often the runtime in memory data structure where the data is processed. If you profile a lot of data applications, it's very common to see that serialization and deserialization take up 80%, or 90%, or even more of a workload's wall clock time. People, when they go to analyze what exactly is taking so much time and costing so much money, oftentimes in a large-scale data workload and the effects of serialization are really underestimated. It's something that really surprises people.

Another source of waste is the developer time that is invested in building the glue code to deal with importing and exporting data into applications. You see that the amount of code and the amount of developer time that gets spent on converting between data formats and dealing with all of the details involved with getting data in and out of applications can often absorb as much or more time as building the actual analytical application logic. The developers of data systems also spent a lot of time thinking about the data transport protocol and the details all the way down to the TCP level of how datasets move from one point to another.

So you may be familiar with some common data transfer protocols, so famous database protocols standards like JDBC and ODBC. A number of different database systems have their own database protocols. There's the Postgres protocol, which a lot of database systems will emulate so that you can use an existing Postgres clients to access them. MySQL has its own protocol. If you use Apache Hive, it has its own protocol. Sometimes people design their own protocols, and they may use a remote procedure call framework like Apache Thrift and then define a custom serialization format, which may be one of the data serialization formats that's part of the RPC framework like Thrift or Protobuf. Or, they may use some other standard-ish data format like XML or JSON. Back in the day, XML was a popular, I guess, still is a popular serialization format for moving data between applications but one that can be rather expensive to import and export.

So the problem of inefficiencies in database protocols and transport protocols, in general, has been pretty well-known for many years, and here's a paper from VLDB a few years ago, Don't Hold My Data Hostage. It goes through and analyzes the amount of time that is spent in the course of a query execution and considering not only the query execution time but also the time associated with moving all of the results to the client and dealing with all of the deserialization details. So you see that often the query execution time can be rather small, and there's all of this work that is spent both on the server and client-side on converting the results of the query into the intermediate format, which is used for the database protocol, and then de-serializing from that format in the client into the data structures that the client is using.

There's also the problem you have many different systems that produce data and often, they have their own protocols. So you, as a client application, you may have clients that are written in different programming languages that are using different data structures, different processing frameworks. Often, what you end up is this what I call a combinatorial explosion of building pairwise data connectors. So in order to get data more efficiently out of a particular server, you may end up in the course of optimizing your application building a custom connector. Myself, I've spent a great deal of time building custom data connectors for loading data into Python for use in pandas, for example. You can imagine how complicated this diagram gets when the number of different protocols and services increases and the number of different clients being served also increases. So there's a huge amount of work associated with this and code that has to get written and maintained.

So in recent times, particularly with the emergence of a scalable cloud blob storage and data lakes that rather than implementing a data connector for every client and server, that the blob store is used as the intermediary for getting data into application. So if you're using Apache Spark or you're using some kind of database system which has access to your blob store, you may write out the results if the results are large to a Parquet file or to another file format, and then you load the results from your blob storage from that file format. Be given that we have pretty efficient ways to read and write Parquet files, and so that can often be a much more efficient way of getting data out of a system and one that's a lot simpler to maintain.

It may not be the most efficient, of course, because you're having to produce this intermediate file format, not only produce the file format but write and commit it to the blob storage. Now, there are other reasons why you may want to use to write results to blob storage if you need to persist them or if they need to be consumed over a period of time by multiple clients. But if you're producing an ephemeral result from a database-type system, this process is certainly in many cases, not the most efficient one in an ideal world.

When you look at database systems, there's another common problem, particularly in distributed database systems where the result sets that are produced by different nodes in a distributed execution framework are all transferred internally by the system to a coordinator node, which then relays the results to the client and so you end up with a particular chunk of data being served to the client through a series of multiple hops. It may be that the database system has its own internal serialization that's more efficient than the one that is being served to you through the database protocol. But still, there's an inefficiency here in the indirection of the data to you who requested the data with a query.

So what we would like to see as more and more systems providing a way to stream the results directly from the nodes in a distributed cluster that are producing the result sets. Not only that, but these requests and these results can be streamed to clients in parallel rather than in serial, which is often the case with the coordinator in a database system. So you can think of this coordinator as being a bottleneck, and so I call this the coordinator bottleneck problem in database systems.

So if we're thinking about what we would like to see in making data transport not only faster and more efficient but also simpler for the developers, we'd like to get rid of, minimize, or eliminate as much as possible these serialization costs, converting between different data formats. We'd like to spend less time building custom glue code, building custom connectors that are optimized for a particular pair of clients and servers. We'd like to have really high-quality libraries that are available off the shelf for making requests to data services. And if it makes sense for an application, we'd like the point-to-point transfer between client and server to be as efficient as possible and preferably, to avoid these sorts of coordinator bottlenecks where data has to be moved around and piped in serial fashion to the client.

So this brings us to some of the motivations that were part of the genesis of the Apache Arrow Project, which we've now been working on for about four and a half years. So there were a group of open-source developers who got together and said, "We really need to define some language-independent standards for moving around data and processing it really fast." One of the goals was, of course, solving these transport problems that I've just been discussing in the talk.

The language-agnostic factor's really important when you consider how much big data technology during the 2000s and 2010s and was built in Java, and so you had a lot of technology that was really designed for the needs of Java developers but they really may not work very well considering languages like C++, Python, Rust, Go, and others. So we've been developing those standards and also creating what we think of as a battery-included development platform for building applications, which use the standards developed by the Arrow Project. So what's really exciting about Arrow for me is the fact that we have this a fruitful collaboration between the database systems world and the data science ecosystem, which historically, has not seen that much collaboration.

Very quickly where the project is at, we've been working for about four and a half years. The developer community has been growing rapidly. We have 11 different programming languages, which are represented in the project, and downloads and installs have similarly been growing very fast. So we have a very healthy developer community that that is growing more and more with each month.

One of the primary efforts early in the project, which is related to this data transport problem, is Arrow's columnar memory format. So it's not just a matter of having a way of representing data that doesn't require serialization on the sender and receiver side. That data format needs to be efficient for processing so that if you're building an analytical application that the first thing that you do is not convert that data to some other format that is more efficient to run queries on. So we spent a lot of time, and the Apache Drill folks did a bunch of foundational work in researching the fully-shredded columnar format that forms the seed of Arrow's design. But we want it to have a columnar format that works well for both flat and nested data that supports both random access and scan-based workloads very well. Often, databases are doing mostly scan-based workloads but in data science, you see a lot of random access-based workloads. It needs to be efficient for processing on modern CPUs, and GPUs, and be a good data structure for using hardware acceleration techniques like SIMD.

So it's been a little over four years and just made the 1.0 release of Arrow, which now has declared the columnar format and messaging protocol to be stable. So it's closed to breaking changes. We have backward and forward compatibility guarantees for the protocol. So that's very exciting for us, and we think will accelerate adoption of the columnar format in applications. So our goal is to go from this world of custom connectors and all of this serialization to a common medium that can be efficiently transferred and can be left as is in applications and processed. So we're kind of in the midst of what I would call an awkward transition from the old world where systems would develop their own memory representations and implement all of their own algorithms and data serialization, and all of that custom code would be specific to that application. So now we can think about building more Arrow-native applications where we presume that the import and export protocol is Arrow and that we're primarily using Arrow internally as our runtime data representation for doing query processing and analytics.

So as part of the Arrow columnar format, we created a binary messaging protocol, which arranges chunks of tables, effectively, as a bite sequence. It's stream-based so you can send a large dataset as a sequence of small chunks. It's suitable for use with shared memory. So if you write a large data set to shared memory, you can read it back into memory without having to copy the respective memory buffers into RAM so you can use memory mapping to execute algorithms directly on memory-mapped data, which enables a lot of interesting out-of-core workloads.

The kind of secret sauce that makes all of this work is our message format for arranging the chunks of Arrow data on the wire. So we have a metadata descriptor at the head of the messages that describes the structure of the message, basically the location of each column in the chunk of data. We don't really deserialize the data. I think we sort of rehydrated it, and we create data structures that referenced the respective memory locations in the message body. So that enables us once we receive the body through TCP or if we memory map the body, then we can create a data structure that references the memory locations, and then the data is ready for use.

So we've seen already pretty robust uptake of just the Arrow serialization protocol in database systems. Snowflake Computing has used the Arrow format to speed up their client-server protocol. We've seen the same thing with Google BigQuery, and so it's exciting to see some big applications getting benefits from using Arrow just for data movement. There's some other things in the protocol which are useful for extensibility and performance. So we just recently added LZ4 and ZSTD compression to the protocol. So if you have a need to make the messages smaller because your network is slow, you can do that. We have dictionary encoding, which allows you to compress the size of data with a lot of repeated values. And if you have data types that aren't part of the built-in set of Arrow types, you can create user-defined data types.

So that brings me to the main topic today, which is Flight, which is a framework for building high-performance data services that are Arrow-native, we chose to use Google's GRPC framework for building the project. So you use Flight to implement your clients and servers. You don't need to have explicit knowledge of GRPC to use it. Those clients and servers send and receive Arrow protocol natively. So it's highly customizable. So the intent is for you to use it to implement your services. We've dealt with all of the low-level details of the interactions with GRPC to avoid unnecessary serialization and memory copying to get the most efficient end-to-end data transfer possible.

So we wanted to solve some of these problems that we've seen in database systems, in particular, the coordinator bottleneck problem where you end up with the result sets being streamed serially through a single node in the cluster so that you can have parallel streaming from multiple nodes in a cluster. We also want people to be able to use the framework without having to interact with the low-level details of GRPC and protocol buffers, which are part of the implementation. But that being said, it is just a GRPC service so if you don't know anything about Arrow and you just have GRPC, you can still interact with a Flight service.

So when you think about getting a large dataset in parallel with Flight, it might be, depending on the typology of your system, that you issue a request like, "Tell me how to obtain this data set," to one node in this cluster, which serves as the planner, it gives you effectively a Flight itinerary telling you where the components of the data set are and then you can issue multiple GET requests in parallel to the nodes that contain the data. The data is then streamed back to you whatever fashion you wish.

The kinds of commands that are available in Flight are customizable, and so the intent is for you to define custom commands that encode the sorts of requests that you need in your system. So it's entirely agnostic to what sorts of requests your clients may need to make. We've spent quite a bit of time thinking about the low-level interactions with GRPC. In particular, we want Flight clients that use protocol buffers' libraries as is to be able to serialize the Flight messages. But if you are using one of the optimized Flight implementations, we've done work to avoid any unnecessary memory copies with the deserialization that occurs when a payload of data comes over GRPC. So if you're using Flight in C++ or Java, it's avoiding a lot of serialization in GRPC with overriding the default protobuf deserialization.

So we've seen some experiments here is from a paper from CMU Database Group with a research database system where they implemented Flight as an alternative to the Postgres Wire Protocol and where they kept all of the cold storage of the database system in Arrow format with the intent of being able to do SELECT * and export large amounts of data to clients at very high speed.

Now while I have time left, I'd like to do a quick demo of building a simple Flight service in Python and showing what kind of performance throughput we can get on a pretty large data set. So I'll go over to my Jupyter Notebook. I'm going to have to speed through the details pretty quickly here. This is using the Python PyArrow library. I import all the things, and then I have an implementation of a simple server in Python that acts as a data cache so you can send a data set to the server to be stored with its particular name. Then, you can request the data set back in a different client, which could even be on a different machine. I'm going to run everything on localhost so you aren't seeing the impacts of a network throughput, but I think it does a good job of showing the kind of performance that Flight is capable of when network speed is not an issue.

So I have the RPC handlers for the server. I'm going to skip those details, and you can check them out in the Jupyter Notebook. I have the low-level client interfaces for the client. So I'm going to spin up a server locally, create a client. I'm using it without compression. So I'm not making PUT or GET requests with compression. So the way this works is I create a simple Arrow table, and then I can send the table to the server to be cached. I can then ask the server, "What tables do you have?" So I sent table one. Now it shows I have table one. I'll send a few more tables, and now we have four tables there and then I can call get_table and I pull the data back over Flight from the server into memory.

So I will read a much bigger data set. This is a Parquet file that's over a 100 megabytes. I'm going to concatenate 10 copies of it to make a much larger table. And then I'll serialize it with Arrow format to show how big the entire dataset is. So it is about 1.7 gigabytes, and now I will cache the table in the server. So that takes a little over 600 milliseconds to send to the server to be cached in memory. So now FEC table is there. And then I call get_table to get the table back, which takes about the same amount of time. So I think it's really impressive to be able to push or pull a 1.7 gigabytes of data in about 600 milliseconds end-to-end over TCP with all of the dehydration and rehydration details of the Arrow format taken care of so all that data is received in memory and ready to go for processing.

Now, suppose that you had a larger data set and you want to use a compression like ZSTD to make the data smaller on the wire. So if your network is slow, then this may give you better throughput. So I'll turn on ZSTD compression, run through the same actions again. I'll make my big data set. I'm going to serialize it in memory to show you how big it is in ZSTD format. So it's about a little under half a gigabyte. So now, I send the data to the server. It takes a little over a second because obviously, we're having to compress the data with ZSTD. So there it is in the server, and now I'm going to request the data back and it takes less time to receive through GET. So I guess I will look into why these performance numbers are different. I would expect them to be about the same since there's some compression going on. But either way, I think it's really promising and I'm excited to see more applications implementing Flight for their client-server protocols.

So thanks for listening to my talk. Look forward to hearing your questions. We look forward to seeing you in the open-source project.


Ready to Learn More? Here Are Some Resources to Help

Need Some Help?