Dremio Jekyll

Accelerating TensorFlow Data With Dremio

Introduction

As we know a lot of data is amassed in different forms today and even more is accumulated in the wild and Dremio is a great solution for those, who need to bring together data of different type/nature and from different sources.

Dremio is a versatile distributed data platform (but it also can work on a single node), allowing for rich data manipulation and discovery, all through a simple and concise interface.

Dremio has numerous use cases, including BI analytics, visualizations, and even bringing data to Machine Learning algorithms.

Today, we are going to look at how the recommender system can be built with TensorFlow and how Dremio facilitates that.

Installation

There are many types of Dremio deployment models, however describing them all is out of scope for today. You may refer this doc for more information.

We are going to work with a standalone mode of deployment as it is the best option for exploration and learning. The standalone installation process is quite straight-forward and simple. Installation on rpm based systems is described here.

Data preparation in Dremio

First off, after we have either logged in with an existing user or created a new one, we will connect to our data sources. Our movielens dataset for building recommendation system is stored at different databases. Part of the information is in the relational form in the PostgreSQL (ratings, tags, and links tables) and some of it is in the MongoDB (Movies with titles and genres break down).

Such a situation of split data can bring out some problems with bringing it in one place. You might need to extract data from one datasource and move it to another or push all of the data that you need from both sources to the third one.

However, with Dremio, the data will still reside where it was before, you will just connect to it and make the changes in virtual dataset (a transparent contract of how the data is changed) and save it in your Dremio, then consuming it from Dremio through a regular SQL-like interface.

So, let’s begin. First, adding PostgreSQL source:

Add Postgres as a source

For this, just click add new source, fill out the information, making sure to point to correct host and port of database, and then click save.

After this, the source will be available under the sources section of navigation bar:

Postgres now available as a source

The same we will do for MongoDB connection:

Add MongoDB as a source

And after adding Mongo source, here is how the sources will look like:

MongoDB now available as a source

Each of these data sources has its peculiarities, like PostgreSQL sources have schemas listed in them and under each of schema there are tables, which represent physical datasets.

Postgres schemas in Dremio

In Mongo sources, on the other hand, the upper level is the MongoDB’s database name and inside of it are the collections, also representing physical datasets.

MongoDB collections now available

We begin our work with Dremio by choosing one of the physical datasets, marked with a purple table icon, from one of the data sources. We will choose the ratings table from PSQL source, which we named movies_base.

Selecting the ratings dataset

Physical dataset is always a starting point for making any changes and linking data together, which is then saved in the virtual dataset, marked with a green table icon.

After we opened the ratings dataset, closely eyeballing it gives us idea that it can be used in our recommender system in its entirety.

Previewing the ratings dataset

All of the fields (user_id, movie_id, rating, and timestamp) will be useful in the algorithm and, moreover, they are sufficient for success. However, we want to have some additional textual information to accompany the data, i.e. movie title and genres of the movie. This is stored in the MongoDB and we can access that and link two datasets together (based on the movie_id). To join two dataset, in the opened one we need to click “Join” button and choose the other datasets that we want to join to this one. Then, all what is left to do is to choose the field on which the data will join.

Selecting a column to perform a join

This will give us a virtual dataset (you can understand it from the color of icon by the name of the dataset, which is temporarily is New Query) that is instantly shown.

Building the virtual dataset

It is already what we wanted, but we would like to drop duplicate of movie_id field and _id field, which is an internal mongo field. We simply click on the arrow by the field name and choose drop option to get rid of unwanted field.

Finally, we prepared our virtual dataset and we save it as “ratings_with_titles” in any location (by default it has only home, but we created Movies Space, when we were at the main page of Dremio) and now we may proceed with connecting to our data from application.

Connecting with ODBC Driver

We can access our data from any application by the means of ODBC Driver. But first, it needs to be installed. You can download this driver from the Dremio Web Page for downloads. It is also important to install python odbc handler, as we are going to be working in python:

1
pip install pyodbc

Then, we can access data from Dremio by the means of ODBC Driver through the Pyodbc handler:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import pyodbc
import pandas

host = "localhost"
port = 31010
uid = "marsel.zaripov"
pwd = "qwerty123"
driver = "Dremio ODBC Driver 64-bit"

conn_str = "Driver={};ConnectionType=Direct;HOST={};PORT={};AuthenticationType=Plain;UID={};PWD={}".format(driver, host, port, uid, pwd)

cnxn = pyodbc.connect(conn_str, autocommit=True)

query = "SELECT * FROM Movies.ratings_with_titles LIMIT 100"

df = pandas.read_sql(query, cnxn)

