Dremio Jekyll

Compiling SQL to Elasticsearch Painless

Intro

The Elasticsearch Query DSL is a powerful and simple way to express queries in Elasticsearch using JSON. Painless is a simple, secure scripting language for inline and stored scripts. When considered together, it is possible to map most SQL queries to Elasticsearch efficiently and with high performance.

In this tutorial we will look at how core SQL concepts for data structures and operations map to equivalent concepts in Elasticsearch. We will also explore how different SQL operators can be automatically mapped to equivalent Elasticsearch expressions, include numeric, string, date, Boolean, and set operators, through the DSL and Painless scripts.

Dremio enables business analysts and data scientists to utilize Elasticsearch without having to learn the Query DSL. Dremio automatically translates SQL queries into the Query DSL (and Painless scripts), and can augment Elasticsearch’s execution capabilities so that users can run any SQL query, including arbitrary joins, accurate counts (and count distincts) and more. Furthermore, Dremio Reflections can speed up execution of analytical queries by orders of magnitude.

Assumptions

For this tutorial we assume you’re familiar with Elasticsearch and know how to load data into your cluster. We recommend you first read Getting Oriented to Dremio and Working With Your First Dataset, and you might also review Unlocking SQL on Elasticsearch and Unlocking Tableau on Elasticsearch.

Mapping SQL Concepts to Elasticsearch

If you’re coming from the traditional world of relational databases, it can be helpful to think about how the core data model of Elasticsearch is similar to that of SQL, and how it is different. Here are several of the high-level concepts:

Relational Elasticsearch
Schema Mapping
Database Index
Table Type
Row Document
Column Field
SQL DSL+Painless
Aggregation Aggregation
Projection Projection
Boolean Boolean
Primary Key _id Field
Join Does not exist
Foreign Key Does not exist

One of the most important differences is that Elasticsearch does not support joins. Instead of normalizing data, users are encouraged to de-normalize their records whenever possible. For this reason perhaps there are no foreign keys either. While joins are computationally expensive operations, there are some situations where they are still useful, as we will see in a few of the examples.

One note: at the time of writing, Types are still supported. Elasticsearch has communicated to their community that these are being deprecated in future releases. Presumably the Index will take on the role of the abstraction that is most like a relational table.

Downloading our data

To start, let’s download the raw data from Yelp. Head over to https://www.yelp.com/dataset_challenge/dataset and click on the Download Data button to download the dataset. The downloaded tarball should be about 2GB.

Yelp data source

After you untar the file, you should see six files:

1
2
3
4
5
6
7
$du -sh *
126M	business.json
 57M	checkin.json
 23M	photos.json
3.6G	review.json
176M	tip.json
1.5G	user.json

The data inside these files is stored in JSON Lines format, also known as newline-delimited JSON (NDJSON), where each line contains a JSON document. For example, the first line in the business dataset is:

1
2
$  head -n 1 yelp_academic_dataset_business.json
{"business_id": "vcNAWiLM4dR7D2nwwJ7nCA", "full_address": "4840 E Indian School Rd\nSte 101\nPhoenix, AZ 85018", "hours": {"Tuesday": {"close": "17:00", "open": "08:00"}, "Friday": {"close": "17:00", "open": "08:00"}, "Monday": {"close": "17:00", "open": "08:00"}, "Wednesday": {"close": "17:00", "open": "08:00"}, "Thursday": {"close": "17:00", "open": "08:00"}}, "open": true, "categories": ["Doctors", "Health & Medical"], "city": "Phoenix", "review_count": 7, "name": "Eric Goldberg, MD", "neighborhoods": [], "longitude": -111.98375799999999, "state": "AZ", "stars": 3.5, "latitude": 33.499313000000001, "attributes": {"By Appointment Only": true}, "type": "business"}

