Dremio Jekyll

Anomaly detection on cloud data with Dremio and Python

Dremio

Introduction

In datasets, very often some records do not match with the rest of the data by error or by nature. These kinds of records are useless and even harmful to ML models. In other problems, the sole purpose is to detect anomalies. For example, in health-monitoring systems in hospitals or credit fraud detection. Either way, anomaly detection is a big and important field of data science. The main task of the anomaly detection is to exclude anomalies. In data science, anomalies in data are identified as records which:

  1. Drastically differentiate from other data;
  2. Happen rarely in the dataset.

Based on these two assumptions, several anomaly detection techniques can be used to detect such data. In this tutorial, we will use Activity recognition dataset, and Dremio will be our perfect tool for data curation. You will learn how you can use data from AWS S3 storage, modify it locally in Dremio, and build a model in Jupyter Notebook.

Assumptions

For the scope of this tutorial, we assume that you have the following items already installed and setup:

  1. Dremio
  2. OBDC driver
  3. Amazon account
  4. Python3 (pandas, pyodbc, skearn, mathplotlib)
  5. Jupyter Notebook

Loading data into S3

We will load data into an S3 bucket, but if you already have some data there - it works as well. You do not necessarily need to leave the bucket public, but we will use the public method of connection in this tutorial.

First, create a new bucket. Leave the default options.

Untick “block all public access”.

image alt text

Then, open your new bucket. image alt text

Upload dataset files to the bucket. image alt text

After files are uploaded, open the “Permissions” tab. Click “Bucket Policy” and paste the code below in the editor. Don’t forget to change “activity-recognition-dremio” to your bucket name. Click the “Save” button. This will allow public access to the bucket. You may not want this and rather get access to the bucket via AWS Access Key.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Principal": "* ",
            "Action": [
                "s3:ListBucket",
                "s3:GetBucketLocation"
            ],
            "Resource": "arn:aws:s3:::activity-recognition-dremio"
        },
        {
            "Sid": "MakeItPublic",
            "Effect": "Allow",
            "Principal": "* ",
            "Action": "s3:GetObject",
            "Resource": "arn:aws:s3:::activity-recognition-dremio/"
        }
    ]
}

image alt text

Now you have data inside the bucket ready to use.

Connecting S3 to Dremio

For the purposes of this tutorial, we are connecting to the local instance of Dremio. With Dremio lake engine, we can load data from S3 and modify it the way we need without actually changing source data or needing more data storage spaces.

Start Dremio and click the “Add Source” button. image alt text

Choose “Amazon S3”. image alt text

Under Authentication, select “No Authentication”. Then, add a new external bucket, specify the name of your S3 bucket and click “Save”. If you made your S3 bucket private, specify AWS Access Key instead. This way, you will get all buckets associated with the key. image alt text

Data from S3 is now available in Dremio. image alt text

Data curation in Dremio

For our particular problem, we would need to work with each file separately. Open the “activity-recognition-dremio” folder and press the “Format File” button. image alt text

Default settings will go for this dataset, click “Save”. image alt text

The first column which indicates only the sequential number isn’t needed, so we can drop it. image alt text

Then, convert all data types to “Integer” and rename them accordingly to the labels in the dataset REAMDE: sequential number(dropped), x acceleration, y acceleration, z acceleration, activity. We won’t need the source field. image alt textimage alt text

Repeat the process for the remaining 3 fields and save your dataset with name “dataset-name-1”. Now you have it ready to use in python.

Format the remaining files. Save them with the increasing number at the end.

Note that original data from S3 remained unmodified and can be used for different purposes.

Connecting to Python and performing anomaly detection

Anomaly detection is the process of identifying rare items that strongly differentiate from the rest of the dataset.

The dataset we are using represents different activities from different people. There are 15 people and 7 types of activities (you can read more about this on the dataset’s webpage). We are going to perform a somewhat modified anomaly detection. We want to develop a model that would be able to distinguish a particular person from others when trained on the data of this person. Such a model can be helpful in a variety of use cases when you need to detect a person based on his or her activity.

