Apache Arrow: In Theory, In Practice




Jacques Nadeau:

Thank you so much. Hello, everybody. I will be talking through Apache Arrow. I'm going to start with just a quick overview of what it is, what it's trying to do, some of the things that are a part of it, and then I'm going to go into some pretty detailed thinking about how we have used Arrow. And some of the things we've learned in terms of how to solve certain engineering problems when you're working with Arrow, and some of the solutions we came up with.

So, hopefully, it'll be valuable to you. I've been in meetings back to back all day, so I apologize if it's not as crisp as it could be if I'd done it maybe several hours earlier than this.

Anyway, he said who I am. I am involved in a lot of Apache projects. I also work at Dremio, which is an open-source technology that allows you to interact with lots of different kinds of data, and accelerate how fast you can interact with that data. You can go check it out if you want to, but this talk's really about Arrow.

So, I'll start out with Arrow in theory. So, the Arrow project ... It's actually pretty amazing. It's only about a year and a half old. [Wes 00:01:32] and I were actually working on it for maybe six or eight months before that, talking to different people to try to figure what it could be. Trying to get together a group of people that all shared the vision of having this common way of interacting with data.

It's really focused on columnar in-memory analytics. It's trying to solve different kinds of ways of doing columnar in-memory analytics that could be advancing analytics, that could be descriptive analytics. But really, how do you do that in a reasonable way? The committer and contributor list ... I actually haven't updated this in forever, because there's a bunch of other people involved, like the Enigma guys, like the Two Sigma guys.

Anyway, when we first started out, bunch of different people that are committers on a bunch of other important open-source projects are all involved in Arrow. Some of them less so, some of them more so. But we basically tried to build this as very much a community standard.

So I'm sure you're all familiar with open-source. I kind of look at open-source as there's two kinds of open-source. There's open-source that's very corporate-sponsored open-source, typically driven by mostly one corporation. Then there's open-source that's very community focused, where there's no single organization that owns it, and, in fact, you're trying to build a very common consensus. And Arrow very much fits in the latter category. There isn't any one person who controls it, although Wes does a crazy amount of commits. So he's very influential in it. But the passion is to try to make something that is very much shared.

And so, basically, we wanted to implement a well-documented, cross language compatible, in-memory representation for data. And we want it to be designed such that it was very efficient for processing, so when you're processing this data representation, you could take advantage of modern CPU characteristics.

Apache Arrow: In Theory, In Practice

And so, to think about this at a very high level, there's kind of a chicken and egg that's going on here, is that Arrow is about two things. It's about processing efficiently, and about sharing data at very low cost, or at zero cost, potentially. Okay? And the challenge is, is that ... How do you get two systems to communicate with a common format? Well, you could have them each convert into that common format at the edge, right? But then you're still pairing serialization and deserialization costs.

So the idea with Arrow was, is that if the format is very, very efficient for processing purposes within a single system, then that provides a really tangible benefit for that system to adopt it. And you can see that happening with Wes in some of the stuff he's doing on pandas, for example, where he's starting to adopt Arrow as an internal format for improved performance of processing.

But the second benefit that happens is, as multiple systems adopt that representation of data, then all of a sudden those systems can share the data at very, very lost cost, and possibly a shared memory at zero cost. And so that's really what Arrow's about.

So it's designed to be ... It's a library. It's not a system that you install. It's a library used by many different types of systems. And it's designed to be embedded in executions, analytics engines, storage layers, whatever the case may be.

What is the actual representation itself? Well, it's what I would describe as shredded nested data structure. And so each different field inside of a complex data structure is in a separate piece of memory, and potentially multiple pieces of memory. It's designed specifically so you can be random accessible.

So if you think about formats that are on disk, typically, formats on disk ... You have to read them linearly. So as you're working through individual records, if you want to get to the 100th record, you have to read the first 1 through 99. Or maybe you can skip through the first 50, but you have to read the next 50 in order to get to the right value because the offsets are not guaranteed. In Arrow, that's not true. Arrow was designed specifically to be random accessible. You always know where a particular record is in a particular memory location. So you can calculate that in your code in a very straightforward way, so you always know where to get data.

