New technologies, communication systems, and information processing algorithms demand data rates, availability, and performance targets. Accordingly, the data processing procedures implemented with data (messages) calls for technologies capable of handling this high demand. One of these technologies is RabbitMQ – which is used to develop service-oriented architecture services (SOA) and distributed resource-intensive operations.
However, it is worth to note that the messages from the queue data can flow from different types of data sources and thereby have different formats. As a result, the data processing and analyses are complicated. To resolve this issue, various services are used that can combine different data types into a single database.
This article will focus on data processing and analysis system using RabbitMQ as queries service, Python as a flexible environment, AWS S3 as a fast and secured data storage, and Dremio will be used for combining all data messages from the queue into a single database.
In this tutorial we will execute the following steps:
Generate data of different types that will flow from various information systems, devices, etc.
Create a data producer and a consumer with Python-environment tools to packet data communications processes in RabbitMQ service.
Transmission and storage of processed messages using the RabbitMQ and AWS S3 services.
Now we will describe in more detail each step of presented algorithm with relevant tools.
Data sources
We will use random number generators to create different types of data for the required dataset to make data processing simple, clear and structured. So we are going to use scipy.stats library tools and the related probability density functions (PDF) of the random variables.
The datasets will consist of the following data types:
time and date data – as a time series at the time interval,
integer data – as data samples with Binomial and Poisson probability distributions,
floating-point data – as data samples with Normal and Uniform probability distribution,
string data – as text data received from the text generator using Deep learning models.
The time and date data will be generated with the built-in Pandas functions with time-series data for one day. Also, note that we’ll determine a number (value variable) for the resulting dataset size.
import pandas as pd
# data range
value = 1000
# time periods (day * hours * minutes)
periods = 1 * 24 * 60
# time-series data
time_data = pd.date_range('2019-11-01', periods=periods, freq='T')
time_data = time_data[:value]
In this case, the data are presented in Timestamp format, but real data of such type are usually presented as string variables. Therefore, on the step of Pandas DataFrame formation, we will present these data in string format for further processing in Dremio.
First, let’s present floating-point values as samples and then plot the densities for these distributions.
# Normal data
# distribution parameters
mu = 2.0
sigma = 0.5
# generating normal values
norm_rv = sts.norm(loc=mu, scale=sigma)
normal_data = norm_rv.rvs(size=value)
# Uniform data
# distribution parameters
left_limit = 1
right_limit = 4
# generating uniform values
uniform_rv = sts.uniform(left_limit, right_limit-left_limit)
uniform_data = uniform_rv.rvs(value)
print(normal_data)
Next, we’ll show the resulted data samples, as based on the probability density plots obtained from the generated data. To make the presentation simple and compact, we’ll write the code for drawing graphs only for normal data, since for other data it will be identical.
# PDF by samples of Normal data
samples = np.linspace(0, 4, value)
pdf_normal = norm_rv.pdf(samples)
plt.plot(samples, pdf_normal)
plt.title('Normal distribution')
plt.ylabel('probability')
plt.xlabel('samples')
plt.grid()
plt.show()
The
data acquired from the normal distribution usually describes the values determined by measuring most physical quantities. And uniform data can characterize data from a simple linear system or dependencies limited by two parameters.
Similarly, we will generate samples of integer data from Binomial and Poisson probability distributions.
The plots describing these data samples are similar to the previous ones, except for using the cumulative distribution function.
So we have two columns of integer data for our resulting dataset. These data samples can describe sequential data systems.
Next, we will generate text data using TensorFlow and TextGenRNN libraries. We will use a default Deep Learning model from the TextGenRNN library, which includes several LSTM and RNN layers.
The generating texts process will start with the keywords that we will define in advance – all other parameters of the model are set by default.
from textgenrnn import textgenrnn
# deep learning model
textgen = textgenrnn()
def generate_text(text, sentences):
'''generate number of sentences by key-text'''
generated_texts = textgen.generate(n=sentences,
prefix=text,
return_as_list=True)
return generated_texts
# key words
key_words = ['features','parameters','values','keys','numbers']
# generating process
text_list = []
for word in key_words:
text_by_word = generate_text(word, round(value/len(key_words)))
text_list.append(text_by_word)
# to the list and shuffle
text_list = np.array(text_list).flatten()
np.random.shuffle(text_list)
print(text_list[:5])
As a result, we have obtained a list of texts containing in the beginning the keywords that we can process with Dremio. The resulting data set we will present in the form of the Pandas Dataframe.
Now, we’ll form this data in the data packages for transferring by RabbitMQ queues service. So let’s present them as JSON-packages.
# row to JSON
json_data = df.apply(lambda row: json.loads(row.to_json()), axis=1)
# to the list of JSON- packages
send_data = json_data.to_numpy()
#example of package
send_data[0]
Now we have all data for processing with RabbitMQ service.
Data processing with RabbitMQ
RabbitMQ provides a queuing service where different applications, users, or other systems components can connect to, and send or receive data messages.
The RabbitMQ service includes the following components:
Data producer – procedures for formation and sending data to messages-queue in the RabbitMQ service
Messages-handler – component for message processing and queue formation. Several settings can be assigned, such as Direct, Topic, Fanout, and Headers, that define the rules for redirecting messages to the queue
Queue – queue messages received from data producer
Data consumer – procedures of receiving data from RabbitMQ service
The general structure of the data queuing system with RabbitMQ service is shown below.
Algorithm of presented system will include the following steps:
Data producer gets data messages
Data producer sends data messages to the RabbitMq service
Data messages reach messages-handler and then are redirected to the queue according to the message processing rule
Data message remains in the queue until the data consumer processes it
Data consumer processes the messages
Usually, the RabbitMq service is deployed on the high-performance server. But in our case, we will use a local server with the direct processing rule for messages-handler. To do this, we need a pre-installed RabbitMq service which will be also used to create applications for data producer and consumer.
We are going to use the pika library to create applications for data processing with RabbitMq service.
Data producer
The Data producer algorithm includes the following steps:
create connection to the RabbitMq service,
initialize connection to a specific RabbitMq channel,
send messages to the queue defining the default setting for messages-handler,
close connections to the RabbitMq service after completing the message processing.
Using this algorithm the application for Data producer can be visualized by the following python script.
import pika
# create connection
pika_connector = pika.ConnectionParameters(host='localhost')
connection = pika.BlockingConnection(pika_connector)
# initialize channel connection
channel = connection.channel()
# define queue
channel.queue_declare(queue='channel')
# send messages to the queue
for ind, message in enumerate(send_data):
channel.basic_publish(exchange='',
routing_key='channel',
body=message)
# close connection
connection.close()
print(f'{ind+1} messages was sent to queue')
As a result, we get the data messages uploaded into RabbitMq service. And they will be stored there until the data consumer requests data from the queue.
Data consumer
Data producer and consumer algorithms consist of similar steps, so let’s consider just those that are different:
Data consumer is set to standby mode for data messages
The callback function used for outputting the data message from the queue The script for a data consumer is shown below.
Next, we will perform additional processing of received messages (JSON conversion) and present them in Pandas dataframe for further analysis.
# data processing
processed_messages = []
def after_procession(message):
'''process received message'''
# convert to string
message_string = str(message, 'utf-8')
# convert to dic
message_dic = json.loads(message_string.replace("'", "\""))
processed_messages.append(message_dic)
# JSON-format
For message in receive_data:
after_procession(message)
# load to Dataframe
df = pd.DataFrame.from_records(processed_messages)
df.to_csv(path_or_buf='data_proccesed.csv', index=False)
df.head(5)
Now, the received data is ready for storage in AWS S3 and further analysis with the Dremio platform.
AWS S3
So we are going to use this service as storage for data messages received from the RabbitMq service. The main data processes in AWS S3 are presented in the following diagram.
Accordingly, the AWS S3 data processing algorithm consists of three steps:
Creating an AWS S3 bucket or connect to an existing one
Uploading data messages to AWS S3 bucket
Configuring access and credentials settings These steps you can perform with the AWS GUI interface as well as with the AWS SDK API service. The first way has been described in the previous tutorials so that we will focus on the second one with the AWS SDK Python.
The script for creating an AWS S3 Bucket with SDK Python is as follows.
import boto3
from botocore.exceptions import ClientError
def create_bucket(bucket_name, region=None):
"""Create an S3 bucket"""
try:
if region is None:
s3_client = boto3.client('s3')
s3_client.create_bucket(Bucket=bucket_name)
else:
s3_client = boto3.client('s3', region_name=region)
location = {'LocationConstraint': region}
s3_client.create_bucket(Bucket=bucket_name,
CreateBucketConfiguration=location)
except ClientError:
return False
create_bucket('rabbitmq-data', region='eu-north-1')
This script allows you to specify the AWS region, which is important for optimal data performance. The results of its execution are shown on the AWS control console.
We’ll also implement the uploading data-messages process with a python script (AWS SDK python).
def upload_file(file_name, bucket, object_name=None):
"""Upload a file to an S3 bucket"""
# If S3 object_name was not specified, use file_name
if object_name is None:
object_name = file_name
# Upload the file
s3_client = boto3.client('s3')
try:
response = s3_client.upload_file(file_name,bucket,object_name)
except ClientError:
return False
# data upload
upload_file('data_proccesed.csv','rabbitmq-data','rabbitmq-messages')
As a result, we’ll upload data-messages to the specified AWS S3 bucket.
Next, we’ll configure the access settings for the AWS S3 bucket. In our case, we will select public access. You should choose another setting and configure private access. Otherwise your data will be available to everyone.
Also, we need to define the credentials options in the Identity and Access Management console. Important credentials parameters to access the uploaded data are Access keys and Access key ID.
At this stage, we have loaded data-messages and defined the access to them. Next, we will preprocess and analyze them using the Dremio platform.
Data curation with Dremio
To connect a data source to Dremio, we need to select the type of data source and specify credential parameters
Also, this stage requires entering AWS S3 credentials parameters received earlier. Next, it is necessary to determine the type of connected data and save the result as a Dremio’s virtual dataset.
It is important to select the data format, delimiter (for CSV-file), and the data column’s name extraction option. We will define the data format for each data column. We will show the transformation of data format for the datetime and float data.
At this stage, it is important to define null positions and delete the ones that do not match and are duplicated in the data. After all the transformations, the data will be presented as follows.
Next, let’s extract the keywords from the text column by which we generated the sentences. For this purpose, we need to define the patterns using a regular expression.
As a result, we will get a text column with keywords.
At this point the dataset can be saved and the resulting data can be analyzed with any BI or data science tool.
Ready to Get Started? Here Are Some Resources to Help
Webinars
Cyber Lakehouse for the AI Era, ZTA and Beyond
Many agencies today are struggling not only with managing the scale and complexity of cyber data but also with extracting actionable insights from that data. With new data retention regulations, such as M-21-31, compounding this problem further, agencies need a next-generation solution to address these challenges.
Bring your users closer to the data with organization-wide self-service analytics and lakehouse flexibility, scalability, and performance at a fraction of the cost. Run Dremio anywhere with self-managed software or Dremio Cloud.