29 minute read · August 20, 2019
Visualizing Amazon SQS and S3 using Python and Dremio
· Dremio Team
Introduction
Nowadays, relevant analysis of different data is an important stage of business and technical research and development. Often the data is received in the form of serial info messages (queues). This is typical for data loggers and recorders, IoT developments, live-tracking systems, communication and navigation systems, etc. After that, the following information is sent to some database for further processing and analysis. The most simple and obvious way to do this is to present the data and its patterns as visual information in the form of graphics, diagrams, etc.
One of the main problems with designing such systems is the different forms of data representation, at key stages of their implementation. For this reason, to perform such tasks, many developers use platforms that can aggregate data from different databases, other sources of information with visualization, research, and analysis tools.
In this article, we will demonstrate a similar data analysis system based on the popular Metro Interstate Traffic Volume dataset.
We will use the Amazon services to implement the next data operations:
- Simple Queue Service (SQS) – for forming a data queue
- Simple Cloud Storage Service (S3) – data storage from the queue.
Then, we will implement data analysis and visualization in Python. The system that will aggregate all of these elements is the user-friendly Dremio platform that provides a full set of tools to handle and analyze all kinds of data.
The general structure of our research can be demonstrated as a structural scheme:
The general scheme algorithm is described in the following steps:
- Download and research Metro Traffic Volume data
- Generate data for messaging to the AWS SQS system
- Configure AWS SQS for message handling
- Receive the messages and store them to AWS S3
- Connect the AWS S3 database and python environment to Dremio
- Data visualization and analysis.
Assumptions
The following items should be installed and configured:
Environments:
- Dremio and Dremio ODBC Driver
- Amazon account
- Jupyter Notebook (or other python IDE).
Python packages:
- Pandas, Numpy, Datatime, Matplotlib, Plotly, Seaborn
- PyODBC and AWS SDK Python.
Data in Amazon services
The first step is to configure Amazon services: SQS to process and to send info messages and S3 as the data storage. For the general configuration of these services, you need to possess the following:
- Get registered at Amazon (if you don’t have an account)
- Get AWS user’s credentials.
As a result, you will get the user ID and access key ID. That’s all you need to do to access AWS services.
AWS SQS
Amazon Simple Queue Service (SQS) is a message queue service for sending, storing, and receiving information messages with high performance and reliability.
Using the AWS console, we launch the SQS service and create a New queue.
The parameters for Queues are as follows:
- Queue name – short name
- Queue type – Standard or FIFO queries
- Custom queue settings.
The difference between Standard or FIFO queries is in the priority and order in receiving messages. Since the data order for statistical and graphical analysis is not important, we select Standard queue type and default custom queries settings. As a result, we will get a pre-configured work queue.
Important parameters to access the created queue are URL and APN. They are worth storing.
AWS S3
Amazon Simple Storage Service (Amazon S3) is a security-oriented storage service known for its high-performance, scalability, and availability. We will use the functionality of this service to store info messages received from the SQS queue.
Configuration of this service is quite simple and involves the next steps:
- Creation of the bucket
- Setting up access
- Defining the path to the bucket.
By following these steps, you will store info messages in S3 data storage.
The process of uploading information to S3 storage using the AWS web interface has been described in detail in the previous articles. But in this article, we will perform the same process with AWS SDK Python.
Data overview
The data represents Metro Interstate Traffic Volume. Also, data includes hourly weather features and holiday information.
We’ll upload this data to the Jupyter Notebook and use Pandas to correctly process data. Then, we will send it to the Producer (Messages sender).
import pandas as pd import os # Path to data csv_file = os.path.join(os.path.abspath(os.curdir),\ 'Metro_Interstate_Traffic_Volume.csv') # Load CSV to DataFrame df_csv = pd.read_csv(csv_file) # Data shape print('shape df:', df_csv.shape) # Columns in Data print(df_csv.columns) # Unique elements in columns for column in df_csv: print(f'\n {column}: {df_csv[str(column)].unique()}')
Therefore, the dataset shape is:
The data features are represented by the following columns:
The unique columns elements are shown in the following format:
In total, we have categorial, numerical, and time data that will be useful for creating visualizations.
All the messages for producer and consumer must be presented in JSON-message format. So for the AWS SQS, it will look like the following:
Producer and Consumer for SQS
Producer and Consumer are some devices that perform the following operations: send messages, receive and then store in a database. In our case, we’ll use the AWS SDK tool to emulate their operations.
Therefore, the code to emulate sending info messages to the SQS queue is as follows:
import boto3 # AWS CREDENTIALS aws_access_key_id = "user_ID" aws_secret_access_key = "access_key_ID" # Initialize SQS client sqs = boto3.client('sqs') # URL to SQS queue_url = 'https://sqs.eu-north-1.amazonaws.com/xxxxx/simple-request' # Sending messages to SQS in JSON-format for ind, row in df_csv.iterrows(): response = sqs.send_message( QueueUrl=queue_url, DelaySeconds=3, MessageBody=(row.to_json()))
Note that for the proper operation, you need to paste previously saved User ID, Access key ID, and *URL for SQS data. We can see the result of the producer code execution in the AWS SQS console.
We can also edit messages using the console functionality: view, delete, or send messages. But we’ll do all these operations with the Consumer by the AWS SDK.
The Consumer has two parts, the receiver and the sender. The receiver gets and deletes the message from the SQS queue, and sender forwards the messages to S3 storage. The receiver part is shown below:
def get_messages_from_queue(queue_url): '''queue_url: URL to SQS''' sqs_client = boto3.client('sqs') messages = [] while True: response = sqs_client.receive_message( QueueUrl=queue_url, AttributeNames=['All'], MaxNumberOfMessages=10) try: messages_body = list(map(lambda x: json.loads(x['Body']), response['Messages'])) messages.extend(messages_body) except KeyError: break entries = [{'Id': msg['MessageId'], 'ReceiptHandle': msg['ReceiptHandle']} for msg in response['Messages']] response = sqs_client.delete_message_batch(QueueUrl=queue_url, Entries=entries) return messages # Receiving and deleting messages from SQS sqs_messages = get_messages_from_queue(queue_url)
The sender part of the Consumer is in the following code:
def create_bucket(bucket_name, region): """Create an S3 bucket""" s3_client = boto3.client('s3', region_name=region) location = {'LocationConstraint': region} s3_client.create_bucket(Bucket=bucket_name, CreateBucketConfiguration=location) def upload_file(file_name, bucket, object_name): """Upload a file to an S3 bucket :param file_name: File to upload :param bucket: Bucket to upload to :param object_name: S3 object name""" # Upload the file s3_client = boto3.client('s3') response = s3_client.upload_file(file_name, bucket, object_name) # Creating S3 bucket create_bucket('metro-traff', region='eu-north-1') # Upload the file upload_file('data_for_S3.csv', 'metro-traff', 'metro-dataset')
We can find the result of this code execution in AWS S3 console:
Finally, we get the Metro Interstate Traffic Volume data uploaded to AWS S3 storage. The important parameter for saving is the Object URL that we will use to access this data.
Note that if you need to configure the access data settings, you can do this by setting the parameters in the Permission tab.
Connecting Dremio to Amazon services and data curation
The main steps for connecting Dremio to AWS S3 storage have been described in detail in the previous article. So in this article, we will focus only on the main steps of the connection process and give more attention to the data curation steps.
First, add a new data source:
- Select the type of data source - Amazon S3.
- Determine the name of the data source – Metro-traffic in our case.
- Insert credentials for Amazon S3: AWS Access Key and AWS Access Secret.
- Select the encryption mode of the connection (if needed).
- Click the Save button.
After we complete these steps, we get a new data source in Dremio.
In general, the procedures and steps in data curation focus on the following:
1. Defining the data format in the data source:
- Open the file that is connected from S3 storage
- Define the data format as text
- Define column delimiter (data fields) as a comma
- Extract the data field name from the first row
- Click the Save button.
2. Transforming a data source in Dremio space to the virtual data set:
- Click the Save As… button
- Give a name for a new virtual dataset
- Select the needed space
- Click the Save button.
3. Configuring data columns in the proper format:
- Select the column with the numeric (time) data values
- Select the Convert Data Type option
- Select the required data format
- Put the needed option for null-values
- Select Drop Source Field to delete the old column
- Press the Save button
- Repeat the procedure for each column where you need to change the format.
4. Completing data curation operations:
- Save all changes by clicking the Save button
- Copy path to the virtual dataset in Dremio space.
Connecting Python to Dremio
We will use the pyodbc library and Pandas DataFrame for connection of the Jupyter Notebook to Dremio spaces and data.
The connection algorithm is quite simple and includes the following operations:
- Defining Dremio’s credentials
- Generating a query with credentials data
- Initializing the connection with the Dremio spaces by the ODBC connector
- Requesting an address to data in the Dremio instance
- Loading data.
import pandas as pd import pyodbc # Dremio's Credentials user_id = 'dremio_user' user_password = 'dremio_pass' # Forming credential query credential_query = f'DSN=Dremio Connector;\ UID={user_id};PWD={user_password}' # Setup the ODBC connection to the Dremio instance dremio_connetion = pyodbc.connect(credential_query, autocommit=True) # SQL query to for Dremio's data sql = r'SELECT * FROM "metro-data"."metro-traffic"' # Executing the query with ODBC connection df = pd.read_sql(sql, dremio_connetion) df.head(5)
As a result, we receive DataFrame table data ready for visualization.
Data visualization
Data visualization is an important step that helps to understand the structure, patterns, characteristics, and data attributes. To visualize the Metro Interstate Traffic Volume data, we’ll use Seaborn, Matplotlib, and Plotly packages.
Firstly, let’s analyze the general patterns and dependencies of traffic and temperature values by time data.
For this, we need to transform the data_time column into the datatime format.
import datetime def to_data_time(element): '''convert str to DataTime format''' return datetime.strptime(element, '%Y-%m-%d %H:%M:%S') df['date_time'] = df['date_time'].apply(to_data_time) To plot this graphic, we will use the *stackplot* function. import matplotlib.pyplot as plt import numpy as np # variables temp = df.temp.values traff = df.traffic_volume.values time = df.date_time.values # to array temp_traff = np.array([(temp/temp.max())* 100, (traff/traff.max())* 100]) # plot function fig, ax = plt.subplots(figsize=(7, 5)) ax.stackplot(time, temp_traff, labels=['temperature', 'traffic']) ax.set_title('Traffic and Temperature by Time') ax.legend(loc='upper left') ax.set_ylabel('Values in %') ax.set_xlabel('Time') ax.set_xlim(xmin=time[0], xmax=time[-1]) fig.tight_layout()
As a result, we obtain a chart showing that the temperature and traffic values have seasonal winter and summer variations.
As we can see, these graphical dependencies have approximately the same shape, so let’s make sure of that by comparing them with scatterplot and categorical traffic data.
# split to category def traffic_category(value): return 'High' if value > 4000 else 'Low' if value < 1000 else 'Average' df['traffic_categorial'] = df['traffic_volume'].apply(traffic_category) # plot sns.scatterplot(y='traffic_volume', x='temp', hue='traffic_categorial', data = dff.loc[dff.temp > 50]) plt.title('Traffic and Temperature') plt.show()
As expected, these values have approximately the same shape.
Next, we will perform statistical characteristics of weather data and traffic values with the histogram and density plot.
# define weather (rain) data weather = df[['snow_1h','rain_1h']].loc[dist['snow_1h'] > 0.001] # plot histogram weather.plot.kde(ax=ax, legend=False) weather.plot.hist(density=True, ax=ax) ax.set_ylabel('Probability') ax.set_title('Histogram: Rain vs Snow') sns.jointplot(x="clouds_all", y="traffic_volume", data=df, kind="kde", space=0, color="r") plt.title('Destiny plot for Traffics and Clouds') plt.show()
After analyzing the graphs, we can assume that the rain and snow data have an exponential distribution due to their seasonality. We also see that clouds have an impact on traffic.
The dataset contains categorical data on holidays and weather. Therefore, we will count their quantity and build pie diagrams.
# Holidays data # count elements count_unique = df['holiday'].value_counts() count_unique_limited = count_unique[count_unique < 10] ax.pie(count_unique_limited, labels=count_unique_limited.index, autopct='%1.1f%%', startangle=90) # draw circle centre_circle = plt.Circle((0,0),0.70,fc='white') fig = plt.gcf() fig.gca().add_artist(centre_circle) # Equal aspect ratio ensures that pie is drawn as a circle ax.axis('equal') plt.tight_layout() plt.title('Holidays', weight='bold', fontsize=16) plt.show()
The code for the second diagram is not inference, because it is identical to the one above. Also, note the Holiday pie diagram is built without null data. Next, we’ll analyze the traffic intensity on Holidays with countplot.
# Traffic intensity sns.countplot(y='holiday', hue='traffic_volume_category', data=df[df['holiday'] != 'None']) plt.legend(loc='upper right') plt.title('Traffic intensity by Holidays') plt.show()
Analyzing the received graphic, we see that there is no high-intensity traffic on holidays.
Next, we will use pyplot and seaborn to build and analyze the effect of traffic intensity on weather and time data.
# traffic intensity on weather data import matplotlib.pyplot as plt import matplotlib.ticker as ticker # group data dff.groupby(['weather_main','traffic_categorial']).size() .groupby(level=0).apply(lambda x: 100 * x / x.sum()) .unstack().plot(kind='bar', stacked=True) # plot plt.gca().yaxis.set_major_formatter(ticker.PercentFormatter()) plt.title('Traffic intensity by weather') plt.xticks(rotation=45) plt.show()
# traffic intensity on time data # group data dff_heatmaps = dff.groupby(['date_time_month', 'date_time_year']) .size().reset_index() .pivot(columns='date_time_month', index='date_time_year') # plot sns.heatmap(dff_heatmaps, annot=False, linewidths=.5) plt.title('Heatmap Traffic intensity by Years and Months') plt.show()
The first diagram shows that for our data, there is no high correlation between traffic intensity and weather data. Heatmap Traffic intensity demonstrates that traffic intensity in 2017-2018 was higher.
Below, we will use the seaborn statistical analysis tools to analyze the traffic intensity distribution by time data: hours, days and months. For this purpose, with the datatime library, we will extract from the data_time column: time, days of the week, months, years, into separate columns of the Pandas dataframe.
# extract data time def time_category(value): return 'Evening' if value > datetime.time(18, 0) else 'Day' if value < datetime.time(11, 0) else 'Evening' # create new columns df['date_time_year'] = df['date_time'].apply(lambda x: x.year) df['date_time_time'] = df['date_time'].apply(lambda x: x.time()) df['date_time_month'] = df['date_time'].apply(lambda x: x.month_name()) df['date_time_day'] = df['date_time'].apply(lambda x: x.day_name()) df['date_time_category'] = df['date_time_time'].apply(time_category) # violin plot sns.factorplot(x="date_time_year", y="traffic_volume", hue="date_time_category", data=df, palette="colorblind", kind='violin', legend=True) plt.title('Traffic intensity by Year, Day Time') # boxplot sns.boxplot(x='date_time_month', y='traffic_volume', data = df) plt.xticks(rotation=45) plt.title('Traffic volume by Month day') plt.show()
After analyzing the data with the violin plot, we can conclude that the traffic has more intensity in the evenings and this pattern remains for all years. At the same time, the average traffic value for each of the months is approximately the same.
Next, we explore the impact of days of the week on the traffic volume. We’ll use catplot and boxplot tools for this purpose.
# catplot sns.catplot(x="date_time_day", y="traffic_volume", hue="traffic_volume_category", col="date_time_category", data=df) # boxplot # define last years dff_years = dff[(dff.date_time_year == 2017)|(dff.date_time_year == 2018)] sns.boxplot(x="date_time_day", y="traffic_volume", hue="date_time_year", data=dff_years) plt.title('Traffic volume by Week day and Years') plt.show()
These diagrams show that the traffic volume and intensity decrease at weekends. This pattern remains the same for daytime and evening time, and in general for 2017-2018 years.
Conclusion
This article focuses on data visualization with Dremio, Python, Amazon SQS and S3 services. We have created a producer and a consummate for the AWS SQS service and set up sending and receiving messages using AWS SQS queue. Then, we have configured AWS S3 service and connected it to Dremio. After data curation in Dremio, we have connected to Python and performed data visualization in Jupyter notebook using popular open-source libraries.