Use jq as an easy way to pretty print this one-line JSON document:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
$  head -n 1 yelp_academic_dataset_business.json | jq .
{
  "business_id": "vcNAWiLM4dR7D2nwwJ7nCA",
  "full_address": "4840 E Indian School Rd\nSte 101\nPhoenix, AZ 85018",
  "hours": {
    "Tuesday": {
      "close": "17:00",
      "open": "08:00"
    },
    "Friday": {
      "close": "17:00",
      "open": "08:00"
    },
    "Monday": {
      "close": "17:00",
      "open": "08:00"
    },
    "Wednesday": {
      "close": "17:00",
      "open": "08:00"
    },
    "Thursday": {
      "close": "17:00",
      "open": "08:00"
    }
  },
  "open": true,
  "categories": [
    "Doctors",
    "Health & Medical"
  ],
  "city": "Phoenix",
  "review_count": 7,
  "name": "Eric Goldberg, MD",
  "neighborhoods": [],
  "longitude": -111.983758,
  "state": "AZ",
  "stars": 3.5,
  "latitude": 33.499313,
  "attributes": {
    "By Appointment Only": true
  },
  "type": "business"
}

Load these files into your Elasticsearch cluster using your favorite script or tool. For example, you might want to use the Bulk API. This API doesn’t accept NDJSON. Instead, it expects a structure where the index and type are declared before each record.

You can use jq to convert the downloaded files into this structure. Here’s an example with the user.json file:

1
cat user.json | jq -c '{ index: { _index:"yelp", _type: "user" } }, . ' > user_bulk.json

You can also pipe the output of jq into the Bulk API:

1
cat user.json | jq -c '{ index: { _index:"yelp", _type: "user" } }, . ' | curl -XPOST localhost:9200/_bulk --data-binary @-

Another option is to use esbulk, a bulk loader written in Go. This loader accepts NDJSON as input, so no re-formatting of the downloaded files is required:

1
./esbulk -index yelp -type  /Users/henrystirman/Downloads/dataset/user.json

For this tutorial we will use the business and review datasets, but feel free to load everything.

Connecting to Elasticsearch

Access the Dremio UI by connecting your browser to http://localhost:9047 (this assumes that you are running Dremio on your laptop machine, with the default port, for the purpose of this tutorial):

Dremio with no connected data sources

In our example, Dremio is not yet connected to any data sources (see Sources area in the bottom left). Click on the New Source button to add the Elasticsearch cluster as a source. In the New Source dialog, click on the Elasticsearch option:

List of data source types

Enter the coordinates of your Elasticsearch cluster. Assuming you are running Elasticsearch on your laptop, you should enter the following parameters: Name = ES5 (or any other name that you choose) Host = localhost Authentication = No Authentication

Connect to Elasticsearch

Click Save to return to Dremio’s main screen.

Notice that we now have one index called Yelp, and inside you can see each of the types for this index, including business, review, user, and checkin, as well as any other types you loaded. Dremio uses a hierarchical namespace model, and the canonical path to each type is source.index.type (eg, ES5.yelp.business).

Click on the yelp index:

Contents of the Yelp index

Click on the business dataset to see the data in the Dataset Viewer:

Contents of the business type

Running SQL Queries in Elasticsearch

For the following examples, feel free to use any SQL-based client. There’s a list of supported tools here, but most things should work fine over ODBC/JDBC.

Alternately, you can simply use the SQL window in Dremio to issue queries and see results. Be aware that clicking the “Preview” button will only show you a sample of the results (which is always fast), and if you want to see the full results you should instead click “Run” (sometimes slow, depending on the data source and usually fast when you have Reflections enabled). In this tutorial we’ll assume you’re using Dremio to issue SQL queries.

Basic SQL Push-Downs

From the business dataset viewer window, Click on the New Query button at the top and enter the following query:

1
2
3
4
SELECT city, COUNT(*) AS c
FROM elastic.yelp.business
GROUP BY city
ORDER BY c DESC

Be sure to click Run. Depending on the precise data you’ve downloaded from Yelp, Las Vegas (13,599), Phoenix (8,410), and Charlotte (4224) are the top 3.

