Conquering Slow, Dirty and Distributed Data with Apache Arrow and Dremio

At the 2018 Data Science Summit, CEO Tomer Shiran spoke about Dremio and Apache Arrow, outlining how projects like Pandas are utilizing Arrow to achieve high performance data processing and interoperability across systems.

Conquering Slow, Dirty and Distributed Data with Apache Arrow and Dremio



Tomer Shiran:

So I was told to do this in English, which is actually convenient for me. I'm Israeli but I've been living in the US for the last nine or so years. So most of my presentations are in English these days. I asked them ... Hope you guys don't mind, I asked them to turn down the temperature here so nobody would fall asleep. So it's a little bit cold. It's actually the coldest room I've ever presented in.

So topic of this presentation is really how we deal with large volume of data, data that may be distributed in multiple systems. Maybe it's not ready for analysis, it's dirty and things like that. Just a little bit of background. Cofounded Dremio about two and a half years ago, prior to that was one of the first employees at a company called Mapbar, VP of product there. We'll talk a little bit about Apache Arrow in a second. It's a project that we started at Dremio about a year and a half ago. It's since become an industry standard in terms of of how data gets represented in memory for analytics. It's now foundational component in everything from SPARC, GPU databases, Influx DB, the next version of Mablab, the next version of Pandas, and so forth.

So what are the two motivating factors here? One, data science on modern data is incredibly hard. It's no longer a world where I can have my data in one CSV file or one Oracle database, and just point my tools at that data source and everything is great, right?  We have cloud storage like Gus Three.  We have no SQL databases like LastSix search, MongoDB. We have Hadoop clusters and just data distributed across so many different places that are not necessarily ... Here's a high performance SQL interface and go at it, right?

So that makes it really hard for companies. I was meeting recently with heads of data to Airbnb, Twitch, few other Bay Area companies and they're saying that in today's world, you almost need a data engineer for every consumer of data in their organization. So for every data scientist or business analyst, you really need a full time data engineer. And even those companies who are willing to throw a million dollars a year at some of these folks, can't get to a two to one ratio. So we have a problem. We can't hire our way out of the complexity of data infrastructure.

At the same time, most organizations or most users want self service.  My three elementary school-aged kids use Google all the time. So they go online, they ask a question, they get an answer. And then we come to work with years of experience in data, and it takes us weeks and months to get answers to questions. I think all of us now are so used to this amazing experience in our personal lives with data, and then it's extremely frustrating in our professional lives that we can't just ask whatever questions we want and get an answer.

What it typically looks like is this. We have our data scientist who are hungry for data. We use things like Python and all the stack in that world, things like Panda, Stencer flow, etc. Users of R, Jupiter notebooks etc. And then at the bottom here we have data in various places. Sometimes that's one data lake. Sometimes like three Hadoop. In other cases it spreads across many different systems. So what typically happens is that data gets ETL to a lot of custom development into a staging area. That can be a data lake like S3 as your data lake store, as your blob store, Hadoop etc.

So a lot of work, but then we try to query the data in that environment, and it's too slow, right? We just running a SQL engine on top of a data lake doesn't give us that interactive response time that we all want. So we end up ETLing the data into these data marts. So we maybe use Redshifts or we use an onprem store like Teradata, maybe that's more in the US. SQL server, Oracle and so forth. And then even that's not fast enough. So what do we do? We create cubes, and we pre-aggregate the data in the data warehouse itself. We create aggregation tables, and then we extract the data into CSV files and into BI servers and things like that. And then finally we get the performance that we want.

But the challenge here with this picture is ... First of all, just the complexity, the number of technologies, the amount of efforts to keep this pipeline working. And then the number of copies of data we end up with. And all the costs and security risks that come with that dozen disconnected copies of data all over the place. But more interesting to me, the fact that when this is what the stack looks like, self service is really just a dream. The user at the top can't do anything on their own. The less technical they are, the more that's true. They're really dependent on an engineer to do every single thing they want to do. Data engineering is constantly in this ... Trying to keep up with the requests. Can you run this query for me? Can you do this for me? Can you get me this data? Can you bring these two things together? Can you join this? And so forth.