This will read in the first ten thousand rows from our virtual dataset into a pandas dataframe. We can check out the records using data frame’s .head(n) method to see first n records:

Previewing our data from pandas

Now, the data is imported and ready to be used for some work.

Basics of TensorFlow

TensorFlow is a library for building neural network simulation. It main principle lies in computational nodes and tensors, which are passed between these nodes. To begin the work with TensorFlow it is good to have some basic understanding of its concepts.

First of all, let’s find out about Tensors as it is the most essential unit of data for TensorFlow (it even shows up in its name).

At the most basic level, Tensor is a very general (more general than vector, but analogous to it) mathematical object. It is described as array of simpler components, associated by common coordinate system. The most important notion for the tensor is its rank (number of dimensions of array), which is defined by the number of directions. For example, a simple tensor may be represented by such an array:

[

[1, 2, 3], [4, 5, 6]

]

This is the tensor of rank 2 (as it has two dimensions and two relations of directions pairwise, if you break every vector to its unit vectors, i.e. coordinate plane steps on each axis).

Another example of tensor is a simple vector: [7, 8, 9]. It is a tensor rank 1, since it has only one dimension in the array representation and has on direction, which is split into three base components.

Likewise, we can give another example of tensor: 7. A simple scalar is also a tensor, but of the rank 0, as there are 0 dimensions and scalars have no directions.

Next, we need to understand the models of computational graph in TensorFlow. The way you construct an application with TensorFlow is by creating nodes and then connecting them together. The important note here is that when you create a computational node it is not executed right away, but instead is prepared for execution and then executed, when explicitly said so (lazy evaluation concept). So as the TensorFlow guide refers to its programs - they consist of two main parts: declaration of computational graph and execution of this graph.

The simplest thing that we can build in TensorFlow may be a small graph of three nodes like this:

1
2
3
4
5
6
7
8
9
10
11
12
13
# building computational graph

node0 = tf.constant(2.0)

node1 = tf.constant(3.0)

node2 = tf.add(node0, node1)

# executing this graph

with tf.Session() as session:

	print(session.run(node2))

This will result in the simple output: 5.0. Nothing too fancy and exciting, but it showed how the TensorFlow’s graph has been constructed and executed, giving the idea behind the main principle.

Of course, it is not very useful to add simple scalars together, especially constants, thus we want to see something more profound and usual for the useful work.

More often, instead of using constants (or even variable for that sense) in TensorFlow, you will end up declaring placeholders. Placeholder is like a promise to provide the input of a certain type later, much like a parameter of a function in typed programming languages. So in this manner, we can rewrite our previous example like this:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
a = tf.placeholder(tf.float32)

b = tf.placeholder(tf.float32)

node2 = tf.add(a, b)

with tf.Session() as session:

	print(session.run(node2, {a: 2.0, b: 3.0}))

	print(session.run(node2, {a: .75, b: .356}))

# output
# 5.0
# 1.10600

Now, a and b work as parameters to our node2, which adds those two together. Additionally, say we want to divide the sum by some variable, but not to make it a parameter (i.e. placeholder) - that case we may declare it as TensorFlow variable:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
a = tf.placeholder(tf.float32)

b = tf.placeholder(tf.float32)

c = tf.Variable([.634], dtype=tf.float32)

node2 = tf.add(a, b)

node3 = tf.div(node2, c)

with tf.Session() as session:

# IMPORTANT NOTE: variables should be initialized, before running the graph like so:

init = tf.global_variables_initializer()

session.run(init)

	print(session.run(node3, {a: 2.0, b: 3.0}))

# output
# [ 7.88643551]

Using variables much alike this is typical for enabling training in Machine Learning algorithm - by gradually changing the value of the variable minimize the loss error. The value of variable is also changed not manually, but with the use of training optimizers, either provided or written manually.

Here is an example of full trainable model with explanation in comments. The program is a toy model and does not have meaning, but it show the ability to select the variable automatically to reduce loss.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
import tensorflow as tf

# First, parameters of the model are defined and initialized
# here the two variables are given the type and their initial values

W = tf.Variable([.3], dtype=tf.float32)

b = tf.Variable([-.3], dtype=tf.float32)

# Next, model input and output the placeholder is used for returning output
# you may think of it as a promise to put something there sometime

# input

x = tf.placeholder(tf.float32)

# model itself

linear_model = W * x + b

# output

y = tf.placeholder(tf.float32)

# loss

# computed by summing the squares of deltas between the modeled and desired output

loss = tf.reduce_sum(tf.square(linear_model - y))

# optimizer, using provided one from train API of TensorFlow

optimizer = tf.train.GradientDescentOptimizer(0.01)

