Dremio Jekyll

Visualizing Amazon SQS and S3 using Python and Dremio

Dremio

Introduction

Nowadays, relevant analysis of different data is an important stage of business and technical research and development. Often the data is received in the form of serial info messages (queues). This is typical for data loggers and recorders, IoT developments, live-tracking systems, communication and navigation systems, etc. After that, the following information is sent to some database for further processing and analysis. The most simple and obvious way to do this is to present the data and its patterns as visual information in the form of graphics, diagrams, etc.

One of the main problems with designing such systems is the different forms of data representation, at key stages of their implementation. For this reason, to perform such tasks, many developers use platforms that can aggregate data from different databases, other sources of information with visualization, research, and analysis tools.

In this article, we will demonstrate a similar data analysis system based on the popular Metro Interstate Traffic Volume dataset.

We will use the Amazon services to implement the next data operations:

  • Simple Queue Service (SQS) – for forming a data queue
  • Simple Cloud Storage Service (S3) – data storage from the queue.

Then, we will implement data analysis and visualization in Python. The system that will aggregate all of these elements is the user-friendly Dremio platform that provides a full set of tools to handle and analyze all kinds of data.

The general structure of our research can be demonstrated as a structural scheme:

image alt text

The general scheme algorithm is described in the following steps:

  • Download and research Metro Traffic Volume data
  • Generate data for messaging to the AWS SQS system
  • Configure AWS SQS for message handling
  • Receive the messages and store them to AWS S3
  • Connect the AWS S3 database and python environment to Dremio
  • Data visualization and analysis.

Assumptions

The following items should be installed and configured:

Environments:

Python packages:

  • Pandas, Numpy, Datatime, Matplotlib, Plotly, Seaborn
  • PyODBC and AWS SDK Python.

Data in Amazon services

The first step is to configure Amazon services: SQS to process and to send info messages and S3 as the data storage. For the general configuration of these services, you need to possess the following:

  • Get registered at Amazon (if you don’t have an account)
  • Get AWS user’s credentials.

image alt text

As a result, you will get the user ID and access key ID. That’s all you need to do to access AWS services.

AWS SQS

Amazon Simple Queue Service (SQS) is a message queue service for sending, storing, and receiving information messages with high performance and reliability.

Using the AWS console, we launch the SQS service and create a New queue.

image alt text

The parameters for Queues are as follows:

  • Queue name – short name
  • Queue type – Standard or FIFO queries
  • Custom queue settings.

The difference between Standard or FIFO queries is in the priority and order in receiving messages. Since the data order for statistical and graphical analysis is not important, we select Standard queue type and default custom queries settings. As a result, we will get a pre-configured work queue.

image alt text

Important parameters to access the created queue are URL and APN. They are worth storing.

AWS S3

Amazon Simple Storage Service (Amazon S3) is a security-oriented storage service known for its high-performance, scalability, and availability. We will use the functionality of this service to store info messages received from the SQS queue.

Configuration of this service is quite simple and involves the next steps:

  • Creation of the bucket
  • Setting up access
  • Defining the path to the bucket.

By following these steps, you will store info messages in S3 data storage.

image alt text

The process of uploading information to S3 storage using the AWS web interface has been described in detail in the previous articles. But in this article, we will perform the same process with AWS SDK Python.

Data overview

The data represents Metro Interstate Traffic Volume. Also, data includes hourly weather features and holiday information.

We’ll upload this data to the Jupyter Notebook and use Pandas to correctly process data. Then, we will send it to the Producer (Messages sender).

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import pandas as pd
import os
# Path to data
csv_file = os.path.join(os.path.abspath(os.curdir),\
      	'Metro_Interstate_Traffic_Volume.csv')
# Load CSV to DataFrame
df_csv = pd.read_csv(csv_file)
# Data shape
print('shape df:', df_csv.shape)
# Columns in Data
print(df_csv.columns)
# Unique elements in columns
for column in df_csv:
	print(f'\n {column}: {df_csv[str(column)].unique()}')

Therefore, the dataset shape is:

image alt text

The data features are represented by the following columns:

image alt text

The unique columns elements are shown in the following format:

image alt text

In total, we have categorial, numerical, and time data that will be useful for creating visualizations.

All the messages for producer and consumer must be presented in JSON-message format. So for the AWS SQS, it will look like the following:

image alt text

Producer and Consumer for SQS

Producer and Consumer are some devices that perform the following operations: send messages, receive and then store in a database. In our case, we’ll use the AWS SDK tool to emulate their operations.

