March 1, 2023

10:10 am - 10:40 am PST

Self Serve Reporting at Scale via Change Data Capture and Delta

Business transactions captured in relational databases are critical to understanding the state of business operations. MiQ needed the relational data for analytical purposes and reporting. To avoid disruptions to operational databases, they replicated the data via Change Data Capture (CDC) and spark streaming into Delta tables.

In this session, learn how, Self-Serve Reporting, MIQ’s proprietary product, translates the data from traditional relational database systems via CDC and replicates them in a Big Data system using Spark Delta tables to generate reports for end customers, and how this system can be used to achieve high performance ad hoc and synchronous reporting.

Topics Covered

Lakehouse Architecture
Real-world implementation

Sign up to watch all Subsurface 2023 sessions

Transcript

Note: This transcript was created using speech recognition software. It may contain errors.

Rohan Raj:

I’m data processing and data engineering teams at M Miu. I’ve been here for around seven years, and before M I Q I have been with Samsung and Jiri, and the last seven years here have been about building and managing different data management microservices, big data platforms for M I Q. So yeah, that’s about me. And today I have with me my colleague Rahi Engine, who has been the man behind actually implementing this solution. So I’ll let him quickly introduce himself too.

Rati Ranjan:

Yeah. Hello, good people. I’m Rahi. I am senior software engineer at miq. I have an experience of close to four years spanning across frontend backend and data engineering alike. Pretty excited to be here. Over to you, Ron.

Rohan Raj:

Thanks, Ruthie. So moving on. Here’s the agenda for today. I’ll just pause for like 10, 15 seconds so that like you can go over the agenda items. Cool. so moving on.

So who are we talking about? M iq M IQ is a leading problematic partner for marketers and agencies in the at tech world. And a IQ is the main m IQ technology set, which provides modular API-based capabilities to enable teams and individuals to rapidly build data solutions on top of them. So, just to introduce a bit more, there are three major groups of capabilities that we have. Connect, discover, and action. So on the connect side, if you see it is a set of capabilities for onboarding data sets from a wide variety of sources efficient storage of data, as well as metadata, and also concerns with cataloging and governance of data. discover is analytics at scale and refers to the set of big data technologies in the hands of our analysts, data scientists, data engineers, to derive insights and trends from the dataset and action.

Finally, once we have the outputs received from the last phases it can be used, outputs can be used programmatically to affect our campaign strategies, and also the key insights generated can be used manually by the executives to devise their business strategies. So that’s about M iq. Moving to the next slide. So if you see in the right pin, you will see polling section in the right pen. there you can go and choose the option. I’ll read the poll question for you. So the question is, on average, what is the number of advertisements data percentage exposed to per day?

And the options are a 500,000, 5,000 and 10,000. We already see some questions some answers coming in. yep. Pretty dis well distributed. Go on folks. few more answers. Okay. Okay. 500 is what is, looks like majority is choice. So, yeah, so the correct answer and folks, let me know if you’re able to see yeah, see the pulling results. Yeah. So the correct answer is I, I do see few more changes in the words or a few more words coming in. So, yeah, go ahead. Go ahead. Okay. So the correct answer is somewhere between 5,000 to 10,000 I impressions or ads one person sees. So it might be surprising, but this is the average number of ads that we see as on average. And this, these ads we might encounter while by the means of billboards, commuting to work ads on mobile devices, TVs, websites, et cetera, and 5,000 per person.

Multiply that with what around billion devices, billion people and devices there. So you have 5,000 billion ads in a day. So you got a glimpse of the scale at which this ad adtech data is operating and touching our lives daily. so today we are going to talk about a reporting solution that’s operating on a subset of this huge dataset that we are talking about. So to give you a better understanding of the problem statement here is a very simp, very, very simplistic view of the use case we are talking about. So on the left side, if you see there is a campaign management interface on which our traders and analysts and account managers manage settings for their campaigns. they’re running and they, they do configurations like budget budgets for campaign audience segments. They also do some programmatic deployments. They’re like programmable bids, , et cetera.

And these campaign metadata and configuration parameter needs to be joined with the near realtime log level data that we are receiving. So once these campaigns are running, we get like log level data, like presence, clicks, convergence, et cetera. These need to be joined with the, like other metadata information that is coming from there. And on the second side, on the right side, if you see there is a reporting application, which is in hands of our clients, advertisers, agencies, as well as our internal users, which is like a self-served reporting platform to analyze their campaign performance, their return on advertising spend incrementally all, all by themselves. So as you see we have this ecosystem, but in between the reporting application is querying the database, the transactional oil TP database directly. And that’s the issue that we, we we started facing, we started facing the typical O l TP reporting related issues to give you few numbers.

