18 minute read · April 12, 2023

3 Ways to Use Python with Apache Iceberg

Alex Merced

Alex Merced · Developer Advocate, Dremio

Apache Iceberg is a data lake table format that is quickly growing its adoption across the data space. If you want to become more familiar with Apache Iceberg, check out this Apache Iceberg 101 article with everything you need to go from zero to hero.

If you are a data engineer, data analyst, or data scientist, then beyond SQL you probably find yourself writing a lot of Python code. This article illustrates three ways you can use Python code to work with Apache Iceberg data:

  1. Using pySpark to interact with the Apache Spark engine
  2. Using pyArrow or pyODBC to connect to engines like Dremio
  3. Using pyIceberg, the native Apache Iceberg Python API

Setting Up Your Environment

We will run a local Docker container running Spark and Jupyter Notebook to try out our examples. Having Docker installed on your computer is a requirement for this setup. Run the following command by entering the proper values for each environment variable:

docker run -p 8888:8888 --env AWS_REGION=xxxxxx --env AWS_DEFAULT_REGION=us-east-1 --env AWS_ACCESS_KEY_ID=XXXXXXXXXXXXXXX --env AWS_SECRET_ACCESS_KEY=xxxxxxx --name spark-notebook alexmerced/spark33-notebook

Replace all the “x”s with your appropriate AWS information to write any tables to your S3. (The reason for the two region variables is that Spark will look for AWS_REGION while pyIceberg will look for AWS_DEFAULT_REGION.)

If you don’t intend to write or read from S3 for these exercises, you can run this command instead and work strictly from the container's file system:

docker run -p 8888:8888 --name spark-notebook alexmerced/spark33-notebook

Either way, look for your notebook server token in the terminal output after running the command, which will look something like this:

[C 18:56:14.806 NotebookApp] 
    
    To access the notebook, open this file in a browser:
        file:///home/docker/.local/share/jupyter/runtime/nbserver-7-open.html
    Or copy and paste one of these URLs:
        http://4a80fcd5fa3f:8888/?token=6c7cb2f9207eaa4e44de5f38d5648cfde4884fe0a73ffeef
     or http://127.0.0.1:8888/?token=6c7cb2f9207eaa4e44de5f38d5648cfde4884fe0a73ffeef

Head over to http://localhost:8888 in your browser and enter the token to access the server (feel free to set a password if you plan on using this multiple times).

You can find more information on this setup and see the dockerfile from which the image was created here. This image already has Spark and pySpark installed, so it’s off to the races on your first stop.

Iceberg with pySpark

When using Spark with Iceberg, there are several things you may need to configure when starting your Spark session.

  • You need to establish a catalog under a namespace. We will use “icebergcat” as the namespace of the Iceberg catalog in the Spark session.
  • You need to establish what kind of Iceberg catalog to use. We will write directly to the file system so we will use the Hadoop catalog.
  • If writing to S3, you need to download the appropriate packages and specify a warehouse directory.

Create a new notebook, and assuming you are using AWS and defined all the environment variables when starting up the container, your code should look like this in the new notebook:

import pyspark
from pyspark.sql import SparkSession
import os

conf = (
    pyspark.SparkConf()
        .setAppName('app_name')
  		#packages
        .set('spark.jars.packages', 'org.apache.hadoop:hadoop-aws:3.3.4,org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.1.0,software.amazon.awssdk:bundle:2.18.31,software.amazon.awssdk:url-connection-client:2.18.31')
  		#SQL Extensions
        .set('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions')
  		#Configuring Catalog
        .set('spark.sql.catalog.icebergcat', 'org.apache.iceberg.spark.SparkCatalog')
        .set('spark.sql.catalog.icebergcat.type', 'hadoop')
        .set('spark.sql.catalog.icebergcat.warehouse', 's3a://your-bucket-name/target-directory')
        .set('spark.sql.catalog.icebergcat.io-impl', 'org.apache.iceberg.aws.s3.S3FileIO')
)

## Start Spark Session
spark = SparkSession.builder.config(conf=conf).getOrCreate()
print("Spark Running")

If you aren’t writing to S3, you can write into the containers file system and pass a directory name for where tables will be written to as the warehouse.

import pyspark
from pyspark.sql import SparkSession
import os

