March 2, 2023
9:35 am - 10:05 am PST
Fast Data Processing with Apache Arrow
Using Rust, Apache Arrow, and table formats, data can be efficiently processed closer to the hardware and without any pauses. This session will explain the pros and cons of Apache Arrow for data processing and compare the performance with Apache Spark — the “standard” in terms of distributed processing of big data. We will discuss the advantages of the Rust language, including Rust Arrow and the tools available, the missing pieces, and performance comparisons.
Sign up to watch all Subsurface 2023 sessions
Note: This transcript was created using speech recognition software. It may contain errors.
Andrei, I’m a senior software engineer at Adobe. I’m part of the Adobe experience platform, data Lake team, and working in big data and distributed processing. I’m trying to com to contribute to opensource projects like Iceberg Data Lake Hyperspace, and recently I’ve been enjoying doing stuff with followers and data fusion. Today I’ll be discussing the purpose and benefits of Apache Arrow,
Apache Arrow and how it can provide a performance option to process data sets. The agenda is the following, will get a bit into Apache Arrow format and it’s available implementation. We’ll discuss the differences between them. We’ll switch to query engines and make that make use of error format. We’ll look at two of those both written in Rust. We’ll look at data fusion and all. We’ll dive into some queries and see how similar or different they are, are between the two and we’ll compare with Apache Spark. Lastly, we’ll discuss the gaps and where to use which. So let’s jump into it first, what exactly is error? Apache Arrow is an in data format that was developed to provide a common data representation for big data analytics systems. The format provides a standardized way of storing and exchanging large DA data sets, which is especially important in the world of big data where data is often generating from many different resources and systems.
One key benefit of a patch arrow is that it’s built for in-memory usage. Think about the efficiency of c plus plus array type, but on steroids with all kinds of inner types and nested structures. Arrow has these types well defined and built for memory efficiency now being built like this zero copy aspect can be achieved and sterilization and sterilization minimized. Another very important benefit of Apache Arrow format, it it’s use of columnar storage. That is an efficient way of storing data in columns. Data learning growth, this has several advantages for, for big data analytics, including faster data access and improved compression. By storing data in columns, Apache Arrow allows for more efficient data retrieval and analysis, which is critical for big data application. Another two related and important aspects of Apache Arrow are cross platform compatibility and language agnostic aspect of it. Aero provides a common data representation that can be used across different programming languages such such as Java, Python, ra, c plus plus, et cetera.
This means that data can be easily exchanged between systems regardless of the programming language they use. Apache is a structured format and this means that there are well-defined types that should be used to leverage performance when operating on types. So operations on integers are different than differently optimized than on strings or by array. This structured aspect help minimizing the overhead of sterilization, the sterilization type transformation and memory footprint. It’s also an open source project, which allows for a large and active community of developers to contribute to its development and a advancement. Apache format has a strong ecosystem. It is integrated with many popular big data analytics tools and technologies including Apaches Park, Apache Hive, et cetera. This allows developer to easily integrate Arrow into their big data analytics pipeline and take benefits of, of all these advantages. Now let’s get a little bit more into this concept format and try to understand its benefits.
So on the top row we have an example of a data set stored in an already well known roadmap. Storage groups of entire row are stored in pages inside files. In this simple example, the first two rows get into five, one next to two, and so on on the bottom we have a column format stored in files. As you can see, the whole column one gets into file one, part of column two and file two and so on. We can see the difference in how the data is laid out on the storage, but now let’s look at a couple of queries to understand how these queries work, which with each these kind of formats. First query is the well known select star from table where predicate. So what this will do, it’ll load a file in this case file one that has two rows.
We’ll keep the row that match the predicate and drop the second one. In this case, one single file open the same query in the second case of columnar format shows that it’ll load three file, file one, file two, and five, four, and drop the majority of the data keeping one filled from each file. In this case, two files open. Note that it’s not that efficient in, in this case, looking at the second query, we do an aggregation that things looks look different. For low based format, it’ll have to load a lot of files, file one to five, four, and then drop the majority of data. In this case, lots of files open and to be more accurate, all files are opened. Same query on the columnar format shows that you’ll not only the files containing in the column on which we do the aggregation, no data dropping only needed values are loaded.
We can see that from the use case where the fours are needed, the best storage, it’s the is the the one base while for use cases where aggregations are predominant, the columnar format is desired. Now there are other benefits of of columnar format and these are because of the fields with the same type are closed one to another, mostly in the same file that have become easy accessible, no need to switch between different types. And oh, one random access time is another important benefit that comes with this format because it’s in memory and columnar data is stored sequentially and it has well defined types which size. So when doing an access, it’ll know where to go to fe the data. Now the single instruction, multiple data vectorization is possible with columnar format set of fields. Values will be processing parallel in in modern process or, and then we have the zero copy access, which means less ization ization, less memory consumptions.
Now there are quite a few format that are columnar like Parquet or C there is the arrow fever feather format. So yeah, it’s, it’s pretty used. In terms of implementation, let’s have look at two languages, CNC plus plus CCC plus plus and rust and see how we stand with Arrow. There are duct DB and S databases that have that own implementation of Apache Arrow format in c plus plus, there is also the official LI Arrow library in c plus plus that include aero streaming execution engine. On the other side, on the rust, we do have the Apache Aero Data Fusion query engine that makes use of Arrow, RS and Paulas data frame library that uses Arrow two. From this point, we’ll be focusing on Rust and Arrow living behind CC plus plus and the other general aspects. So let’s first go on the official implementation of Apache Arrow in Rust called Arrow Rs.
So currently this implementation is at version 34, and by that number looking at that big number, we can see that heavy development is happening on on this repository. some of the main modules and libraries that are available with Apache Rs are, are Arrow flight high performance data transfer framework that utilizes Apache Arrow memory format and Arrow object store or fo focused easy to use and high performance asynchronous object store library interacting with stores like s3, blobstore, world Cloud, et cetera. Now, Apache Arrow RS supports all major five formats like csv, parque, Jason aro O R C, reaching to the other implementation called Arrow two. Arrow two is a transmute free last implementation of Apache Aero format. It’s written by Jorge. The project does focus on simplifying the use of Apache Arrow by removing the need for data conversion steps.
So that’s why it’s called transmit free. it also optimized for polarization with single instruction, multiple data featuring more types than r R s and it’s also memory safe due to Rust. R two has libraries for accessing accessing csv Jason Parque formats. It supports the Arrow flight PC protocol development is going fast in this implementation and it is used in powers and possibly in the future it’ll be used in data fusion cause I have seen some works going that direction. Now you may wonder why Rust, why was was chosen in these projects. So Rust gives memory safety from its conception core pillar in rust. Its memory safety. In fact, that’s why Rust has been built to have the CS plus plus speed with memory safety. The concepts of borrowing and ownership are core elements in Rust concurrency due to the same borrowing and ownership concepts.
And together with mutable and immutable concepts, a currency issues are eliminated or minimized. Minimized performance is very, very similar and close to c plus plus code. So no ish collection downside. Here it is cross platform rust code. Rust code can be compiled on so many platforms starting from embedded micro controls, even bigger processors and going up to silicon processors that are new right now. So Rust is very powerful language and started gaining authority and traction in big data distributed processing due to this advantages. Now let’s switch. Let’s switch a little bit towards these engines. First we’ll look at aero data. Fusion Data Fusion is an open source query engine designed to be fast, efficient, modular and is based on arrow format.
It’s a modular and also accessible architecture. Makes it easy to add new functionality in features. This enables user to customize this engine according to their specific needs and sometimes use parts of it in their products or flows. For example, influx DB’s latest engine is based on data fusion. Because data fusion is built using Apache error columnar memory format, it does provide a high performance output. The columnar format allows efficient memory utilization and enables data fusion to process data much faster than traditional road based processing engines. Data Fusion provides built-in connectors, core values, data sources, including CS files, perk and Apache Kafka. this comes also from Arrow format. This enables user to to easily query data from different sources without having to provide custom connectors. Lastly, but not released beside the data from API provides a complete SQL engine that supports a wide range of SQL queries, including joints, sub queries, filters, and aggregations.
It also provides support for standard SQL syntax and SQL functions, making it easy for user to work with the engine. Now let’s look at some code and how to use data efficient with data from API and sql. So first we need to create a session context, which is very similar to what Spark does. All feature actions will be tied to the context. Then we read a par file with Read Parquet and select only the columns by, by specifying the collections of columns. In the end we do a filter and show the results down is the same query but express with sql. The same first step of creating a session Context is present. Next, we need to register the parking file, the table inside the session context with a specific name, in this case my park table and that name we can later use in our sequel querys.
Then we define that query similar to any compatible SQL database and we show the results. As you can see this is pretty close to what we, what we do in Spark and other engines, and it’s pretty neat to be able to have both the data frame API and SQL available in data fusion. These are just simple examples of how data fusion can be used, but it does support complex queries like aggregation joints and I’ll show you that in a bit later. But for now, let’s look at Polars, the new kit on the block. Polars is a library that provides data manipulation and analysis for Tabler data sets. Polars is built on top of arrow to library. It is fast and it’ll, and we will see a bit little later in some slides that arguably it holds the crown on performance. It is very easy to use that it has Python bindings and can be used.
Similar to performance-wise, the for the majority of queries, polars has a higher efficiency and data fusion, but yet it depends from query to query and I’ll show you that a little bit later. Besides the data frame api, similar to data fusion, it provides a SQL engine that supports a set of SQL queries including joints, filters, aggregations. Currently it’s a bit less developed than data fusion, but community it’s working on IT. Followers can use multiple types of data sources including CSV or k as data fusion on other engines enables users to have these available and query data from different sources without the need of writing code. Now let’s see some queries. In fact, let’s see the same queries that we did in on data fusion followers. So first let’s load the data. Lately we do that with lazy frames can perk method. This only defines the dataset and does not go to the disk.
Next we select our columns and while we are providing the list of those columns, we can define the filter right there. As you can see, call one filter, call one greater than one. After that we collect the result. The second example is the same use case express as the same lazy loading of par file needs to happen. Then we create a sequel context similar to data fusion and spark and register data frame in that context as a table giving it a specific name. Next, we define and exhibit our query and collect the resulting roles. So now let’s switch and let’s, let me run some code, some rust code and I’ll show you some aggregation and joints on both data fusion and polars. I have an example at the end that it’ll be surprise. So here is a RA project with some examples. We live out the, the simple ones. aside. I will start with this Query two. So query two does an aggregation. Query two is this one here. So we try to get a meme, an average and max value of a column of a table. Let me run this.
So the tables that I’m using are provided and downloaded from open data blend platform. They are free of charge, they’re free to download and to use. They have anonymized data inside them. So it run over a dataset that has about one gigabyte of data, about 80 million rows, and the result did take about eight seconds. Now let’s switch to the SQL one and is this one simple one, select mean of this column and average and max of that. Let, let, let it let it run. So it should take the same amount of time and should bring the same results and yeah, so it takes about eight seconds or so.
yeah, I was saying about this dataset I have to dataset one, it’s big, the other is smaller. yeah. Now let’s move to a more complex query. Let me switch to this one. It’s query three. I will show you only I will show, I’ll run only the sequel version, but I can show you also the data frame version. So what we do is we do a join of two soup queries. So we want to optimize or pre optimize and we want only to take 10,000 values of the big table and we do one to filter by a predicate before doing any join because join is very, very complex and very heavy lifting. We do that here and then we do the same thing, the same approach on the right table. And then we do an inner on those two tables run this.
So this should take small amount of time because we do that pre optimization. So yeah, it did take about 150 milliseconds, which is pretty fast. The data frame version, it’s here, it’s pretty similar. We read a parking file, we limit here and we do the predicate on driver vehicle key, the same on the second data set. And then we just join data frame one with data frame two, that’s all that we do. Now let’s switch to the polls and show you the first query is the same query with the aggregation mean average and Mac. we run this example. So yeah, the same table is used, the same data dataset are used in both cases. in this case, as you can see here, it takes about 2.5 seconds instead of let me find it. Eight seconds, 8.3 seconds in, in data fusion case.
Now let’s switch to the last query. That is query three. I want you to show something here and I’ll let it run In the meantime, I’ll explain you why it, it’ll take a time. so query three, I try to do the same query that I did in data fusion, but Powers doesn’t support sub queries. So I haven’t been able to do that pre-op optimization of taking only 10,000 rows. So what happens is it’ll take, it’ll take the whole one gigabyte table and join it with the other table. And that takes longer. At the end we limit 10,000, but that after the join. So yeah, that pre-op optimization is not available in in pillars yet.
So I’ve been running this a few times. It takes about 40 seconds, 39 seconds and 40 seconds. and this because of that missing pre optimization that I wanted to use. Yeah, 42 seconds. The last example that I want you to to show is reading data from a Delta Lake table with data fusion. So we do have here this data dimension vehicle. I know it may be small, dunno how to increase the phone, but it has a Delta log and the pocket files here. So to use the app with data fusion, we just say open, open table that provides a table and that table will register here in in the context and give it a name. Then after that we can use SQL to Delta table. The Delta Lake table. let me switch there and run it.
yeah. Now the integration between data fusion and Delta Lake Table has some bugs. It’s new and has some bugs. Some queries fail, some queries break. But yeah, it’s nice that we can, we can improve and have this running. I’ll switch back to the presentation right now and go to the last slides that I have. We’ll discuss about performance polars bit. Everything in terms of performance. Second to to, to Polar is data fusion and then I would say Spark. Now the file sizes that has been used or the payloads were from 500 megabytes to 50 gigabytes of data. These chart and these performance tests were provided by H two AI DB benchmarks. so now beyond 50 gigabytes of data, I would say we should go straight to Spark at this moment because Spark has horizontal scaling and it’s really distributed on multiple node, but both polars and data fusion are promising.
Here is the comparison in terms of features. Spark is distributed, has vertical scaling, horizontal scaling. Now Polars and Data Fusion don’t have horizontal scaling because they’re not distributed. But Data Fusion does have a project called Barista that tries to make it distributed, but it’s very hard to make it run and very complex to, to maintain. yeah, I’ll go next because the next slide is more interesting gaps and when to use which of these engines. So gaps both data, fusion colors, miss Integrion with different cloud storages and table formats. Now there are some work that does some work that has been done in those areas. Data fusion is more advanced in that area. So I’ve been able to read with data fusion straight from Asia Blob and also as you can see reading from a data table. Other limitation is single node execution. Either of those are distributed and they are not enterprise ready yet.
So features that we see in Spark are not available in in polars node data fusion. Now, when to use which one, if you have a large amount of data above 50 gigabyte go to Spark, you can distribute the workload on multiple nodes. If you have under 50 gigabyte, you can choose either ERO data Fusion or Polars. Now if you want a range, a wide range of SQL features go with data fusion because that’s nicely implemented there under 50 gigabyte and Python, you can you go with Polars, but again, some queries are still having issues. But yeah, that’s how I would choose which one of those. In the end, there are some links. The, the example that I presented will be available on UB publicly so we can be able to take them and use them. And the open data blend data sets are available also publicly.