And this is the main part the blue box that you see here is the, like, important part where, which will be, will be concentrating on the, in the next few slides. So to give you some insights about the numbers. So the kind of data we already talked about and the numbers we had were a 10 plus database, says six 50 plus tables, a billion plus records in few of our most relevant tables that are needed that were of our concern and for reporting purposes and an influx of around seven plus million renew records daily. and because of the scale we were facing into typical o ltp issues related to performance, like slowness of the database for heavy joints, because the number of tables as you see, scalability because we were limited by the number of campaigns which users could select for reporting purposes.

And since it was a self-service solution, enhance of our clients and advertisers, the number of campaigns which a big advertiser or agency can have, can be very, very huge. And we wanted to serve those use cases seamlessly without getting bottleneck by the existing solution. So, yeah, so so before we, I think go into the solution part, one more highlight was we that did not directly jumped into completely revamping the solution. We did try some short term and like tactical patches if that helps us, like scaling our MySQL nodes, scaling the concurrency there increasing parallel query data, parting all of that. But even after applying these patches only incremental improvements is what we observed. And we did, we did not see any significant improvements, and that’s when we decided to completely revamp the solution and take a fresh approach while maintaining business continuity because it was a production important application for us in hands of our clients we did not we wanted to maintain that business continuity and do a graceful migration here.

So so on the requirements part so when you revamp the system, you have a, your waste list of items that you’d want from the new system. And here was our waste list. So we wanted a system which has high throughput, realtime processing capabilities for our events efficient data storage layer, which is suitable for our reporting purposes, and an efficient query engine for the dashboard, which can serve within the defined sla. And to understand it from the user’s perspective or product manager’s perspective on the change propagation. Any change that the trader or analyst does in the campaign settings should reflect in the reporting data with a lag of not more than 15 minutes. So any chance that you do on the campaign management, I think application that you see on the left side that change data, so to reflect into the reporting system within 15 minutes.

And on the reporting side for a better user experience when you are loading different dashboards the worst case SLA for loading any kind of complex dashboard or anything like that should be not more than eight seconds to give you a better UX user experience. And on the future scale project sense that we had from our product team for next one or two years, it was around like two times increasing the data volume in terms of increasing the number of campaigns data for each campaign and all, and four x increase in the number of concurrent users. So, cool. So with all of these wasteless and requirements in mind, we went ahead to like have some design tech design considerations and tech tech choices that we’d wanted to have. So on the change propagation side, we needed a realtime change data capture system for change propagation.

And that choice of technology was around AWS D M S, which helped us do the C d C and kindnesses to serve as a middle layer for from where the processing engine can consume the data on the realtime processing bit, because we needed to join a lot of the realtime stream joins that we want, we do and normalize the data in realtime. we needed system which can process millions of events per batch. And for that we went ahead with back streaming, always running scholar app on Databricks, and it was able to serve us for that. And next we needed a query engine, which is hydro put high concurrency query engine, which can serve the dashboard Querys, which used datasets, complex joints within eight second sla. And for that, we, we went twofolds, like one was efficient storage in form of Delta, and then we went ahead with the SQL query engine serverless sql query engine, and we needed the system, which could be co which we could be confident of the future scaling needs of the next two to five years.

And all these choices we have taken here can be horizontally scaled. So with all these like choices and everything, the new system looks something like this. remember the blue box or black box, whatever you that, that box is replaced with the, the bottom set of technology that, that you see here. And that’s how the new system looks like. So you have previously there was only this my database. Now you have changes from the, my database captured into D M s Stream two kinases being consumed via this Spark application and put into Delta tables. And these Delta tables are getting served to dashboard via the database service serverless sequel. So now that I have given you a fair idea on the problem statement and requirements from the new system, I would hand over to Rahi to talk specifics about few of the implementation details and challenges we faced and the results. Yeah, over to you.

Rati Ranjan:

thank you so much, Rohan. So now that you guys have brought understanding of the solution, let’s deep dive into it,

Rohan Raj:

Hopefully.

Rati Ranjan:

Cool. So we have used C D C ET LT patterns. C D C stands for change, data captured as, as has been mentioned by Rohan multiple times now. And then ET TLT pattern is extract, transform might transform load, and then transform, transform again. Here nesses becomes a staging area of salt, a spark job written scholar data from nesses, it transforms, normalizes, and then dumps the data into the delta lake. The entire pipeline, starting from the MySQL database to the Delta Lake, can be broadly broken into two phases. let’s take a look at them one by one, starting with phase one.

The first phase from the bronze layer, which is from MySQL two Kins, the pipeline is built on AWS’s dms data migration service. we had to tweak the parameters to fine tune the migration, but rest all, as you would know, is managed by AWS was a, to a large extent. So the data in form of CDC events traveling from MySQL database reaches s by end of this phase. Let’s take a look at a sample CDC event now to understand what kind of information we are receiving from the transactions happening at the my, so this is a sample event coming from sis. looking at the sample, we can see that we have the latest image of the data, the kind of operation performed on the data that is whether it is an insert, a delete, or an update, the schema and the table the event belongs to.

And you know, these information sort of forms the basis of our control flow for the second phase of the pipeline. So that brings us to the end of the first phase of the pipeline. Now jumping into the second phase that is where we put all our expertise in. This is where all the major transformation happens. There are different components to which, which we will talk about in details in the subsequent slide. So I’ll just wait, take a pause for two, three seconds here so that you guys can just take an item. These are basically different components of these parks color application, which we have written, and which is running as a streaming job. moving on to the control flow. So we won’t be going into detail of every flow here. Hopefully we’ll be able to share the, these resources with you so that we can take a look at it in detail.

However, at a high level, this represents the decision we take once we receive the CDC event data. The same CDC event data, which you folks just saw a sample of in the previous slide. These decisions are based on the kind of operation that hasn’t performed on on the table, what kind of, what table it was, where the operation was performed, so on and so forth. Now, this may seem pretty straightforward because I think there are a lot of open source solutions available today where you can apply CDC changes to your old lab system. However, this wasn’t that easy where a lot of challenges and major challenges came of the data. The issue which our data source had, let’s take a look at them.

So the data was coming from a legacy system, and one of the biggest constraints which we had was we won’t be touching the existing system at all. A new solution had to be built addition to the existing system to name a few of the challenges with this system. We had those were data integrated issues. The data source, the MySQL database, although relational database didn’t have the data integrated constraints like referential integrity applied to it, this opened up a window for every single dimension being eventually consistent, which has its own technical challenges as you would know. Another was data redundancy. The application on top of the old t p DV does deletes and inserts. Instead of doing an update, increasing the actual number of operation, multiple folds this legacy system also does data accusation multiple times a day from outside the outside the organization with the no overlapping window, which meant that it’ll be a acquiring same data over and over again, resulting in a lot of redundant updates.

Third one was a multiple one to mini mapping between fact table and eventually consistent dimension table. I’ll go with the assumption that we are aware about fact and dimension table. Also, to give an idea, the prerequisite of the solution was we transformed the data into Snowflake is schema. So resolving this eventual consistency issue actually was a big challenge. You know, resolving the volatility of the summary data actually become a big challenge due to the eventual consistency issues. by volatility I meant the summary data, the report data which you’re generating. ideally it should be incremental as in way you are getting more and more data you keep on adding on top of that. However, we were required to mutate that data and not just mutate that data. In fact as in when data comes, you go and update it. So even if you don’t have all the data available, your summary should still be presented there with placeholders.

That was the requirement from product. So, and as I mentioned, the data was data was similar to Snowflake schema. So flowing the changes of a leaf dimension to the summary data required full fledged joint and merge, which would know would take a hit on the performance. So how did we solve these problems? Take a look at them one by one. So the first thing was batch supply. we added an algorithm which reduces the total number of transaction events in the effective minimum. we just went through the redundant data issue, the redundant transactions, which are happening on our data source providing us with a lot of credit operations. So the, this particular algorithm which we applied, what it would essentially do is it’ll transform all these transactions and be effective very well. For example, if you have hundred, let’s say hundred updates from the same record is a qva into processing just the last update event, they both will take you to the same state essentially.

Similarly, all the permutation of create, update and delete happening on the same record, it can be reduced to an effective minimal. This batch supplier algorithm achieved that. to expand a little bit more than that we can talk about, so we have essentially three right operations create, update, and delete. that would turn out to be a total of 12 permutations of these three operations out of which seven are valid. When I say seven are valid, rest all are not valid because for example, you can’t really do an update after doing a delete, right? So out of all these 12 permutations, only seven are valid. Out of the seven permutation, four needs, two operations, last two operation, and three needs just the last operation. So even if from the same record you’re doing create, update, delete multiple times, you essentially need to just process the last two or last one event.