And it's designed to basically take advantage of the CPU pipelining, SIMD, cache locality, super-scalar operations, all of those things, by how we structure the data in memory.

It's also designed so that it's very easy to take it and move it to an I/O, whether that's socket or disk. So you can take it and basically flatten it down to disk or flatten it off onto a socket very, very efficiently, and use the scatter/gather capabilities in the kernel.

So, I talked about the fact that this is about processing and sharing. So, if you think about how systems work today, basically, most systems have different ways of representing data. And so, whenever you're moving between any two systems, you have to basically convert the data from the internal representation of data to a version that it uses on the wire, and then sends the data across the wire. Then you suck the data through some kind of cell-level API to interact with it on the other side, and convert it into a different format. Right? And so you actually have copy and convert on the sending side, most likely, and copy and convert on the receiving side. And so, every time you're moving in between systems, that becomes very, very expensive.

And so, why this is a big problem is, is if you think about where we are today versus where we were, say, 20 years ago. 20 years ago, people did analytics in monolithic systems. I might do all my stuff in Quark, or with some plugins of some machine learning stuff, or something like that. Today, there are lots and lots of different tools. But what's happening is because the cost of moving data between different contexts is so high, you still kind of get drawn into this approach of, "Let's do it more monolithically." When really, what you want to have is a very loosely-coupled ecosystem.

Apache Arrow: In Theory, In Practice

And so, the passion with Arrow, there, is that you can get to this image on the right-hand side, which is all about, "Hey, let's [inaudible 00:06:52] these systems all use similar representations of data, and be able to share those at very, very low cost." So I could start a application in, say, a particular storage format, move it to one processing engine, move it to a second processing engine, move it to another storage system, possibly move back into a processing system, whatever the case may be. And doing all of those things at low cost, so I could pick a best approach to how I'm dealing with data analytics.

The other thing that, actually, it provides a potential benefit ... And this is something that we're getting to. You can't use Arrow today to do this. Is, is that if you think about processing of a dataset today in memory, most likely, you might pin the files in memory, or something like that. And then every time someone wants to interact with the data, they basically read the file in memory, and then turn it into an internal format for processing purposes. Right? If the data is in a representation that is processable without change, then that means that, potentially, many different applications or many different users can interact with the same data at the same time. And that, ultimately, we hope, will mean that you can have substantially larger [hot 00:07:53] datasets, because you only have to have one copy in memory instead of a copy for every user and for every application.

So, the other part of Arrow is about common processing. So, yes, we want a representation that's very efficient to process, but it's also nice for someone to be able to pick it up and start being able to process using certain types of algorithms without a lot of extra implementation. Right? So, as it is right now, it's more ... More of the code, right now, is focused on the implementation of the representation, writing to and reading from the representation. But what we're trying to do is start to have, "Hey, what's a fast way to apply a predicate to this representation?" "What's a fast way to sort it?" "What's a fast way to drop it into a hash table?"

Because one of the things that people may not know is, is that working with columnar data has its own challenges. And that's one of the things I'll talk about in the second part of the talk. And so, having some sort of initial tools, some primitives that someone can use to start working with Arrow, start to implement something and say, "Wow, when I use this representation, I can solve this problem in a very, very fast way," and that's an easy way for them to start adopting it.

In terms of the data types available in Arrow, Arrow is a complex representation. It's basically all the standard things that you think of. So there's a bunch of scalar types around dates, times, decimals, integers, strings, binary, category ... I think there's a couple I'm missing, there, that you've added. What's that?

Jacques Nadeau:

Yeah, there's a few. Anyway.

But most of the common types that you'd think about, and as well as complex capabilities around those arrays, structs, maps, and as well as a union type, which allows you to have a heterogeneous [feed 00:09:27].

This is an oversimplification, but basically, if you think about a stream of Arrow data, okay? And this is a slide I've been using for a long time, so some of these things might have changed a little bit. But the stream of Arrow data is really around starting with schema negotiation. So you and I need to agree with what schema I'm going to be providing to you in my Arrow stream. And so this is going to be like, "Oh, here's the field names. Here's their types. Here's the structure of the data." And then after you do that communication, it's a very highly efficient representation of data.

Apache Arrow: In Theory, In Practice