conf = (
    pyspark.SparkConf()
        .setAppName('app_name')
  		#packages
        .set('spark.jars.packages', 'org.apache.hadoop:hadoop-aws:3.3.4,org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.1.0,software.amazon.awssdk:bundle:2.18.31,software.amazon.awssdk:url-connection-client:2.18.31')
  		#SQL Extensions
        .set('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions')
  		#Configuring Catalog
        .set('spark.sql.catalog.icebergcat', 'org.apache.iceberg.spark.SparkCatalog')
        .set('spark.sql.catalog.icebergcat.type', 'hadoop')
        .set('spark.sql.catalog.icebergcat.warehouse', 'iceberg-warehouse')
)

## Start Spark Session
spark = SparkSession.builder.config(conf=conf).getOrCreate()
print("Spark Running")

If you want to see these configurations for other catalogs like Dremio Arctic, Project Nessie, Hive, JDBC, AWS Glue, and DynamoDB you can find pySpark examples of those here.

Either way, let’s run some queries in some additional cells:

  • Create a table
spark.sql("CREATE TABLE icebergcat.our_table (first_name string, last_name string) USING iceberg").show()
  • Insert records
spark.sql("INSERT INTO icebergcat.our_table VALUES ('Alex', 'Merced')").show()
  • Display your records
spark.sql("SELECT * FROM icebergcat.our_table").show()

The bottom line, any valid Iceberg SQL statement can be run through the configured namespace, in this case, “icebergcat”. Things you can do include:

  • Run upsert statements
  • Use call procedures to run compaction and other maintenance operations
  • Inspect metadata

Iceberg with pyArrow/pyODBC (Dremio and other engines)

For Dremio and other query engines, you can use universal interfaces to send queries from your Python code. If the engine supports Iceberg tables, this is another route to run all your workloads in Python scripts. Two of the interfaces are:

With ODBC you have to install drivers for the platform you're connecting to. For example, here are the Dremio ODBC drivers (which include Dremio’s Universal Arrow ODBC Driver and enable the benefits of Apache Arrow when using ODBC).

Below is some sample code for connecting ODBC using pyODBC to Dremio.

#----------------------------------
# IMPORTS
#----------------------------------

## Import pyodbc
import pyodbc

## import pandas
import pandas as pd

## import environment variables
from os import environ

#----------------------------------
# SETUP
#----------------------------------

token=environ.get("token", "personal token not defined")
connector="Driver={Arrow Flight SQL ODBC};ConnectionType=Direct;HOST=sql.dremio.cloud;PORT=443;AuthenticationType=Plain;" + f"UID=$token;PWD={token};ssl=true;"

#----------------------------------
# CREATE CONNECTION AND CURSOR
#----------------------------------

# establish connection
cnxn = pyodbc.connect(connector, autocommit=True)

# set encoding
cnxn.setdecoding(pyodbc.SQL_CHAR, encoding='utf-8')

# creating a cursor to send messages through the connection
cursor = cnxn.cursor()

#----------------------------------
# RUN QUERY
#----------------------------------

## run a query
rows = cursor.execute("SELECT * FROM \"@[email protected]\".\"nyc-taxi-data\" limit 1000000").fetchall()

##convert into pandas dataframe
df = pd.DataFrame([tuple(t) for t in rows])

print(df)

If you want to set up a local Dremio environment with MinIO click here, or click here if you want to see how to start a Dremio Cloud account so you can try it out. You can use the sample dataset that is available to all Dremio accounts.

Also, you’ll notice in the code snippet above a token as an environmental variable; this refers to the Dremio personal access token, which you can obtain by going to your account settings in Dremio and generating one.

Note: The driver's name for the connection string may differ depending on how you configure the driver in your ODBC.ini.

If using Arrow directly, you don't need to install drivers; you just use the Arrow Flight part of the pyArrow library to send requests to Arrow-enabled sources like Dremio.

#----------------------------------
# IMPORTS
#----------------------------------
## Import Pyarrow
from pyarrow import flight
from pyarrow.flight import FlightClient

## import pandas
import pandas as pd

## Get environment variables
from os import environ

const token = environ.get('token', 'no personal token defined')

#----------------------------------
# Setup
#----------------------------------

## Headers for Authentication
headers = [
    (b"authorization", f"bearer {token}".encode("utf-8"))
    ]

## Create Client
client = FlightClient(location=("grpc+tls://data.dremio.cloud:443"))

#----------------------------------
# Function Definitions
#----------------------------------