Therefore, the code to emulate sending info messages to the SQS queue is as follows:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import boto3
# AWS CREDENTIALS
aws_access_key_id = "user_ID"
aws_secret_access_key = "access_key_ID"
# Initialize SQS client
sqs = boto3.client('sqs')
# URL to SQS
queue_url = 'https://sqs.eu-north-1.amazonaws.com/xxxxx/simple-request'
# Sending messages to SQS in JSON-format
for ind, row in df_csv.iterrows():
 	response = sqs.send_message(
    	QueueUrl=queue_url,
    	DelaySeconds=3,
    	MessageBody=(row.to_json()))

Note that for the proper operation, you need to paste previously saved User ID, Access key ID, and *URL for SQS data. We can see the result of the producer code execution in the AWS SQS console.

image alt text

We can also edit messages using the console functionality: view, delete, or send messages. But we’ll do all these operations with the Consumer by the AWS SDK.

The Consumer has two parts, the receiver and the sender. The receiver gets and deletes the message from the SQS queue, and sender forwards the messages to S3 storage. The receiver part is shown below:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
def get_messages_from_queue(queue_url):

 	'''queue_url: URL to SQS'''
	sqs_client = boto3.client('sqs')
	messages = []
	while True:
    	response = sqs_client.receive_message(
        	QueueUrl=queue_url,
        	AttributeNames=['All'],
        	MaxNumberOfMessages=10)
         try:
             messages_body = list(map(lambda x: json.loads(x['Body']),
                       	response['Messages']))
             messages.extend(messages_body)
         except KeyError:
        	    break
         entries = [{'Id': msg['MessageId'], 'ReceiptHandle':
                   msg['ReceiptHandle']} for msg in response['Messages']]
         response = sqs_client.delete_message_batch(QueueUrl=queue_url,
               	    Entries=entries)
	return messages
# Receiving and deleting messages from SQS
sqs_messages = get_messages_from_queue(queue_url)

The sender part of the Consumer is in the following code:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
def create_bucket(bucket_name, region):
	"""Create an S3 bucket"""
	s3_client = boto3.client('s3', region_name=region)
	location = {'LocationConstraint': region}
	s3_client.create_bucket(Bucket=bucket_name,
                           CreateBucketConfiguration=location)
def upload_file(file_name, bucket, object_name):
	"""Upload a file to an S3 bucket
	:param file_name: File to upload
	:param bucket: Bucket to upload to
	:param object_name: S3 object name"""
	# Upload the file
	s3_client = boto3.client('s3')
	response = s3_client.upload_file(file_name, bucket, object_name)
# Creating S3 bucket
create_bucket('metro-traff', region='eu-north-1')
# Upload the file
upload_file('data_for_S3.csv', 'metro-traff', 'metro-dataset')

We can find the result of this code execution in AWS S3 console:

image alt text

Finally, we get the Metro Interstate Traffic Volume data uploaded to AWS S3 storage. The important parameter for saving is the Object URL that we will use to access this data.

Note that if you need to configure the access data settings, you can do this by setting the parameters in the Permission tab.

Connecting Dremio to Amazon services and data curation

The main steps for connecting Dremio to AWS S3 storage have been described in detail in the previous article. So in this article, we will focus only on the main steps of the connection process and give more attention to the data curation steps.

First, add a new data source:

  • Select the type of data source - Amazon S3.
  • Determine the name of the data source – Metro-traffic in our case.
  • Insert credentials for Amazon S3: AWS Access Key and AWS Access Secret.
  • Select the encryption mode of the connection (if needed).
  • Click the Save button.

After we complete these steps, we get a new data source in Dremio.

image alt text

In general, the procedures and steps in data curation focus on the following:

1. Defining the data format in the data source:

  • Open the file that is connected from S3 storage
  • Define the data format as text
  • Define column delimiter (data fields) as a comma
  • Extract the data field name from the first row
  • Click the Save button.

image alt text

2. Transforming a data source in Dremio space to the virtual data set:

  • Click the Save As… button
  • Give a name for a new virtual dataset
  • Select the needed space
  • Click the Save button.

image alt text

3. Configuring data columns in the proper format:

  • Select the column with the numeric (time) data values
  • Select the Convert Data Type option
  • Select the required data format
  • Put the needed option for null-values
  • Select Drop Source Field to delete the old column
  • Press the Save button
  • Repeat the procedure for each column where you need to change the format.

image alt text

4. Completing data curation operations:

image alt text

  • Save all changes by clicking the Save button
  • Copy path to the virtual dataset in Dremio space.