Our steps are described below:

  1. Create a new jupyter notebook.
  2. Import the following libraries.
1
2
3
4
5
6
7
8
import pandas as pd
import numpy as np
import pyodbc
import matplotlib.pyplot as plt
from sklearn.ensemble import IsolationForest
from sklearn.svm import OneClassSVM
from sklearn.model_selection import train_test_split
from sklearn.model_selection import StratifiedKFold
  1. Configure connection to Dremio. Use username, password, and dataset name.
1
2
3
4
5
username = 'username'
password = 'password'

user_info = f"DSN=Dremio Connector;UID={password};PWD={password}"
connection = pyodbc.connect(user_info, autocommit=True)
  1. Load data from all files into datas list.
1
2
3
4
5
6
7
def load_all():
    datas = []
    for i in range(1, 16):
        sql_request = f'SELECT "X", "Y", "Z", "activity"  FROM "Dremio  space"."your-dataset-name-{i}"'
        data = pd.read_sql(sql_request, connection)
        datas.append(data)
    return datas
  1. Prepare the dataset. At the moment, we have all our datasets in the following format (the example is for the first person):

image alt text

All 15 datasets are in the datas list (look at the output of the load_all() function). To prepare the dataset, we should understand it properly. This is a time-series dataset. The gadget performs measurements with a frequency of 52 Hz. This means that it makes 52 measurements during each second. It is not enough to take only one row of the dataset and make a judgment about its belonging to a particular person. We (as well as our model) should look at the dataset in dynamic.

To prepare the training, and testing datasets in the correct form, we will use a modified method of a sliding window. The sliding window method is the method which is widely used when working with time-series data. The core idea of the method is to include the information about one or more previous datapoints in the current datapoint. So, for the timestamp* t, we also include information about timestamps *t-1, t-2, t-3, etc. Let’s think about our dataset.

As we have mentioned above, the frequency of the measurements is 52 Hz. If you add information about say 10 previous datapoints to the current datapoint, it means that you add information only about approximately ⅙ of 1 second. This is not enough to detect the peculiarities of the activity of the particular person, because human movements take significantly larger period of time. The model which is built on such dataset will try to deal with the noise in the particular sensor measurements rather with the patterns of human activities.

To address the problem, we decided to include the information about the datapoints, which are further from the current datapoint. For example, we can try to use t-30, t-60, t-90, and so on timestamps. If we include 20 such points, this roughly means that we look at the last 10 seconds of the human movement. 10 seconds should be enough to distinguish people. Here is the function to split single dataframe into train and test:

1
2
3
4
5
6
7
8
9
10
# split single dataframe
def split_train_test(df):
    skf = StratifiedKFold(n_splits=4, shuffle=False)
    X = df.drop(['activity'], axis=1)
    y = df['activity']
    indices = skf.split(X, y)
    for train_index, test_index in indices:
        train, test = df.iloc[train_index], df.iloc[test_index]
        break
    return train, test

The split_train_test() function splits the dataset into train and test in the way that test set will be 25% of the original dataset. We used StratifiedKFold cross-validator with shuffle=False parameter because this is a time-series dataset (so we have to save sequential patterns and can’t randomly shuffle it). Also, we have 7 different human activities in the dataset, and we want to include all of them both in train and test dataset (stratify).

Now we prepare the function to split into train and test each of the 15 datasets we have.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
#split each dataframe from list with dataframes

def train_test_split_all(datas):
    splitted_datas = []
    for i in range(len(datas)):
        train, test = split_train_test(datas[i])
        splitted_datas.append([train, test])
    return splitted_datas

#there is a mistake in the last row of each dataframe (activity == 0)

#We suggest that this datapoint should be with activity == 7

for df in datas:
    df.at[len(df)-1, 'activity'] = 7

Also, as you can see, we also fix a minor bug in the dataset. Now we need to remove activity as an input feature. This is because at the inference time, the appliance will not know whether the person is walking, standing, sitting, or doing something else. People shouldn’t inform their gadgets about what type of activity they are doing now. The gadgets should be smart enough to detect their owner with any time of its activity.