Results of SQL query on Elasticsearch

Dremio compiled this SQL query, and the optimizer decided to translate most of the plan into an Elasticsearch Query DSL query that could be pushed down into Elasticsearch:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
{
  "size" : 0,
  "query" : {
    "match_all" : { }
  },
  "aggregations" : {
    "city" : {
      "terms" : {
        "field" : "city",
        "missing" : "NULL_STRING_TAG",
        "size" : 2147483647
      }
    }
  }
}

The way to see the expression that is pushed-down into a source in Dremio is to look at the query profile. There’s a tutorial on how to find your query profile. Once you’ve found it, you should see something like the following on the right:

Job History

Then click on the the Profile link to see the query profile, then the Planning tab:

Query Profile

There’s tons of useful information here, and it’s easy to be overwhelmed. Scroll down to the section called “Final Physical Transformation” where you should see something like this:

Query Profile

This isn’t as easy to read as it could be, and we’ll make it easier in a future release, but you can still find the query here if you want to.

As we can see this is a simple match all query, with an aggregation. Let’s try adding the state column and a few predicates:

1
2
3
4
5
6
7
SELECT state, city, COUNT(*) AS c
FROM elastic.yelp.business
WHERE 
  state NOT IN ('TX','UT','NM','NJ') AND
  review_count > 100
GROUP BY state, city
ORDER BY c DESC

The resulting DSL is as follows:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
{
  "size" : 0,
  "query" : {
    "bool" : {
      "must" : [ {
        "bool" : {
          "must_not" : {
            "match" : {
              "state" : {
                "query" : "TX",
                "type" : "boolean"
              }
            }
          }
        }
      }, {
        "bool" : {
          "must_not" : {
            "match" : {
              "state" : {
                "query" : "UT",
                "type" : "boolean"
              }
            }
          }
        }
      }, {
        "bool" : {
          "must_not" : {
            "match" : {
              "state" : {
                "query" : "NM",
                "type" : "boolean"
              }
            }
          }
        }
      }, {
        "bool" : {
          "must_not" : {
            "match" : {
              "state" : {
                "query" : "NJ",
                "type" : "boolean"
              }
            }
          }
        }
      }, {
        "range" : {
          "review_count" : {
            "from" : 100,
            "to" : null,
            "include_lower" : false,
            "include_upper" : true
          }
        }
      } ]
    }
  },
  "aggregations" : {
    "state" : {
      "terms" : {
        "field" : "state",
        "missing" : "NULL_STRING_TAG",
        "size" : 2147483647
      },
      "aggregations" : {
        "city" : {
          "terms" : {
            "field" : "city",
            "missing" : "NULL_STRING_TAG",
            "size" : 2147483647
          }
        }
      }
    }
  }
}

Now let’s try a query without aggregation that includes projection, sorting, and a few predicates:

1
2
3
4
5
6
7
SELECT state, city, name, review_count
FROM elastic.yelp.business
WHERE 
  state NOT IN ('TX','UT','NM','NJ') AND
  review_count > 100
ORDER BY review_count DESC, state, city
LIMIT 10

This time the DSL is very similar, but the LIMIT is being performed in Dremio:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
{
  "from" : 0,
  "size" : 4000,
  "query" : {
    "bool" : {
      "must" : [ {
        "bool" : {
          "must_not" : {
            "match" : {
              "state" : {
                "query" : "TX",
                "type" : "boolean"
              }
            }
          }
        }
      }, {
        "bool" : {
          "must_not" : {
            "match" : {
              "state" : {
                "query" : "UT",
                "type" : "boolean"
              }
            }
          }
        }
      }, {
        "bool" : {
          "must_not" : {
            "match" : {
              "state" : {
                "query" : "NM",
                "type" : "boolean"
              }
            }
          }
        }
      }, {
        "bool" : {
          "must_not" : {
            "match" : {
              "state" : {
                "query" : "NJ",
                "type" : "boolean"
              }
            }
          }
        }
      }, {
        "range" : {
          "review_count" : {
            "from" : 100,
            "to" : null,
            "include_lower" : false,
            "include_upper" : true
          }
        }
      } ]
    }
  }
}

