Improving Python and Spark Performance and Interoperability with Apache Arrow



Julien Le Dem:

I'm Julien. I'm an architect at Dremio, and formerly I worked at Twitter where I co-created Parquet, and I'm an Apache member and I'm on several PMCs, including Apache Arrow, which is our topic today.

Improving Python and Spark Performance and Interoperability with Apache Arrow

So to give you a quick agenda, I'm going to start. I'm going to talk about the current state and limitation of PySpark's UDF. Then I'm going to give you an overview of Arrow, how the project started and what it's for, and then Li is going to talk about the improvement realized in PySpark using Arrow and the future roadmap of things yet to come.

So first, the reason for PySpark is, there are a lot of things that are much easier to express using Python and the built-in Spark features. For example, you can do weighted means, correlation, moving average, a lot of things that are much easier expressing Python.

And so the way a PySpark UDF works is by using ... So you express your Spark pipeline using Python, and you can use Python lambda expressions inside of it, and they are going to be run inside the Python runtime, once [inaudible 00:02:17]. And there are two main types of expression. You can have simple, take one value and evaluate an expression like x+1. You can do more complex, taking multiple columns in and do data expressions. And you can also do group UDFs. Group UDF where you work on a list of values, and that's the more interesting case we're looking at today.

Improving Python and Spark Performance and Interoperability with Apache Arrow

So the row UDF, it's similar to what you do in Spark with the map operator and pressing a function. In this example, df.withColumn, this is PySpark dataframe. We're creating a new column, v2, and we create it by applying the UDF defined as this lambda expression x:x+1, choose a column v1. And the big downside of this, it's 68 times slower than doing the same thing in Scala, and for a bunch of override we're going to talk about.

Group UDF, while it's similar to a row UDF, except you want to do it on a list of values. So it's similar to do a groupBy followed by a map operator in Scala, and you could use it for example to compute the weighted mean by month. You group by months and then you apply a function that will do the weighted means.

The problem is, it's not supported out of the box, so there's a lot of boiler plate to pack/unpack values into lists, and there's a lot of override to that. And that translates to poor performance because those groups have to be materialized when possibly want to do partial aggregation and then combine them. And there's a lot of serialization/deserialization in converting the data structures that Spark understanding to the data structures that Python understands.

So we're going to look at an example of data normalization. So here you take a list of values, subtract the mean to each value, and you divide by the standard deviation.

So to apply this simple math operation, you need to write all that. And I'm not going to go into the details, but if we zoom in on the useful bits, you can see we do a group by months, and then we define a function called 'normalize', which is you recognize the expression you were defining earlier. And then we apply it into this last line, we apply the normalized function and create a new column for it.

Improving Python and Spark Performance and Interoperability with Apache Arrow

And everything else is boilerplate code to make the group UDF work into the [inaudible 00:04:54].

As poor performance we compared it to a baseline. So this groupBy().agg(collect_list()) is the Scala baseline in Spark to materialize those lists. And so evaluating the normalization on top should be small cost, where here we pay 60 times the cost because there's a lot of serialization/deserialization happening.

So the problem is, there's a lot of packing/unpacking of nested rows, there's inefficient data movement because of serialization/deserialization, and Li is going to show you in details what that looks like. And the Scala computation model add the override of boxing and the interpreter.

So that's where I'm going to give you an introduction about Arrow to give you some background of why Arrow can help in this situation. So first, Arrow started as a common need for a in-memory columnar presentation. So a lot of those open-source SQL-on-Hadoop like Impala or Drill or Presto, or other projects like Kudu is a columnar storage. They're all looking at the literature and academia like MonetDB and Victor [inaudible 00:06:21] and they're the next step on making all those things much, much faster. And it's all happening in all those projects, but the idea here, hey, if we can agree on what these columnar in-memory or presentation looks like, then there's a lot of benefits in removing all the overhead of converting from one representation to the other from one system to the other.

Improving Python and Spark Performance and Interoperability with Apache Arrow

So building on the success of Parquet, we put all those people together in a room, and we said hey, let's agree on what it's going to look like and start an Apache project. Define the format, and share ... First you can share the implementation, and second, you have the benefit of now it's standard, so there's a lot of work that doesn't need to happen because we don't need to convert from one format to another.

So the goal of the project is to have a well-documented cross language compatible in-memory representation. So you can use it from Java to C++ and seamlessly communicate from one process to another through it. It's designed to take advantage of modern CPUs, pipelining, [inaudible 00:07:29], etc. And it's embeddable and interoperable.