And so, you basically can send one or more dictionary batches, and we're actually modified it so can send them interspersed, as well. But when the slide was written, you could only send them in the front. Basically, the dictionary batch, it says, "Hey, for this field, I know I told you that this is the structure. I'm going to send you the values for that structure, but then the data's actually going to be integers." Okay? And we don't do bit packing yet, but we probably will soon. But at the moment, those are integer packs.

Jacques Nadeau:

And you can have more than 64k records, although it's impossible to manage your memory. But you can have it.

But, basically, the pattern ... Some of this is from ... I work a lot in the Java library. Wes works in the C++ library. There's small variations there, but the format is actually very clear. And so, at least in the Java library, normally you can't have more than 2 billion values at a leaf level inside of any particular batch of records.

So, anyway, so, basically, when you're communicating with Arrow, it's very simple. "Hey. Here's what the schema is." Maybe a dictionary batch, maybe not. And then, "Here's a bunch of data batches." And the data batches are designed so that you actually have a very clear structure of data. And I'll get to that in a second.

What does the actual format look like? Well, it seems ... It's probably really obvious once you see it. Right? And so, here we have an example of a person's array that has two people inside of it. Most of those things are scalar values. A string of name, integer of age, as well as an array of phone numbers which are strings. And so what that does is it maps to three representations, here. And this is a slightly simplified version of the representation, because we don't cover nullability, here, and I'll talk about that in a minute. But if you just think about it in terms of the data itself, right?

So, the name ... You're going to have two different buffers of memory. The first is going to be an offset buffer which describes the start and endpoint of each value in that buffer. Okay? And then the second is the actual data associated with those names. Okay? And so those line up very, very, very nicely in a simplified manner, and you can say, "Okay, if I want to ... " And this is where the random accessible comes in, which is, "If I want the first value," if we're zero indexing, "I want the first record's value for the 'name' field," I can go to the offset one of the "name" field, then offset two of the "name" field, and I know that the data in the data buffer is going to be from three to seven for that value. Right? And so, if I wanted to do a link operation, I can just do it against the offsets. If I actually want to do some kind of data operation, I have to interact with the data buffer. Okay?

The second buffer, much simpler. Let's say this is a 4-byte integer. Those 4-byte integers are just end to end in that representation. The third one, more abstraction, here, where basically, you have one more level of indirection. So it's a list of strings. So if you look to the right, the two right things look just like the structure for "name". Right? Which is basically just a name structure. What we've added on top of that is another structure which describes the start and end point of the lists. Okay?

And so, basically, this is the ... Base components are atoms that you would use to build most of the data representations. Obviously, there are different widths, like it's a 4-byte integer or an 8-byte integer, it's a single precision or a double precision floating point, or whatever. But this is the base concept.

And so, generally speaking, we also typically add one more buffer to each of these, which is a validity buffer, which is one for "this is a [inaudible 00:13:24] value," zero for "this is not". One of the things to note for that is, is that if the validity buffer says it's a null, there is no guarantee of what the other data says in the other buffers.

So, if you think about a record batch and how it's constructed, it's actually a very simple thing. So the data itself is basically all of the buffers that are associated with that record batch, and [inaudible 00:13:49]. And so this is all data, basically the green, the yellow, and the orange, or whatever that last color is, peach? Whatever it might be. Are all data. And then above them, they have what's called the data header. And the data header describes the offsets into the data. Okay? And is structured in a consistent manner across all record batches in a flat buffer representation.

Apache Arrow: In Theory, In Practice

And so you always know in the data header ... There's no effort to decoding the data header. If I want the age buffer, I know exactly where to go in the data header. So I can pre-compile whatever I want once I know what the schema is, and work against all these data batches without having to do any extra work. Because I know exactly where all the data is. Okay?

Now, one of the important things to note here is, is that there's two patterns of how you might share this data. One is that you're going to share it on something that's local. And so, in that case, there is no requirement that is necessary that these things are contiguous. Right? And so you have two applications that are possible working in shared memory, then these buffers could potentially be all over memory, and that data header will simply tell you where they are. Okay?

But, if you were doing something that is more likely to be shared across memory spaces, like, "I'm sending this thing on socket," then, generally speaking, this will all be contiguous as a single message with the data header sitting on top of it.

