Fugue SQL is an open source SQL interface for Python compute frameworks such as Pandas, Spark, Dask and Blazing SQL.
As Dremio has made data engineering easier, FugueSQL makes data analysis with Python easier for SQL lovers.
With Fugue SQL, users can utilize SQL as a grammar for end-to-end data workflows. In this demo we’ll go over how to get started in leveraging distributed computing to process big data with Fugue SQL and various backends.
Speaker 1: Hello everyone. My name is Hon. Today I’m going to talk about Fugue SQL an abstract interface for distributed computing. All packages I will talk about today are open-sourced. For fugue core and the fugue SQL, please install the fugue package. [00:00:30] For GPU and the blazing support, please install fugue blazing. So what is the motivation of creating the fugue project? In the past, we create our own computing logic to run locally using simple Python and SQL. But in order to scale out, we have to use certain distributed computing engine. In order to [00:01:00] use a certain engine, we normally have to adopt their API and interfaces, making our core logic, convoluted and hard to maintain. So instead of learning and adapting to certain frameworks from the user side, what about creating an abstraction layer that can adapt to both users code and the different computing engines?
That is the motivation [00:01:30] of fugue. It is to height the complexity and inconsistency of different computing frameworks and provide a unified approach to do distributed computing and the machine learning. And in the meantime, fugue keeps being non-invasive and minimal to users logic. We want to bring scientists and data engineers back to their core tasks. [Inaudible 00:01:59] this design, [00:02:00] since the core logic is written in simple ways. They’re already very easy to unit test. Plus fugue can let you use local execution engine and the mock data frames to run your data pipelines and without Spark or Dask. Your entire data pipeline becomes testable and the more importantly you can use this approach to quickly iterate on your local box, and you can do enough tests [00:02:30] to before pushing to the expensive computing cluster. Last but not the least, both your core logic and the fugue level code is platform agnostic. So you can easily switch between Spark, Dask, blazing SQL without code change.
Here is an overview of the fugue projects. There are different computing frameworks, Spark, Dask, Pandas, and on GPU [00:03:00] rapids and the blazing SQL. Fugue is an obstruction layer, unifying the computation on top of fugue we already have fugue SQL validation and the tune. Today’s topic is Fugue SQL. Now let’s talk about fugue SQL. If you have ever used the SQL in your Python workflow, you will see this common pattern. You’ll write a block of python codes that invokes the SQL to do something. [00:03:30] Then you get the data frame and the repeat. The code will be very convoluted by switching between Python and SQL. This seems inevitable because Python is the dominant language and it’s SQL is not designed to support multiple tasks.
Fugue SQL solves this problem by adding extra intuitive syntax, to distend a SQL. So you can easily describe your answer and workflow using SQL [00:04:00] as the dominant language. In addition, you can write simple Python functions as extensions that can be directly invoked by fugue SQL. Commonly you will see your code becomes significantly shorter was much better readability. I also want to mention that fugue SQL can use Dask SQL, blazing SQL, and Spark SQL as the backend. And it can also run without any of them. That is [00:04:30] why a fugue SQL is also very unit testable. Now let’s take a look at an example. We want to compute the median of column D of the pandas data frame, and then get the difference of each D and the median. It’s quite simple to express it in pandas, but not easy in SQL. So we write central function.
In this compute example, we [00:05:00] use fugue on Dask and Dask SQL. We have two data frames as input. We want to join them, get the product of B and C as column D. We use SQL here, and then we want to do two things. Why is to compute the diff from median for each group? Another is to take the smallest end rows from each [00:05:30] group sorted by BMC. On the left-hand side, it is how we do that using Dask and Dask SQL. And we have to write another first end to do the second thing. Now, let’s see how to express the same logic in fugue SQL.
We only need a single block of SQL codes with define and assign the join the data frame in a way like Python. [00:06:00] And to compute median diff, we can also directly invoke the median diff function, and we don’t have to care about Meta, which is a specific thing to Dask. And to get the first end rows of a group, we don’t even need to implement anything. We have the take statement to handle that.
Actually take can be significantly [00:06:30] faster than Python approach in certain execution engines, such as spark. Please pay attention to engine. Compute two is no longer only for dask. It can run on QDF and blazing. It can run on spark and they can run without any of them. That is how you do unit test. You also have a lot of flexibility on the input. The F1 and the F2 can be [00:07:00] pandas data frame, Dask data frames or QDF. And engine can be a combination of Dask engine plus blazing SQL engine. So your SQL will execute as blazing SQL and the rest will be [inaudible 00:07:15] used Dask to run distributively. You will see more detailed examples in the following demo, where we will use fugue on spark.
Speaker 2: [00:07:30] Welcome to fugue SQL demo. In this demo, we’ll go into explore US flight dataset that is extensively discussed on the internet. I will show you how to use fugue and spark to quickly iterate on big data problems. Firstly, I will paste a piece of code to initialize the notebook environment. [00:08:00] The setup function will create shortcut use spark. It also enables the fugue SQL magic and the syntax highlighting. Please pay attention to the config post [inaudible 00:08:11] session. We enabled fugue to use pandas UDF. Whenever is eligible in fugue using or not using pandas UDF. Your code will not need any change, it’s just a switch. And for this case, enabling [00:08:30] pandas UDF makes certain steps three to four times faster.
Also notice you don’t need to write this piece of code every time you use fugue, there can be a centralized place to customize your fugue experience. All data of this demo has been saved on Google cloud. The first a fugue SQL is to allow CSV files from Google cloud. These [00:09:00] CSV file don’t have header. So we define the schema using columns, then we yield them so we can consume them in the following cells. We also bring them for debugging purpose. Notice the execution [00:09:30] of this cell has nothing to do with spark is using default execution engine, which only uses native Python and the Pandas.
Now let’s take a look at the underlying data frame of the yielded airports. It is just a pandas data frame. [00:10:00] Now let’s explore the main data set, the flights information. The flights dump parquet is a folder of parquet files. Using spark will be much faster. To use spark, we just to specify spark for FSQL. It has 2.5 million rows. Now [00:10:30] we need to do some transformation on this big data set, but how can we verify everything is working fine before applying it on the entire dataset? We can use the fugue SQL sample syntax data and yield the small frame to iterate on. In test, we only have 267 rows.
If [00:11:00] you want to iterate quickly or do unit tests using small data set is one frame. But avoiding spark is also a great idea. Although practically at this point, if you were not using fugue, you already see Spark dependencies everywhere in your code. By passing spark [00:11:30] would not be an option anymore. But so far was fugue, you don’t see spark dependencies anywhere. Let’s paste the transformation function. Again, this function is still all related to spark of fugue. It is just a native Python and Pandas function. Adding a few more columns [00:12:00] from the existing columns. The command line will be useful to fugue.
It tells fugue the output schema is the input schema, which is the wild card plus three columns. But since it’s just a command line, it doesn’t build any dependencies on fugue. We all know how to unit test the such a simple function on pandas. And here we just verify the results [00:12:30] by the sample test that different. Okay, this code only works for Pandas data frames. But with fugue this function can work for all supported data frames such as spark and Dask. Let’s see how to use this function and fugue SQL. Transform can directly [00:13:00] apply simple Python functions and to generalize the data frames. The usage has many variations, for example, you can pre-partition the data frame. Then the function will be applied to each partition separately. For more details, you can visit the fugue repository.
Now we want to do one more thing. The output data frame has too many columns. [00:13:30] We want to select just a few and the rename them. This is a typical case that SQL select can do an elegant job. So let’s just do it. This [00:14:00] still runs on pandas not spark. In fugue, SQL is no longer a privilege when using a certain computing framework. Your SQL statement can run on every computing engine Fugue supports including native pandas engine. So this piece of code has a map function and the SQL statement, you could make a more complicated. It just runs without a spark.
[00:14:30] Now let’s bring it to spark and apply on the entire dataset. We just to replace the test the data frame was the load statement and the specified engine spark FSQL. We also yield the results for later use. While it is running let’s continue [00:15:00] our demo. Visualization is very important for data exploration. So how can we enable visualization in [inaudible 00:15:12] fugue SQL? Again, you all I need to write a very simple function that is independent from fugue. Plot and plot bar [00:15:30] will be used as another type of extension in fugue. It is called output [inaudible 00:15:35]. It has to run on the driver.
Here is how to use it. This is a simple piece of code showing several things. First, it is multi task. We aggregated and uploaded data points based on date of year and hour [00:16:00] of a day and hour of a week respectively. Second, it is a more balanced, the case where you see both the select statement as well as the fugue specific syntax output. Third, and again this code is runs on all computing frameworks Fugue supports. Now let’s see a more SQL intensive case. [00:16:30] We joined the flights, airlines and the airports together as info. Then we filtered the data frame to contain only US domestic flights, and we yield both data frames and print both.
[00:17:00] Now let’s do more analysis. In this example, pay attention to that we can use select statements one after another, this avoids gigantic embedded SQL statements actually fugue also supports embedded SQL. If you prefer, [00:17:30] you can still write in that way. Also see how we use simple assignments, syntax to define the intermediate data frames and how we can use them in the following step. Another thing was to mention is anonymity. We can give names to data frames only when necessary for Chaining operations. [00:18:00] We can keep them anonymous. And we also omitted a lot of from clauses.
There is no ambiguity issue and of course you can be explicit on each step in fugue. You can assign variable names and keep from closest if you prefer. Now, imagine how to implement the same logic in [00:18:30] spark. You may realize how much boilerplate code to write to achieve the same thing. Fugue revolutionized SQL the logic is much simpler, cleaner, and more intuitive. Now let’s put everything together. [00:19:00] We just the copy pasted the code together and they removed that yield statements because they are in a single cell now. Yield is no longer necessary. So you can see fugue SQL is a first class programming language to describe complex data pipelines. It’s multi task. It’s grateful SQL heavy pipelines where Python can help at a few steps.
Well, it is running. [00:19:30] Let’s talk about productionization. Notebook magic is a cool idea. It makes development experience much better. However for production, we need a modularized testable code. Let’s see how we can modularize the previous logic. [00:20:00] See, we separated the pipeline to three parts. The first is to get the reference data. The second is to present and clean up the flight data. And the third one is to do data analysis. And then we use yield to output that data frames from the first and second module. Notice in the third one, [00:20:30] we have this [inaudible 00:20:37] . This is how we parameterize the modules. Now let’s see how we can chain them together.
We use FSQL utility function to execute the fugue SQL strings. [00:21:00] For the first inquiry we write to without parameter, meaning that it will run using our native execution engine. For the second and the third, we run them on spark. So if you modularized in this way, we can run each module is a different execution engine. For example, we can run the first with blazing SQL, GPU and run the second [00:21:30] using Dask and run the third using spark. But it’s not a common use case. A more practical use case is that you can run different steps was different. Spark clusters was different size and different settings.
In the end, I want to talk about testing [00:22:00] for this particular case. It was not set up for the best testability , but we still can use it as an example. Look at this EDA query, it use the top data frame. We can use this as an example. So here is how we test was [inaudible 00:22:23]. [00:22:30] Instead of getting outputs from the first two modules, we directly provide mock pandas data frames into the third module. And then we write using native execution engine and assert on the yielded top data frame. Everything will happen with our spark. It is just a normal test on pandas. If you want to test the Unlocal spark, you [00:23:00] only need to specify it in the run. All other things were remained the same. You just need to make sure your testing environment can get a spark session.
Speaker 1: Again, all packages I talk about today are open-sourced. [00:23:30] Thank you.
Speaker 3: All right Hon, can you hear me okay? Okay. Awesome. Thank you. Thank you for that great session and we do have a few folks we’d like to open it up to Q&A. Now we do have a few folks waiting in the queue, so I’m going to let our first one in [00:24:00] for a live question. Did that was a Beam? Did that not work? Beam, I apologize. I clicked to let you in. Okay. I’m going to try our second one and Beam if you can come back, that would be fantastic. Let me try this one. [00:24:30] Okay, for some reason, this is not working the way it usually does. Beam could you put your question in the Q&A portion?
Speaker 1: You can put your questions in the chat box or in the Q&A. Let’s that fine.
Speaker 3: Okay. I’m clicking to let you in and for some reason it’s not automatically giving you [00:25:00] that. Let me try Caroline here. Okay, for those folks who had questions, I apologize. I do not know what is going on with the video, but if you could put your question in the Q&A section, we can absolutely answer that right away. And we have four minutes until this session ends. So we have time for a few questions.
Speaker 1: If you guys still know how to put it in the Q&A section, just [00:25:30] put it in chat box. that is the quicker way and we can also chat in slack after this session. I will be available.
Speaker 3: That’s correct. I put in the chat for everyone to see that there is to join the subsurface workspace on slack. And then you just search for Hon’s name and you can put your question in there and have a live conversation with him there.
Speaker 1: Okay. I see one question. The learning curve for fugue [00:26:00] extension on the SQL slide. So as you can see from the slides, although they are called a fugue extension, but they are just a native Python functions. It does not require any fugue interface or anything special. They’re just native Python functions. The thing you need to learn is probably just how to use those functions as fugue extensions inside [00:26:30] SQL. There are just a few syntax, I think the extension type should be under 10 and the syntax is very limited. So if you read through the fugue documents and you should be able to learn them very quickly and also try to use them only one, you really need them. Don’t learn them systematically. It’s not necessary. Fugue is [00:27:00] just to let you try to eliminate all the unnecessary learning curve for you to achieve your own things. So don’t learn too much just to use it. When you have a question, just to find the answer in a document that is more efficient.
Speaker 3: Okay. Thank you, Hon. We do have another person waiting in the lobby. Let’s see if I’m going to try one more time to let this person in. [00:27:30] And for some reason again that did not work, I apologize. If you could put your question into the chat or the Q&A section, we will get to that right away. And we have about two more minutes until the session ends. Anyone else has any other questions? [00:28:00] Okay. There are no other questions or if the folks who were waiting you can always go to the slack channel or we still have a couple of minutes to put one in here.
But I also wanted to mention that there is a super short Slido session survey on the right-hand side of your screen. If you have an opportunity to fill that out before you leave, we would greatly appreciate it. The next session [00:28:30] is coming up in about five minutes. The expo hall is also open and we encourage you to check out the booth to get demos on the latest tech and win some awesome giveaways as well. So thank you to everyone. Enjoy the rest of the conference. I don’t see in there any other questions coming in here, but again, thank you so much. Thank you, Hon. Hope you guys have a great day.
Speaker 1: Thank you.
Speaker 3: Take care. Bye-bye.
Speaker 1: Bye
Han Wang is the Tech Lead of Lyft’s Machine Learning Platform, focusing on distributed computing solutions. Before joining Lyft, he worked at Microsoft, Hudson River Trading, Amazon and Quantlab. Han is the founder of the Fugue project, aimed at democratizing distributed computing and machine learning.