So here I have sample of execution engine, our processing framework on the top, and more storage layer on the bottom. You have the costs ... Like if you look at the case of Spark and Python, there's a lot of overhead on making them communicate. Because you have one is a native process in Python, and Spark running on the [inaudible 00:07:57]. So most of the time, you find the lowest common denominator for communicating between the two, and you have a lot of overhead just serializing/deserializing stuff. And second, there's a lot of duplicate effort because you need to figure out how to integrate each of those, and you need to find this common layer for each of those things every time. So there's lots of duplication, lots of complexity, and lots of work involved.

Improving Python and Spark Performance and Interoperability with Apache Arrow

Now, agreeing on the in-memory representation, well once everyone agreed on that and use Arrow for communication, then integrating between any two of those is much, much simpler. There's a lot of effort that's not necessary anymore, and the non-negligible advantage is there is zero cost in serialization/deserialization, because it's exactly the same representation on both sides. Arrow is designed so that you can copy the buffer directly from memory to the wire and back to memory without any operation. So there's no CPU cost involved, and reduce the serialization cost easier.

So to give you a quick example of what it looks like, we have some Jason data here to have some data. And to simply find ... Didn't show the beat sets that are used to capture nulls, so if you have a null you would have a bit that says zero versus one if it's defined. If it's affixed with types like integers, you can see the age column, we could just put four bites after four bites after four bites, it's fixed width, it's one vector. It fits variable widths like the name here. You have the values all one after the other in one vector, and then you have an offset vector which points to each beginning of value. And this offset vector mechanism is composable, so if you have a list of variable lens values like phones, then you have an offset vector that represents the beginning of each value, and you have another offset vector that points to the beginning of each list. So you can have arbitrarily listed data structure presented in that way, and you have this nice columnar presentation.

So now, when you set it over the wire, you have just a simple data header that says hey, here are the buffers, and here's their size. And then you just copy those vectors directly to the wire. You notice because the offset vector is all relative, you have off index zero, index five, and so on, there's no translation necessary. There's no absolute pointers like you would have if you had an object tree in-memory in regular languages. So you can just copy them directly to the wire and back, and there's no CPU transformation involved. You don't consume CPU transforming or weak at creating those pointers.

So this representation is designed to maximize CPU throughput. One thing is modern processors only execute one operation after the other, they stagger, they start the next instruction before the previous one is finished. And for that, they have branch prediction algorithm to kind of ... Every time there's an 'if', or if there's a data dependent branch, they need to try to start executing the next instruction now before they know which one is the next one, so they pick one and they guess so that it can go faster. Otherwise it would lose a lot of cycles waiting for the previous instruction to be finished. So vectorized execution is actually focusing on very tight loops on one column at a time. I'm sorry, I don't have time to go into details. But vectorized execution is taking advantage of pipelining to get maximum [inaudible 00:11:36] of the CPU.

SIMD instruction are single instruction multiple data. Similar, it's about tailoring the processor, hey, do this same operation on four values at a time. And because we put all the values next to each other, it's really easy to do that. We're going to work on that whole vector at a time. And cache locality is, again, the processor goes much faster than it can fetch data from the main memory, so to avoid this problem it copies some limited cache on the processor itself that goes much faster. But every time it needs to fetch more data into it, it has to wait for this. So this creates a lot of latency in vectorized execution because you're focused on one column, on few columns at a time, you actually have to fetch more data a lot less, and it's much more efficient.

And scatter/gather I/O is this mechanism of just pointing the network layer at the memory and say, take these buffers and send them to the network. Or take those buffers and put them back in-memory. And you take advantage of the most efficient copying of data that doesn't require any CPU involved for transformation.

So we have some members thanks to Wes McKinney from Two Sigma. We put a bunch of blog posts with some measurement of PySpark, Arrow Parquet Integration, and give some numbers. So these slides are available. You'll be able to go look at it.

Some information about our releases. So since we started, in green you have the number of commits or JIRA tickets resolved. In blue you have the number of days. So we have been trying to make more often releases and try to release every month now. And so you can see there is a lot of activity that has been happening.

And now, I'm going to invite Li to talk about the improvement to PySpark using Arrow. And probably you can introduce yourself again.

Li Jin:

Hello, everyone. My name is Li Jin. I'm a software engineer working at Two Sigma Investment. Two Sigma is a technology company that applies data signs to the fields of finance and insurance. I'm currently building a Python based analytics platform using PySpark, so here's some work we have done to improve PySpark using Arrow.

So first, I wanna just quickly illustrate of PySpark UDF currently works. On the graph here we have to process. On the left side, there's executive process running [inaudible 00:14:03], on the right side is the Python worker process running on a Python runtime. Doing a UDF validation, the executive here will stream rows in batches to the Python worker, and upon receiving rows, the Python worker simply invokes the UDF row-by-row basis and sends the results back. So this is a quite simple architecture, and it works currently.