All right. So, quickly, Arrow components. I kind of break it into three things. This is not formal. It's not what the project says, but it's the way that I kind of think about it. Is, is that there are core libraries, which is the most basic components of Arrow, to get started using Arrow. And then there are some within-project integrations. And then there's a bunch of things that are using Arrow that are outside of the project itself, just basically using it as a tool.

And so, the core components to me are really composed of the different libraries for the different languages, so there's bindings for a bunch of different languages. I think that there's more than we have here. But there's bindings for a bunch of languages. We're adding them all the time with the ability that you can interact with an Arrow representation no matter what language you're in.

One of the key things that we focused on is, is that one way that we look at this is that there are two worlds. They're two really important, distinct worlds, the JVM world and the not-JVM world. And, usually, they don't play very well together. And so one of the things that Wes and I have focused a lot on is trying to make sure that those play very well together. And Wes and his team at Two Sigma have actually done a huge amount of work there on that. To make sure that, "Hey, if I want to move between those two worlds, I can do it very efficiently."

On top of those core libraries, which allow you to do reading and writing, and that kind of thing, there are several other applications that are inside the project. So, Plasma was recently contributed. Came from the Ray project at Berkeley, and it's a shared memory caching layer that supports using Arrow representation inside the shared memory.

Feather is a really cool implementation that Wes did early on in his Arrow stuff, where basically, very fast way of writing to disk and reading from disk. Basically, an Arrow representation for short term precision. So moving data between different processes.

These other two are things that we're talking about now in the community, and trying to figure out the right way to implement. One is what I call ArrowRest. That's not the formal term for it. But it's the idea that we need to have a very straightforward way to communicate data. And it might be over Rest, too, using GRPC, or whatever. But it's a way for us to be able to communicate Arrow streams that everyone knows how to do. So I can start to implement a, in, say, my product at Dremio, a API that then any other tool could interact with, because they know it's a standard ArrowRest API.

The other one is what I'm calling ArrowRoutines, which is the set of libraries that allow you to work with Arrow efficiently and get started very, very quickly.

So, moving up to the next layer. So, who's using Arrow today? Well, there's a lot of different places. So, pandas, as I mentioned earlier. Wes is using that extensively internally to improve performance of a bunch of operations. There's the GPU Open Analytics Initiative, which is a mouthful. And libgdf. But it's basically a GPU data frame that a number of different people are using to basically use Arrow as the way to represent data in GPU memory.

The Parquet project uses it extensively. If you read and write from Parquet in C++, you are using Arrow as the intermediary there.

Spark, actually ... There was a blog I think yesterday or the day before about data bricks. A bunch of people at Two Sigma worked very hard, and actually, some people at IBM and a couple of other places, worked very hard at integrating Spark and PySpark capabilities to allow us to serialize data from Spark context into Python much, much more quickly. And that just got dropped into ... I think it merged just a month ago. But it'll formally be in the 2.3 release, which is, I think, at the end of the year.

And then, lastly, the project that I work on ... We built everything on top of Arrow. So we are an OSS project. You can check us on GitHub. But we actually do a bunch of different things on top of Arrow. And I'll talk a little bit about some of the algorithms in the second part of my talk, about how we use Arrow.

So, Arrow in practice. So, at Dremio we built a execution environment for working with data that's built entirely on top of Arrow, so the only internal representation of data is an Arrow representation of data. And we call the engine Sabot. Which, if you don't know, is a shoe. But it's also a shoe for a bullet, or a small cross-section bullet in a large barrel to make it go faster. Which we think of as a fast way of making Arrow.

Anyway. So, couple of quick ... So, I'm going to go through five or six different technical problems. Right? Problems you're going to hit every day when you're doing this kind of thing, and how we solve each of them. And so this is mostly in the context of the JVM implementation of the Arrow library, but a lot of these lessons exist no matter what language you might be working on.