# actual using of the opt, to minimize loss

train = optimizer.minimize(loss)

# our training data

x_train = [1, 2, 3, 4]

y_train = [0, -1, -2, -3]

# up next, building our training loop
# prepare the session for running

sess = tf.Session()

init = tf.global_variables_initializer()

# set initial values of our variables

sess.run(init)

# training loop, adjusting W and b with optimizer

for i in range(1000):

sess.run(train, {x: x_train, y: y_train})

# let's look at current state of W and b variables, as well as loss:

curr_W, curr_b, curr_loss = sess.run([W, b, loss], {x: x_train, y: y_train})

print("W: %s b: %s loss: %s"%(curr_W, curr_b, curr_loss))

That is it. This program is adjusting the values of variables automatically to bring the error to its minimum and thus it is a self training computational graph. Next, we will try something like this, but more complex on our movielens data from Dremio.

Building SVD model for Movielens in TensorFlow

Before we begin, we need to import everything we might need. Here is how our imports look like:

1
2
3
4
5
6
7
import time
from collections import deque
import numpy as np
import tensorflow as tf
import pandas as pd
import pyodbc
from six import next

A bit of explanation: we will need time to make simplest measuring of time elapsed; we will need deque for efficient container of computation errors; the next function from six library because we will organize our training and testing data. The reason for other imports should be obvious.

Also, we will leave some constant declarations at the top, right after imports:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# some constants

# seeding random generator for predictable outcomes

np.random.seed(13575)

# the folds of data

BATCH_SIZE = 1000

EPOCH_MAX = 100

# some metadata

USER_NUM = 6040

ITEM_NUM = 3952

DIM = 15

# computational device. if you have beefy graphics card
# you can change it to "/gpu:0"

DEVICE = "/cpu:0"

# Finally, before we begin we would like to define a function for limiting the rating, this will help us later.

def clip(x):
    return np.clip(x, 1.0, 5.0)

First of all, we will need to divide our data into training and testing datasets. As you remember, we queried data from Dremio with the use of odbc wrapper. Now, we are going get rid of the limit of records we query and wrap this logic into a function:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
def load_data():

# reading data from dremio

  host = "localhost"
  port = 31010
  uid = "marsel.zaripov"
  pwd = "qwerty123"
  driver = "Dremio ODBC Driver 64-bit"
  conn_str = "Driver={};ConnectionType=Direct;HOST={};PORT={};AuthenticationType=Plain;UID={};PWD={}".format(driver, host, port, uid, pwd)

# quering all records from

  cnxn = pyodbc.connect(conn_str, autocommit=True)
  query = "SELECT * FROM Movies.ratings_with_titles"
  df = pandas.read_sql(query, cnxn)
  return df

# As we have read all of the data into df variable,
# we will split it 0.1 to 0.9, where 0.1 of data will be for testing and 0.9 will be for training.

def prepare_data(df):
  rows = len(df)
  df = df.iloc[np.random.permutation(rows)].reset_index(drop=True)
  split_index = int(rows * 0.9)
  df_train = df[0:split_index]
  df_test = df[split_index:].reset_index(drop=True)
  return df_train, df_test

Note: we have transform our data inside of the function as not to clog up the global scope with unneeded variables.

Further on preparing data, we will define generator container classes for our training and testing data. The first one is for generating batches of certain size randomly:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
class ShuffleIterator(object):
  """
  Randomly generate batches
  """
  def __init__(self, inputs, batch_size=10):
    self.inputs = inputs
    self.batch_size = batch_size
    self.num_cols = len(self.inputs)
    self.len = len(self.inputs[0])
    self.inputs = np.transpose(np.vstack([np.array(self.inputs[i]) for i in range(self.num_cols)]))

  def __len__(self):
    return self.len

  def __iter__(self):
    return self

  def __next__(self):
    return self.next()

  def next(self):
    ids = np.random.randint(0, self.len, (self.batch_size,))
    out = self.inputs[ids, :]
    return [out[:, i] for i in range(self.num_cols)]

The key method here is next, which is a python interface for iterators (object, which defines this method), which produces a random set of data of the size of the batch.

Next, the generator is for testing data and it produces data of same epoch (within the same group).

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
class OneEpochIterator(ShuffleIterator):
  """
  Sequentially generate one-epoch batches, typically for test data
  """
  def __init__(self, inputs, batch_size=10):
    super(OneEpochIterator, self).__init__(inputs, batch_size=batch_size)
    if batch_size > 0:
      self.idx_group = np.array_split(np.arange(self.len), np.ceil(self.len / batch_size))
    else:
      self.idx_group = [np.arange(self.len)]
    self.group_id = 0

  def next(self):
    if self.group_id >= len(self.idx_group):
      self.group_id = 0
      raise StopIteration
    out = self.inputs[self.idx_group[self.group_id], :]
    self.group_id += 1
    return [out[:, i] for i in range(self.num_cols)]