So we decided to create an open source project about three years ago. We started working on that, spent about two years building that. Really focused on solving this problem, creating a much more elegant solution to this. So fundamentally Dremio is a ... You can ... download it. You can go to GitHub and see the source code. Basically hook it up to ... Hooks up to different data sources, often times that's one data lack. So if you have all your data in something S3 or Hadoop, that's a classic use case. And then the tools at the top that people want to use for their analysis, whether it's Python, R, Jupiter, or maybe some BI tools like Tableau.

The idea with this project is really to serve the needs of two different constituents in the organization. One is the data scientists and the business analysts, and one is the data engineers. One the side of the data scientists, it's really about providing this integrated self service experience, where people can interactively discover and explore the data. They can also curate new datasets. So we have a built-in data preparation interface where users can create new virtual datasets without creating any copies of data. Basically they're creating views and they can collaborate and share them with each other.

In terms of making the life of the data engineer simpler, we've developed a number of interesting technologies here. One of them is data reflections. Basically it's our ability to create effectively data structures similar to indexes or cubes that run behind the scenes and it can accelerate queries. So you can get orders of magnitude better performance than just having a SQL engine. And then providing what we called a vertically integrated query engine. A query engine that can leverage these data reflections to provide the 100x performance speed-up that you would want for interactive analysis. It can also analyze the raw data faster than many distributed engines, and then can also push down queries into the underlying sources. So if you have some of your data in systems like elastic search, we can translate query plans into LastSix query language, into a Mongos query language, into Oracle, SQL dialect and so forth. Does it apply to everything? If your data's in S3, then we developed a really fast part K reader to, for example, consume part K files from S3.

So the experience to the user, I'll show you a demo in a second here. We like to think about it as similar to Google Docs for data. What I mean by that is if you think of the Google Docs experience, or Office365, where people can collaboratively create new documents and share them and so forth, we want to provide that experience to the data scientists, where they can create new datasets. They can share them with their colleagues. They can build on top of each other. You can browse that lineage graph that shows what dataset was created from what other dataset. And all of this happens without creating copies. So at the end of the day, these are just views underneath the hood. If you're technical, you can write SQL. Most of the people in this room can. You can actually define these new virtual datasets through SQL as well.

And then there's a visual interface for curating data. So when you want to cast type, or extract a zip code from an address, or do things like that, sometimes it's easier to do it visually than to have to come up with the right regular expressions and things of that nature.

In terms of the performance aspects, how do we take a scenario where you maybe have terabytes of data, maybe petabytes of data, and still want to have that interactive response time, where you can use Jupiter notebook and create a bar chart, and get a response two seconds later? So a few things make that possible. One is an open source project we created called Apache Arrow. I'll talk about that in a second. The second thing is data reflections. These are the materializations of data that we can create in the data lake, and automatically leverage those to accelerate queries. And then it's the native push downs. The ability to push down processing into the underlying sources, if those sources have the data maybe indexed, like elastic, or things of that nature.

So before I jump into a demo here, just want to explain a little bit about Apache Arrow, which is the foundation for this. So like I said about 18 months ago, we created an open sourced project called Arrow. Jacque, who's my co-founder, is the PMC chair of that project. What Arrow really was designed for ... We realized there was a gap in the market in terms of ... In the industry, in terms of having a standard way to represent data in memory for analytics. So we basically develop a specification and some libraries, and we worked with Wes McKinney, I don't know if you guys know him. He's the creator of Pandas, and wrote the O'Reilly book on Pandas. At the time was working at Two Sigma, and worked together on this new specification and set up libraries for columnar in-memory data. Really focused on doing it in a way that's very optimized for Intel CPUs with their instructions. So vectorized processing, that's the ability for the CPU to do the same operation on multiple values at the same time. And also for Nvidia GPUs, leveraging GPU acceleration when that's available.

