March 2, 2023

9:35 am - 10:05 am PST

Supporting 1000s of concurrent interactive queries with Dremio Sonar

As the world moves to cloud data lakes, the potential for lakehouses to disrupt data warehouses are increasing, and with that comes the need for fast query engines. In this talk, we’ll explore a real world low-latency high-concurrency use case that Dremio Sonar was able to solve. We go over execution engines, query planning, C3, workload management, reflections, and runtime filtering – all that helped achieve Dremio Sonar execute 1000s of concurrent interactive queries

Topics Covered

Customer Use Cases
Lakehouse Architecture

Sign up to watch all Subsurface 2023 sessions


Note: This transcript was created using speech recognition software. It may contain errors.

Lenoy Jacob:

Good morning, good afternoon good evening, wherever you are. Thank you for joining me today. Today I’ve got a real world use case to show you folks. it is a high concurrency and a low latency use case that one of my customers had. and I’ll walk you through what that use case is and all the features of DR. Or Sonar that helped be a big part in being a solution for this use case. All right. let’s begin. let’s talk about what the customer was doing from a business standpoint first, right? And what we were helping them with, right? And then I’ll get into the actual testing details. So as part of the Data Lake platform initiative that they were building, the customer had a use case that needed to provide a dashboard with reporting and analytics to their end customers.

They wanted this to allow them to query data that these end customers themselves have got. So it is more of an embedded service to their end customers. And the idea was to embed, renew into the platform so that these end customers could have query capabilities, right? so what did this entail? well, since the data sets themselves were owned by the end customers they were completely custom, right? a lot of the queries were dynamic and impossible to predict beforehand, right? And since this was a multi-tenant use case that means you had more than one end customer as you would expect one customer’s dataset would be drastically different from another, right? and also because one dataset can be completely different from the other, the queries also were drastically different, right? in fact, the customers themselves did not have much idea about the datasets themselves, right?

But they did have some idea about the data types. And and I, you know, what was in the data set, right, as well as some ideas of the measures and dimensions that could be used in a query. so these reports and dashboards were originally serviced by Elastic Search. And what they told us is that this was becoming expensive and hard to maintain, and they started to build data lake and they had an ability to dump the data into Amazon S3 and use some query engine or the other to query the data, right? They were already using Trino for other use cases in the company. And after a quick proof of concept with Trino, they wondered if Trino was actually the right choice for them and if there was anything better, right? so they began evaluation. We were looking at Dremeo to either replace or at least augment Trina.

Alright? So let’s look at the proof of concept that they did with Dremeo and Trina, right? this was a high concurrency and low latency use case. for, for the proof of concept. they provided 1700 queries initiated randomly at a rate of 30 queries per second. Now, keep in mind that this is 30 queries initiated per second, right? So imagine if a query took a few seconds to complete, then you can imagine that there’ll be potentially hundreds or thousands of queries initiated against Dremio or Trino at any given time, right? they provided about 160 different data sets from 160 different customers, right? and the overall data scan from a query perspective was about 40 gigabytes post any data pruning done by either Trino or Ramo, right? And the data format was in Parque.

 so looking at the results we can see that Trino took used about 20 workers with 16 cores and 64 gigs of memory. and there were 30 queries randomly selected from the set of 1700 queries that they provided and initiated against Trino and Dremio for about five minutes at a rate of 30 queries per second, right? And then they measured a few things. They measured the median which is TP 50, as well as the TP 95, right? So TP 95 is essentially the 95th percentile. And what that means is it is essentially the max time of all the, of a query of all the queries that that were initiated under which 95% of the queries have been completed, right? so for Trino for example it’s served you know 50% of the queries with the max time that a query took being 1.2 seconds.

