19 minute read · April 12, 2023
3 Ways to Use Python with Apache Iceberg
· Senior Tech Evangelist, Dremio
- Get Hands-on with Apache Iceberg and Dremio from your Laptop with this quick exercise
- Learn how to pull all your unified data from Dremio into your Python Notebooks
Apache Iceberg is a data lake table format that is quickly growing its adoption across the data space. If you want to become more familiar with Apache Iceberg, check out this Apache Iceberg 101 article with everything you need to go from zero to hero.
If you are a data engineer, data analyst, or data scientist, then beyond SQL you probably find yourself writing a lot of Python code. This article illustrates three ways you can use Python code to work with Apache Iceberg data:
- Using pySpark to interact with the Apache Spark engine
- Using pyArrow or pyODBC to connect to engines like Dremio
- Using pyIceberg, the native Apache Iceberg Python API
Setting Up Your Environment
We will run a local Docker container running Spark and Jupyter Notebook to try out our examples. Having Docker installed on your computer is a requirement for this setup. Run the following command by entering the proper values for each environment variable:
docker run -p 8888:8888 --env AWS_REGION=xxxxxx --env AWS_DEFAULT_REGION=us-east-1 --env AWS_ACCESS_KEY_ID=XXXXXXXXXXXXXXX --env AWS_SECRET_ACCESS_KEY=xxxxxxx --name spark-notebook alexmerced/spark33-notebook
Replace all the “x”s with your appropriate AWS information to write any tables to your S3. (The reason for the two region variables is that Spark will look for AWS_REGION while pyIceberg will look for AWS_DEFAULT_REGION.)
If you don’t intend to write or read from S3 for these exercises, you can run this command instead and work strictly from the container's file system:
docker run -p 8888:8888 --name spark-notebook alexmerced/spark33-notebook
Either way, look for your notebook server token in the terminal output after running the command, which will look something like this:
[C 18:56:14.806 NotebookApp] To access the notebook, open this file in a browser: file:///home/docker/.local/share/jupyter/runtime/nbserver-7-open.html Or copy and paste one of these URLs: http://4a80fcd5fa3f:8888/?token=6c7cb2f9207eaa4e44de5f38d5648cfde4884fe0a73ffeef or http://127.0.0.1:8888/?token=6c7cb2f9207eaa4e44de5f38d5648cfde4884fe0a73ffeef
Head over to http://localhost:8888 in your browser and enter the token to access the server (feel free to set a password if you plan on using this multiple times).
You can find more information on this setup and see the dockerfile from which the image was created here. This image already has Spark and pySpark installed, so it’s off to the races on your first stop.
Iceberg with pySpark
When using Spark with Iceberg, there are several things you may need to configure when starting your Spark session.
- You need to establish a catalog under a namespace. We will use “icebergcat” as the namespace of the Iceberg catalog in the Spark session.
- You need to establish what kind of Iceberg catalog to use. We will write directly to the file system so we will use the Hadoop catalog.
- If writing to S3, you need to download the appropriate packages and specify a warehouse directory.
Create a new notebook, and assuming you are using AWS and defined all the environment variables when starting up the container, your code should look like this in the new notebook:
import pyspark from pyspark.sql import SparkSession import os conf = ( pyspark.SparkConf() .setAppName('app_name') #packages .set('spark.jars.packages', 'org.apache.hadoop:hadoop-aws:3.3.4,org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.1.0,software.amazon.awssdk:bundle:2.18.31,software.amazon.awssdk:url-connection-client:2.18.31') #SQL Extensions .set('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions') #Configuring Catalog .set('spark.sql.catalog.icebergcat', 'org.apache.iceberg.spark.SparkCatalog') .set('spark.sql.catalog.icebergcat.type', 'hadoop') .set('spark.sql.catalog.icebergcat.warehouse', 's3a://your-bucket-name/target-directory') .set('spark.sql.catalog.icebergcat.io-impl', 'org.apache.iceberg.aws.s3.S3FileIO') ) ## Start Spark Session spark = SparkSession.builder.config(conf=conf).getOrCreate() print("Spark Running")
If you aren’t writing to S3, you can write into the containers file system and pass a directory name for where tables will be written to as the warehouse.
import pyspark from pyspark.sql import SparkSession import os conf = ( pyspark.SparkConf() .setAppName('app_name') #packages .set('spark.jars.packages', 'org.apache.hadoop:hadoop-aws:3.3.4,org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.1.0,software.amazon.awssdk:bundle:2.18.31,software.amazon.awssdk:url-connection-client:2.18.31') #SQL Extensions .set('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions') #Configuring Catalog .set('spark.sql.catalog.icebergcat', 'org.apache.iceberg.spark.SparkCatalog') .set('spark.sql.catalog.icebergcat.type', 'hadoop') .set('spark.sql.catalog.icebergcat.warehouse', 'iceberg-warehouse') ) ## Start Spark Session spark = SparkSession.builder.config(conf=conf).getOrCreate() print("Spark Running")
If you want to see these configurations for other catalogs like Dremio Arctic, Project Nessie, Hive, JDBC, AWS Glue, and DynamoDB you can find pySpark examples of those here.
Either way, let’s run some queries in some additional cells:
- Create a table
spark.sql("CREATE TABLE icebergcat.our_table (first_name string, last_name string) USING iceberg").show()
- Insert records
spark.sql("INSERT INTO icebergcat.our_table VALUES ('Alex', 'Merced')").show()
- Display your records
spark.sql("SELECT * FROM icebergcat.our_table").show()
The bottom line, any valid Iceberg SQL statement can be run through the configured namespace, in this case, “icebergcat”. Things you can do include:
- Run upsert statements
- Use call procedures to run compaction and other maintenance operations
- Inspect metadata
Iceberg with pyArrow/pyODBC (Dremio and other engines)
For Dremio and other query engines, you can use universal interfaces to send queries from your Python code. If the engine supports Iceberg tables, this is another route to run all your workloads in Python scripts. Two of the interfaces are:
- ODBC (Open Database Connectivity): This older standard is more widely supported, but analytical workloads can have much more latency than using pyArrow.
- Arrow Flight: Arrow Flight and Arrow Flight SQL are part of the Apache Arrow project that created a new standard for database connectivity for analytical workloads and whose adoption is growing and provides many gains in speed.
With ODBC you have to install drivers for the platform you're connecting to. For example, here are the Dremio ODBC drivers (which include Dremio’s Universal Arrow ODBC Driver and enable the benefits of Apache Arrow when using ODBC).
Below is some sample code for connecting ODBC using pyODBC to Dremio.
#---------------------------------- # IMPORTS #---------------------------------- ## Import pyodbc import pyodbc ## import pandas import pandas as pd ## import environment variables from os import environ #---------------------------------- # SETUP #---------------------------------- token=environ.get("token", "personal token not defined") connector="Driver={Arrow Flight SQL ODBC};ConnectionType=Direct;HOST=sql.dremio.cloud;PORT=443;AuthenticationType=Plain;" + f"UID=$token;PWD={token};ssl=true;" #---------------------------------- # CREATE CONNECTION AND CURSOR #---------------------------------- # establish connection cnxn = pyodbc.connect(connector, autocommit=True) # set encoding cnxn.setdecoding(pyodbc.SQL_CHAR, encoding='utf-8') # creating a cursor to send messages through the connection cursor = cnxn.cursor() #---------------------------------- # RUN QUERY #---------------------------------- ## run a query rows = cursor.execute("SELECT * FROM \"@[email protected]\".\"nyc-taxi-data\" limit 1000000").fetchall() ##convert into pandas dataframe df = pd.DataFrame([tuple(t) for t in rows]) print(df)
If you want to set up a local Dremio environment with MinIO click here, or click here if you want to see how to start a Dremio Cloud account so you can try it out. You can use the sample dataset that is available to all Dremio accounts.
Also, you’ll notice in the code snippet above a token as an environmental variable; this refers to the Dremio personal access token, which you can obtain by going to your account settings in Dremio and generating one.
Note: The driver's name for the connection string may differ depending on how you configure the driver in your ODBC.ini.
If using Arrow directly, you don't need to install drivers; you just use the Arrow Flight part of the pyArrow library to send requests to Arrow-enabled sources like Dremio.
#---------------------------------- # IMPORTS #---------------------------------- ## Import Pyarrow from pyarrow import flight from pyarrow.flight import FlightClient ## import pandas import pandas as pd ## Get environment variables from os import environ const token = environ.get('token', 'no personal token defined') #---------------------------------- # Setup #---------------------------------- ## Headers for Authentication headers = [ (b"authorization", f"bearer {token}".encode("utf-8")) ] ## Create Client client = FlightClient(location=("grpc+tls://data.dremio.cloud:443")) #---------------------------------- # Function Definitions #---------------------------------- ## makeQuery function def make_query(query, client, headers): ## Get Schema Description and build headers flight_desc = flight.FlightDescriptor.for_command(query) options = flight.FlightCallOptions(headers=headers) schema = client.get_schema(flight_desc, options) ## Get ticket to for query execution, used to get results flight_info = client.get_flight_info(flight.FlightDescriptor.for_command(query), options) ## Get Results results = client.do_get(flight_info.endpoints[0].ticket, options) return results #---------------------------------- # Run Query #---------------------------------- results = make_query("SELECT * FROM \"@[email protected]\".\"nyc-taxi-data\" limit 1000000", client, headers) # convert to pandas dataframe df = results.read_pandas() print(df)
Once your connection is established with Arrow or ODBC it’s just a matter of sending SQL statements that work with any of the engine's sources. For Dremio, there are a large number of sources including table formats like Apache Iceberg (Read and Write) and Delta Lake (Read Only).
For Dremio, using SQL you can do the following with Apache Iceberg tables:
- Full DDL (CREATE & CREATE TABLE AS)
- Full DML (Inserts, Updates, Deletes, Merge Into)
- Query the Table Data and Metadata
- Time Travel Queries
Also, recently I published a library called “dremio-simple-query” that can make it even easier to connect to Dremio via Arrow Flight to work with your Iceberg tables and quickly load them into a DuckDB relation.
from dremio_simple_query.connect import DremioConnection from os import getenv from dotenv import load_dotenv import duckdb ## DuckDB Connection con = duckdb.connection() load_dotenv() ## Dremio Personal Token token = getenv("TOKEN") ## Arrow Endpoint (See Dremio Documentation) uri = getenv("ARROW_ENDPOINT") ## Create Dremio Arrow Connection dremio = DremioConnection(token, uri) ## Get Data from Dremio stream = dremio.toArrow("SELECT * FROM arctic.table1;") ## Turn into Arrow Table my_table = stream.read_all() ## Query with Duckdb results = con.execute("SELECT * FROM my_table;").fetchall() print(results)
You can also have Dremio queries returned to you as a DuckDB relation:
duck_rel = dremio.toDuckDB("SELECT * FROM arctic.table1") result = duck_rel.query("table1", "SELECT * from table1").fetchall() print(result)
pyIceberg – The Native Python API
The native Python API for Iceberg is provided through the pyIceberg library. You can load catalogs to create tables, inspect tables, and now query tables using DuckDB. Let’s walk through an example for an AWS Glue catalog.
You want to install pyIceberg with this command:
pip install pyiceberg[glue,s3fs,duckdb,pyarrow]
This command installs pyIceberg with some optional dependencies for working with AWS Glue and DuckDB to run local queries on your data.
The next step is to create a ~/.pyiceberg
configuration file in your computer's home directory. This YAML file will be used to find the configurations for the Iceberg catalog you seek to work with. In this example, it will be configured to work with Glue. The credentials are passed through the environmental variables defined when the Docker container was started during setup.
The contents of your configuration file would be:
catalog: glue: type: glue
This defines a glue type catalog called “glue.” Right now the Python API supports Glue, Hive, and REST catalog (not Nessie, JDBC, Hadoop, or DynamoDB yet).
Now you can write a script to query a table like so:
## Creating Configuration File in HOME directory f = open("../.pyiceberg.yaml", "w") f.write( """catalog: glue: type: glue """ f.close() ## Import Pyiceberg from pyiceberg.catalog import load_catalog catalog = load_catalog("glue") ## List Glue databases catalog.list_namespaces() ## List tables in a particular glue database catalog.list_tables("db2") ## Load a table table = catalog.load_table("db2.names") ## Check the tables schema print(table.metadata.schemas) ## Scan the Table into a DuckDB Table duckdb_connection = table.scan(selected_fields=("id", "name", "age")).to_duckdb(table_name="names") ## Query and Print Results print(duckdb_connection.execute("SELECT * FROM names;").fetchall())
To see the results from running this script to scan tables created with Spark and Dremio, here you go:
As a side note, you can also configure the catalog using kwargs like so:
catalog = load_catalog("glue", **{"type": "glue"})
Conclusion
Apache Iceberg has a wide Python footprint to allow you to do the work you need to do. Whether you use pySpark, ODBC/Arrow to send SQL to engines like Dremio or use pyIceberg to do local scans of your table with DuckDB, Iceberg has a lot to offer data practitioners who love writing Python.