So I'll go through these. The first one I'm start talking about is memory management. And so, when you're working with Arrow, you're basically having to work with these buffers all the time. How do you solve memory management? Well, we're on a JVM, and so one of the problems with JVM is they don't ... So, as a note, in Java, we always keep Arrows off heap, because we want to be able to share it with other applications. And so, we did a bunch of memory management stuff. It's built on top of Netty's JEMalloc implementation, and, basically, what we did is we built a way to create a tree of allocators. So that, for different pieces of work, you can clearly identify the boundaries of that work. You can define how much memory is allowed to that piece of work, what is reserved for that piece of work, so that you can then very clearly understand where memory is being used. Because you may have many, many pieces of work that are operating at the same time and the same context.

Apache Arrow: In Theory, In Practice

It also includes the ability to detect leaks. So if you see where you are failing to release memory, it's very clear, because we're not relying on a garbage collection for this stuff. And it's also designed to be very powerful in terms of ... I actually got that on the next slide.

So, it's also designed to be very clear about ownership semantics. And so if you're moving Arrow memory through a data pipeline, you're going to want to know who owned that memory last. And so the allocator hierarchy has a very clear concept of transferring ownership between allocators. It's built into the counting system, as well, so you can very clearly see who's using what when, and make sure that you know that you're maintaining the levels that you need to maintain so that you can make decisions about, "Oh, shoot, I am running on memory I need to spill to disk," or whatever the case might be.

The second experience that we had is, is that ... And this is the one that I was joking with Wes about before. Is, is that if you're working with a lot of workloads at the same time and the same context, you start dealing with all sorts of memory problems with this kind of stuff. Arrow is great at being very, very fast for processing, but it also is large. So the reason that things are smaller on disk is that we use compression, and we don't support random accessibility, and various other things. And so when you do some things to make it really good for processing, then it can be larger in memory than it is on disk. It typically is larger in memory than it is on disk.

And so, in our system, we actually do constrain record batches to up to 4,000 records. But it's actually very rarely even 6,400 records, and the reason is, is that you want to figure out, "What is the optimal batch to be able to move through the system as a unit of work?" Right? And so, in our system, we are more of a sequel system, and so when we're moving data between different operations, we want to make that unit of work small enough that we can make scheduling decisions on which thread should be working on what when. We want to be able to spill individual batches to disk very quickly. We want to do batch-level sorts very, very quickly. And so, we are constantly sizing the batches for one thing, which is efficiency unit of work.

Well, actually, for unit of work, if you're not ... Spilling is a case where you actually want to keep it smaller, because you want to be able to drop fast. You don't want to have to wait for a big spill of a huge amount of data if you're trying to pipeline something. But many of the other use cases, you actually benefit from a larger batch of records, because you can work with more at once before you go into some other less optimized code.

But what happens is, is that as the width of your table increases, or the number of fields, if it's a complex object, increase, you start to use huge amounts of memory for even a single batch. Right? If you're going to send a single batch across an RPC connection between two nodes, you don't really want that to be a hundred-megabyte batch. Right? You also don't want to spill a hundred-megabyte batch. It's going to take too long, and mess up your pipeline.

And so, what we ultimately figured out that we needed to do was adaptively size the length of the batch based on the width of the table. And so, you basically look at the number of fields in the batch as well as the average size of things like the variable width fields, like strings. And then, based on that, we actually make decisions inside the engine on what size we should size these batches so that we can continue to maintain a very good pipeline. And so, typically, we actually run somewhere between 127 and 4,095 records in a batch, because that's a good size. So 4,095 is closer to dealing with 10, 15 columns, whereas something like 127 might be dealing with ... One of the customers we're working with has got over a thousand columns that they're dealing with in a single pipeline.

And so, anyway, it's this kind of balancing act. And it's something that we think, "Oh, it wasn't going to be so bad." But then you start bringing in customers that have a thousand columns that they're trying to push through a system like ours. And you're like, "Okay, I need to adjust this down, because my batches are getting just much, munch too large." Which causes several things, right? It causes problems with fragmentation. Your allocator may be doing special allocations for that, causes problems in terms of the spilling pipeline. It causes problems in the network pipeline. And so it just causes [inaudible 00:24:53] problems when you're trying to deal with memory management in general, in terms of who's using what memory.

