May 2, 2024

The Apache Arrow Revolution for Data Solutions

Apache Arrow revolutionizes the way data solutions are built. It has never been easier to interconnect different data tools enjoying zero-copy reads for lightning-fast data access. Apache Arrow is powering an extensive list of open and closed-source projects you might already be using: PySpark, Pandas, Polars, Dremio, Snowflake, Hugging Face, and more.

We will deep dive into Apache Arrow to understand why it’s conquering the data world.

Topics Covered

Lakehouse Analytics
Performance and Cost Optimization

Sign up to watch all Subsurface 2024 sessions


Note: This transcript was created using speech recognition software. While it has been reviewed by human transcribers, it may contain errors.

Yerachmiel Feltzman:

Hi, everyone. It’s a pleasure to be here today, and thanks for joining us. Let me start by listing a number of data solutions. And please pay attention if you know at least two of them. Okay, let’s start. Apache Spark Clickhouse, Delta Lake, Dreamio, Hugging Face Datasets, Pandas, Polars, DataFusion, Snowflake, BigQuery, Ray, Databand, Lansi, Kuzo, TensorFlow, AWS Athena, DuckDB. That is just a subset of solutions using or supporting Apache Arrow in a way or another. So just to get a feeling of how trendy Arrow is nowadays, let’s see one of its implementation downloads per month graph. That’s the Python implementation, the PyArrow implementation. And 100 million downloads per month, that’s not something I, as a data engineer, can just ignore. So I got really curious. Yeah, that curious. As a data engineer in the data ecosystem for a while, once I start listening to the same name over and over and over and over again, I usually get super curious about what it is. So I decided to take the time to learn about this project, to at least get a good high level understanding of what it is, what it isn’t, and its ecosystem. So today, I will share with you what I learned while researching about this amazing project. 

My name is Yerachmiel Feltzman. I know it’s a little bit hard to pronounce. I am originally from Brazil, Rio de Janeiro, but I live in Israel. I am currently working as a senior big data engineer at Tikal. We are a hands-on tech consultancy company. As probably most of you, I love to learn, I love to share. And from time to time, I share stuff I learn on Medium. So feel free to read my Medium and give some feedback. 

Apache Arrow

So let’s come back to us being curious about what Apache Arrow is. We just saw how trendy it is, and it is not just in case. Other than it being trendy, it’s also compelling to different types of developers by letting you pick your language of preference. Here, you can see 12 different languages implementing Arrow’s specification. Although I must note that as Arrow is an ongoing live project, probably different specific implementations have different maturity levels. So I encourage you to go check the implementation matrix in Arrow’s official website after this presentation. Moreover, when evaluating a new open source project, we usually want to know who is behind it, correct? In the case of Arrow, I don’t think we have to worry much about who is behind it. As we can see, the project is composed of people from well-known companies, even some independent professionals. Some of– we have creators of highly used data tools and platforms like Pandas and DreamView. So we don’t have to worry about this point. 

So why Arrow is worth considering? It has strong community support. It lets you pick your language of preference. And it is already empowering so many good projects. In fact, it has become a de facto part of our data landscape. However, I must admit that at the beginning, I was confused about what it really is. And I think I was not alone. On 2016, in an Arrow meetup, someone asked, how does Arrow achieve fast processing? Jackson Addo, co-creator of Apache Arrow, then answered, Arrow itself does not achieve fast processing. It is a representation which is optimal for fast processing. So what is Apache Arrow? It is a specification with an ecosystem of protocols and implementations. That’s the best way I found to define the project at its current phase after learning what it isn’t. It isn’t an implementation. It has several implementations. It is a spec. It isn’t a processing tool, although different implementations have added compute methods. For example, the PyArrow implementation we just saw had added compute methods, but it is not what it is in its core. And as everything in life, it is not a silver bullet solution. It is laser-focused on analytics on large chunks of data. 

And from my understanding, it has two main goals. One, to allow better data analytics processing, and two, to help better data sharing. How does it help better data processing? By defining a language-agnostic memory data structure for tabular data, and by designing this specification to enable fast computation in memory. And how does it optimize for sharing? By defining a series of protocols to share the same common data format, which is the Arrow data. And sharing can be local memory in the same process, different processes in the same machine, anything between machines connected through the network. So let’s dive in. 

The Arrows Project