However, as Julien mentioned earlier, there are two issues with this approach. The first is, we're doing a lot of serialization and deserialization in Python, which is not very efficient. And also, the way we invoke the UDF basically in a Scalar computation model in a Python for loop is not very efficient either.

So this slide I want to deep dive into one profile we took during one UDF evaluation to show a little bit of where the overhead is. The first thing I wanna show here is it took about a total of two seconds to compute about two minute doubles for x+1. Here on the profile it shows four seconds, this is because of the profile of Python actually adds significant overhead. So just keep in mind actual runtime is about two seconds. And to translate that to [inaudible 00:15:23], that's about eight megabytes per second in processing this simple computation, which isn't very fast.

The second thing I wanna highlight here is the second red box here which accounts for over 90% of the total runtime of this computation, and these are all spent in the serialization and the lambda function overhead. So this is quite less than ideal, and we want to improve this.

So here are some changes we made to PySpark to vectorize the row UDF. Basically, what we have done is we have implemented a module to transport rows back and forth to Arrow batch records. And we send Arrow batch records, of course they're wired to the Python worker. And upon receiving those data, now the Python worker uses a UDF function which is now a transform on Panda's dataframe to enable vectorized operations.

Improving Python and Spark Performance and Interoperability with Apache Arrow

So I wanna just quick talk about Panda's dataframe as the interface, if it's not already obvious. First, Panda's dataframe is a fast, feature-rich library, and widely used by Python users. Panda's dataframe already exists in PySpark by using a toPandas function. And Panda's dataframe is compatible with most popular Python libraries, such as NumPy, StatsModels, and etc. And last but not least, Panda supports zero copy to and from the Arrow data structure, which is very important for performance reasons here.

So now I wanna show a comparison with the Scala version of the UDF and a vectorized version of the UDF. The first thing I wanna show here is the total runtime goes to about two seconds down to less than a hundred milliseconds. That's almost 20x speed up on the Python worker.

The second thing I wanna show here is all of the overhead we've seen before, so the over 90% of the time spent in serialization and lambda functions are all gone now. You cannot find them anymore in the vectorized version.

And finally, I wanna show that even I/O part is faster in a vectorized version because of the I/O is not also vectorized, which means less system call, and the resulting faster I/O.

With all that, we also measured end-to-end runtime comparing the default version which is the row-based, and the Arrow version which is vectorized. Here, again, we're doing x+1 for a dataframe of doubles, and we're doing this to sum a single thread in Spark local mode. And as we can see, we can achieve a pretty consistent of 4.5x speed up across different data sizes. The speed up isn't quite 20x as it showed earlier in the Python worker, because there's extra time spent in converting Spark rows to and from Arrow batches, which brings the speed up down to about 4.5x. But still, this is quite promising.

The next thing I wanna talk about is supporting group UDF. As Julien showed earlier, he wanted to do a transformation on group in PySpark right now is quite complicated, so we wanna make it better. First I wanna talk about just quickly about this common pattern in data analyzing called 'split-apply-combine'. This is a simple yet powerful pattern which basically breaks the problem into smaller pieces, operate on each piece independently, and finally, put all the pieces back together. This is a common pattern supported in a lot of the systems, such as SQL, Spark, Pandas, R, etc.

So in PySpark, split-apply-combine basically works like this. PySpark provides a lot of functions for you to split your data differently. For instance, you can call groupBy or you can use the window function to put the data into different groups. The apply step, basically you can invoke any of the [inaudible 00:19:45] Spark functions, window functions, or you can do a Scala UDF. Here, just some quick examples, you can do mean, standard deviation, or you can collect the data into a list and process it later. And the combine step is done inherently by Spark. So what we wanna do is we wanna keep the split and combine step unchanged, and we want as support for the group UDF for the apply step.

to do that, we introduced a new function apply on the groupBy function. This function takes a UDF on the Panda dataframe, so it's transformed on Panda's dataframe. It treats each group as a Panda's dataframe, applies the UDF on each group, and finally assemble the data back in Spark.

Here, just a quick illustration of how it works. On the left side, we have three partitions with different rows. First we do a groupBy which is a full shuffle of the data into different groups. And then, within each partition we invoke this lambda function group by using Panda's dataframe as its interface. And finally, we get the data back together in Spark.

Improving Python and Spark Performance and Interoperability with Apache Arrow

To show how to use this, we go back to the previous example of data normalization. Again, we want to subtract the lesser values by it's mean and divide it by it's standard deviation.

