Apache Arrow Integration with Spark




Okay. Hi, I'm Bryan Cutler, I'm a Software Engineer at IBM. I work for the Center for Open-Source Data and AI Technologies. It's a mouthful. We call it CODAIT, for short. It was previously called the Spark Technology Center, but since then, we've expanded to include more AI driven endeavors. We're just kind of down the street, on Howard.

A little bit about me, I mainly focus on open source development, there. I'm a committer for Apache Arrow and for Apache Spark. I have a focus on Python, and machine learning, and working on optimizations, and improving the overall experience for users. Feel free to hit me up at GitHub, there's my username.

Today, I just want to go over a little bit about how some work, I was involved in, in integrating Apache Arrow with Spark. There were other people involved too. So, I'll go over some of those details, and talk a little bit about, kind of, problems we were trying to solve in Spark, and how Arrow really was a big help, and what it could provide to solve these problems. I'll go over some specifics about where in Spark it's being used now and, finally, talk about some current and future developments.

In Spark, we started hearing a lot of people complain about their Python code running so slow. This is dealing largely with the Spark dataframe API, which is pretty much a wrapper around Java implementation of everything. So, when this came out, in Spark 2.0, the data processing was very fast, as long as that user would keep the data in the JVM. But as soon as they needed to go outside, to a Python process, it would just be a huge bottleneck, and things would slow down to a crawl.

A lot of people didn't understand why performance was so different from running the same kind of UDF's in Scala, but there's a couple reasons for this big bottleneck. One, when trying to transfer data to Python, Spark would use the pickle format. Which is not known to be terribly efficient and is kind of a bulky serialization format.

Furthermore, it forced Spark to iterate over each scalar value to serialize each individual value. This kind of led to a lot of loop, when Spark needs to serialize, and de-serialize, and then process all this data. Anybody that uses NumPy and Pandas know that you want to avoid that, because it's bad.

Looking at Arrow, it provided a lot of good solutions to handle these problems. It provided this common format between Java and Python, which would allow us to convert our data into an Arrow format, in the JVM, and avoid this serialization issue. Then, be able to just send it over to Python process, where it could directly consume it, and process it right off the bat.

Arrow also provided IPC mechanisms, to allow for reading and writing over sockets, which is how Spark transfers data already, so that was great. It pretty much supported all the major types that Spark would need to be able to work within Python. A lot of this initial work was dealing with transferring data from the Java process into a Python process.

Okay. I'll go over just a few areas, where this was integrated in. the first place we used Arrow was in doing a Two Pandas conversion. So, if your given a Spark dataframe, you want to collect that to your driver's side application. Which, basically, brings all the data over the Python.

This started off as being very inefficient, because it initially does a complete collect of all the data in this pickle format, which produces individual records. It builds all these records up into a huge list in Python. Then, through a very inefficient way, creates this Pandas dataframe from these records.

One thing Arrow helped out with this, is because we could convert the data on the Spark executor side, which would allow us to do this in parallel. Then, when it came to collect all that data, we just brought in a whole bunch of Arrow batches, into the driver application. We could combine them together and Arrow supported, already, converting this into a Pandas dataframe.

This was introduced in Spark 2.3. It's not enabled by default, so this is one optimization that if you want to use, you have to be sure to include this configuration, that I have here. Once that was introduced, it gave a pretty good speed up for collecting to a Pandas dataframe.

Another area, that's kind of related, goes in the opposite direction. Given a Pandas dataframe you want to create a Spark dataframe from this. This is also very well suited for Arrow format. Previously, this was a major slow down and wasn't useful in Spark at all. Because, in order to ... Even just a small amount of data would run very slowly, so it's almost useless to try to send us through Spark.

But, with addition of Arrow, you can create a huge dataframe, and be able to very efficiently send it to Spark, and carry on with your Spark processing, and everything. Part of the reason this was so slow before was because, trying to ingest Pandas data, Spark has no idea what kind of data types there are. It did not try to look at Pandas types, or NumPy types, so it treated everything as a Python object.

Even if you try to specify a schema, it still needed to loop through all the data, do verification checks, run through some cleansing, to produce clean data for Java, then clean data for Spark, to make sure everything was in a format that Spark could easily accept. So, with the addition of Arrow, basically, none of that was needed, and starting off with a Pandas dataframe, we could very quickly just produce Arrow record batches, send those to the JVM, and the JVM can then send them out, in a parallel manner, to the cluster.

So, it had a huge speed up and was a great use of Arrow. Working with UDFs was also a major concern for users. There's a couple types of UDFs available, in Spark, right now. The most common is probably a scalar type, which is very similar to a standard UDF. All these vectorized UDFs, also known as Panda UDF's, inter-operate with Pandas data, so a very of various types.

So, depending on what kind of UDF you create, you might have a slightly different types of inputs. For a scalar Pandas UDF, it can take columns, that will produce input, that are Pandas series. You're going to want to output a Panda series, from that.

Part of the reason this area was a huge slow down before Arrow is because, not only is serialization issues going one way, but data gets sent from the JVM, over to Python, does the processing, record by record, by the way. Then, builds a whole list of Python records and needs to serialize that back to the JVM. So, Arrow really helped out in this area. Not only gives it a performance boost, but using Pandas to express your user defined functions is also a huge plus, and allows you to run these functions, in a vectorized manner, and utilize all the nice Hardware features that go along with that.

Second type of UDF is called the grouped map type. This was introduced by Li Jin, at Two Sigma, and it's a super useful addition. Here's a little example of how it's used. This is slightly different, in that you write your UDF, and express it with Pandas dataframe, as input. Your output would also be a Pandas dataframe.

What's useful about this is, you can take your Spark SQL data, and do an efficient groupby, on some kind of column, and all that will be done very quickly by the Spark internals on the JVM. Then, it will transfer these groups, in batches, over to Python, using Arrow. So, each batch that you process will be a complete group ID and you'll be able to process your data as one complete Pandas frame.

This is really nice for Pandas users that are familiar already with these kind of groupby-apply schematics. One caveat though is that you do need to be wary of maybe how your groups are SKU'd, because it will collect all groups into one batch. That will need to fit in memory.

Okay. That's basically the areas that Spark is being used with Arrow, right now. Currently, I have some ongoing work of trying to use the Arrow stream format, to work with Two Pandas collection, and creating a dataframe from Pandas. The UDFs already uses a stream format, but when Two Pandas and creating the dataframe were first introduced, for whatever reasons, the Arrow file format was what we needed to use.

So working with the stream format, I'm hoping that it will be a little less memory intensive and be a little bit faster performance-wise. Also, Legion of Two Sigma is working on adding a windowed UDF's, that use the same kind of mechanism, but allow you to specify different windows in Sparks SQL. And again, process data using Pandas on various windows.

I'd also like to see Spark add the ability to externalize this Arrow data. I mean, right now, it's just being used to hand off data from java to Python, vice-versa. But I think it would be a great way to interact Spark with other systems, to be able to externalize Arrow data explicitly.

That's about all I have. Thanks a lot. Just to plug something, we are hiring at IBM. So, if you're interested in this kind of work, and want to be involved with any kind of machine learning, AI focus, we'd love to hear from you.