Subsurface LIVE Winter 2021
Serverless Cloud Data Lake with Spark for Serving Weather Data
The Weather Company (TWC) collects weather data across the globe at the rate of 34 million records per hour, and the TWC History on Demand (HoD) application serves that historical weather data to users via an API, averaging 600,000 requests per day. Users are increasingly consuming large quantities of historical data to train analytics models, and require efficient asynchronous APIs in addition to existing synchronous ones that use Elasticsearch.
This session presents TWC’s architecture that uses a serverless cloud data lake running on top of Apache Spark and how that enables a highly elastic and economic way of serving weather history data. We will explain our concept of data skipping indexes that boosts performance by orders of magnitude compared to an out-of-the-box Spark setup, as well as significantly reducing cost. This enables TWC HoD to triple weather data coverage from land only to the entire globe, while at the same time reducing costs by an order of magnitude.
We will also review serverless cloud data lake architecture in general and elaborate on the composition of serverless building blocks such as serverless storage, serverless ETL, serverless SQL and serverless data pipeline orchestration. In addition, we will review a set of major enhancements, including built-in geospatial and time series functions and a built-in multi-tenant Hive Metastore.
Finally, we will highlight how TWC was able to adopt the serverless cloud data lake platform for new applications by rolling out a brand-new global data collection pipeline and data lake for COVID-19 data in just a few weeks.
Paula Ta-Shma, Research Staff Member, IBM
Dr. Paula Ta-Shma is a Research Staff Member in the Cloud & Data Technologies group at IBM Research - Haifa and is responsible for a group of research efforts in the area of hybrid data, with a particular focus on high performance, secure and cost-efficient data stores and processing engines. She is particularly interested in performant SQL analytics over object storage and leads work on data skipping whose work is now integrated into multiple IBM products and services. Previously, she led projects in areas such as cloud storage infrastructure for IoT and continuous data protection. Prior to working at IBM, Dr. Ta-Shma worked at several companies on database management systems, including Informix Software Inc. where she worked on Apache Derby. She holds M.Sc. and PhD degrees in computer science from the Hebrew University of Jerusalem.
Torsten Steinbach, Cloud Data Lake Architect, IBM
Torsten Steinbach has a long record working as a database architect. He led the IBM Db2 performance tooling and worked on the workload managers in IBM Netezza and IBM Db2. He also led the deep integration of machine learning into IBM's RDBMS. Over the past few years, Torsten built from scratch IBM's cloud data lake platform, which is heavily based on open source software such as Apache Spark and Apache Kafka.
[00:01:00] Hello everyone. Thank you for joining the session. We’ll get started in just a couple of minutes. (silence). Thanks [00:02:00] for joining, everyone. We’re going to just wait about another minute for the rest of the attendees to get in the session, starting in about one minute. Just want to remind people that we’ll do the live Q&A at the end of the session. [00:02:30] Okay. Well, here we are at the top of the hour and I’m very excited. My name is Melissa Biele. Some of you may have seen me earlier on the session with Jeff Ma. Right now, I’m excited to be the moderator for this session on serverless loud data lake with Spark for Serving Weather Data, presented by Paula Ta-Shma and Torsten Steinbach, both from IBM.
But before [00:03:00] we get started with the session, I just want to do a couple of housekeeping notes. You will see at the right side of the screen, you have your chat. That is one area where you are able to ask questions for the moderated Q&A at the end of the session. The other way to do that is to actually share your video and audio, which you can see a button up at the top right part of your screen. If you select that button after the presentation, we’ll go ahead and let you into the session on camera to ask [00:03:30] your questions live in our moderated Q&A session.
The other thing that I want to remind people is that you’ll see a tab over here that says Slido. It’s just a quick survey on the session and on the content. Takes about two minutes. So before you leave the session, we do ask that you do fill out the Slido. And with that, I’m going to turn it over to Paula and Torsten. Paula and Torsten, I hope you have a great presentation. See you at the other end.
Thank you very much. Welcome. This is Torsten Steinbach, [00:04:00] working for the IBM cloud as an architect, where I’m actually leading the cloud data service efforts around cloud data lake. I’m accompanied today by Paula Ta-Shma, who will basically lead you through the second part of our session today. She’s also working for IBM research, and has contributed a lot to the capabilities that we have built into this data lake platform that we’re going to show you today.
[00:04:30] I’m going to start, first to walk you a little bit through our take on a cloud data lake that we have at IBM, and how we actually architect our data lakes, especially our reliance on Spark and how we do that, and make it actually serverless. It will be the first part of that. In the second part, actually, we are going to walk you through a specific implementation, a real implementation that, actually, [00:05:00] we have implemented and went into production with for serving large volumes of metadata on demand.
And part of that is actually a major contribution that we have added to Spark for indexing, which we’re also proud to announce today here in this session, at the end of the session. Sorry, we’re going to announce the open sourcing of this indexing technology.
Okay, let’s start with basically [00:05:30] definition and implementation of data lakes in the cloud. First conceptual overview, what we consider data lakes or cloud data lake. The cloud data lake is actually an on-ramp of big data to analytics. This can be data that is already somewhere, stored and basically you need to get it into your data lake through ETL processes, or you basically use a continuous ingest from telemetry data, click streams, IoT data, log data, and so on [00:06:00] into your data lake.
The data lake then is going to provide to you all of this tool chain, starting from very raw data that you first need to explore and understand in first schema and so on, and then slowly you make your way up to prepare it, enrich it, but also optimize it on this, which is very important for a cloud data lake with disaggregated storage, because then optimization of the layout, the formative petitioning is critical for performance of your analytics.
We also consider a data warehouse [00:06:30] still to play a role here, attached to a data lake for certain specific use cases where you have very high demands on response time as a base. That’s why we also consider it critical that you have capabilities to also transport and promote data, hot data, certain part of the data into real data warehouse for those use cases. That’s something that’s actually changing that I’m going to touch it a little bit also towards the end of my part.
One of the specialties that we actually focus on, when we build data lakes and [00:07:00] define our platform, is that we actually focused a lot on serverlessness of platform. So that means you’re basically, as a consumer, as developer, as a user, you’re not actually involved in resource management in terms of host and things like that. It’s auto provision, auto scale on demand based on your actual workload. And also the billing of that is completely, based entirely on the real record run. What comes with it also is a high availability, out of the box thing you do. [00:07:30] So you don’t have to [inaudible 00:07:33] multiple instances of this data lake or something like that.
We actually have three major components on the foundation that we use for our services to make a serverless data lake. It’s a serverless storage, which is actually object storage. So we at IBM cloud, also have an S3 compatible object storage, serverless run times where we also have several layers. We have function as a service, we call it block function. We have container as a service on demand, [00:08:00] which is code engine in our cloud. Then we have the serverless analytics, which is a service that I’m going to show you next. That is the core of our cloud data lake, which is called SQL Query.
What that service is actually is in fact, a serverless Spark service. So the architecture of the service, basically the rough architecture looks like this. It’s actually really community Spark that we are running here, but we run basically this in a big farm of [00:08:30] Spark clusters that we provision. And not just Spark classes, even Spark applications, Spark contexts that you kept around in a basically pooling mechanism. So whenever [inaudible 00:00:08:43] comes in, we can immediately serve that.
We have it in a sandboxing within those entire environment, using the SQL grammar set as a sandbox, basically. With that, you can run anything on demand that Sparks equal can do. The entire Sparks equals index to support it, [00:09:00] community Spark, and we have added a couple of capabilities here to make this more complete in terms of the value proposition that you can do. We’ve integrated the hive middle store out of the box, fully managed, as part of this service so that it can do all the data management. We have implemented a sophisticated data skipping indexing feature. That’s what we’re going to take a look at later today.
We also have some functional capabilities like time series, SQL [00:09:30] extensions, and geospatial extensions, but just the SQL language itself is extended to have native operations on time serious data or location data. All of that on our data and cloud, which is an optic storage. We also have attached, basically as a source or as a target relational database, our own PV 2 in our cloud. And event streams is a Kafka as a service. I’m going to talk about that also towards the end a little bit, what we can do here.
The capabilities [00:10:00] that this enables can be grouped in these four major boxes that you can see here. So it starts with data ingestion that the service supports, into basically landing data into the data lake, into basically the storage of the data lakes, the object storage, transformation of the data, that’s obviously a big strength of Spark itself, right? As a framework. So all kinds of data transformations, data itself but also layout formats, reformatting, repartitioning, all of that. [00:10:30] Then the actual analytics, which is really queries, scale-out, basically queries generating reports and things like that, and data management is basically encompassing goes thing that goes along with all of those, because that’s your metadata management. Right? Table definitions, rolling in and out of partitions, and also the index management. Frankly, is part of that.
That’s basically all exposed in serverless fashion. I just want to give you a brief idea of how that looks like. OUr [00:11:00] basically key user interface is looking like this. It’s a SQL experience where you can just run SQLs, right? All of the capabilities are actually expressed through SQL syntax. So when you go for ensembles, you see that. So for the analytics part, it’s all of the old app or the [inaudible 00:11:23] that you can do on. But it also covers ETL capabilities [00:11:30] like data transformation. For instance, you’re a repartitioning into a hive, partition schema on disc, or writing data from a data Lake into, let’s say a relation database into the BB2.
You can see this is ultra steeple syntax that is basically used to express all that. Same also for data management, like indexing here, or your catalog management, right? [inaudible 00:11:59], [00:12:00] partition management of table’s actually ultra SQL. That’s why it’s pretty lean interface, and the SQL syntax is actually all the functionalities contained. All of that is running Spark behind the scenes, serverless. We automatically skill it out for you, if you can run this farm of Spark service and Spark application. When we basically use it to run it up for you, we also auto scale that in terms of the number of executor’s needed, depending on the data of volume. It’s nothing that basically the user [00:12:30] will have to do.
That’s something that basically, where we use the capabilities of Spark in a fashion that the user just can use it out of the box. I want to basically, for the sake of this presentation, give you some idea of why we’re actually here. First of all, we are proud to share, of course, what we are doing here at IBM. But actually, we also clearly see where Dremio actually can fit in, right? So by relying on Spark, this is really scaling [00:13:00] out very well, especially bench performance is good. It has a lot of flexibility, for instance, also to work with real-time data and things like that. But interactivity is something where I think revenue shines. So we have started, actually, to work with Dremio User Interface or the Dremio Stack, and they have to [inaudible 00:13:26] cloud.
So this is now our Kubernetes service. This is a Kubernetes cluster, [00:13:30] an IBM cloud that I’ve just deployed here with a virtual pool. And if I look inside, this is not standard Kubernetes here, but you can see I’ve used the Dremio helm charts to set up here. The standard Dremio helm chart to set up a Dremio cluster. And then I basically can use Dremio just work on the same data of our data lakes, right? So this is now really cloud object storage of the IBM cloud, which is S3 compatible. That’s why it was easy to connect. [00:14:00] And I can just go here and run my queries in a very interactive way. That’s what we find very attractive and we’d like to explore further as we go along.
Like for instance, we would also like to maybe enable our cloud data lake to show up here, also as something you can directly import the metadata from like a data lake itself. But that’s something that we’re looking into. Well, I just want to give you some idea of why actually we think Dremio [00:14:30] is an interesting community for us actually to basically get connected to. This is what we have and what we have built out, and it’s available in our IBM cloud. There’s a freemium support, so you can free plan, so you can just sign up for free and start playing with that. It’s actually really, really approachable.
What we’re doing this year. I want to give you a little bit of an idea [00:15:00] of what our current focus is. We are elevating real-time data to become the same kind of first class citizen like they data object storage. Right? That means actually all of the capabilities that you can do on data at rest and object storage, like running queries against it, doing data transformations, right? ELT, that type transformations of data on a fixed storage, enriching it and so on, cleansing it, all of that will be possible [00:15:30] to do in data in motion.
There you can run real-time queries and you can run transformations on the stream, right? From one topic to another one. And obviously you can also basically cross those boundaries very nicely, but it’s interesting more. I’m running a secret job that lands data from the stream into object storage, in a continuous fashion. For instance, into a certain format, partitioning in a certain way. And maybe do some pre-filtering, pre-processing, using the full syntax of SQL. [00:16:00] The underpinning of that will be common metadata where we bring together [inaudible 00:16:06] with the Kafka schema registry. So it makes this metadata also covering both sides in a seamless fashion. That’s something where we’re working on and there are some things that we have very soon available here. So stay tuned.
I want to basically take up take up this metadata topic because I think that’s also another thing that we think is highly important, [00:16:30] actually for us also to go into this lake house notion that you see nowadays, more and more actually being promoted, which as the name indicates, it’s a data lake that is more warehouse capabilities. You can directly do the data lake. And in our interpretation of that, the key enabler for that is metadata. More complete metadata, heavy infusion of metadata. So it’s not just schema metadata, but also consistent metadata. Asset metadata [00:17:00] using these table formats like [Elder Lake 00:17:01] and Iceberg, governance of the IBM offer has … We have IBM. We have a strong portfolio, also that we want to put in here for governance policies that can be enforced then, and indexing.
Indexing is one of the key innovations that also we have done, that we [inaudible 00:17:17] past year into a cloud. But as you will see later, actually that’s something we really want to contribute also to the community, and that’s why we decided to open source that. That’s called X Skipperm and you’re going to hear that [00:17:30] in the next minutes from Paula. Paula, maybe you want to take us through the-
Okay. I’m going to start off talking about use case from the weather company. And the weather company started with a simple mission to keep people safe and informed with the world’s most accurate forecast. [00:18:00] Please move to the next slide. So today the weather company has evolved into a decision solutions company with data at its core, that produces 25 billion forecasts daily.
The weather company history on demand application provides access to a worldwide data set up past weather conditions via a web API. It provides global high resolution [00:18:30] grid with geospatial and temporal search, and it averages 600,000 requests per day, and typically used for machine learning and data analytics. So a previous implementation of history on demand had synchronous data access with several drawbacks. Firstly it was expensive. It also had limited storage capacity, [00:19:00] and we limited ourselves to land only weather coverage and only a subset of the available where the properties. Clients were limited to small requests. And it was slow at retrieving large amounts of data, which is becoming increasingly important now for machine learning work flows.
So we developed an asynchronous solution using a serverless [00:19:30] approach, which Torsten already talked about. This gives us a huge cost advantage since we don’t need to pay for compute when there are no quirks. Torsten already told you about SQL query, which is serverless SQL powered by Spark. And it has a high [inaudible 00:19:49] and geospatial capabilities. For the rest of the talk, I’m going to focus on the data skipping aspects and how this dramatically boost performance [00:20:00] as well as low in cost.
So SQL query has a very general data skipping capability, which avoids reading irrelevant objects based on data skipping indexes, and these contain aggregate metadata for each object. The indexes are created by users and they’re stored alongside the data in IBM’s cloud object storage. So we support multiple [00:20:30] index types, including the standard, meaning max data skipping indexes, but also introducing others such as value list, blue filter and geospatial index types.
Our capability is extensible, so it’s easy for us add new index types. To our knowledge, we’re the only framework which enables skipping for queries with user defined functions. These two [inaudible 00:20:56] could for example, be functions from our [00:21:00] geospatial library. Under the hood, we have an extensible way to map UDS to index types. And this is picked up by Sparks catalyst optimizer. This goes beyond alternative approaches for data skipping, which are typically based on the only max and sometimes blue filter, and don’t support UDS.
Okay. So our code is an add on library for Spark, which enables [00:21:30] the modified query execution flow you see here. On the left is the typical Sparks SQL query flow. There’s a partition printing step before reading the data from object storage. We introduced an additional pruning phase, which consults object level metadata. This is done using the catalyst optimizer and the session extensions API. Okay. Here [00:22:00] is a classic example of data skipping using a min-max index on the temperature column of a weather dataset. Through each object, we store the minimum and maximum values of that column. When evaluating a query, looking for temperatures of over 40 degrees Celsius, Spark with our library will prune the list of relevant objects based on this metadata.
In our example, the red objects are not relevant to the query and therefore don’t need to be scanned. [00:22:30] To build the index, we scan the temperature column once and store it in parquet format. From then on, only the index needs to be accessed to make skipping decisions. Okay. Here is an example of a query looking for weather data in the rally research triangle. Without any optimizations, this query would need to scan the entire dataset, even [00:23:00] though all relevant data is in a small geospatial region. This is because Spark doesn’t know anything about IBM’s geospatial library functions, so it can’t skip any of the data. To address this, we built min-max indexes on the long columns, as you see on the right.
To enable geospatial data skipping for cruise like this, we map the SD contained UDS to min-max indexes on these columns. We also did this for the rest of the functions in the [00:23:30] geospatial library. This graph shows this query running on a TWC weather data set containing 10 terabytes of parquet data, spanning five years. As I said before, without any optimization, these queries need to scan the entire data set and didn’t even complete.
So the next thing we tried was rewriting the cruise by adding the regions bounding box as a hint to this Spark optimizer. [00:24:00] And this exploits parquets built in min-max indexes. This is the yellow curve you see here. So the yellow curve is already significantly optimized, compared to the true baseline. Then we added data skipping and hive medistal support. This results in the blue curve. Averaging over all data points shown here, we get an order of magnitude speed up compared to the yellow curve, which as [00:24:30] I pointed out, was already optimized using parquets data-skipping support.
Okay. So this translated to order of magnitude performance improvements and cost reductions for the weather company. It also enabled moving to global instead of land only weather coverage, and supporting all 34 available weather properties. Users can now retrieve large amounts of historical weather data efficiently, to train machine learning [00:25:00] models. We achieved a 40 times speed up in cases that we tested.
I’m now excited to announce, for the very first time, the open sourcing of X Skipper, our extensible data-skipping library. X Skipper works out of the box with Apache Spark, and it supports many open formats. [Parcasius 00:25:24], [Fijason 00:25:25], Arrow, and ORC, and also supports hive tables. [00:25:30] Out of the box, it supports min-max value list and bloom index types. But you can define your own data-skipping index types, use your own novel data structures and apply to your own use cases. The data can be domain specific. For example, geospatial data, like I showed for the weather company, used [inaudible 00:25:56], or could also be genomic data, astronomical data, [00:26:00] and so on.
You can enable data skipping through your own UDS by mapping them to conditions over your data, over your data-skipping indexes. You can achieve order of magnitude performance acceleration and beyond, just like I showed for the weather company use case. And this is even compared to format with built-in data-skipping like [inaudible 00:26:25]. So here’s a snapshot of our site [00:26:30] where you can access the code full documentation for X Skipper.
Okay. Please check this out and get involved. We welcome contributions. And three example areas of contribution are firstly, support for you index types. So new index types and UDS can be added as plugins. We show how this is done by adding a regular [00:27:00] expression index as a plugin in a separate info, and the docs and coach show you how you can mole your own. The library supports Apache Spark, as I mentioned. But support for additional engines can be added in future.
And finally, we’re data type agnostic and support all Spark’s built in types, as I mentioned before. But we’d also like to support table formats, such as Iceberg, [Woody 00:27:30] [00:27:30] and Delta. And in particular, we’re starting to discuss this in the Iceberg community. I’m sure you’ll have additional ideas of how to contribute, so please feel free to reach out to us on Slack after the talk and also using GitHub discussions on our info. Okay. Thanks everyone for attending and thanks to all the team, particularly to [Guy Cosmo 00:27:57], for leading the X Skipper open- [00:28:00] sourcing efforts.
Okay. Thank you so much, Paula and Torsten. Torsten, if you could stop sharing your screen, we will go ahead and get started with the live Q&A. While I’m getting that going, I do see a couple of questions in the chat Nina [Demla 00:28:19] wants to know how is this different from parquets built-in indexes?
The difference is that we support [00:28:30] the metadata in a consolidated fashion. So with parquet, the indexing information is stored in the parquet footers. That means that in order to access that information, you need to go and access every single footer for every single object. So although you read less data, you still need to access all the objects. Whereas we take all of that information out and put it in a consolidated fashion as a separate [00:29:00] parquet dataset. That’s for min-max. We also support additional index types that parquet doesn’t support, like value list, for example.
The next question I have is from Mike, and he wants to know how different is it from adaptive query execution as far of part of Spark 3.0? Want me to ask that again?
[00:29:30] Adaptive query execution is a method of optimizing queries, but it’s not data per se. So what we’re doing is we’re enabling skipping over objects in their entirety, in order to improve query performance. So it is a different method of optimizing queries and [00:30:00] can be used in a complimentary complimentary fashion. [crosstalk 00:30:06].
Yeah. I would also say that they should actually be combinable very nicely actually. The index should add up actually, the benefits.
Paula Ta-Shma: Yes.
So Ryan Blue, one of our first technical speakers today is asking what is the difference between value list and parquets dictionary [00:30:30] filtering?
Value list is a list of values, of unique values in the column of an object. We store it in a consolidated fashion outside of parquet data, so you would only need to access that in particular. Aside from parquet, it also works [00:31:00] for all the other types like CSV, JSON, ORC and so on. I hope that answers the question.
Okay. Let’s do one more before we go ahead and do it, but can an index in memory catch Spark table?
You [00:31:30] can take data-skipping indexes and cash them in memory, in order to even further boost performance and avoid having to read the indexes from storage. So actually, the performance graphs that we showed don’t do that. So you could get an additional speed up by cashing data-skipping indexes.
[00:32:00] I think that that’s probably the last question we can take right now. But just a reminder that Torsten and Paula will be in their Slack channel after this, to answer additional questions. There are a lot more questions there in the chat. I did a panel linked to that Slack channel up at the top. What I’d ask everyone to do now is go ahead and click on that Slido link over on the side of your screen, to go ahead and fill out a quick survey of what you thought about [00:32:30] this session. I want to thank Torsten and Paula for their time today, and we’ll see you over in Slack. Thanks so much.
Paula Ta-Shma: Thank you.