And for Drio, that was half a second. and the 95th percentile for Trino was about five seconds, and for Drio was three seconds, right? And the key thing here is, if you notice that drio did all of this with half the hardware to achieve to, to achieve these results, right? with, you know, drier using just 10 Dr. Executors as opposed to trio 20 workers, right? this is actually a testament to PA and the column execution that a query engine like drio can do using such a technology, right? So what were the features in drio that enabled such an aggressive use case with, with lesser hardware? so one of the configuration options that you will be able to set in REO is a feature called caller cloud Columnar Cache, or c3, right?

 C3 is an immensely useful feature for optimizing performance. it provides an opportunity to store locally on the dremeo executors, the data that we fetch from the data lake storage, right? So this not only allows for faster initial reading, but also facilitates subsequent use for in later queries, right? And the reason this is important is because data lake storage is like, S3 can be highly, highly variable in performance, right? data links like S3 are, you know, theoretically infinite in nature, but as a consequence, they are kind of slow, right? so by creating a cash on the executors, on the drumming executors where we store the data we are you know it kind of means that we can use that cash on the second, third, or fourth run of the query, right?

 it’s also already right on the executors. so we don’t need to spend time, you know, re fetching it from the data lake right now because C3 will be stored on the executors. It can be as fast as the disks you use to attach them to, right? So what that means is if you have got fast N VM E storage or SD storage that can be attached to the executors using C3 with these N V M E or s d disc will vastly improve both the latency as well as the throughput because we are going to read because we’re actually going to read from these highly optimized storages, and we are not going to have to connect to the data lake to get the data. so when you’ve activated c3 correct correctly you will actually see that it is, you can actually see in the REO query profile whether it’s being used or not, alright?

Right? the next thing was data reflections, right? reflections are a very unique way in dremeo and one that separates dremeo from any other SQL query engine out there, right? they are the primary means of achieving performance that might not be achievable in any other way, right? So if there’s a highly reusable query on a view let’s say that’s got a complex join or something that takes a lot of processing power and resources, and if it got a hundred users running that same logic over and over again, then it just kind of makes sense to just materialize that view, right? why waste computing power for the same logic over and over again, right? The cool thing is that Dremeo picks out the best reflection for your query automatically, and we do query rewrites as opposed to the user keeping track of, hey, which materialized you to use, right?

 Dr automatically rewrites your query is to basically pick out the best reflection for that query. And so if you’re querying a data set in dr, which is accelerated by a reflection, then does not need to retrieve any data from the original sources, and will instead retrieve the data to fulfill the query from the reflection store instead, right? one of the things that is important to note about reflections is that we have the ability to effectively use a reflection that doesn’t necessarily need to cover the whole query, right? So in this case, we might get some of the results from the reflection and the remaining data can be retrieved from the original sources, right? there are a few different types of reflections. There’s raw reflections, aggregate reflections, and external reflections. I won’t get into the details in this session, though.

 you can find more information about them on our website for our customer use case, though we could not use reflections because of the dynamic nature of the queries in the data sets, because there was no way to know the queries beforehand to turn on a reflection. but if we could have turned them on, we believe we could have achieved much better results than what we got, right? Again, this, this proof of concept that we did was a testament to the raw power of Apache Arrow, right? And the columnary execution that we can do against that,

Alright? elastic engines, right? elastic engines are a way that dremeo organizes executors into different pools called engines, right? and you can match them to different workloads, right? along with elastic engines Dremeo has provided something known as a workload management capabilities which is often referred to as w lm, which route queries to an engine that you can choose, right? so in this diagram, we have a variety of queries that first arrive on an incoming job queue. these queries are then distributed to the engine that they should run on based on the rules that you define, right? So with W L M, you can make decisions not only you know how many queues are there, but also how the queues are mapped to engines, which gives you the ability to basically dictate how many CP resources and what priority you want to set for the various workloads, right?

So what essentially you’re you’re doing is you are creating individual sub clusters that are isolated in terms of resources and they, they can be dedicated to workloads, right? And this, in effect, prevents the noisy neighbor problem, right? And this was huge to achieve what we did in the poc, right? and and this is something that I noticed that many of my customers don’t take use of, right? sometimes they do everything on a single 10 node cluster even when a small split of the cost cluster into multiple engines, say for example, one engine of seven nodes and another engine of three nodes would drastically reduce a noisy neighbor problem and could achieve better performance, right? so here’s how to use W lm, right? W L M or workload management consists of a variety of rules that are evaluated one at a time in a priority order, right?