If you’re curious how the fields determined when expressing the query, that is included just above in the query profile. In this case you can see the line just before the DSL:

ElasticScan(resource=[yelp/business], columns=[[city, review_count, name, state]], pushdown

If you want to see exactly what queries are being issued to Elasticsearch by Dremio, you can enable the slowlog.log feature in Elasticsearch to write out all the queries whose processing time exceeds a threshold. Be sure to set it back to a reasonable setting when you’re done. :-)

Windowing Functions

Taking the last examples a little further, there are common query patterns that benefit from more recent additions to SQL, such as Windowing. If you’re working with time-series data, for example, Windowing Functions are essential as they allow you to make sense of patterns within data that are very difficult or inefficient to otherwise analyze.

For a simple example, consider the restaurant reviews we were looking at before. It is very simple to assess the count of restaurant reviews per restaurant as shown above. But what if we wanted to understand the total number of reviews as a percentage of all reviews? In essence, calculate the total number of reviews for each restaurant, then compare that to the total number of reviews across all restaurants.

This is pretty easy if you’re willing to do multiple selects from your application, but that doesn’t scale as easily as it could, and coordinating predicates across queries can become fairly complex. With Windowing things are easier once you master the syntax. Here’s one example:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
SELECT 
  city, 
  c, 
  (c*1.0/total)*100 AS pct
