21 minute read · May 31, 2023

Deep Dive Into Configuring Your Apache Iceberg Catalog with Apache Spark

Alex Merced

Alex Merced · Senior Tech Evangelist, Dremio

Apache Iceberg is a data lakehouse table format that has been taking the data world by storm with robust support from tools like Dremio, Fivetran, Airbyte, AWS, Snowflake, Tabular, Presto, Apache Flink, Apache Spark, Trino, and so many more. Although one of the tools most data professionals use is Apache Spark and many introductory tutorials for Apache Iceberg use Spark as an entry point.

When reading documentation or following tutorials, a source of confusion is often all the configurations used to configure the catalog during your Spark session, which can look like the following when using the Spark CLI:

spark-sql --packages org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.3.0 \
--conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
--conf spark.sql.catalog.iceberg=org.apache.iceberg.spark.SparkCatalog \
--conf spark.sql.catalog.iceberg.type=hadoop \
--conf spark.sql.catalog.iceberg.warehouse=$PWD/warehouse

Or like this in a PySpark script:

import pyspark
from pyspark.sql import SparkSession
import os

## DEFINE SENSITIVE VARIABLES
AWS_ACCESS_KEY = os.environ.get("AWS_ACCESS_KEY") ## AWS CREDENTIALS
AWS_SECRET_KEY = os.environ.get("AWS_SECRET_KEY") ## AWS CREDENTIALS


conf = (
    pyspark.SparkConf()
        .setAppName('app_name')
        .setMaster(SPARK_MASTER)
  		#packages
        .set('spark.jars.packages', 'org.apache.hadoop:hadoop-aws:3.3.4,org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.3.0,software.amazon.awssdk:bundle:2.17.178,software.amazon.awssdk:url-connection-client:2.17.178')
  		#SQL Extensions
        .set('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions')
  		#Configuring Catalog
        .set('spark.sql.catalog.hdfs_catalog', 'org.apache.iceberg.spark.SparkCatalog')
        .set('spark.sql.catalog.hdfs_catalog.type', 'hadoop')
        .set('spark.sql.catalog.hdfs_catalog.warehouse', 's3a://my-bucket/path/')
        .set('spark.sql.catalog.hdfs_catalog.io-impl', 'org.apache.iceberg.aws.s3.S3FileIO')
  		#AWS CREDENTIALS
        .set('spark.hadoop.fs.s3a.access.key', AWS_ACCESS_KEY)
        .set('spark.hadoop.fs.s3a.secret.key', AWS_SECRET_KEY)
)

## Start Spark Session
spark = SparkSession.builder.config(conf=conf).getOrCreate()
print("Spark Running")

## Run a Query
spark.sql("SELECT * FROM hdfs_catalog.table1;").show()

Why Do You Need These Configurations?

When using tools like Dremio, these configurations are automatic based on the source you connect to your Dremio Sonar project, but this is not precisely how Spark works. When you start up a Spark session, you need to configure a Spark catalog that is an abstraction over the Apache Iceberg catalog you’d connect to any engine. Possible catalogs include:

  • Hadoop (File System)
  • AWS Glue
  • Project Nessie
  • JDBC (Any JDBC supporting database)
  • Hive
  • DynamoDB
  • REST Catalog

Wait, What Is a Catalog?

An Apache Iceberg catalog is a mechanism for tooling to identify available Apache Iceberg tables which provides the engine with the location of the metadata.json that currently defines the table. You can learn more by reading these articles about Apache Iceberg reads and Apache Iceberg writes.

An Apache Spark catalog is a mechanism in the Spark session that enables Spark to discover available tables to work with, and our Iceberg configurations create a Spark catalog and links it to an existing Iceberg catalog.

The Primary Settings

Here are many of the typical settings you’ll use regardless of the catalog.

--packages 
org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:0.14.0

This setting identifies any packages that need to be downloaded and used during the Spark session. Libraries you’ll often specify in this setting include:

  • org.apache.iceberg:iceberg-spark-runtime-x.x_x.xx:x.xx.x
    (Core Apache Iceberg Library)
  • org.apache.hadoop:hadoop-aws:x.x.x
    (Library for writing to S3 if using Hadoop/HDFS catalog)
  • software.amazon.awssdk:bundle:x.xx.xxx
    (For working with AWS S3)
  • software.amazon.awssdk:url-connection-client:x.xx.xxx
    (For working with AWS S3)
  • Org.apache.iceberg:iceberg-gcp:x.x.x
    (For working with Google Cloud)
  • org.projectnessie:nessie-spark-extensions-x.x_x.xx:x.xx.x
    (For using a Project Nessie or Dremio Arctic catalog)
  • org.apache.iceberg:iceberg-dell:x.xx.x
    (For using Dell ECS)
  • com.emc.ecs:object-client-bundle:x.x.x
    (For using Dell ECS)

