16 minute read · January 18, 2024
Connecting to Dremio Using Apache Arrow Flight in Python
· Senior Tech Evangelist, Dremio
The quest for efficient and powerful data management and retrieval solutions is perpetual. Dremio and Apache Arrow Flight, when combined, simplify and speed up the way we interact with large datasets. This blog delves into the synergy of these technologies, particularly through the lens of Python, a language synonymous with data.
Dremio is a data lakehouse platform that allows you to turn your data lake into a data warehouse, unify disparate data sources and curate and govern a single, easy-to-use access point for your data. Dremio stands out by offering lightning-fast queries and a self-service data platform, making it a favorite among data scientists and analysts. It connects to various data sources, providing a unified view of your entire data ecosystem. But what truly sets Dremio apart is its ability to optimize query performance without the need for data replication, ensuring that your insights are both rapid and relevant.
Apache Arrow specifies a standardized language-independent columnar memory format for flat and hierarchical data, organized for efficient analytic operations. Arrow is pivotal in the realm of big data for its ability to eliminate the need for costly serialization and deserialization processes. Its role is to act as a universal data layer that enables fast data exchange and efficient data processing across a wide range of systems.
Building on the foundation of Apache Arrow, Arrow Flight is an RPC framework for high-speed data transfer between distributed data sources and consumers. It's specifically designed to exploit the full potential of the Arrow format, ensuring that large volumes of data are transferred quickly and efficiently. Arrow Flight is a game-changer for scenarios requiring high data throughput, such as real-time analytics and machine learning data pipelines.
When Apache Arrow Flight is used in conjunction with Dremio, the result is a highly efficient pipeline for data retrieval. Dremio's powerful data processing capabilities, combined with Flight's high-speed data transfer, mean that large datasets can be moved and processed in record time. This integration is particularly valuable for Python developers working in data-intensive fields. Python, with its rich ecosystem of data libraries and tools, becomes even more potent when paired with the speed and efficiency of Dremio and Apache Arrow Flight.
In this article, we'll explore how to harness these technologies using Python, making your data analytics tasks faster and more efficient.
Connecting with Python
One way to connect to Dremio and other Apache Arrow Flight supporting sources is to use the Apache Arrow Flight JDBC driver, which you can learn about here. This requires you to download and configure the driver, but it can be great for faster connections when using tools that require a JDBC connection. There is an Arrow Flight ODBC driver as well.
The other approach for Python or any language is just to create a client using the Arrow libraries in that language or even implement the client yourself with the gRPC framework. You will see some example implementations across a few languages at this repository, and in the following section, we’ll show how to use the PyArrow library to connect and send queries, and how to use it with easy-to-use tools like dremio-simple-query which uses a PyArrow connection.
Using Just PyArrow
The following illustrates what it would look like to make a single query to an Arrow Flight Server using PyArrow (Dremio Cloud is the Arrow Flight Destination in this example, an example for Dremio Software later in this blog).
from pyarrow import flight from pyarrow.flight import FlightClient import os ## Endpoint location = "grpc+tls://data.dremio.cloud:443" ## Auth Token token = os.getenv("token") ## Headers for Requests headers = [ (b"authorization", f"bearer {token}".encode("utf-8")) ] ## Query query = "SELECT * FROM table1" ## Create Arrow Flight Client client = FlightClient(location=(location)) ## Create Flight Call Options options = flight.FlightCallOptions(headers=headers) ## Send Query flight_info = client.get_flight_info(flight.FlightDescriptor.for_command(query), options) ## Get Query Results results = client.do_get(flight_info.endpoints[0].ticket, options) ## Print Results print(results.read_all())
Breaking It Down
Importing libraries:
from pyarrow import flight from pyarrow.flight import FlightClient import os
- pyarrow: This is the Python library for Apache Arrow, which includes support for Arrow Flight.
- FlightClient: A class from the pyarrow.flight module that allows connecting to and interacting with a Flight server (like Dremio).
- os: A standard Python library used for interacting with the operating system; here it is used specifically to access environment variables.
Setting up the endpoint:
location = "grpc+tls://data.dremio.cloud:443"
This specifies the network address (URL) of the Dremio server using gRPC with TLS encryption. The 443 indicates the standard port for secure HTTP traffic.
Retrieving the authentication token:
token = os.getenv("token")
This retrieves an authentication token stored in an environment variable named "token". This token is required to authenticate with the Dremio Cloud server.
Setting headers for requests:
headers = [ (b"authorization", f"bearer {token}".encode("utf-8")) ]
Headers are set for the HTTP request. In this case, the authorization header is used, which includes the retrieved token in a bearer token format. The .encode("utf-8") ensures the string is correctly formatted in bytes.
Defining the query:
query = "SELECT * FROM table1"
This is the SQL query to be executed. It selects all records from table1.
Creating an Arrow Flight client:
client = FlightClient(location=(location))
An instance of FlightClient is created with the Dremio endpoint. This client will be used to send the query and receive results.
Creating Flight call options:
options = flight.FlightCallOptions(headers=headers)
This creates an object containing call options for the Flight client. It includes the headers set earlier for authorization.
Sending the query to the server:
flight_info = client.get_flight_info(flight.FlightDescriptor.for_command(query), options)
The client sends the query to the server and retrieves information about how to fetch the results. The FlightDescriptor.for_command is used to specify that the command (in this case, the SQL query) should be executed.
Retrieving the query results:
results = client.do_get(flight_info.endpoints[0].ticket, options)
Finally, the client retrieves the actual query results. The flight_info object contains endpoints and tickets that tell the client where and how to fetch the data. The do_get method is used to stream the results back to the client.
If you are using Dremio Software, here are a couple of things to take into consideration:
- If using a local non-SSL version of Dremio, use “grpc://” instead of “grpc+tls”.
- You’ll need to get a token prior via an API Call
- By default, the Apache Arrow Flight endpoint is exposed on port “32010”
Here is how the code would look like adjusting for this.
from pyarrow import flight from pyarrow.flight import FlightClient import requests import os ## Function to Retrieve PAT TOken from Dremio def get_token(uri, payload): # Make the POST request response = requests.post(uri, json=payload) # Check if the request was successful if response.status_code == 200: # Parse the JSON response data = response.json() # Extract the token return data.get("token", "") print("Token:", token) else: print("Failed to get a valid response. Status code:", response.status_code) ## Arrow Endpoint location = "grpc://localhost:32010" ## Username and Password for Dremio Account username = "username" password = "password" ## Dremio REST API URL To Login and Get Token uri = "http://localhost:9047/apiv2/login" ## Payload for Get Token Requests payload = { "userName": username, "password": password } ## Auth Token token = get_token(uri, payload) print(token) ## Headers for Arrow Requests headers = [ (b"authorization", f"bearer {token}".encode("utf-8")) ] ## Query query = """ SELECT * FROM Samples."samples.dremio.com"."NYC-weather.csv" """ ## Create Arrow Flight Client client = FlightClient(location=(location), disable_server_verification=False) ## Create Flight Call Options options = flight.FlightCallOptions(headers=headers) ## Send Query flight_info = client.get_flight_info(flight.FlightDescriptor.for_command(query), options) ## Get Query Results results = client.do_get(flight_info.endpoints[0].ticket, options) print(results.read_all())
Making the Code Reusable
Ideally, you don’t have to write all those steps for each query you’d like to make. For this, you can use this source code for the dremio-simple-query library (this way you can implement it if using Conda since the library isn’t in the Conda repository and choose which of the functions are relevant for you).
#---------------------------------- # IMPORTS #---------------------------------- ## Import Pyarrow from pyarrow import flight from pyarrow.flight import FlightClient import duckdb import pyarrow.dataset as ds import polars as pl import pandas as pd class DremioConnection: def __init__(self, token, location): self.token = token self.location = location self.headers = [ (b"authorization", f"bearer {token}".encode("utf-8")) ] self.client = FlightClient(location=(location)) def query(self, query, client, headers): ## Options for Query options = flight.FlightCallOptions(headers=headers) ## Get ticket to for query execution, used to get results flight_info = client.get_flight_info(flight.FlightDescriptor.for_command(query), options) ## Get Results (Return Value a FlightStreamReader) results = client.do_get(flight_info.endpoints[0].ticket, options) return results # Returns a FlightStreamReader def toArrow(self, query): return self.query(query, self.client, self.headers) #Returns a DuckDB Relation def toDuckDB(self, querystring): streamReader = self.query(querystring, self.client, self.headers) table = streamReader.read_all() my_ds = ds.dataset(source=[table]) return duckdb.arrow(my_ds) #Returns a Polars Dataframe def toPolars(self, querystring): streamReader = self.query(querystring, self.client, self.headers) table = streamReader.read_all() df = pl.from_arrow(table) return df #Returns a Pandas Dataframe def toPandas(self, querystring): streamReader = self.query(querystring, self.client, self.headers) df = streamReader.read_pandas() return df
With this code, you can choose to implement the toDuckDB, toPolars or toPandas functions if you plan on using them. Here is an example of using this class with Dremio Cloud:
# Assuming the DremioConnection class definition is already provided as in your example. # Initialize the Dremio Connection # Replace 'your_token' and 'your_location' with actual values dremio_conn = DremioConnection(token='your_token', location='your_location') # Define a SQL query to be executed sql_query = "SELECT * FROM your_table" # Using toArrow method to get results as Arrow format arrow_results = dremio_conn.toArrow(sql_query) print("Arrow Results:") print(arrow_results) # Using toDuckDB method to get results as DuckDB Relation duckdb_relation = dremio_conn.toDuckDB(sql_query) print("\nDuckDB Relation:") print(duckdb_relation) # Using toPolars method to get results as Polars DataFrame polars_df = dremio_conn.toPolars(sql_query) print("\nPolars DataFrame:") print(polars_df) # Using toPandas method to get results as Pandas DataFrame pandas_df = dremio_conn.toPandas(sql_query) print("\nPandas DataFrame:") print(pandas_df)
This can certainly make using Apache Arrow Flight sources much easier when sending many queries. A couple of things to keep in mind:
- The toArrow function returns a FlightStreamReader object that allows you to choose how to handle the incoming stream of data. You can see its methods here.
- The simplest way to consume the stream is to use its read_all() method to convert it into an Arrow table. To learn the methods in an Arrow table refer to this documentation.
Conclusion
Dremio's capabilities as a data lakehouse platform, combined with the efficiency of Apache Arrow's in-memory columnar data format and the speed of Arrow Flight's data transfer, create a powerful ecosystem for handling large-scale data with ease and agility.
The examples provided in this article, particularly using the dremio-simple-query source code, underscore the flexibility and ease with which Python developers can interact with these technologies. By enabling seamless connections to Dremio and efficient data retrieval in various formats like Arrow, DuckDB, Polars, and Pandas, Python developers are equipped to tackle data-intensive challenges more effectively than ever before.
Whether through direct PyArrow library usage or leveraging the dremio-simple-query library for simplified querying and data manipulation, the synergy of these tools opens up new possibilities for data analysis and processing. The ability to convert data streams into different formats ensures compatibility with a wide array of data processing and analytics tools, making this approach highly versatile.