## makeQuery function
def make_query(query, client, headers):
    

    ## Get Schema Description and build headers
    flight_desc = flight.FlightDescriptor.for_command(query)
    options = flight.FlightCallOptions(headers=headers)
    schema = client.get_schema(flight_desc, options)

    ## Get ticket to for query execution, used to get results
    flight_info = client.get_flight_info(flight.FlightDescriptor.for_command(query), options)
    
    ## Get Results 
    results = client.do_get(flight_info.endpoints[0].ticket, options)
    return results

#----------------------------------
# Run Query
#----------------------------------


results = make_query("SELECT * FROM \"@[email protected]\".\"nyc-taxi-data\" limit 1000000", client, headers)

# convert to pandas dataframe
df = results.read_pandas()

print(df)

Once your connection is established with Arrow or ODBC it’s just a matter of sending SQL statements that work with any of the engine's sources. For Dremio, there are a large number of sources including table formats like Apache Iceberg (Read and Write) and Delta Lake (Read Only).

For Dremio, using SQL you can do the following with Apache Iceberg tables:

Also, recently I published a library called “dremio-simple-query” that can make it even easier to connect to Dremio via Arrow Flight to work with your Iceberg tables and quickly load them into a DuckDB relation. 

from dremio_simple_query.connect import DremioConnection
from os import getenv
from dotenv import load_dotenv
import duckdb

## DuckDB Connection
con = duckdb.connection()

load_dotenv()

## Dremio Personal Token
token = getenv("TOKEN")

## Arrow Endpoint (See Dremio Documentation)
uri = getenv("ARROW_ENDPOINT")

## Create Dremio Arrow Connection
dremio = DremioConnection(token, uri)

## Get Data from Dremio
stream = dremio.toArrow("SELECT * FROM arctic.table1;")

## Turn into Arrow Table
my_table = stream.read_all()

## Query with Duckdb
results = con.execute("SELECT * FROM my_table;").fetchall()

print(results)

You can also have Dremio queries returned to you as a DuckDB relation:

duck_rel = dremio.toDuckDB("SELECT * FROM arctic.table1")

result = duck_rel.query("table1", "SELECT * from table1").fetchall()

print(result)

pyIceberg – The Native Python API

The native Python API for Iceberg is provided through the pyIceberg library. You can load catalogs to create tables, inspect tables, and now query tables using DuckDB. Let’s walk through an example for an AWS Glue catalog.

You want to install pyIceberg with this command:

pip install pyiceberg[glue,s3fs,duckdb,pyarrow]

This command installs pyIceberg with some optional dependencies for working with AWS Glue and DuckDB to run local queries on your data.

The next step is to create a ~/.pyiceberg configuration file in your computer's home directory. This YAML file will be used to find the configurations for the Iceberg catalog you seek to work with. In this example, it will be configured to work with Glue. The credentials are passed through the environmental variables defined when the Docker container was started during setup.

The contents of your configuration file would be:

catalog:
  glue:
    type: glue

This defines a glue type catalog called “glue.” Right now the Python API supports Glue, Hive, and REST catalog (not Nessie, JDBC, Hadoop, or DynamoDB yet).

Now you can write a script to query a table like so:

## Creating Configuration File in HOME directory
f = open("../.pyiceberg.yaml", "w")
f.write(
"""catalog:
  glue:
    type: glue
"""
f.close()
  
## Import Pyiceberg

from pyiceberg.catalog import load_catalog

catalog = load_catalog("glue")

## List Glue databases
catalog.list_namespaces()

## List tables in a particular glue database
catalog.list_tables("db2")
  
## Load a table
table = catalog.load_table("db2.names")

## Check the tables schema
print(table.metadata.schemas)
  
## Scan the Table into a DuckDB Table
duckdb_connection = table.scan(selected_fields=("id", "name", "age")).to_duckdb(table_name="names")

## Query and Print Results
print(duckdb_connection.execute("SELECT * FROM names;").fetchall())

To see the results from running this script to scan tables created with Spark and Dremio, here you go:

As a side note, you can also configure the catalog using kwargs like so:

catalog = load_catalog("glue", **{"type": "glue"})

Conclusion

Apache Iceberg has a wide Python footprint to allow you to do the work you need to do. Whether you use pySpark, ODBC/Arrow to send SQL to engines like Dremio or use pyIceberg to do local scans of your table with DuckDB, Iceberg has a lot to offer data practitioners who love writing Python.

Ready to Get Started?

Bring your users closer to the data with organization-wide self-service analytics and lakehouse flexibility, scalability, and performance at a fraction of the cost. Run Dremio anywhere with self-managed software or Dremio Cloud.