Again, the main logic of this generator is in the next method.

We will load the data from dremio with the use of load_data() function. Then, we will divide the data into training and testing sets with prepare_data() function. And after that, we will pass the training data to the generator like this:

1
2
3
4
5
ShuffleIterator([train_data["userId"], train_data["movieId"], train_data["rating"]], batch_size=BATCH_SIZE)

# And testing data like this:

OneEpochIterator([test_data["userId"], test_data["movieId"], test_data["rating"]], batch_size=-1)

Up next, we will be preparing our optimizer.

The first thing would be to find the inference (how the rating predicts), in our case of recommendation system, it can be found by the next formula:

1
2
3
4
5
y_pred[u, i] = global_bias + bias_user[u] + bias_item[i] + <embedding_user[u], embedding_item[i]>

# Where item is a movie. Thus, the target of optimizer would be to minimize this:

sum{u, i} |y_pred[u, i] - y_true[u, i]|^2 + \lambda(|embedding_user[u]|^2 + |embedding_item[i]|^2)

Both of the expression may be represented in TensorFlow graph with the use of TensorFlow’s operations (like tf.add and tf.div we used before).

Each optimizer uses a certain optimization algorithm. In the example of the previous section, provided optimizer used classical stochastic gradient descent. For this optimizer, the Adam algorithm is used (it is not acronim, it is derived from adaptive moment estimation). This algorithm show better performance compared to the stochastic gradient descent, as the former show single learning rate.

Coming back to the computation of inference, here is the realization of inference formula from above expressed in TensorFlow DSL:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
def inference_svd(user_batch, item_batch, user_num, item_num, dim=5, device="/cpu:0"):
  with tf.device("/cpu:0"):

    # here we will be extracting bias for further computing
    bias_global = tf.get_variable("bias_global", shape=[])
    w_bias_user = tf.get_variable("embd_bias_user", shape=[user_num])
    w_bias_item = tf.get_variable("embd_bias_item", shape=[item_num])
    bias_user = tf.nn.embedding_lookup(w_bias_user, user_batch, name="bias_user")
    bias_item = tf.nn.embedding_lookup(w_bias_item, item_batch, name="bias_item")
    w_user = tf.get_variable("embd_user", shape=[user_num, dim],
    initializer=tf.truncated_normal_initializer(stddev=0.02))
    w_item = tf.get_variable("embd_item", shape=[item_num, dim],
    initializer=tf.truncated_normal_initializer(stddev=0.02))
    embd_user = tf.nn.embedding_lookup(w_user, user_batch, name="embedding_user")
    embd_item = tf.nn.embedding_lookup(w_item, item_batch, name="embedding_item")

  with tf.device(device):

    # building up the computation
    # of the formula of inference from above

    infer = tf.reduce_sum(tf.multiply(embd_user, embd_item), 1)
    infer = tf.add(infer, bias_global)
    infer = tf.add(infer, bias_user)
    infer = tf.add(infer, bias_item, name="svd_inference")
    regularizer = tf.add(tf.nn.l2_loss(embd_user), tf.nn.l2_loss(embd_item), name="svd_regularizer")

return infer, regularizer

You may also notice that the computational operations are directed to the device with the use of context manager. By the way, using literal and not parameter in the first context manager (with statement) is not a typo, but an intentional decision to make sure that bias extraction logic is always computed in cpu (even if the device passed is gpu).

Having computed the inference, we can compute the loss and minimizing optimizations. Once again, we will single this out into a separate function.

1
2
3
4
5
6
7
8
9
10
11
12
def optimization(infer, regularizer, rate_batch, learning_rate=0.001, reg=0.1, device="/cpu:0"):

  global_step = tf.train.get_global_step()

  with tf.device(device):

    cost_l2 = tf.nn.l2_loss(tf.subtract(infer, rate_batch))
    penalty = tf.constant(reg, dtype=tf.float32, shape=[], name="l2")
    cost = tf.add(cost_l2, tf.multiply(regularizer, penalty))
    train_op = tf.train.AdamOptimizer(learning_rate).minimize(cost, global_step=global_step)

  return cost, train_op

The loss here is computed with the use of TensorFlow’s Neural Network module and its L2 loss function (passing it delta between inference and actual training outcome). Then, after loss is computed, we pass it to the AdamOptimizer for minimizing.