It's designed to work with any programming language, and also to support not just relational data but also nested structures like maps, lists and so forth. And then all development is done through a consensus, open sourced development model. So I think last time I check we had about 102 contributors, various different organizations. So different programming languages have been contributed by different companies, or different open sourced projects. This is a small subset of the ... Of some of the projects that are now built on Apache Arrow. Just a few examples here. The integration between Sparc and Python is now based on Arrow. That ability to go from a Sparc data frame to a Panda data frame using the two Pandas functions in Sparc, and vice versa, that's become about 20 to 50 fast-

Tomer Shiran:

And vice versa. That's become about 20 to 50 X faster in the latest of spark because of Arrow. High fun is now based on, or the panda's library, which is that kind of the foundational data science library in python. That's now based on Arrow. In fact, West is outside of Germany, is probably the top contributor to the project.

And that's also the foundation for other components, if you've used things like for example, that's all sits on top of kind of Panda's. Dremio, of course, is based on that. H20 dot AI's and AI Library, that's based on that. Lab, if you're familiar, that's the, one of the labs at Berkeley, originally the lab that developed spark, they used the, they actually built like an Arrow cache now inside of a project called Raid. And Influx DB recently contributed the go driver for Arrow. That's their way of developing kind of a single like interface on top of influx DB. Also exporting data from influx to other systems.

So the idea here was really to create a standard way to represent data and memory, and then not only for that to be beneficial to an individual project like many of these projects, but also to provide a foundation for the future so that as projects are starting to talk to each other, because we're in a world of heterogeneous tools and environments, we can now do much faster processing between systems, right? If you think about in the past, we used ODBC, JDBC, that was kind of, that still is the standard way in which systems can talk to each other, right? Or rest APIs. And that means going from, here's a very high performance representation of data to I'm going to serialize and deserialize every single record. I'm going to pass it in an inefficient format over the wire, and so we end up spending 90 percent of our CPU time just on serialization and deserialization, and kind of changing formats of data. So we wanted to completely change that and move to a world where there's a way, yeah, you can move a batch of ten thousand records as a whole from one system to another. And all these different systems work on that same structure of data. And so that's really what we're doing here.

As recently, just last week in London, meeting with one of our users during those users, is one of the large banks there. And they use this system called KDB, very fast time series database using financial services and trading, specifically that was a foreign exchange trading use space. It was real interesting because Arrow structures are actually very similar to some of these high performance, specialized systems for kind of in memory processing.

So with that, let me dive into and show you what it's, what this looks like. So I'll show you kind of a demo of Dremio. Hopefully this will come up. So I'm dependent here on my VPN connection, so this is a Dremio cluster here. Let's, hopefully this works. Okay, so this is ... right now I'm connected to a Dremio cluster that's running in the cloud, so this is just happens to be running on four instances on basically four VNs, right? Normally, deployments tend to be bigger because people use it for larger volumes of data. And so what you can see here is, you can see that we're connected here. This is the user interface. We're connected to a number of different data sources, so this is Azure data link store. I have an elastic search cluster, a environment, some other data sources here. Normally, and our users would have, generally the users would have maybe one data light, maybe a few other sources, but this is a demo cluster, so we've connected lots of different things here. And I can browse data. So if I, basically all the data, even if it's nonrelational, it gets exposed in the exact same way.

So this is a database called, so I database, if I click on business, I dive into this one collection inside of Mongle DB. And you can see it looks like, just like a relational table. Every data set in the system has coordinates, so Mongle.Yelp.Business is this one. Going back to the main screen here, we have spaces. So spaces are a place where we can create virtual data sets. And so if we're curating data, we're prepping the data, maybe for a project, or maybe for a specific use case, we can create a new space. So let's go ahead and create a space called Data Science Summit, DSS. And we'll pin that to the top here. Right now there's nothing in this space, of course, because I haven't done anything there. And then every user also has their own personal space. So as a user, if I want to upload a CSV or an Excel spreadsheet and join it with my data NS3, very easily do that.