FROM (
  SELECT 
    city, 
    c, 
    SUM(c) OVER () AS total
  FROM (
    SELECT 
      city, 
      COUNT(*) AS c
    FROM 
      elastic.yelp.business
    GROUP BY 
      city
  )
ORDER BY 
  pct DESC

This is compiled into Elastic DSL as the following:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
{
  "size" : 0,
  "query" : {
    "match_all" : { }
  },
  "aggregations" : {
    "city" : {
      "terms" : {
        "field" : "city",
        "missing" : "NULL_STRING_TAG",
        "size" : 2147483647
      }
    }
  }
}

In this case Dremio pushes the basic aggregation calculation for all cities into Elasticsearch, and calculates the percentage of the whole on its own.

Here’s a slightly more complex example where the number of reviews and the average score is compared to the aggregate score for each city and across all cities when businesses have more than 100 reviews. You can see that calculating the same using multiple queries would be much more challenging in this case.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
SELECT 
  city, 
  name, 
  bus_review_count, 
  bus_avg_stars, 
  city_avg_stars, 
  all_avg_stars
FROM (
  SELECT 
    city, 
    name, 
    bus_review_count, 
    bus_avg_stars, 
    AVG(bus_avg_stars) OVER (PARTITION BY city) as city_avg_stars, 
    AVG(bus_avg_stars) OVER () as all_avg_stars, 
    SUM(bus_review_count) OVER () AS total_reviews
  FROM (
    SELECT 
      city, 
      name, 
      AVG(stars) as bus_avg_stars, 
      SUM(review_count) AS bus_review_count
  	FROM 
    	elastic.yelp.business
  	GROUP BY 
    	city, name
  	)
  )
WHERE 
  bus_review_count > 100
ORDER BY 
  bus_avg_stars DESC, 
  bus_review_count DESC

The results here are a little surprising - it looks like there are a number of restaurants with literally hundreds of 5 star reviews across Las Vegas, Phoenix, Pittsburg, and Scottsdale.

Top Rated Restaurants Comparison

How can this be? It turns out there is one record per business, and the number of reviews and stars are aggregations themselves. To get an accurate sense of the total number of reviews and average stars per business we need to join to the reviews type, an entirely different set of records in our Elasticsearch cluster.

While Elasticsearch doesn’t support joins, Dremio makes this pretty simple. We can modify our query slightly to perform a left outer join and change our aggregates to use fields in the reviews data:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
SELECT 
  city, 
  name, 
  bus_review_count, 
  bus_avg_stars, 
  city_avg_stars, 
  all_avg_stars
FROM (
  SELECT 
    city, 
    name, 
    bus_review_count, 
    bus_avg_stars, 
    AVG(bus_avg_stars) OVER (PARTITION BY city) AS city_avg_stars, 
    AVG(bus_avg_stars) OVER () AS all_avg_stars, 
    SUM(bus_review_count) OVER () AS total_reviews
  FROM (
    SELECT 
      city, 
      name, 
      AVG(review.stars) AS bus_avg_stars, 
      COUNT(review.review_id) AS bus_review_count
    FROM 
      elastic.yelp.business AS business
      LEFT OUTER JOIN elastic.yelp.review AS review ON business.business_id = review.business_id
    GROUP BY 
      city, name
  )
)
WHERE bus_review_count > 100
ORDER BY bus_avg_stars DESC, bus_review_count DESC

And now our results look a lot more believable:

Top Rated Restaurants Comparison, take 2

Painless Expressions

In some cases, the Elasticsearch Query DSL is not expressive enough for the query, in which case Dremio also has the ability to utilize Painless, the secure scripting language embedded in Elasticsearch. For example, if we query the reviews and want to see if there are months where there are any seasonal differences in terms of number of reviews, we could issue the following query:

1
2
3
4
SELECT EXTRACT(MONTH FROM "date") as my_month, COUNT(*) AS c
FROM "elastic-remote".yelp.review
GROUP BY EXTRACT(MONTH FROM "date")
ORDER BY c DESC

Sure enough, there are about 50% more reviews in August than in February:

Review counts by month

Each review has a datetime, so we need to extract the month from this value. There are certain SQL expressions that are pushed-down as Painless scripts when used in a filter or aggregation expression. Extracting parts of time from date and datetime values is one example. This SQL produces the following query for Elasticsearch:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
{
  "size" : 0,
  "query" : {
    "match_all" : { }
  },
  "aggregations" : {
    "my_month" : {
      "terms" : {
        "script" : {
          "inline": "(def) ((doc[\"date\"].empty) ? null : doc[\"date\"].date.monthOfYear)",          
          "lang" : "painless"
        },
        "missing" : -9223372036854775808,
        "size" : 2147483647
      }
    }
  }
}

In Elasticsearch 2.x you will see something similar, compiled to Groovy:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
{
  "size" : 0,
  "query" : {
    "match_all" : { }
  },
  "aggregations" : {
    "my_month" : {
      "terms" : {
        "script" : {
          "inline" : "(doc[\"date\"].empty) ? null : doc[\"date\"].date.monthOfYear",
          "lang" : "groovy"
        },
        "missing" : -9223372036854775808,
        "size" : 2147483647
      }
    }
  }
}

Other types of expressions that are pushed down as Painless include:

  • Math operators
  • String operators
  • Numeric operators
  • Date operators
  • Case statements
  • Existence Functions
  • Cast Functions

Keep in mind these need to be used in a filter or aggregation to be pushed down, otherwise Dremio will perform these types of expressions in it’s own SQL engine.

The good news is that all of these options are transparent to you as a user - you send your SQL to Dremio, and it decides the best way to optimize the query.

Conclusion

In this tutorial we showed a number of different SQL expressions and how they are compiled into Elasticsearch queries, including DSL and Painless (and Groovy for 2.x). With Dremio users can apply existing skills in SQL to unlock the powerful features of Elasticsearch. They can also use their favorite SQL based tools, such as Tableau, Power BI, and Qlik with Elasticsearch, unlocking the data in these clusters to a new audience, without making copies or using any ETL.

Next Steps

As a next step you should take a look at Dremio Reflections as a way to offload these analytical queries from your Elasticsearch cluster, and to speed up analytical queries dramatically.