So, the next thing is, is that when we were trying to solve RPC communication, as I talked about before, this Arrow batch is basically a bunch of things structured together. This is basically the thing saying, "You saw it before." And so, what we do there is we actually leverage gathering rights when we drop it on to the socket. And so if you see this, basically, we can write the structured message in a buffer. And then we take that buffer, link it together with all the other buffers, and then let the kernel all write that to the socket at once. So it's a very efficient way to send the data.

Apache Arrow: In Theory, In Practice

And so, basically ... We actually, internally, at the moment, we still have the structured [inaudible 00:25:34] and [proto-buff 00:25:34] in some cases, and we've adopted the flat buff in some locations. Because when we started, we actually didn't have Arrow. But we'll ultimately get to flat buff for everything on that, and do the same one that in the spec. And, basically, built a custom Netty protocol for this that allows us to support this concept of what some people call the "sidecar". We call it a "data body". Which just basically is, is that I'm going to send a structured message which is relevant to, like, "Hey, this is more information about what I'm sending you." Then I can always send this giant data message, which is the actual thing that's the main package.

So just a couple more things, and then I'll hand it over to Wes. When we're doing filtering and sorting, one of the things to think about when you're interacting with an Arrow representation is, is that it takes a lot of ... One of the things that's not faster in an Arrow representation is when you want to eliminate brackets. Okay? And the reason is, in a rowwise representation, I can just delete the record, and then copy the next record. And I copy records a record at a time. That's nice about a rowwise representation.

In a columnar representation like Arrow, I've got to copy a cell at a time. And you can do that pretty efficiently, but it's still way less efficient than doing a row at a time. And so, one of the things that you do is, in some cases, you defer the copying of data until as late as possible. And so one of the things that we do to do that is, for example, if you're applying a predicate, we generate what we call a selection vector 2, or selection vector 4, actually even ... We have now a selection vector 6. And these are, basically, masking vectors that tell us what the offsets are that are valid in the original dataset, rather than copying them out into compacted datasets.

And so we use 2 to generally operate, and that's where the 65,000 comes from, on a single batch. We use 4 to operate on a collection of batches that are fairly large in size, and so 2 bytes is for the batch, and 2 bytes is for which batch in the set. We used to only do 2 and 4, but as we added the ability of adaptability of the batch sizes based on the width of the data, we then had to add a 6th byte masking vector, which allows us to use 4 bytes for the offset, because you have far more batches, because the batches are actually smaller.

One of the things that we do to solve filtering ... We actually use that for sorting, as well, so you basically [inaudible 00:27:50] against the masked data, rather than trying to move the individual cells.

Example of what we did is ... For rowwise algorithms, basically we have a hash table implementation that's also doing aggregation. And so, this is an example of something where you're kind of struggling with the columnar representation versus the rowwise representation. You want to sort these things into a hash table, but they're actually individual cells. And so, we originally were doing it cell at a time. But it was a fairly complicated ... [inaudible 00:28:23] code, but it's differently complicated. It was fairly complicated originally, but it also wasn't that performing.

And so what we actually did is we came up with a fairly efficient way to what we call "pivot" the hash table key into a partial rowwise representation that's built inside the hash table. So that we can pivot the data, insert it, and then pay for resize, insertion, and equality, and hashing costs in a much more efficient manner, and then un-pivot it after we're doing something like an aggregation.

And so what this looks like is, is you've made aggregation tables in an Arrow representation. Those are columnar, but you maintain the hash table in a rowwise representation that has, basically, two parts to it. A fixed part, and a variable part. The fixed part has a very well-known offset, and you don't have to go to the variable part unless you've confirmed that the fixed part is actually the same.

And so, that actually worked pretty well for a hash table implementation. And then the aggregations, you can actually spill those out of the pipeline at the end. Especially if you're doing something like sum, or something like that, where the intermediate representation is the same as the end representation. So you can spill out the aggregations directly, but then you have to do an un-pivot on the data that's in the hash table.

I have an example, here. I'll send out the deck. I don't think you can read the example. But, basically, there's a least one interesting pattern that we do here. And so, when we first started trying to explore this kind of representation, we focused very much on whether or not there's ... "Is this a nullable type or a non-nullable type?" Because the idea is, is that, "Hey, if I know this was a non-nullable type, I can make my code much more efficient."