Now, that we have prepared our data in the form of batch generators for both training and testing sets, implemented formula for finding inference, prepared loss computation and optimizer for minimization - we are ready to construct our svd model. We will put the process of constructing and running into a separate function, passing two parameters - training data and testing data, calling it run_svd().

First thing we will do is compute number of samples per batch and create our batch generators (as we showed above) from the data.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
def run_svd(train, test):

  samples_per_batch = len(train) // BATCH_SIZE

  iter_train = dataio.ShuffleIterator([train["user"],
            	                         train["item"],
            	                         train["rate"]],
               	                       batch_size=BATCH_SIZE)

  iter_test = dataio.OneEpochIterator([test["user"],
                                       test["item"],
                                       test["rate"]],
                                       batch_size=-1)

  # The next thing, we will prepare inputs.


  user_batch = tf.placeholder(tf.int32, shape=[None], name="userId")
  item_batch = tf.placeholder(tf.int32, shape=[None], name="movieId")
  rate_batch = tf.placeholder(tf.float32, shape=[None])

  # Next, we will compute inference from the function we prepared, set the global step for optimizer, and create optimizer.

  infer, regularizer = ops.inference_svd(user_batch, item_batch, user_num=USER_NUM, item_num=ITEM_NUM, dim=DIM, device=DEVICE)
  global_step = tf.contrib.framework.get_or_create_global_step(),
  train_op = ops.optimization(infer, regularizer, rate_batch, learning_rate=0.001, reg=0.05, device=DEVICE)

  # Then, we initialize the variables:

  init_op = tf.global_variables_initializer()

  # At this point we are ready to init the session, so we open it in context manager:

	with tf.Session() as sess:

	# init variables, until you run it through the session they are not initialized

	  sess.run(init_op)

  # this is the header for our future prints from the training loop

	  print("{} {} {} {}".format("epoch", "train_error", "val_error", "elapsed_time"))

  # container for computational errors

	  errors = deque(maxlen=samples_per_batch)

  # start time for measuring elapsed time

  start = time.time()

  # Now, we will enter training loop and proceed with
  # the rest of the operation (much like in the simple example from previous section).

  # in the scope of the context manager from above

  for i in range(EPOCH_MAX * samples_per_batch):

    users,
    items,
    rates = next(iter_train),
    pred_batch = sess.run([train_op, infer], feed_dict={user_batch: users, item_batch: items, rate_batch: rates})
    pred_batch = clip(pred_batch)
    errors.append(np.power(pred_batch - rates, 2))

    if i % samples_per_batch == 0:
      train_err = np.sqrt(np.mean(errors))
      test_err2 = np.array([])

    for users, items, rates in iter_test:
      pred_batch = sess.run(infer, feed_dict={user_batch: users, item_batch: items})
      pred_batch = clip(pred_batch)
      test_err2 = np.append(test_err2, np.power(pred_batch - rates, 2))

      end = time.time()

      test_err = np.sqrt(np.mean(test_err2))

      print("{:3d} {:f} {:f} {:f}(s)".format(i samples_per_batch, train_err, test_err, end - start))

      start = end

# Put the calls at the bottom:

if __name__ == '__main__':
  df = load_data()
  df_train, df_test = prepare_data(df)
  run_svd(df_train, df_test)
  print("Finished!")

Save the file as movielens_svd.py and run it with python:

python movielens_svd.py

This will produce the following output:

Output of TensorFlow model

This shows us how the training goes on and the minimization of error takes effect after every iteration.

Now, from the constructed inference, we can also try to produce a prediction, by running a concrete instance of data through it:

1
2
3
4
5
6
7
# run in the same session as above

userIds = [61, 59, 28, 506]

movieIds = [1240, 1953, 306, 2264]

pred_batch = sess.run(infer, feed_dict={user_batch: userIds, item_batch: movieIds})

Conclusion

In today’s article we have constructed a neural network based on SVD factorization matrix with the use of TensorFlow general computational graph library. And even though our example showed complex, it followed the principles showed in the introductory example of TensorFlow.

Dremio played an important role in the process. As we showed in another tutorial , we constructed a recommender for movie ratings with the use of two different libraries: scikit learn and surprise. Today, we were able to use the same data without having to relocate it once again for our special purposes or creating additional dependencies.

Having such flexibility of data, when trying out multiple approaches to apply Machine Learning and this special case known as Neural Networks, is extremely useful. Evaluating models is an important part of the process and if lots of work has to be done to make the data ready for each model, that adds to the costs and time considerably. With Dremio multiple models can be evaluated easily because accessing the data is provided as a self-service experience that also makes getting the data very, very fast.