Connecting Python to Dremio

We will use the pyodbc library and Pandas DataFrame for connection of the Jupyter Notebook to Dremio spaces and data.

The connection algorithm is quite simple and includes the following operations:

  • Defining Dremio’s credentials
  • Generating a query with credentials data
  • Initializing the connection with the Dremio spaces by the ODBC connector
  • Requesting an address to data in the Dremio instance
  • Loading data.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import pandas as pd
import pyodbc
# Dremio's Credentials
user_id = 'dremio_user'
user_password = 'dremio_pass'
# Forming credential query
credential_query = f'DSN=Dremio Connector;\
                     UID={user_id};PWD={user_password}'
# Setup the ODBC connection to the Dremio instance
dremio_connetion = pyodbc.connect(credential_query, autocommit=True)
# SQL query to for Dremio’s data
sql = r'SELECT * FROM "metro-data"."metro-traffic"'
# Executing the query with ODBC connection
df = pd.read_sql(sql, dremio_connetion)
df.head(5)

As a result, we receive DataFrame table data ready for visualization.

image alt text

Data visualization

Data visualization is an important step that helps to understand the structure, patterns, characteristics, and data attributes. To visualize the Metro Interstate Traffic Volume data, we’ll use Seaborn, Matplotlib, and Plotly packages.

Firstly, let’s analyze the general patterns and dependencies of traffic and temperature values by time data.

For this, we need to transform the data_time column into the datatime format.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import datetime
def to_data_time(element):
    '''convert str to DataTime format'''
	return datetime.strptime(element, '%Y-%m-%d %H:%M:%S')
df['date_time'] = df['date_time'].apply(to_data_time)

To plot this graphic, we will use the *stackplot* function.

import matplotlib.pyplot as plt
import numpy as np
# variables
temp = df.temp.values
traff = df.traffic_volume.values
time = df.date_time.values
# to array
temp_traff = np.array([(temp/temp.max())* 100,
                   	(traff/traff.max())* 100])
# plot function
fig, ax = plt.subplots(figsize=(7, 5))
ax.stackplot(time,
             temp_traff,
             labels=['temperature', 'traffic'])
ax.set_title('Traffic and Temperature by Time')
ax.legend(loc='upper left')
ax.set_ylabel('Values in %')
ax.set_xlabel('Time')
ax.set_xlim(xmin=time[0], xmax=time[-1])
fig.tight_layout()

As a result, we obtain a chart showing that the temperature and traffic values have seasonal winter and summer variations.

image alt text

As we can see, these graphical dependencies have approximately the same shape, so let’s make sure of that by comparing them with scatterplot and categorical traffic data.

1
2
3
4
5
6
7
8
9
10
11
12
# split to category
def traffic_category(value):
	return 'High' if value > 4000 else 'Low'
                  if value < 1000 else 'Average'
df['traffic_categorial'] = df['traffic_volume'].apply(traffic_category)
# plot
sns.scatterplot(y='traffic_volume',
                x='temp',
                hue='traffic_categorial',
                data = dff.loc[dff.temp > 50])
plt.title('Traffic and Temperature')
plt.show()

image alt text

As expected, these values have approximately the same shape.

Next, we will perform statistical characteristics of weather data and traffic values with the histogram and density plot.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# define weather (rain) data
weather = df[['snow_1h','rain_1h']].loc[dist['snow_1h'] > 0.001]
# plot histogram
weather.plot.kde(ax=ax, legend=False)
weather.plot.hist(density=True, ax=ax)
ax.set_ylabel('Probability')
ax.set_title('Histogram: Rain vs Snow')
sns.jointplot(x="clouds_all",
              y="traffic_volume",
              data=df,
              kind="kde",
    	     space=0,
              color="r")
plt.title('Destiny plot for Traffics and Clouds')
plt.show()

image alt text

After analyzing the graphs, we can assume that the rain and snow data have an exponential distribution due to their seasonality. We also see that clouds have an impact on traffic.

The dataset contains categorical data on holidays and weather. Therefore, we will count their quantity and build pie diagrams.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# Holidays data
# count elements
count_unique = df['holiday'].value_counts()
count_unique_limited = count_unique[count_unique < 10]
ax.pie(count_unique_limited,
   	labels=count_unique_limited.index,
       autopct='%1.1f%%',
   	startangle=90)