So let's look at some very simple example. Let's say I'm an analyst working for the city of New York, and I want to do some analysis. Now you remember the word MTA being part of the meta data, and so I might sort of use kind of the catalog here and search for MTA, and that reminds me that I have this data set here, the last five years of taxi trips in New York. So this is a data set of about a billion records. 1.03 billion. Every record represents one taxi trip. So I have the pick up time, drop off time, number of passengers, the trip distance, and so forth.

And at this point, I can do one of two things. I can start interacting with the data visually, so I might say, you know what? Let's drop this column. Let's make some changes, maybe share it with a colleague, so creating a new virtual data set. Or I can jump right into an analysis. And so if I'm a BI user, I can click on say, this tableau button here, and I can start interacting with this data visually through a BI tool. In a second, I'll show you how we do that programmatically through, more as a Python user. So here I'm a less technical BI user, maybe I know sequel. I can also do this through a sequel editor. And I can start aggregating, so I might do a count star on the number of records. And you see here, 1.03 billion records. So if I was using something like, spark or presto, something like that, this would be about five to ten minutes per query. And so you can see that the queries are much, much faster than five to ten minutes, right? They're coming back in less than a second.

So this is an aggregation by year. You can see the number of taxi trips has gone down from 2012 through 2014. That's probably because the like Uber and Lyft became more popular in New York. I can look at the tip amounts, so I might look at what is the average tip amount for a taxi trip? And I'll change that to an average, okay? And so you can see that tip amounts have gone up from 2009 to 2014. Let's change the color so it's easier to see. Okay. And you know I might say, let's aggregate by month instead of by year. So let's change this to a month. And you can see that, at tips are higher, taxi drivers get more tips later in the year than they do earlier in the year. Maybe people spent all their money on the holidays, and they kind of become cheap come the new year. And in July, a bunch of tourists in New York, they don't like to tip maybe, right? And so I can do all these different kinds of analysis.

I can also do that exact same thing through a Jupiter notebook, right? So if I'm more of a Python user here, you can see I can run this query here which is extracting the month. I'll make this bigger so it's easier to see. So I'm extracting the month from the drop off time, and I'm aggregating. I'm selecting that column as well, calling it month, and then counting the number of trips, and the average tip amount, okay? So as I run this, you can see that that comes back very fast, right? Here's another example, loading into a panda's data frame and doing a. Okay, and that's one example of a query that can be fast, where I'm doing aggregations.

Another example is maybe I'm looking for all the taxi trips that occurred on a specific data. So I'm not looking to aggregate the data. I'm looking to just select specific records. So here I'm looking at the day, the number of passengers and the total amount paid to the taxi driver, for taxi trips that have this vendor ID, and occurred on a specific date. But I'm only selecting the first ten, so I don't blow up my notebook here. So again, I run that, and I get a very fast response time. And the reason I'm able to get this kind of fast response time, whereas normally, if I was just using distributed sequel engine, you know spark, presto, drill, hive, etc., it is because A, we're using Apache Arrow to accelerate these queries and leverage kind of the modern intel CPU instructions for more parallism in the CPU, and then the second reason is that we have these data structures called data reflections.

And so if I go here into the user interface, and all this thing is available through APIs as well of course, you can see the queries that have come in from the tool. So in this case, I can look at, start with a simple example, so let's say this query here that came in, where I was extracting, I was aggregating on the month. So this is the query that came, I think from tableau, right?

So it's this ugly kind of sequel. And you can see, we say that the run time here was, it was less than a second, came from an ODBC client. And you can see it was accelerated by this reflection. So again, a reflection is a materialization of data that we maintain and update on a regular basis in the cache. And the cache in this case, lives on S3. It can live on Azure data light store. It can live on HDFS or just on the local disks of the Dremio cluster. And I can click on that and see what is this aggregation that we're leveraging here. So in this case, you can see we're leveraging this aggregation here that has the data pre-aggregated by the vendor ID, the pick up time, the drop off time, the number of passengers, and a few other things.

