Introduction
Modern businesses generate, store, and use huge amounts of data. Often, the data is stored in different data sources. Moreover, many data users are comfortable to interact with data using SQL while many data sources don’t support SQL. For example, you may have data inside a data lake or NoSQL database like MongoDB, or even inside a search engine like ElasticSearch.
All of these tools can be perfect for storing data. But when we need to use data simultaneously from several storages, we come to an issue. We need to have specialists who build a data pipeline and staging layer for data preparation. Also, it is desirable to have the ability to speak with this staging layer using SQL. So, before taking the data and working with it directly, we need to do a lot of things and to use many resources. It is not very convenient.
Dremio can help you with this issue. It is an instrument for working with scattered data sources. Dremio can easily join datasets stored in data lakes, data warehouses, traditional on-premise databases and cloud storages. In addition, it provides an ability to use the familiar SQL language for working with any of those sources, even if they do not support SQL. Dremio also optimizes your queries and accelerate datasets. So, you will get more convenience, more speed, and spend less time. You don’t need to have a team of engineers to start to work with the data. You simply use Dremio instead. It has a good web-based UI, where you can do many things just by clicking and drag-and-dropping. If you need to do something advanced or non-standard, it is possible to prepare complex SQL queries and let Dremio deal with them.
In this tutorial, we want to show you how to use Dremio when you need to create a machine learning model for time series prediction. The dataset we are going to use is the Sales Transactions Weekly Dataset. Dremio will help us to simplify the basic data preparation before training a machine learning model.
Assumptions
In this tutorial, we assume that you have the following items already installed and setup:
- Dremio
- AWS account
- Microsoft Azure account
- PyODBC
- Dremio ODBC Driver
- Jupyter Notebook environment
- Ubuntu OS (optional.)
For the purposes of this tutorial, we will work with a subset of data stored in ADLS Gen2 and another one stored in AWS S3.
Data curation with Dremio
Our data is split into 4 files: first.csv and second.csv are stored in S3, third.csv and fourth.csv are stored on ADLS Gen2.
The first step we need to do is to connect Dremio with our data sources and join the data. Click on the Add new source button in the Sources section.
Then, select Amazon S3 and ADLS Gen2 options on this window:
To create the sales_s3_source we enter its name and use AWS Access Key and AWS Access Secret which you can generate in the IAM section of the AWS platform:
To establish a connection between ADLS Gen2 and Dremio specify the storage account name (dremiosalesstorage), the name of the source (sales_adls2_source) and the Shared Access Key, which can be found in the Access Keys section of the storage account on the Azure platform:
The final step is to format the files:
Now it’s time to join both datasets. This demonstrates how useful Dremio is: you can work from the UI or using SQL with completely different data sources.
We can join the datasets in two ways. This first is the gradual way: initially, we join first.csv and second.csv, then we join third.csv and fourth.csv, and finally, we join two temporary parts into one large dataset.
Create the space sales_space to save new datasets inside it. To do this, click the corresponding button in the Spaces section and enter the name of the new space.
Go to the first.csv (inside sales_s3_source). Click on the Join button:
Select the dataset which you want to use to join with the current dataset (in this case, we want to join the first.csv with the second.csv) and click the Next button:
Now select the fields for joining (foreign keys). We will use Product_Code fields. Then click the Apply button:
Save the joined dataset as part1 in the *sales_space *(using the Save As… button):
Repeat all described actions for third.csv and fourth.csv files. Call the joined dataset as part2 and save it in the sales_space also. In the result, you should have both part1 and part2 datasets located in the sales_space:
Now we need to join these parts of the whole datasets. Note, that previous joins can be called “vertical” join. This is because one file contained certain part of the columns and another file had certain part of the columns. But the number of rows was the same and we just wanted to have all columns in one place. Now the situation is different. Datasets part1 and part2 contain the same columns. What we need to do is to join part1 which has rows 1-400 with part2 which has all other rows of data. To do this, we will issue an SQL query.
In Dremio, SQL queries can be written in the SQL Editor. To open it, click on the corresponding button. You can see the image with the needed query below. The central element of the query is the UNION statement. This statement is used to get rows from the subqueries from both sides from it.
As a result, we got a data frame where all rows with products are presented (both 1-400 and 400+):
We will save this joined dataset as united_way1 *inside the *sales_space.
This was the first step to join all datasets. Dremio allows executing arbitrary SQL queries. So we can issue a query which will do all these actions immediately without the creation of temporary datasets (like part1 and part2). Let’s try to get the same result as above but only with a single SQL query.
Go inside first.csv file. Open the SQL Editor. And execute the following query:
SELECT * FROM "first.csv" AS a
INNER JOIN "second.csv" AS b
ON a.Product_Code = b.Product_Code
UNION
SELECT * FROM sales_adls2_source.salesfs."third.csv" AS c
INNER JOIN sales_adls2_source.salesfs."fourth.csv" AS d
ON c.Product_Code = d.Product_Code
ORDER BY CAST(A AS FLOAT) ASC
This query will yield the same dataset as the previous approach. Note that we had to specify the full paths to third.csv and fourth.csv files because we operated from another environment (from the environment of the sales_s3_source).
If you are comfortable with SQL we recommend you the second way. It is quick and also Dremio can perform advanced optimization of queries to provide better efficiency.
Now it’s time to make our joined dataset looks prettier and prepared for the further ML model creation. We will work with the united_way1 dataset.
First, let’s drop unneeded columns A, A0, and Product_Code0. To do this, click on the button with an arrow near the column name to call the drop-down menu. Then select the Drop option:
Now we want to rename the Product_Code column to shorter Product. Renaming can be performed from the drop-down menu as well:
Another thing we may want to do is to change the data type in columns. The little Abc sign means that the column has the Text type of data. Actually, all data here is of Integer data type. To convert the data type, click on Abc sign and then select the target type of data:
After converting, the sign near the column name will be changed to #(this means that the column has the Integer type of data).
Converting the data type of single or several columns is easy using the described above approach. At the same time, we have 52 columns which demand to be converted. When you have a lot of columns to convert, it is more convenient to write an SQL query. You can click the SQL Editor button to look at the current SQL query generated by Dremio and understand how the converting is performed:
This is all we planned to do with the data in Dremio. Using the Save As button we saved this dataset as sales_curated in the sales_space.
Connecting Python to Dremio
Now we will use Python to build a machine learning model. The first thing we need to do is to make the dataset from Dremio accessible in Python.
To work with Dremio datasets from Python we need to have an ODBC driver and a Pyodbc Python’s library. See the full code for importing the sales_curated dataset into Python script:
import pandas as pd
import pyodbc
host='localhost'
port=31010
uid ='dremio_username'
pwd = 'dremio_password'
driver = '/opt/dremio-odbc/lib64/libdrillodbc_sb64.so'
cnxn = pyodbc.connect("Driver={};ConnectionType=Direct;HOST={};PORT={};AuthenticationType=Plain;UID={};PWD={}".format(driver,host,port,uid,pwd),autocommit=True)
sql = "SELECT * FROM sales_space.sales_curated"
df = pd.read_sql(sql,cnxn)
In the first two rows we import pandas as pyodbc packages.
The next group of statements consists of the parameters needed to establish a connection. Enter the host, port, your Dremio credentials and path to the ODBC driver.
The cnxn variable creates a connection with the pyodbc.connect() method.
Then you need to enter a simple SQL query (like SELECT) and use read_sql() method of Pandas to fetch the data into the df variable. That’s it, your data should be imported:
Building time series prediction model
There are several approaches for time-series forecasting. For example, we can select one product and build models for this specific item. Or we can create a model which will take into account several products and use information about all of them to predict sales of the given product. Theoretically, the second approach can be more accurate, because it is true that sales of one product can influence sales of some other products. But in practice, it can be more beneficial to use an individual model for every product. Usually, it is worth to research and try several ways before focusing on something particular. Such research is out of the scope of this tutorial, so we will build a model for one specific product.
Another aspect of the model selection is what algorithm to use. It is possible to use pure statistical algorithms (i.e moving average), traditional machine learning algorithms (i.e. random forest), or even neural networks. We will try all of these options.
Let’s import needed libraries:
import pandas as pd
import numpy as np
%matplotlib inline
import matplotlib
import matplotlib.pyplot as plt
from sklearn.metrics import mean_absolute_error
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestRegressor
from statsmodels.tsa.arima_model import ARIMA
import tensorflow as tf
First, we need to filter out the Product column. The following Python’s statement does the work:
df2 = df[[col for col in df if col not in ['Product']]]
We will forecast sales for the Product0, so we need to take the Pandas Series for this item. Also, we want to visualize the time series:
ts = df2.iloc[0]
ts.plot(figsize=(10,5))
Below you can see the visualization of the given time series. It is important to visualize data before creating any predictive models.
The first approach we are going to try is a simple moving average. The moving average is the mean of several consecutive values in a time series. It is called “moving” because it is computed within a window (frame) and this window is moved along the time series to produce moving average values for the entire sequence. A moving average can be used in the data preparation process for smoothing time series or in feature engineering for creating new features. It is a simple algorithm and that’s why it is often used as a baseline for forecasting models.
First, we need to define functions plot_series() and moving_average_predict(). The plot_series() function will be used several times for creation of plots with time series. The moving_average_predict() function takes time series and window size as inputs and generates the predictions for the whole time series. This means that to look only at predictions for testing dataset we will need to manually slice the output of this function.
def plot_series(time, series, format="-", start=0, end=None):
plt.plot(time[start:end], series[start:end], format)
plt.xlabel("Time")
plt.ylabel("Value")
plt.grid(True)
def moving_average_predict(series, window_size):
predictions = []
for time in range(len(series) - window_size):
predictions.append(series[time:time + window_size].mean())
return np.array(predictions)
Now we want to make predictions using moving average. We have 52 datapoints in our dataset. So, let’s take the first 40 entries as train part and the remaining entries as the test part. Also we set 5 as the window size. This means that we take into account the last 5 data points when predicting the next value.
split_time = 40
moving_avg = moving_average_forecast(ts, 5)[split_time - 5:]
plt.figure(figsize=(9, 5))
ts[split_time:].plot()
plot_series(ts[split_time:].index, moving_avg)
Here is the plot where you can see the actual (blue line) and the forecasted (yellow line) time series for the testing part of the dataset. We need to visualize the results of the model to better understand how well is it working and probably detect the weak places of the model.
To estimate the quality of the model we will use the sklearn’s mean_absolute_error() function (here and further). Mean absolute error (MAE) for the moving average approach equals 3.15:
Taking into account that the range of the actual values for this time-series is around 0-20, the average error of MAE=3.15 is not very good. If we will predict the constant average time-series value (around 9.63) for any observation, we will get the MAE=2.88 on the test set, which is better than the results from moving average algorithm. But as a baseline, it is acceptable.
Now we will build a more complex model called ARIMA. ARIMA stands for the Autoregressive Integrated Moving Average. When speaking about ARIMA the following notation is often used: ARIMA(p,d,q). In this notation, p - is the number of lag data points used in the model, d - the degree of differencing, q - the size of the window used for moving average. The differencing is the technique for making time series stationary. Time series is called non-stationary if its summary statistics depends on time. For example, if there are seasonality or trends in the time series, such a time series is non-stationary. For time series forecasting it is better to work with stationary time series.
We will use the statsmodels’ implementation of ARIMA. The next piece of code does all the work:
train, test = ts[:split_time], ts[split_time:]
history = [x for x in train]
predictions = []
for t in range(len(test)):
model = ARIMA(history, order=(5,1,0))
model_fit = model.fit(disp=0)
output = model_fit.forecast()
prediction = output[0]
predictions.append(prediction)
actual = test[t]
history.append(actual)
In the first row, we create a train and test datasets. Then we create the history list which is a copy of the train part. Also, we create an empty predictions list. Then we loop over the test dataset. On each step, we create the ARIMA model and fit it to the history dataset. Then we generate the prediction and append it to the predictions list. At the end of the iteration, we append the actual value for the next timestamp to the history list to be able to train on this datapoint the ARIMA model on the next iteration. As you can see, we are using the ARIMA(5,1,0) configuration.
The mean absolute error given by the ARIMA model is around 3.22, which is even worse than the simple moving average (actually, it is roughly the same quality). Perhaps you will be able to improve this metric by playing with the ARIMA parameters.
The plot of actual vs. predicted by the ARIMA model values for the test dataset you can see below:
Now it’s a time for trying some of the traditional machine learning models. The first step is to prepare the dataset as if it is a supervised learning task:
df = pd.DataFrame()
df['t'] = ts.values
df['t-1'] = df['t'].shift(1)
df['t-2'] = df['t'].shift(2)
df['t-3'] = df['t'].shift(3)
df['t-4'] = df['t'].shift(4)
df['t-5'] = df['t'].shift(5)
df.dropna(inplace=True)
Here is what we got:
The column t is our target variable, while all other columns are the input features. We take the values for the last 5 timestamps as features. Each observation is an independent example, so we can randomly split the dataset into train and test parts (with shuffling, which was not possible earlier for moving average and ARIMA).
At the same time, this approach makes our dataset even smaller:
Here we can see very clearly the lack of data issue. The dataset with only 47 examples is insufficient to train a good predictive model. We need to have both train and test datasets, so the training part will be even less than 47 data points. The more input features we want to generate, the less training data we will be able to use. Actually, in this situation, we need to think of how to solve the issue and where to get more data. If there is no data for this particular product, we can use transfer learning - for example, by training a model on other products and then continue training on the given product. But this is out of the scope of our tutorial, so we will just train the model on the data we have.
Below we set the target variable and the input features. Also, we split the dataset into train and test part so that the testing dataset will have 10 observations.
y = df['t']
X = df[[col for col in df if col != 't']]
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=10, random_state=42)
In the next cell, we create an instance of the RandomForestRegressor model with the default parameters and the random_state=42. Then we train this model:
model = RandomForestRegressor(random_state=42)
model.fit(X_train, y_train)
In the next image, you can see that the mean absolute error of the Random Forest model is around 2.66. Actually, it depends on the random state and can be either lower and greater. But generally, we can say that this model is slightly better than the previous models. Perhaps it could be better if we had more training data.
Finally, we want to try a neural network (an LSTM architecture). Ideally we would use a larger dataset to train neural networks, butабо 10 січня, або вже we will show you the general approach.
In the next cell, we split the dataset by the split_time value:
x_train = ts[:split_time]
time_train = [i for i in range(0, split_time)]
time_valid = [i for i in range(split_time, len(ts))]
x_valid = ts[split_time:]
The code below defines the function for the creation of the dataset needed for the neural network training. Also, we specify other parameters. The most important is window_size. We set it to ten because five is too small a number to take as the window size for the neural network. The last row of the code creates a dataset using the defined function.
window_size = 10
batch_size = 16
shuffle_buffer_size = 50
def windowed_dataset(series, window_size, batch_size, shuffle_buffer):
dataset = tf.data.Dataset.from_tensor_slices(series)
dataset = dataset.window(window_size + 1, shift=1, drop_remainder=True)
dataset = dataset.flat_map(lambda window: window.batch(window_size + 1))
dataset = dataset.shuffle(shuffle_buffer).map(lambda window: (window[:-1], window[-1]))
dataset = dataset.batch(batch_size).prefetch(1)
return dataset
dataset = windowed_dataset(x_train, window_size, batch_size, shuffle_buffer_size)
We are going further and define the architecture of the neural network. As you can see from the code snippet below, this is a sequential model which consists of the input Lambda layer (for the input transformation), the bidirectional LSTM layer with 16 neurons, and the output dense layer with just one neuron.
tf.keras.backend.clear_session()
tf.random.set_seed(10)
np.random.seed(10)
tf.keras.backend.clear_session()
model = tf.keras.models.Sequential([
tf.keras.layers.Lambda(lambda x: tf.expand_dims(x, axis=-1),
input_shape=[None]),
tf.keras.layers.Bidirectional(tf.keras.layers.LSTM(16, return_sequences=True)),
tf.keras.layers.Dense(1)
Now we need to compile the model and trigger its training. To do this, use the following code:
model.compile(loss=tf.keras.losses.Huber(), optimizer='adam', metrics=["mae"])
history = model.fit(dataset, epochs=200)
As you can see, we use the Huber loss, the Adam optimizer, and mean absolute error as a target metric. Since the dataset is very small, the training is fast and we can run it for 200 epochs. Here is what we have got at the end of the training:
The final MAE is around 2.84. It is better than when using statistical models but worse than when using the random forest model. Probably, if we had more data, the LSTM approach would be the best.
Conclusion
In this tutorial, we have used Dremio to work simultaneously with two different data sources - AWS S3 and Azure Data Lake Storage Gen2. The aim was to create a time-series predictive model. Without Dremio we would need to create a complex pipeline for data preparation. But Dremio provided us with a simple and fast way to join different parts of the dataset and perform basic data curation using UI and SQL queries. In the result, we have trained four predictive models for forecasting the sales amount for the next week. The used approaches included a simple moving average, ARIMA model, random forest machine learning model, and the neural network of the LSTM architecture.
The random forest model provided the best results (MAE was around 2.66). The simple moving average approach produced the MAE around 3.15, ARIMA - 3.22, and the neural network - 2.84. After the creation of the models, we made sure that we need to use some more sophisticated approaches to create a good model. The forecasting quality can be significantly improved if someone will work more on this issue. In particular, we need to address the lack of data for training. This can be done, for example, by using transfer learning technique. Another perspective solution is to build a model which will take time-series from several products as inputs for predicting future sales for the given product.