# draw circle
centre_circle = plt.Circle((0,0),0.70,fc='white')
fig = plt.gcf()
fig.gca().add_artist(centre_circle)
# Equal aspect ratio ensures that pie is drawn as a circle
ax.axis('equal')
plt.tight_layout()
plt.title('Holidays', weight='bold', fontsize=16)
plt.show()

image alt text

The code for the second diagram is not inference, because it is identical to the one above. Also, note the Holiday pie diagram is built without null data. Next, we’ll analyze the traffic intensity on Holidays with countplot.

1
2
3
4
5
6
7
# Traffic intensity
sns.countplot(y='holiday',
              hue='traffic_volume_category',
              data=df[df['holiday'] != 'None'])
plt.legend(loc='upper right')
plt.title('Traffic intensity by Holidays')
plt.show()

image alt text

Analyzing the received graphic, we see that there is no high-intensity traffic on holidays.

Next, we will use pyplot and seaborn to build and analyze the effect of traffic intensity on weather and time data.

1
2
3
4
5
6
7
8
9
10
11
12
# traffic intensity on weather data
import matplotlib.pyplot as plt
import matplotlib.ticker as ticker
# group data
dff.groupby(['weather_main','traffic_categorial']).size()
       	.groupby(level=0).apply(lambda x: 100 * x / x.sum())
       	.unstack().plot(kind='bar', stacked=True)
# plot
plt.gca().yaxis.set_major_formatter(ticker.PercentFormatter())
plt.title('Traffic intensity by weather')
plt.xticks(rotation=45)
plt.show()

image alt text

1
2
3
4
5
6
7
8
9
# traffic intensity on time data
# group data
dff_heatmaps = dff.groupby(['date_time_month', 'date_time_year'])
	          .size().reset_index()
              .pivot(columns='date_time_month', index='date_time_year')
# plot
sns.heatmap(dff_heatmaps, annot=False, linewidths=.5)
plt.title('Heatmap Traffic intensity by Years and Months')
plt.show()

image alt text

The first diagram shows that for our data, there is no high correlation between traffic intensity and weather data. Heatmap Traffic intensity demonstrates that traffic intensity in 2017-2018 was higher.

Below, we will use the seaborn statistical analysis tools to analyze the traffic intensity distribution by time data: hours, days and months. For this purpose, with the datatime library, we will extract from the data_time column: time, days of the week, months, years, into separate columns of the Pandas dataframe.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# extract data time
def time_category(value):
	return 'Evening' if value > datetime.time(18, 0) else 'Day'
                    if value < datetime.time(11, 0) else 'Evening'
# create new columns
df['date_time_year'] = df['date_time'].apply(lambda x: x.year)
df['date_time_time'] = df['date_time'].apply(lambda x: x.time())
df['date_time_month'] = df['date_time'].apply(lambda x: x.month_name())
df['date_time_day'] = df['date_time'].apply(lambda x: x.day_name())
df['date_time_category'] = df['date_time_time'].apply(time_category)

# violin plot
sns.factorplot(x="date_time_year", y="traffic_volume", hue="date_time_category",
data=df, palette="colorblind", kind='violin', legend=True)
plt.title('Traffic intensity by Year, Day Time')

# boxplot
sns.boxplot(x='date_time_month', y='traffic_volume', data = df)
plt.xticks(rotation=45)
plt.title('Traffic volume by Month day')
plt.show()

image alt text

After analyzing the data with the violin plot, we can conclude that the traffic has more intensity in the evenings and this pattern remains for all years. At the same time, the average traffic value for each of the months is approximately the same.

Next, we explore the impact of days of the week on the traffic volume. We’ll use catplot and boxplot tools for this purpose.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# catplot
sns.catplot(x="date_time_day",
        	y="traffic_volume",
            hue="traffic_volume_category",
	         col="date_time_category",
        	  data=df)

# boxplot

# define last years
dff_years = dff[(dff.date_time_year == 2017)|(dff.date_time_year == 2018)]
sns.boxplot(x="date_time_day",
            y="traffic_volume",
   	     hue="date_time_year",
        	data=dff_years)
plt.title('Traffic volume by Week day and Years')
plt.show()

image alt text

These diagrams show that the traffic volume and intensity decrease at weekends. This pattern remains the same for daytime and evening time, and in general for 2017-2018 years.

Conclusion

This article focuses on data visualization with Dremio, Python, Amazon SQS and S3 services. We have created a producer and a consummate for the AWS SQS service and set up sending and receiving messages using AWS SQS queue. Then, we have configured AWS S3 service and connected it to Dremio. After data curation in Dremio, we have connected to Python and performed data visualization in Jupyter notebook using popular open-source libraries.