Note: Make sure to replace the xs with the current version number of the library. You can look them up in the Maven repository.

--conf 
spark.sql.extensions=org.apache.iceberg.spark.extensions.Iceberg
SparkSessionExtensions

This specifies any extensions to SQL that should be present in the Spark session; this is what makes the special SQL syntax for Apache Iceberg or Project Nessie (branching and merging) available to the existing Spark session.

  • org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
    (Apache Iceberg Spark Extensions)
  • org.projectnessie.spark.extensions.NessieSparkSessionExtensions
    (Project Nessie Catalog SQL Extensions)

Catalog-Specific Settings

The settings below are to configure your specific catalog, which can be under a namespace of your choosing. We’ll name the catalog for this Spark session “my_iceberg_catalog”, to make it clear where you are able to specify the namespace of the catalog in this setting.

--conf spark.sql.catalog.my_iceberg_catalog=org.apache.iceberg.spark.SparkCatalog

This specifies that this specific catalog is using the Apache Iceberg Spark Catalog class.

--conf spark.sql.catalog.my_iceberg_catalog.type=hadoop

This setting is used to set the type of catalog you are using, and possible values include:

  • Hadoop (if using HDFS/File System Catalog)
  • Hive (if using Hive catalog)

You can exclude this setting if you’re not using one of the above two catalogs.

--conf spark.sql.catalog.my_iceberg_catalog.uri=$URI