1
2
3
4
5
6
splitted_datas = train_test_split_all(datas)

# remove the activity as input feature
for i in range(len(splitted_datas)):
    splitted_datas[i][0] = splitted_datas[i][0].drop(['activity'], axis=1)
    splitted_datas[i][1] = splitted_datas[i][1].drop(['activity'], axis=1)

Here is the main function of dataset preparation:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
def prepare_datasets(shift_frequency=30, n_steps=10):
    for i in range(len(splitted_datas)):
        # normalization
        splitted_datas[i][0]['X'] = np.log((splitted_datas[i][0]['X'] + 0.1).astype(float))
        splitted_datas[i][1]['X'] = np.log((splitted_datas[i][1]['X'] + 0.1).astype(float))
        splitted_datas[i][0]['Y'] = np.log((splitted_datas[i][0]['Y'] + 0.1).astype(float))
        splitted_datas[i][1]['Y'] = np.log((splitted_datas[i][1]['Y'] + 0.1).astype(float))
        splitted_datas[i][0]['Z'] = np.log((splitted_datas[i][0]['Z'] + 0.1).astype(float))
        splitted_datas[i][1]['Z'] = np.log((splitted_datas[i][1]['Z'] + 0.1).astype(float))

        for n in range(1, n_steps+1):
            splitted_datas[i][0]['X-{}'.format(n)] = splitted_datas[i][0]['X'].shift(shift_frequency*n)
            splitted_datas[i][1]['X-{}'.format(n)] = splitted_datas[i][1]['X'].shift(shift_frequency*n)
            splitted_datas[i][0]['Y-{}'.format(n)] = splitted_datas[i][0]['Y'].shift(shift_frequency*n)
            splitted_datas[i][1]['Y-{}'.format(n)] = splitted_datas[i][1]['Y'].shift(shift_frequency*n)
            splitted_datas[i][0]['Z-{}'.format(n)] = splitted_datas[i][0]['Z'].shift(shift_frequency*n)
            splitted_datas[i][1]['Z-{}'.format(n)] = splitted_datas[i][1]['Z'].shift(shift_frequency*n)


        # remove rows with NaN

        splitted_datas[i][0] = splitted_datas[i][0].dropna()
        splitted_datas[i][1] = splitted_datas[i][1].dropna()

The function iterates over each of the 15 datasets. For each dataset, it performs data normalization. Then it uses its input parameters to build a datapoint based on the approach we described earlier. By default, we take each 30th datapoint from the past 10 times. This means that if we use this function with the default parameters, each datapoint in the resulting dataset will contain information for approximately the last 5 seconds of the human activity. But we can change the parameters if we want. Finally, the function removes rows with NaNs from the dataset.

Here we prepare dataset in a way to look back over the last 10 seconds:

1
prepare_datasets(n_steps=20)
1
2
#Get train and test sets for the first person (which will be used for training)
train, test = splitted_datas[0][0], splitted_datas[0][1]

Here is how the training dataset should look like after its preparation:

image alt text

  1. Train OneClassSVM model.

Here is the code used to train the model: model = OneClassSVM(nu=0.2, gamma=’auto’)

1
2
3
model.fit(train)
predictions = model.predict(test)
print("Certainty: {}".format(list(predictions).count(1)/len(predictions)))

Output:

image alt text

We proposed to measure the model’s confidence in the conclusion that the input belongs to the person whose data was used for the model training in the following way. If the model thinks that single data point is anomaly, it predicts label “-1”. On the other hand, if the model considers the data point as “not anomaly”, it predicts label “1”. That’s why we count the number of “1” and divide by the total number of examples in the test set.

nu is a parameter which controls the maximum amount of outliers in the training data. This is also something similar to a parameter which can control overfitting. In our case, we realized that the higher n we set, the more distinction between “our person” and all other persons we have. But the training time is also larger.