So here in this first diagram, you see you start rule one, if rule one is matched, then the query will be placed into an associated queue or allocated to a particular engine. Here, it’s called the q1. if rule one is not matched, then dremeo proceeds to evaluate rule two, and if rule two is matched, and then the query will be allocated to q2 and so on, right? as you may know, queries can be sented from a variety of sources such as JDBC clients or ODC clients, arrow flight clients, or via sj, et cetera, right? And you can track Ando keeps a track of where these queries come from, and you can use that to you can use information to create the rules, right? You can also use query cost. And this was key in our poc, because since we did not know what type of queries were being initiated or the query cost or, you know or the complexity involved we, what we did is we routed the more costly queries to a larger engine, right?

And the smaller queries to a smaller engine, right? And you could also control how much memory, right? Basically the memory footprint and the CPU scheduling of your queries, right? you can limit your queues based on things like q time runtime memory use per node. You can control the CP priority of your queues, right? you know, all of this were used to support this high competency use case, right? So if you’re not using WM today, please make sure you use them. Even simple engines or simple splits of the cluster can help reduce a noise neighbor problem in, in such high competency use cases, right? Another useful thing to understand is how remu does planning of a query, right? So when a user summits a query the DR optimizer and the query planner goes through several steps prior to executing it, right?

The first thing it does is goes through a passing phase and we convert the query into a relational algebra representation. And and then we do this in itrate of manner, right? a eco through an nitrate of planning phase. we identify if there are any reflections that are used capable of covering that query as well as, you know basically planning out and seeing what the query would look like even without reflections, right? the query planner will essentially generate a query plan that, and produce cost estimates for a plan. And ultimately it’ll pick out the lowest cost plan from all the plans that it, it has generated, right? And all of this processing is essentially done on the coordinate note, right? once we arrive at the most optimized plan, it is then submitted to the executor is to be executed, right?

And all this happens in like, in, in a few milliseconds or so, right? the cool thing is, in REO we have a feature that’s called query plan cash that can cash these query plans, right? So the query plan cash is useful because when we plan the query for very first time, we’ll put the most optimal plan into the cash. And if the same query or a similar query submitted, again, REO will source the cash to see hey if the plan has already been regenerated has already been generated, and we can then use a cash version of it, right? this will save you know this will kind of save relatively large portion of a potentially expensive planning time, right? in the query profile of a, of a job, you will notif you will be notified if the query plan was taken from the query plan cash or not, right? So you can identify whether queries are using it or not.

Alright? another feature that we have introduced into drio is runtime filtering, right? this is a feature that can optimize the situations where you are performing a join between a large fact table and a small dimension table, right? So the diagram on the screen tries to describe the purpose and the benefits of runtime filtering, right? let’s consider the top diagram first, right? here we have a large fact table, and if gonna join it with a smaller dimension table we we, you know, you, you use this and you use a specific filter condition, right? So without runtime filtering, RAMIR will need to read the contents of the entire fact table, as well as it needs to read the entire contents of the dimension table. It’ll try to perform the joint in memory, and then it will filter out the record that did not match the filter criteria.

The bottom diagram tries depict what will happen if we do have runtime filter enable, right? So in this scenario the table reads begin in panel again. however, as soon as the records are retrieved from the dimension table, the distinct joint keys are pushed as a filter into the threads retrieving the data from the large fact table. so we really need to read the relevant data outta the fact table that’s required to complete the join, right? And this can be vastly less data to return that can be returned to dremeo to process, right? And this can, you know really improve query performance, right?

All right, and the last feature that was relevant was Arrow flight. I won’t go into much details about this. There are other incredible sessions in subsurface that you can attend to learn more about this. this is also available on the Apache Arrow website as well as the reme website. they’re all relevant sources for this, but at a very high level, right? arrow flight is a high performance way to transfer data, right? And since everything in REO is done using the Apache Arrow memory format, right we can transfer this arrow buffers directly to a client without converting into another format, right? and since it’s built on G R P C, it’s pretty fast. It’s multi-threaded as opposed to something like ot, bbc, or jdbc, right? And this is instrumental for many extraction use cases, right?

So if you want to transfer data to a client and let’s say you have, I don’t know a, a data scientist use case, for example, right? a machine learning use case, right? data scientist may need to extract a hundred million or 200 million rows or something like to go build a machine learning model, right? so instead of waiting around for a single threaded o EBC extraction job a multi-threaded arrow flight extraction job can improve productively improve your productive drastically, right? alright, so these were all the features that we that is relevant to our proof of concept that we did with, with this customer. And, and, and what is used to achieve at such high performance.