So this is a comparison of the tool implementation using the current AVI and using the group UDF API we implemented. On the left side, this is what you can do now, which is exactly the same code as Julien showed earlier which involves a collect list, a row UDF, an explode. So it's a quite complicated computation. And on the right side is what you can do with the group UDF, which is pretty simple, easy to read and write. And also, we are able to achieve a 5x speed up by using the group UDF because it's using Arrow as the serialization and using Panda's for the vectorized computation.

Finally, I wanna talk about some limitations we found along the way. First limitation is this approaches require conversions between Spark rows to Arrow record batches. This is because, inherently, Spark is a row-based memory layout, so it's a reader of rows, and Arrow data structure is inherently a column base. And in order to convert these two we need to spend some extra CPU cycles to cog the data from one format to the other. Hopefully this can be improved once we can have access to the common format within the Spark internal.

The second limitations we found is in the groupBy case, it's actually pretty hard to do local aggregation. If you still remember earlier, in the illustration earlier we have to shuffle the data first and then apply the UDF function on it. This is difficult due to how PySpark inherently works. If you're interested in details there is this [inaudible 00:22:56] discussing aggregation functions in PySpark, so you can take a look afterward.

So that covers all we have done basically to show some improvement. Next I wanna talk about a little bit about future roadmaps. So first, on the Arrow side, we expect to implement Arrow RPC over REST which is a standard way to retrieve and store data in Arrow format. Secondly, we want to implement Arrow IPC, which stands for inter-process communication. This is basically a shared data structure to allow even faster data transfers between languages, because you can just pass the memory pointer from one language to the other. Because of Arrow data structure is language agnostic, both languages can understand it perfectly. So this is actually pretty novel work that's happening. Also, there has been discussions about Arrow integration with more systems that we would like to see. For instance, Drill and Kudu.

And on the PySpark side, we're gonna keep working on this [inaudible 00:24:06] which captures the faster UDF using Pandas and Arrow. We want to support the Pandas UDF function with more PySpark functions, for instance groupBy aggregation and window functions.

Here is the API prototype of how things might look like. This is not final, but just to give you a taste. The first example here, I'm showing you can do a groupBy aggregation with the Pandas UDF. In the UDF you define how you wanna compute or the way they mean for each group, which is using the non-pi average function, and you should be able to just use this with the PySpark groupBy aggregation function. The second example I wanna show here, is you can implement a customer rank function using the non-pi rank function, and you can use it with the window function by cutting the random UDF over window. So this hasn't been implemented yet, but we're working finalizing the API and implement this work in the near future.

So with that, I want to, as a community, we want to help to make PySpark better. So if this work seems interesting to you, please get involved, watch this [inaudible 00:25:21]. Or you can give me some feedback, talk to us, discuss about your [inaudible 00:25:26] cases. We're very excited to hear about that. Also, if you're interested in Arrow project, here's some information you can use to watch this project. It has been pretty active, as Julien showed earlier.

So with that, we want to thank a couple people here. Bryan Cutler from IBM and Wes McKinney from Two Sigma Investments for helping build this feature. Also wanna thank Apache Arrow community, Spark Summit organizers, and Two Sigma and Dremio for supporting this work. With that, I want to move to questions. Thank you very much.

Speaker 3:

Thank you Li and Julien for the great work in the community and Apache Arrow. We've got about four minutes. If you have any questions we have one mic over there and the other mic is with me so I'll start to run around and add some steps to my FitBit, so go ahead.

Speaker 4:

Hey, guys. Great presentation. Quick question, are you using all this efficiency, is this for trading for like, less than a second? Are you holding assets for a very short amount of time or is this more for value investing and doing a whole bunch of research and then using that information to purchase for the long-term?

Li Jin:

This isn't particularly for low agency, this is just for intact of use cases.

Speaker 3:

Any other question?

Go ahead.

Speaker 5:

I was wondering is there a roadmap for doing more any kind of columnar compression, that kind of stuff, in Arrow? Thanks.

Li Jin:

Julien, you wanna answer that?

Julien Le Dem:

If there's any what, sorry?

Speaker 5:

On the roadmap is there ... What kind of support columnar compression will you have in Arrow?

Julien Le Dem:

What kind of support we have for columnar compression. So currently there's support for a dictionary in coding, and then the main support for columnar compression there is in Arrow. The accent is on everything that is very [inaudible 00:27:31] efficient, so dictionary in coding is something that's great because it helps with generating aggregation, it can go much faster on the dictionary ideas rather than the valuable lens values, for example. We don't have more advents of complex compression techniques yet or in the near future. And one of the goals was to agree on a limited set of features. If you put too many features it slows down adoption. But dictionary encoding is a big one.

Speaker 3:

We got time for one more last. Going one, going twice. Alright, give a big hand to Julien and Li.

Julien Le Dem:

Thank you.