And so the idea here with these reflections is that the optimizer has various different representations of data in the system, related to different data sets. Some of them may be partial aggregations. Others may be the raw data, perhaps partitioned in a specific way. And then the optimizer is able to automatically figure out, using various algorithms, which reflections we can leverage at query time to accelerate this query, right? So if you think of how is Google fast? It's fast not ... you know, they don't just go, use five thousand machines and scan all the web pages when you go and search the web, right? They've prepared various data structures, various models, various indexes. And so the idea here is to create different materializations of data, not necessarily the raw data, maybe aggregations of some of the columns, maybe some of the columns, partitioned in a specific way. And then have an optimizer that can automatically figure out when you submit a sequel query, how to leverage those reflections, right?

So that's what we're doing here, and that's what makes these queries so fast when the user's actually running a query. So if I go here, show you a few other examples, of this set of notebooks here. These are just some set up functions for the notebook. What we're doing here is we're presenting all the data in virtual data sets and in the physical data sets in one name space, so from the standpoint of the Jupiter notebook, the Python user or the tableau ...

Tomer Shiran:

From the standpoint of the Jupiter notebook, the Python user, or the Tableau user, or the Power BI user, Dremio appears like a relational database to the top. So it basically sees us like you would see a relational database. You can submit a query, get the answers. Right? And so you can see the schemas here. DSS is one of the schemas in this system. Okay? I'm going to skip through a few things here.

This is an example here where we're joining data across two different sources. So you can see here, we're looking at a join between this data set. This is what we call a physical data set, so it's a table inside of Oracle, and a table inside of Postgres. We're looking at the departments and the employees and doing a join on the department ID, and then looking for which department has the highest salaries. Okay? So this is a join between two sources, and basically, the way you do that in Dremio is every data source, every data set in the system has coordinates. It's a dot-separated path that allows me to specify that. That could be a directory of Parquet files, or CSV files on S3, and it could be an Oracle table, and it could be a Postgres table or an elastic search index. Right? At the end of the day, you just have to refer to it using that canonical set of coordinates. As an example, where we join these two things, and then we get the answer here.

You can also leverage depending on the data source. For some data sources we maintain more statistics than others, like a number of records at a minimum and things like that. Where we do have the statistics, we try to leverage them. I'd say right now, the system's very good at that when it comes to data lakes, like your Hive tables and things like that, including in the Cloud ... Okay at that on the relational side, but there's more work we're doing now to improve the connection to relational databases.

Other questions? By the way, I can hardly see anybody here, because of this light in my face.

Speaker 2:

I have a question about writing the data back. Do you need to write the data back to the data source itself? Do you need to write the data back to the data source, the original data source, or you can write it back to [Realm IO 00:24:24], and it will propagate it to the data source?

Tomer Shiran:

Yeah, so when you're using the system ... Normally, we don't write the data back anywhere. They're reflections themselves, those materializations of data that we use as kind of an underlying caching slash indexing system slash cubing system. We can store those in the data, like on S3, or HDFS, or something like that. Right? We don't normally write back into an Oracle database, let's say. We do allow you to export data, if ... So, as an example, let's look here. Let's say I have this data source. I'll give you a very simple example here. So, I had this data source of businesses. These are the business in the United States on the Yelp. Yelp is like a review website in the US. This is ''. I might say, "You know, I don't need this ID column," and maybe ... See the city here? Maybe I only need a subset of the cities for my analysis for building a model. I only need Las Vegas and Phoenix for my model. So I'm going to do that. So maybe ... You see 'categories' here? That's an array. I really don't want an array. I want to unnest that, so I can do a group bind and figure out what's the most common business category.

I might say, "Let me unnest the categories. Instead of having an array, I want one category per record." I'm going to rename that. Basically, all the changes that I'm doing here are virtual. I haven't created any copy of data. Basically, I've created a view. In fact, if you open the SQL editor, you can actually see those changes in SQL. You can see where 'city in Phoenix and Las Vegas', and you can see the 'flatten' function here and the renaming of 'categories' to 'category', right, is these things that I've done.