The main basic foundation of the Arrows project is its in-memory format specification, which is optimized for in-memory analytics. That’s a classic diagram. You can find almost every content about Apache Arrow. Consider we have four rows in memory. We have four rows in memory. Pick a language. It can be Java, Python, doesn’t really matter. How would these rows usually be stored in memory? Usually, records can just be stored in a contiguous way, where each row comes after another, and that’s the traditional way. Honestly, this makes a lot of sense if we are, for instance, iterating over objects, and for each iteration, we want to have that operate in the whole object. However, what would be the optimal layout for analytics? Columnar-oriented layout, where each column is put together. It makes sense because in analytics, we operate a lot on columns. For example, we’ll see the aggregations are grouped by. We want to sum column A. We are just interested for the sum operation on column A, or even a predicate, like where session ID is equals one through three. We are interested in operating primarily on session ID to just then move on to the next remaining rows. 

That’s the whole idea of columnar storage. We want to have the column data co-located so we can operate in the column as a bench. That’s even the premise of most OLAP systems in contrast to OLTP systems. OLTP systems, which stands for Online Transaction Processing, usually cares about rows, while OLAP systems, analytic processing systems, usually care about columns. 

Apache Parquet Project

You probably also know about the Apache Parquet project, which is a well-known open source columnar storage format on disk, and it stores data on disk, co-locating each column data close to itself. It takes advantage of this exact same concept. This allows, for example, Parquet readers to optimize for projection, pushdown, selecting only specific columns, predicate pushdown, filtering only required rows, parallel reading, et cetera. I think a good analogy is to say that Apache Arrow is for memory, while Parquet is for disk. Both store data in a columnar layout, Parquet on disk, and Arrow in memory. 

Let’s see an example for using the in-memory format of Arrow Pollers. It’s a data frame library. It’s like a younger brother of Pandas, it’s written in rows, and it’s well-optimized for analytics in memory. If you go and check Poller source code, you’re going to see that every column is a set of contiguous memory allocations. This column memory structure allows Pollers to efficiently parallelize operations on columns. For example, it can possibly run a sum calculation column A in one thread, while processing an average calculation on column B in another one concurrently. And guess how they implemented this memory layout? By using Arrow underneath, which is, well, columnar structure. So a deeper look at Poller’s source code shows that each Poller’s column is a series, and series are actually comprised from Arrow arrays. And once two systems use a common data format layout as their internal representation, those two systems can easily interrupt, simply because they share the same common internal memory representation. 

They understand each other. Let’s think about how revolutionary this is for the entire data ecosystem. Without the standard columnar data format, every database and language and tool has to implement its own internal data format, and this generates a lot of waste. Because, you know, like moving data from one system to another involves cost serialization, deserialization. Almost common algorithms usually have to be rewritten for each internal data format. However, having one common columnar data spec eases a lot of these problems. 

Let’s take the following example. Suppose we have a task. Our task is to read a bunch of XML files, process them, probably aggregate somehow, and write them back to disk in an ORC format. That’s a given. That’s a task. It’s a given. We have to deal with it. Let’s say we want to choose from one of the following three Python libraries. Either Pandas, Polars, or PyArrow. By the time I checked, Pandas had an XML reader, but not an ORC writer. PyArrow had an ORC writer, but not an XML reader. And Polars was the only one with lazy computation and a query plan optimizer. So, what if we want to use the best of each two? We want to read XML with Pandas, process with Polars, and use PyArrow to write ORC files. How would we do that without wasting memory rearrangements between libraries? Just pipeline them. Why? Because all the three use error-backed memory. So, they can one, easy interrupt, and possibly do this with zero copy reads. Simply because the different libraries can just access the same memory buffer. They understand the same memory representation without having to copy and convert much when transferring from one library to another. 

That sounds super promising. But I have to admit that when I first saw an example like this, I was skeptical. I’m not a low-level memory management. Low-level memory management is not my expertise. I’m a data engineer. So, I needed to see some sort of low-level example memory sharing to feel confident that what was promised is being delivered. Let’s say one proof at least that zero copy reads can really happen under the rules. Are really happening under the rules. So, I found to start one Polars unit test. And this test does exactly a conversion between two libraries that use error-backed memory, which is it tries converting from PyArrow array to a Polars on series. And it asserts that both use the same exact buffer address. I know what you’re thinking. It is just one unit test. But if we go to errors efficient implementation Git repositories, we will see that they have interrogation tests between different language implementations. There is a whole suite of tests that aim to ensure errors implementation are interoperable between each other. Cross-language integration tests, if you wish. For example, an error C++ as a producer will write in the arrow IPC format. An arrow Java as a consumer will then read it. And the test will validate that the in-memory representation of the two are equal. 

Inter-Process Communication