We think that nu=0.2 is enough in our case. Using this value for nu, we can consider all inputs that score less than 0.8 as inputs which belong to “other persons”. In other words, 0.8 can be used as the threshold for classification.

We trigger the inference of the trained model on the testing dataset for each person in a loop. Here are the code and results:

image alt text

We can see that only for 1st person (which is the person on whose data we have trained the model) we get a high level of confidence. This means that our model performs well and can distinguish people based on their activity measurements over a 10 seconds period. You can see that “our person” score 0.87, while the closest pursuer (Person #14) score only about 0.49.

Let’s check if our model also works when trained on some other persons. Results for person #3:

image alt text

Results for person #7:

image alt text

Results for person #15:

image alt text

As you can see, the model is successful in predicting persons # 3 and # 7 but fails to predict correctly the person # 15. This means that there is a place to improve the model.

Let’s try to use an Isolation Forest model.

The main idea of Isolation Forest is to randomly select a feature, select a random value between min and max values of this feature. This way, anomalies will require fewer partitions to get to them than normal data. It works because anomaly data is less frequent and situated further from the average value of the dataset. And, therefore, closer to the root of the forest. So, the main advantage of the Isolation Forest technique is in locating anomalies instead of profiling “normal” instances. Here is the code of training the Isolation Forest model on the data from person #1:

1
2
3
4
isfor = IsolationForest(n_estimators=150, contamination=0.2, n_jobs=-1)
isfor.fit(train)
predictions = isfor.predict(test)
print("Certainty: {}".format(list(predictions).count(1)/len(predictions)))

Output:

image alt text

image alt text

As we can see, the results are similar to the results we got from the OneClassSVM model. To compare the models in a more scientific way, we need to prepare the separate dataset in a slightly different form, measure the accuracy and probably the ROC AUC indicator of two models.

Let’s try to measure accuracy. The code for dataset preparation:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import random
for i in range(5000):
    person_number = random.randint(0, 14)
    if person_number == 0:
        target = 1
    else:
        target = 0
    ds = splitted_datas[person_number][1]
    record_number = random.randint(0, len(ds)-1)
    if i == 0:
        df = ds.iloc[[record_number]]
        df['target'] = target
    else:
        temp_df = ds.iloc[[record_number]]
        temp_df['target'] = target
        df = df.append(temp_df)

As you can see, we build a dataset for supervised learning. We randomly pick a number of dataset and a number of a datapoint in the picked dataset. If it is the dataset for the first person, we assign the target variable to 1, and 0 otherwise. The dataset will have 5000 records. Our final dataset should look like this: image alt text

Then we split it into X and y_true and perform predictions:

1
2
3
4
X = df.drop(columns=['target'])
y_true = df['target']
preds_svm = model_svm.predict(X)
preds_isfor = isfor.predict(X)

The predicted datapoints are in the format of -1 and 1. So, we should transform arrays a little bit:

image alt text

Now we can estimate the accuracy of both models:

1
2
3
from sklearn.metrics import accuracy_score
print('OneClassSVM accuracy - {}'.format(accuracy_score(y_true, preds_svm)))
print('IsolationForest accuracy - {}'.format(accuracy_score(y_true, preds_isfor)))

The output:

image alt text

This means that for person 1, the OneClassSVM model performs better than the IsolationForest model.

Conclusion

In this tutorial, we learned how to upload data from files to AWS S3 storage and import it into Dremio to process it there. Finally, we used data locally in Jupyter Notebook to solve anomaly detection problems. The chosen techniques for anomaly detection were OneClassSVM and Isolation Forest. OneClassSVM has demonstrated the slightly better quality of the results.

The trained models can be improved. There are several possible ways to improve the results:

  1. Feature engineering. You can try to devise another more powerful features. For example, you can use an average of several neighbor datapoints to reduce the risk of dealing with noise instead of with the real activity patterns.
  2. Tweak hyperparameters of the models.
  3. Perform ensembling by combining OneClassSVM and Isolation Forest models.
  4. Use deep learning to perform anomaly detection. As we have the time series dataset, LSTM neural networks can probably show a very good result here.