This effectively reduce the number of transactional events we were processing, reducing the overall performance and cost impact to a large extent. The other big challenge which we had was the eventual consistency as described earlier, the majority of the dimension tables were eventually consistent, which meant we couldn’t have gone with the non-volatile normalized data. that is we couldn’t have waited for all these values to come, and then we prepared the summarized data. So for a non data, we would have to wait for the dimension to prevent before we perform the joint right. However, our recomme comment was that as I mentioned before, also from the product that the data should reflect as and when it’s available on the source. This essentially meant that we merge the dimension in the normalized data as it comes. the solution was bound to take ahead on the performance.

As, as you would know, merge operations are quite expensive. Fortunately, we had time as one of the sub-dimensions of these eventually consistent tables, and time was also one of our partitioning columns. So what we did was prepare normalized data for the recently received dimension using broadcast joints, and then merge the data with the existing normalized data with low shuffle merge enabled. Low shuffle. Merge is a property which you can enable with Delta file formats. the entire processing was within our sla, however it still remains under the scope of future improvement for us. This wraps with the processing side of things that you wanted to talk about. Let’s talk a bit about warehousing now. So just let me take a check on the timeline. We are, we’re good data warehousing. So we have chosen delta storage framework for our delta for our data lake.

 Delta was part of our existing ecosystem. And we are also using Databricks runtime for spark processing job. The inline processing application, which we talked about just now in detail that is also using Databricks runtime environment. So as you would know, Delta gels pretty well with the database runtime environment and, and with the spark. However, that was not the only reason. The performance which we required and few the requirements we had like sche, evolution and enforcement and rollbacks, those are all available with Delta. So although we had other options with us, but we didn’t find it substantially better to move out, move out, move out of the existing ecosystem.

So this completes our entire solution. Let’s talk a bit about optimization. Some of the techniques which we have used, so here are some of the common optimization techniques which we have used like loci merges and zero indexing. Nothing state of art here, but we had to apply post-processing. the one thing which you would want to emphasize on is although when you build the solution, the performance was quite well, but as you would expect over a period of time, because this is incremental data, your performance is gonna degrade. So you’ll have to have some sort of strategy on your post processing optimization, which you can run at a regular interval. So this, the order and the pruning part actually falls under that. So we regularly do this. We regularly ensure that the indexing happens on a regular basis on the data so that not just querys storage is optimized, but also the querying on this data is optimized,

As you would’ve noticed that the entire solution moves data across multiple halves which means that there are multiple point of failures as well, right? start data is starting from MyQ L to dms to essis to the inline processing job. And finally getting dumped into the Delta leak. there’s a possibility of things going wrong at any point in time. So how do we tackle that? So to tackle that what solution we have is the checkpointing. We have checkpointing in multiple places to name a few. Dms has its own checkpoint available. And then the inline processing job which we have written also has checkpointing. so if something goes wrong at any point in time, all we need to do is do a retry and the processing will be picked up from the last checkpoint stage. we did end up adding duplicate handlers for the retry batch at a small performance cost because because our checkpointing happens at every single batch, so let’s say a batch, we have started processing a particular batch and something goes wrong in between, then when you do a retry for that particular batch, there will be duplicates.

So the reason why it’ll have a performance impact if you have duplicate handlers is if you might be doing a pens for certain use cases, you’ll still have to go ahead and do a merge or an absurd because you may have duplicates over there. So apart from this, no significant issue as far as achieving resilience is concerned. And again, because because this is gonna happen just for one batch, the batch which you have retried. So it wasn’t a concern for us to have this implemented. Finally, our performance comparison with the old solution,

The red bars represents the old solution numbers and the blue ones, the new one. So with the old solution, what we were able to achieve for a concurrency of 16 users, average data volume of hundred 40,000 records in 70 seconds, we are able to achieve the same within seven seconds with a total trends in increase of concurrency up to 70 users data volume up to 2.4 million records. So this result was as required with the change in condition for the next two years as was projected by our product team. And this is sort of expected as well because we had moved from a Rola solution to a non-traditional Molap solution. You can still call it mola, although we were not building any cubes, specific cubes. We are dumping data in a format on top of which you can generate as many cubes as you want in a fairly, fairly small amount of time. So this is something which is expected also as well. So that’s all that we have to share with you people.

header-bg