But what we found was, is that, at least in many of the use cases that I was interacting with, people generally mark things as nullable types even if there are rarely nulls. Okay? And so, for example, with Parquet, and historically, with Parquet, most of the engines would never even write a non-nullable type into Parquet. I'm not even sure if today the C++ library supports it.

Jacques Nadeau:

It does. Okay. Well, there's an example. But a lot of them didn't. An so, basically, you'd see this stuff where it was basically annotated as nullable, but then the nulls are very, very rare.

And so what we decided to do, and we do this in several places, is that, rather than try to manage both nullable and non-nullable data at a schema level, we actually manage it at a word level. And so we will, in many cases, look at 64 values, see whether or not they're valid, or not valid, or partially not. And then we have separate code at that level. And so that allows us to take advantage of it. Even if you annotated the data as nullable, we're just actually just doing the observation to say, "Hey, this data actually has no nulls in it, so we can skip over that piece of code." And then, actually, in, at least, my experience, at least the code that we worked on, has vastly simplified our code and actually improved performance.

The other example I was going to do ... I think this is the last one I have. Yeah. I think it's the last one I have. Is when you're shuffling data in Arrow, it starts to become a little bit of a problem here as well, because Arrow is really nice for a lot of things, but buffering ... It's kind of expensive. And so if you're going to try to shuffle a bunch of data between two different nodes, then you actually need to maintain a batch for each of the target locations that you're sending the data, and as batches get bigger, then these batches actually ... Over the cluster. And so if you're dealing with ... So in some of our larger scenarios, we might be dealing with 500 nodes with 25 or 50 threads per node, interacting across two ... Basically doing an end-to-end-way shuffle.

And so what happens is, is that the Arrow representation, you're like, "Oh, shoot, my buffers for the shuffle are extraordinarily expensive." And so we basically did two things here in terms of the ... One thing in the pattern, and one thing in the performance. And so, originally, we were actually doing the shuffle in what I would describe as a more rowwise approach. The algorithm was rowwise, bu the representation was columnar. And so we were seeing not that great of performance, and we also had this problem with too much memory.

And so the first thing we did, is we actually started to support muxing and demuxing inside of the shuffle. Okay? And so, what we would do is we'd say, "Hey, there might be K nodes, and there's N threads total across all the K nodes. Why don't we take each node and mux to a single [inaudible 00:32:44] before we actually do the partitioning for the shuffle?" And, initially, we were like, "Well, that's going to be concerning, because all of a sudden one thread is doing the work of many threads."

But what we found is, is by changing the pattern of the actual code to a fully columnar shuffling pattern, where we actually go through all the values, identify all their target values, and then come up with a pattern of exactly how far we can copy down before we actually need to flush any of the things, and basically do that a column at a time ... If you see here, we basically generate a bucket vector. We then, field by field, go through and do the bucket vector transposer shuffle, and then we can flush those things out. And so we can still do that gallery right at the end, and we do all this work in the middle. But this was a way that we solve both the buffering problem and didn't suffer from the fact that we were bringing this all back into a single thread.

So, another example of code ... One last example. This one's quick. So one of the nice things I kind of noted earlier is, is that one of the common patterns that we see when working with data is it might have a list of things inside of a record. And one of the things that people commonly want to do is they want to unroll that list and turn it into separate records because they want to work with it more relationally. And so one of the nice things that we can do with the Arrow representation is, is that basically we can unroll the list by basically dropping the offset vector and basically re-treeing the data. We actually never have to rewrite the struct vector.

Now, the one thing that we do, at least in our case, is that we do sub-divide the inner vectors if the lists are too long, so that we can stay within our target batch size. So you'll see that I may have take those three vectors, and then I turned them into individual sub-vectors. And so we actually have ... At least, in the Java library, we have this concept of what's a split transfer. Which is the ability to subdivide a vector, but only rewrite the minimum amount that's required to start that as a new vector.

And so in a normal case, like an integer vector, basically the only thing you have to do is make sure that you move any of the validity vector if you don't have the correct offset. In something like a variable-width vector, you have to rewrite the offsets themselves.

So, anyway, that's what I had. A bunch of things coming. Love you to all join the community.