So, we have an example of interop in the same process. What about inter-process communication for error-backed systems? To allow that, the arrow ecosystem has the arrow IPC protocol, which has a messaging format that helps streaming error batches from one process to another. For example, the PyArrow implementation has APIs for reading from and writing to OS files, memory mapped files, memory buffers, on-the-fly compensated streams, sockets, et cetera. And one excellent example of a successful use case for arrow IPC protocol is Apache Spark. It is an excellent example because it is between two different languages, Java and Python. Apache Spark is a well-known distributed data processing tool. The cluster workers run on the JVM, while the driver code that we write can be written in Python if we use the PySpark interface. So, we write code in Python, but the code runs on the JVM in the workers. Since developers are writing Python code sometimes, they want to use pandas. And historically, Spark has allowed developers to convert from Spark dataframes into pandas dataframes by using the two-pandas method. 

And the way it was implemented had Spark doing roughly the following steps. First, make the JVM workers to serialize distributed JVM data. So, something Python can understand, which is to pickle the JVM data. Since it is the Python serialization protocol, the Python driver will then receive those pickles, deserialize them to a list of tuples, and then give them to the pandas library so it can convert them to NumPy arrays. Some time ago, this implementation was rewritten to allow using RIPC as the protocol for dataframes exchange. In the arrow-based two-pandas implementation, workers serialize JVM data to arrow streams, the Python driver gets them almost as is, and moves them to the pandas library as is to let pandas create from them the dataframe, the pandas dataframe. And by doing that, the new implementation, one, highly reduced serialization and deserialization steps, given that the bytes are sent in a common layout. And it also allows pandas to try zero copy reading the data received from the Python, PySpark driver, since both are in the same process. Okay. 

ArrowFlight Protocol

Let’s move on now to a more client-server approach in Arrow’s ecosystem. ArrowFlight is the RPC, the remote procedural protocol in the ecosystem. It is based on GRPC, and it provides a high-performance wire protocol for transferring large volumes of data between client and servers. If I got it correctly, it was first created by Dreamio, and later donated to the open source community. So let’s explore the basics of the ArrowFlight protocol. First step of an ArrowFlight client will be to get metadata, information about the data it wants to stream into it. The metadata mainly pieces are the schema and the endpoints. Once the client has the data schema and knows where it is located, which is the client knows how the data looks like, knows from where to get the data, it can then reach out to those endpoints to actually get the data. The endpoints will at that point stream Arrow batches to our flight client. And again, as in other use cases, having a language-agnostic way of sharing tabular data makes life easier. For example, the client could be in Java, and the server is in C++, or in Rust, and the server is in Python. And we can just stream data in a corner fashion from two different languages, client and the server, and they both understand each other, don’t have to deserialize too much when it moves through the wire, after it moves through the wire. One interesting thing to note is that the flight protocol was designed with data and metadata decoupling in mind, as we saw. First, to get the metadata, then to get the data. And by that, it opened up the possibility to scale out the server, the flight server, horizontally relatively easily. What’s maybe even more interesting is the possibility to easily scale the client horizontally. 

For example, I found two projects implementing this architecture to connect Spark clusters as clients to flight servers. The Spark driver would be the client coordinator. It would then get the metadata from the server coordinator and delegate the actual data to its executors. So distributed client, distributed server, super interesting, super scalable. Also, I can think of even a multi-cloud, multi-region architecture, for example. If we can have the client coordinator move to specific workers closer to the server endpoints, “Hey, the data is on cloud A, region B. You are the closest worker. Go get the data stream from the location.” It opens up a lot of possibilities. And I have a good feeling that more use cases will come to production. 

Apache Arrow in Today’s Data Landscape

So today, we learned about Apache Arrow, which plays an important role in nowadays data landscape. As a project, it has strong community support. It lets you pick your language of preference. It is already empowering so many good projects. We understood that Arrow itself does not achieve fast processing. It is a representation which is optimal for fast processing. This means that it is a specification with an ecosystem of protocols and implementations. It has two main goals. One is to boost processing, to achieve faster processing, and two, to allow better data sharing. We also saw several production-grade real-life use cases of beta solutions making good use of Apache Arrow. Polar, for example, is achieving efficient in-memory processing by using Arrow as its in-memory columnar internal representation. Libraries like PyArrow, Polars, and Pandas can easily interrupt in the same process and possibly achieve zero copy reads by accessing the same memory buffer. We also saw Spark, which is leveraging the language agnostic format of Arrow to change data between JVM workers and the Python driver using the IPC protocol. By the way, doing that, it reduced serialization and deserialization steps and probably achieved better performance. 

Also, we see DreamView as an example of how servers and clients can share large chunks of data, can stream large chunks of data over the network. It does it by making use of Apache Arrow RPC protocol, which is also easy to scale out horizontally. I’m sure the future is bright. It’s a bright horizon. As a data engineer, day after day, we see new data projects, solutions, and tools being created to make our life easier, and I think Apache Arrow is definitely one of them, one of the good ones. Thank you for listening and learning together with me about how Apache Arrow is revolutionizing the data landscape.