This is the URI for catalog types that use a URI to communicate with the catalog. These catalogs include:

  • Nessie (http://example.com:19120/api/v1)
  • Hive (thrift://example.com:9083)
  • REST (http://example.com/catalog/v1)
  • JDBC (jdbc:mysql://example.com:3306/default) [Note: May need to configure database drivers]
--conf spark.sql.catalog.my_iceberg_catalog.catalog-impl=org.apache.iceberg.nessie.NessieCatalog

This setting chooses the catalog implementation to use. If not the default, possible options are:

  • org.apache.iceberg.nessie.NessieCatalog
    (For a Project Nessie/Dremio Arctic Catalog)
  • org.apache.iceberg.rest.RESTCatalog
    (For the REST catalog)
  • org.apache.iceberg.aws.glue.GlueCatalog
    (For Glue Catalog)
  • org.apache.iceberg.jdbc.JdbcCatalog
    (For JDBC Catalog)
  • org.apache.iceberg.aws.dynamodb.DynamoDbCatalog
    (For DynamoDB Catalog)
  • org.apache.iceberg.dell.ecs.EcsCatalog
    (For Dell ECS Catalog)
--conf spark.sql.catalog.my_iceberg_catalog.warehouse=$WAREHOUSE

This specifies where any data files for new tables should be warehoused. This will not affect existing tables as they will already have a location where they exist that is specified in their metadata. This can be a local file system path or an object storage path for S3. The value could be something like: “s3a://my-bucket/path/”.

--conf spark.sql.catalog.glue.io-impl=org.apache.iceberg.aws.s3.S3FileIO

This specifies what class to use for writing to the file system. It is mainly needed if you’re using a cloud file system, as a standard file system will be used if not specified. If you are writing to HDFS or a local filesystem, you can exclude this property. Possible values include:

  • org.apache.iceberg.aws.s3.S3FileIO
    (If using an S3 compatible store)
  • org.apache.iceberg.gcp.gcs.GCSFileIO
    (If using Google Cloud storage)
  • org.apache.iceberg.dell.ecs.EcsFileIO
    (If using dell ECS)

Nessie-Specific Settings

Project Nessie is an open source transactional catalog that goes beyond a normal catalog by providing catalog-level snapshots and branching, which enables the following:

  • Isolation of ingestion
  • Catalog-level rollbacks
  • Catalog-level time travel
  • Branching as a way to make zero-copy clones for experimentation
  • Tagging at the catalog level for reproducibility

If you haven’t tried out Project Nessie, here are a few articles on it and the cloud service, Dremio Arctic, which provides cloud-managed Nessie catalogs with extensive lakehouse management features:

For Spark, there are a few more parameters you want to configure.

--conf spark.sql.catalog.my_iceberg_catalog.ref=main \
--conf spark.sql.catalog.my_iceberg_catalog.authentication.type=BEARER \
--conf spark.sql.catalog.my_iceberg_catalog.authentication.token=$TOKEN \
  • ref: what branch should it by default work with
  • authentication.type: Type of authentication such as BEARER or NONE
  • authentication.token: Auth token if Auth type set to BEARER.

Settings for S3 Compatible Stores

--conf spark.sql.catalog.my_iceberg_catalog.s3.endpoint=http://10.x.x.x:9020

If using ECS, Minio, or other S3 compatible services make sure to specify the endpoint in your settings. For S3 and compatible stores make sure to define these environmental variables prior to starting up Spark.

export AWS_ACCESS_KEY_ID=XXXXXX
export AWS_SECRET_ACCESS_KEY=XXXXXX
export AWS_REGION=us-east-1

JDBC-Specific Settings

For a JDBC compatible database, you can pass any typical JDBC settings under the JDBC setting category for your catalog like so:

--conf spark.sql.catalog.my_iceberg_catalog.jdbc.verifyServerCertificate=true \
--conf spark.sql.catalog.my_iceberg_catalog.jdbc.useSSL=true \
 --conf spark.sql.catalog.my_iceberg_catalog.jdbc.user=$DB_USERNAME \
 --conf spark.sql.catalog.my_iceberg_catalog.jdbc.password=$DB_PASSWORD

Examples of Everything Put Together

Below are some examples of what the settings may look like in the shell and in PySpark, but here are some other articles where you can find examples for reference:

PROJECT NESSIE/DREMIO ARCTIC – SparkSQL

spark-sql --packages "org.apache.iceberg:iceberg-spark-runtime-x.x_x.xx:x.x.x,org.projectnessie:nessie-spark-extensions-x.x_x.xx:x.xx.x,software.amazon.awssdk:bundle:x.xx.xxx,software.amazon.awssdk:url-connection-client:x.xx.xxx" \
--conf spark.sql.extensions="org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,org.projectnessie.spark.extensions.NessieSparkSessionExtensions" \
--conf spark.sql.catalog.nessie.uri=$ARCTIC_URI \
--conf spark.sql.catalog.nessie.ref=main \
--conf spark.sql.catalog.nessie.authentication.type=BEARER \
--conf spark.sql.catalog.nessie.authentication.token=$TOKEN \
--conf spark.sql.catalog.nessie.catalog-impl=org.apache.iceberg.nessie.NessieCatalog \
--conf spark.sql.catalog.nessie.warehouse=$WAREHOUSE \
--conf spark.sql.catalog.nessie=org.apache.iceberg.spark.SparkCatalog \
--conf spark.sql.catalog.nessie.io-impl=org.apache.iceberg.aws.s3.S3FileIO

PROJECT NESSIE/DREMIO ARCTIC – PySpark

import pyspark
from pyspark.sql import SparkSession
import os

## DEFINE SENSITIVE VARIABLES
ARCTIC_URI = os.environ.get("ARCTIC_URI") ## Nessie Server URI
TOKEN = os.environ.get("TOKEN") ## Authentication Token
AWS_ACCESS_KEY = os.environ.get("AWS_ACCESS_KEY") ## AWS CREDENTIALS
AWS_SECRET_KEY = os.environ.get("AWS_SECRET_KEY") ## AWS CREDENTIALS


conf = (
    pyspark.SparkConf()
        .setAppName('app_name')
        .setMaster(SPARK_MASTER)
  		#packages
        .set('spark.jars.packages', 'org.apache.iceberg:iceberg-spark-runtime-x.x_x.xx:x.x.x,org.projectnessie:nessie-spark-extensions-x.x_x.xx:x.xx.x,software.amazon.awssdk:bundle:x.xx.xxx,software.amazon.awssdk:url-connection-client:x.xx.xxx')
  		#SQL Extensions
        .set('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,org.projectnessie.spark.extensions.NessieSparkSessionExtensions')
  		#Configuring Catalog
        .set('spark.sql.catalog.arctic', 'org.apache.iceberg.spark.SparkCatalog')
        .set('spark.sql.catalog.arctic.uri', ARCTIC_URI)
        .set('spark.sql.catalog.arctic.ref', 'main')
        .set('spark.sql.catalog.arctic.authentication.type', 'BEARER')
        .set('spark.sql.catalog.arctic.authentication.token', TOKEN)
        .set('spark.sql.catalog.arctic.catalog-impl', 'org.apache.iceberg.nessie.NessieCatalog')
        .set('spark.sql.catalog.arctic.warehouse', 's3a://my-bucket/path/')
        .set('spark.sql.catalog.arctic.io-impl', 'org.apache.iceberg.aws.s3.S3FileIO')
  		#AWS CREDENTIALS
        .set('spark.hadoop.fs.s3a.access.key', AWS_ACCESS_KEY)
        .set('spark.hadoop.fs.s3a.secret.key', AWS_SECRET_KEY)
)

## Start Spark Session
spark = SparkSession.builder.config(conf=conf).getOrCreate()
print("Spark Running")

## Run a Query
spark.sql("SELECT * FROM arctic.table1;").show()

AWS GLUE – SparkSQL

spark-sql --packages "org.apache.iceberg:iceberg-spark-runtime-x.x_x.xx:x.x.x,software.amazon.awssdk:bundle:x.xx.xxx,software.amazon.awssdk:url-connection-client:x.xx.xxx" \
--conf spark.sql.catalog.glue=org.apache.iceberg.spark.SparkCatalog \
--conf spark.sql.catalog.glue.warehouse=$WAREHOUSE \
--conf spark.sql.catalog.glue.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog \
--conf spark.sql.catalog.glue.io-impl=org.apache.iceberg.aws.s3.S3FileIO \
--conf spark.hadoop.fs.s3a.access.key=$AWS_ACCESS_KEY \
--conf spark.hadoop.fs.s3a.secret.key=$AWS_SECRET_ACCESS_KEY

AWS GLUE – PySpark

import pyspark
from pyspark.sql import SparkSession
import os

## DEFINE SENSITIVE VARIABLES
AWS_ACCESS_KEY = os.environ.get("AWS_ACCESS_KEY") ## AWS CREDENTIALS
AWS_SECRET_KEY = os.environ.get("AWS_SECRET_KEY") ## AWS CREDENTIALS


conf = (
    pyspark.SparkConf()
        .setAppName('app_name')
        .setMaster(SPARK_MASTER)
  		#packages
        .set('spark.jars.packages', 'org.apache.iceberg:iceberg-spark-runtime-x.x_x.xx:x.x.x,software.amazon.awssdk:bundle:x.xx.xxx,software.amazon.awssdk:url-connection-client:x.xx.xxx')
  		#SQL Extensions
        .set('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions')
  		#Configuring Catalog
        .set('spark.sql.catalog.glue', 'org.apache.iceberg.spark.SparkCatalog')
        .set('spark.sql.catalog.glue.catalog-impl', 'org.apache.iceberg.aws.glue.GlueCatalog')
        .set('spark.sql.catalog.glue.warehouse', 's3a://my-bucket/path/')
        .set('spark.sql.catalog.glue.io-impl', 'org.apache.iceberg.aws.s3.S3FileIO')
  		#AWS CREDENTIALS
        .set('spark.hadoop.fs.s3a.access.key', AWS_ACCESS_KEY)
        .set('spark.hadoop.fs.s3a.secret.key', AWS_SECRET_KEY)
)

## Start Spark Session
spark = SparkSession.builder.config(conf=conf).getOrCreate()
print("Spark Running")

## Run a Query
spark.sql("SELECT * FROM glue.table1;").show()

HDFS/S3 – SparkSQL

org.apache.iceberg:iceberg-spark-runtime-x.x_x.xx:x.x.x\
    --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
    --conf spark.sql.catalog.hdfs_catalog=org.apache.iceberg.spark.SparkCatalog \
    --conf spark.sql.catalog.hdfs_catalog.type=hadoop \
    --conf spark.sql.catalog.hdfs_catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO \
    --conf spark.sql.catalog.hdfs_catalog.warehouse=s3a://my-bucket/path/

HDFS/S3 – PySpark

import pyspark
from pyspark.sql import SparkSession
import os

## DEFINE SENSITIVE VARIABLES
AWS_ACCESS_KEY = os.environ.get("AWS_ACCESS_KEY") ## AWS CREDENTIALS
AWS_SECRET_KEY = os.environ.get("AWS_SECRET_KEY") ## AWS CREDENTIALS


conf = (
    pyspark.SparkConf()
        .setAppName('app_name')
        .setMaster(SPARK_MASTER)
  		#packages
        .set('spark.jars.packages', 'org.apache.hadoop:hadoop-aws:3.3.4,org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.0.0,software.amazon.awssdk:bundle:2.17.178,software.amazon.awssdk:url-connection-client:2.17.178')
  		#SQL Extensions
        .set('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions')
  		#Configuring Catalog
        .set('spark.sql.catalog.hdfs_catalog', 'org.apache.iceberg.spark.SparkCatalog')
        .set('spark.sql.catalog.hdfs_catalog.type', 'hadoop')
        .set('spark.sql.catalog.hdfs_catalog.warehouse', 's3a://my-bucket/path/')
        .set('spark.sql.catalog.hdfs_catalog.io-impl', 'org.apache.iceberg.aws.s3.S3FileIO')
  		#AWS CREDENTIALS
        .set('spark.hadoop.fs.s3a.access.key', AWS_ACCESS_KEY)
        .set('spark.hadoop.fs.s3a.secret.key', AWS_SECRET_KEY)
)

## Start Spark Session
spark = SparkSession.builder.config(conf=conf).getOrCreate()
print("Spark Running")

## Run a Query
spark.sql("SELECT * FROM hdfs_catalog.table1;").show()

Get Started with a Free Data Lakehouse Powered by Apache Iceberg

Access all of your data where it lies and start querying in minutes. No movement required.