So, if I save this now, and go and save this inside of the DSS space, and I call this, let's say, 'categories', right? I can now access this data set, but I don't have to export the data. Okay? This is virtual data set, and I can go query that data set. Let's say, here, if I wanted to go to here and, say, query DSS dot, what did we call that, 'categories'? So let's say I want to run this query here. You can see I have this 'DSS.categories' as our new data set I just created in the DSS space, which you create at the beginning of the demo, and I can do that. I could go here and say, "You know what? I want the address. I really want to do analysis by zip code." So maybe I want to look at the zip code here. I can click on a zip code in one of these examples and click on extract. We try to recommend the regular expression that would be used to extract the zip code. If it doesn't work, you can flip the card here and provide that yourself. I'll rename that to 'zip'. If I save that now, I'm basically saving a virtual data set.

We version control all the changes so you can always go back in time, but if I go back to my ... Where was my Jupiter? Environment. So if I go and rerun this query again, you should see now we have this new column called 'zip code'. This is a calculated field, right? It's a virtual column. I haven't materialized this anywhere. Now, if there was a data reflection, let's say, on that original data, the 'categories' data? Even if that didn't include the zip code, that would still be okay. The reflection doesn't have to be the exact answer to the query that somebody's looking for. The optimizer will recognize that. We can build on top of that and do a cheap calculation rather than doing a more expensive calculation at run time.

Speaker 3:

I have a quick question on data sets...

Tomer Shiran:

That's a good question. So, we have work. How do we know which reflections to create in advance? I'd say there's where we are today and where we're heading. So, where we are today is that you can kind of ... Users are allowed to vote, and they can say, "You know what?" Let's say I'm a user using this 'DSS categories' data set. It's too slow, so I can vote on this data set here, and then the data engineer, the admin of the system, can kind of see what people are voting on. We can make some basic recommendations today based on the cardinality of the columns, cardinality of the fields.

What we want to do though, and that's where we're heading, is the ability to automatically figure out the right reflections to create. Think about it as an exercise in, we see all the history of queries, what people are doing. We understand them already very deeply at their relational algebra level, right? Then you might want to say, "You know what? I'm going to allocate ten terabytes of space on S3", let's say. "Please figure out what's the best bang for the buck given ten terabytes of space."

There's always a challenge. We talk to companies about that kind of an approach, and they're always like, "Well, but the CFO has a dashboard they log into once every day. It's only the CFO uses it." So, it's not going to be a common query that you're going to see, but it's the CFO, so it has to be fast, right? We will always have the ability to manually define these as well, because it's not always that the most common thing is the most important thing to the company either. Certainly, the next step is to make that much more automated in terms of learning from user behavior.

Speaker 4:

What is the consistency model between reflections and the original data?

Tomer Shiran:

Consistency model between original data and the reflection, so right we now allow you to define the reflection's update schedule as the same for a given source. So you can't define, for a given data source, two different data sets are going to ... One is going to get updated hourly and once every five hours to avoid those types of problems. It's not atomical in any way, right? Even, irrespective of that, it's not an atomical operation, so you do have to make sure that you're okay with any caching system, right? Things get updated, when they get updated, but we do try to make sure that it makes sense.

I didn't mention, there's also a way ... So Dremio can update and maintain these reflections automatically for you, and we'll use our engine to create them and so forth, in either incremental or full updates. You also have the ability now, with the 2.0 release of the project, to create your own reflections in your own ETL tool. So you'd use Informatica, or Spark, or whatever you want to use, to create some data set of Parquet files, let's say. Then you can register that with Dremio and say, "This directory of Parquet files in S3, or this Hive table, represents this aggregation." We can then leverage that. Then it's up to you when you want to update it, how you want to update it. That's more if you have developers that can do that, or maybe you already have a pipeline that does these things.