Subsurface Summer 2020
How to Build an IoT Data Lake
The Internet of Things (IoT) is one of the driving forces for the increase in today’s data volume and diversity. In response, IoT platforms must enable users to leverage incoming data for immediate analysis/action as well as long-term historical analytics. For the latter, the data needs to be stored efficiently and made available for large-scale analytics.
Cloud data lakes are cost-efficient and scale almost infinitely. However, data lakes only provide low-level primitives that must be used appropriately to unleash their full potential.
This talk discusses both the challenges and solutions of using data lakes as the essential building blocks for long-term storage of IoT data, such as how data is moved from the IoT platform to the data lake, how data is organized, and how efficient querying by various consumers (e.g., IoT platform user interface, business intelligence tools and machine learning applications) is achieved.
Tim Doernemann, Senior Lead Software Enginee, Software AG
Tim Doernemann graduated with a degree in computer science from the University of Marburg, Germany, in 2006. During his doctoral program at the Distributed Systems Research Group of University of Marburg he worked on various topics around scheduling and quality of service for high-performance computing, grid computing and cloud computing applications. Since 2012, he has workeds as a developer and architect at the intersection of IoT and data analytics.
Michael Cammert, Senior Manager, Software AG
Michael Cammert graduated in with a degree in computer science from the University of Marburg, Germany, in 2002. During his time as a research assistant at the Database Research Group of University of Marburg, he worked on data stream processing. In 2007 he co-founded the complex event processing startup RTM Realtime Monitoring GmbH which was acquired by Software AG in 2010. Since then, he has worked as a developer and manager on various projects, currently at the intersection of IoT and data analytics. In 2014, Michael received his Ph.D. from the University of Marburg.
Hello, everybody. Thank you for joining our session with Michael and Tim from Software AG: How to Build an IoT Data Lake. Just a reminder, we will be doing live Q&A after their presentation is over. We do recommend that you request to share your camera and audio for that live Q &A session, and don't forget that after that live Q and A is over, we will continue the conversation in our subsurface Slack channel. With that, welcome, Michael and Tim, and take it away.
All right. Hello, everyone. Thanks for joining our session on how to build an IoT data lake. My name is Tim. I'm an engineer at Software AG and my colleague, Michael, and I will now talk about some of the challenges and lessons learned from building an analytics solution for our IoT customers around data lake technologies.
Hi, my name is Michael, welcome also from my side to our session and without further ado, let's go into it. So what's this IoT we are talking about? So if you are looking for a definition you can go to Wikipedia and you will find this text, but I will give it a slightly different take. Like the internet always was composed out of devices, but what's changed over time is that the beginning they were very costly and usually, humans were interacting with them whenever the communication took place, like an email was sent or a webpage was requested. But over the time the technology became cheaper and better available, and mobile became available.
And as of today, we have a huge amount of devices connected to the internet, even much more than we have people on the planet, and they are continuously sending data. So why are we humans doing this? So what's in for us humans? Yeah, the idea is to do something with the data, more simply said to generate value from data. So getting some great insights, do some business, things like that.
So not the IoT way we'll be talking about today, but also a common field of IoT and also out there in the public there are a lot of devices for home automation and so on that's really fun. I also have a lot of those devices, but there also happened bad things in that area but we are today more in the business side of IoT or also on the side of industrial IoT and other use cases.
So just to give you some examples, there is fleet location tracking. This slide shows a truck, but we also have a big [inaudible 00:02:50] which equipped the whole fleet of a local train company with sensors, such that their cargo cars became IoT devices and are now sending information about their condition and so on.
We have smart waste containers where you start sending how full they are so you can empty them when they are full rather than on a schedule, which is much more effective and environmentally friendly. Or with the current pandemic, we quickly were able to build a social distancing application where we allow our customers to give their employees a device, which warns them when they're too near to another employee, or which also allows tracking similar to what current Corona apps do. Or what we will also be using in this talk, we have this example of painting robots where our customers equip their robots in a car factory which are painting the cars with sensors and track continuously what's going on.
So on this example, what are the data use cases that are in there? So on the one hand side, the customer wants to react really quickly to what's happening. So if the pressure of the paint is wrong in that painting robot, they want to stop it before the car gets damaged, which can be expensive. So this requires a reaction within milliseconds, and this obviously is not suited for any kind of a batch processing. On the other hand side, they also want to gain insights from years of production within the factory in order to optimize the settings for a new batch of cars which will be produced in the future. So this requires collecting huge amounts of data and that better fits in that topic of the day which is about data lakes.
So what's special about IoT data use cases? So the devices are out there and usually, they are continuously sending data and you have to continuously add that data to an operational store and at some point in time also to a data lake. Most customer installations grow all the time. Customers have new use cases. They have new devices. New devices not only increase the data volume, additionally, they usually also add new versions or completely new devices, which also needs us to react to new schemas and new challenges.
So how do you handle this? So IoT platforms out there usually therefore don't have that one processing engine, but they have an architecture like the one depicted in this diagram where the devices send their data to the platform where it is stored in an operational store for the near time use cases. It is forwarded to a streaming analytics engine for real-time processing and very quick reactions. And it is also offloaded by an analytic engine into a data lake to be ready for a long time analysis. For example, to feed your favorite BI Tool for your dash-boarding or also to do machine learning in order to find rules which you then afterwards can also execute in real-time in order to make sure that your devices behave like you like.
So how does this fit into our product? We offer the Cumulocity IoT platform, both in the cloud, also on-premise, or even if you want to just run a one machine inside the factory, also on the edge. And as you can see a lot of things going together here in order to manage the devices, make them available, and so on. And what we will be talking today about is just the Cumulocity IoT data hub, which is available both in the cloud and on the edge. And with that over to our architect, Tim, who had the vision for that product and has been an architect right from the beginning.
Thanks, Michael. So while I'm trying to share my screen ... All right. So let's now start talking about what we actually implemented. And this slide here shows a simplified view of our platform. It only shows the components that are required to understand what we did in terms of analytics. As you can probably see here right in the middle, we have integrated Dremio as the central component for analytical purposes into our platform. And this serves two use cases for us. First of all, relatively straightforward, it allows our customers to query their data which they already have in their data lakes via SQL queries, and they can use ODBC, JDBC, and REST for it. And in the near future, we will also extend this to Arrow Flight and they can connect all their existing applications and derive value from their data.
The part we now want to focus on in this talk is how we actually make sure that the IoT data from our operations store, which is a MongoDB in our case, how that is stored in the data lake, what the off-loading looks like, how we map the roughly dynamic JSON documents into relational format. How we do partitioning and why we need it and those kinds of things.
So, first of all, very quickly I want to give an overview of our data model, and that's not too different from what you would see in other IoT platforms too. So we have many different asset types, but the four we are really interested in for our analytical purposes is, first of all, alarms, which can be raised by devices as an example is [inaudible 00:09:48] and we have events. We have inventory, whereby in inventory the most important thing is devices. So things which actually generate or produce the data. And then we have measurements which make up 99%, usually of the customer data.
Here on the left, you see in simplified version of such a measurement. It has an ID, of course. It has a timestamped. Usually, the data has multiple timestamps, but here we only use one for the sake of simplicity. We usually would distinguish between the application timestamp. So the timestamp the device has put into and the timestamp when the data has arrived or been processed in the IoT platform.
Then, of course, each measurement is linked to a device which is the source in that case here, then it has a type. Other platforms call that topic, for us it's a type. And this is really where the meat is. We have in JSON array where all the actual sensor values for that particular measurement are in. In that case here we only have two. Temperature measured in degrees celsius, sample values is 35 here, and then we have weights, which is measured in kilogram, 42. We could have many more here and they could be in different orderings because as Michael said, with new device types, new firmware versions and so on customers might change how their devices send data, and so we need to be able to cope with that and still produce and fix schema in the data.
So how do we do that? The procedure of data offloading is a multi-step procedure in our case and it's done on a periodic basis. By default, we run that offloading procedure once an hour, per asset type. By default, we offload "Everything" for that particular asset type, but we quickly learned by customer feedback that they wanted to have the capability to filter data for example, for legal reasons like GDPR maybe. Or if they want to reduce the data resolution before putting it into the data lake. Our implementation uses Dremio's create table AS functionality, which you might also know from Spark or maybe Hive, or Drill, and Presto, and so on.
So in the first step what we need to do is we need to determine where the last offload in the previous hour has stopped. And we do that by requesting the latest timestamp from our data lake. So we don't rely on anything we have stored internally, but we just check for the latest watermarks, so to speak, in our data lake and then just keep that in memory in our code and use it as an input for the first real step, so to speak.
In that step, we basically tell Dremio to offload everything from MongoDB where the timestamp is greater than the one we have just determined. Then Dremio fetches that data and persisted in a staging area in the customer's data lake on in bronze area [inaudible 00:13:38]. Here it's important to mention two things. First of all, we use an index structure in MongoDB on that timestamp column, which is in that case, you have a processing timestamp, not the application timestamp and that makes the query very efficient on the operational store. So it doesn't have to do any full scan or whatever. And the second important thing we'll learn over time is that it's very beneficial to in that step here artificially partitioned the data to make sure that we produce a lot of parquet files to be able to parallelize subsequent processing steps in our ETL pipeline.
So step two then needs to figure out what was actually contained in the last batch we have received from MongoDB because as said, the structure in this JSON array can be highly dynamic so we need to determine what's in. Therefore, we read back the data from our staging area here or any in parallel to speed up things and then we determine what's in. Then we get a list of basically names and the corresponding types. So here, for example, we have temperature, not value, which is in flow. Temperature, not unit, which is in string and so on.
And then again, we store that information and then process the data once more to bring it in the right order and do some other processing steps. This is a step which is not completely possible using just SQL statements out of the box. What we were able to plug in user-defined functions into Dremio and then we were able to achieve what we needed. So then we create an sorted copy and post-process copy in the data lake and are almost done with our idea.
In the last step, we need to reach that new data once more and then generate the actual structure that we need, the relation structure, it will be shown on the next slide, and we already petition it according to the most relevant query patterns of our customers. I will explain that in a minute.
So the data format once the hourly ETL pipeline is done for an waste container example here, would look like this. On the left side again we have the sample measurement I have shown before, and that translates into a relational table and here make sure that we only produce atomic values wherever possible, because that makes the data much more easily digestible in BI tools like Power BI, Tableau, or Apache Superset, and also in machine learning applications. We don't have to do any further unnesting if that's already done during the extraction phase.
So most of the values translate very easily. All top-level JSON attributes just translate into columns. And here this data array is flattened down and we produce a column for temperature.unit, one for temperature.value and so on and so on and that becomes that part here. Then we add some other artificial columns like year, month, and day, which is useful for certain applications and technically speaking, it's interesting for a petition pruning.
You might wonder if it's efficient to store the unit here all the time because it doesn't change very frequently, but that's no problem because here, the good thing is we have got joins by doing this de-normalization and this compresses very well because you apply parquet's run length encoding here. And so if the value doesn't change at all, or only very rarely it compresses down to almost zero.
The next topic we learned over time is how important partitioning actually is. Because as you probably know, since you don't have index structures typically on data lakes and you don't have partitioning in place that matches your query patterns, you would have to do full table scans all the time. Which is expensive in at least two ways. First of all, performance-wise. Queries run longer than they actually would need to because of the high amount of IO you are doing. And the other reason is it could be financially expensive because you might be charged for egress traffic from the data lake and so on.
So in that sense, it's really important to partition the data along important query patterns. And for us an very typical query would look like this. Customers might want to see the average temperature per device in a given time span. So in that case here today from 10 to 14 o'clock, and that's very typical. Usually, queries are a little more expensive, but this is the most important part here.
So then it comes almost naturally to partition the data by a year, month, and day pattern. You could go further down of course and also do hourly. This really depends on the amount of data we usually extract. The big advantage then is that that type of query here can be very efficiently treated by the distributed SQL engine. Be it Dremio, or Presto, or Spark or whatever. Because then you can prune partitions that don't fit into the time range here. More or less automatically, depending on how sophisticated the query engine is.
It's important to understand here that in that case, we need to partition via application timestamp because customers usually want to query by application time and don't care too much about the time when the data was actually stored in the IoT platform. One exception here is inventory devices because they are mostly queried by ID, and so it's advisable to not petition then by year, month, day, but by ID.
Another thing we learned during the closed alpha phase we did with some selected customers of our product is how important some housekeeping is. And by housekeeping here I mean I'm running compaction on the data we have extracted into the data lake. In the alpha version we suffered from the well-known small file problem because every hourly ETL run per asset type creates one or more parquet files. Those can be very small files if the customers don't ingest too much data per hour, and if you then want to query the data, you have to open many really small files, which comes with a relatively high latency.
So we decided to implement compaction, and that means that we run that once per night for every customer table, and collect everything we have extracted over the day, and compact it or write it to as few parquet files as possible, and replace those incremental folders here by one daily folder. Keeping the year, month, day partitioning in place.
Here it's really important to locally sort those files by the application timestamp because then things like page pruning and local pruning and parquet readers are very efficient and even run length encoding gets much better.
So now, just very quickly before I go into a short live demo. One other thing customers asked for is to have the ability to do early joins, and by early joins I mean the ability to be able to join, let's say, measurements, like our waste container example with some information about the device which would produce that data. And the result when enabled then looks like this, we would not only have our measurement data and our source ID, but we would already join things like the source's name and maybe the geo location and that kind of stuff. That avoids expensive joins at runtime and uses certain applications.
Now let me do a very quick live demo. What you see here is the landing page of our IoT platform. And let me now switch to our data hub configuration UI. Here customers have to do one very simple configuration. So they have to, first of all, decide what user name they want to have and then they have to type in their data lake credentials. In our case, it's pre-configured as an S3 data lake, but it could also be EDLS and others, and then they can configure what data they actually want to offload. So here I have already configured three off-loadings for events, inventory, and some measurements. And when I go now to my data lake and refresh that we can see that I have three tables here that match the offloading jobs I have just shown.
Now let me just create one for our alarms. And what I can here do is I can choose the table name and that kind of stuff and then I can click preview. Then I get an idea of how that would look like in a relational structure. So let's say I'm fine with that because I'm running out of time and I save it, and then I just need to activate it. Then it would automatically run every hour, and we are doing a demo here and we impatient so we click that offload now button here to make it run immediately.
Then I can go to our status page here and see the offloading is currently running. Let me reload that. Okay, it's done. And it offloaded whopping 33 alarms. So that's a really, really big data use case. Then I can reload the page here, and not very surprisingly, we will now see another table, which has our alarms here.
So to summarize, we try to keep things as simple as possible for our end customers and don't make them deal with compaction and all that kind of stuff themselves, but it's all done by our platforms. Thank you.