21 minute read · May 31, 2023
Deep Dive Into Configuring Your Apache Iceberg Catalog with Apache Spark
· Senior Tech Evangelist, Dremio
- Get Hands-on with Apache Iceberg and Dremio from your Laptop with this quick exercise
- Learn how to pull all your unified data from Dremio into your Python Notebooks
Apache Iceberg is a data lakehouse table format that has been taking the data world by storm with robust support from tools like Dremio, Fivetran, Airbyte, AWS, Snowflake, Tabular, Presto, Apache Flink, Apache Spark, Trino, and so many more. Although one of the tools most data professionals use is Apache Spark and many introductory tutorials for Apache Iceberg use Spark as an entry point.
When reading documentation or following tutorials, a source of confusion is often all the configurations used to configure the catalog during your Spark session, which can look like the following when using the Spark CLI:
spark-sql --packages org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.3.0 \ --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \ --conf spark.sql.catalog.iceberg=org.apache.iceberg.spark.SparkCatalog \ --conf spark.sql.catalog.iceberg.type=hadoop \ --conf spark.sql.catalog.iceberg.warehouse=$PWD/warehouse
Or like this in a PySpark script:
import pyspark from pyspark.sql import SparkSession import os ## DEFINE SENSITIVE VARIABLES AWS_ACCESS_KEY = os.environ.get("AWS_ACCESS_KEY") ## AWS CREDENTIALS AWS_SECRET_KEY = os.environ.get("AWS_SECRET_KEY") ## AWS CREDENTIALS conf = ( pyspark.SparkConf() .setAppName('app_name') .setMaster(SPARK_MASTER) #packages .set('spark.jars.packages', 'org.apache.hadoop:hadoop-aws:3.3.4,org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.3.0,software.amazon.awssdk:bundle:2.17.178,software.amazon.awssdk:url-connection-client:2.17.178') #SQL Extensions .set('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions') #Configuring Catalog .set('spark.sql.catalog.hdfs_catalog', 'org.apache.iceberg.spark.SparkCatalog') .set('spark.sql.catalog.hdfs_catalog.type', 'hadoop') .set('spark.sql.catalog.hdfs_catalog.warehouse', 's3a://my-bucket/path/') .set('spark.sql.catalog.hdfs_catalog.io-impl', 'org.apache.iceberg.aws.s3.S3FileIO') #AWS CREDENTIALS .set('spark.hadoop.fs.s3a.access.key', AWS_ACCESS_KEY) .set('spark.hadoop.fs.s3a.secret.key', AWS_SECRET_KEY) ) ## Start Spark Session spark = SparkSession.builder.config(conf=conf).getOrCreate() print("Spark Running") ## Run a Query spark.sql("SELECT * FROM hdfs_catalog.table1;").show()
Why Do You Need These Configurations?
When using tools like Dremio, these configurations are automatic based on the source you connect to your Dremio Sonar project, but this is not precisely how Spark works. When you start up a Spark session, you need to configure a Spark catalog that is an abstraction over the Apache Iceberg catalog you’d connect to any engine. Possible catalogs include:
- Hadoop (File System)
- AWS Glue
- Project Nessie
- JDBC (Any JDBC supporting database)
- Hive
- DynamoDB
- REST Catalog
Wait, What Is a Catalog?
An Apache Iceberg catalog is a mechanism for tooling to identify available Apache Iceberg tables which provides the engine with the location of the metadata.json that currently defines the table. You can learn more by reading these articles about Apache Iceberg reads and Apache Iceberg writes.
An Apache Spark catalog is a mechanism in the Spark session that enables Spark to discover available tables to work with, and our Iceberg configurations create a Spark catalog and links it to an existing Iceberg catalog.
The Primary Settings
Here are many of the typical settings you’ll use regardless of the catalog.
--packages org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:0.14.0
This setting identifies any packages that need to be downloaded and used during the Spark session. Libraries you’ll often specify in this setting include:
- org.apache.iceberg:iceberg-spark-runtime-x.x_x.xx:x.xx.x
(Core Apache Iceberg Library) - org.apache.hadoop:hadoop-aws:x.x.x
(Library for writing to S3 if using Hadoop/HDFS catalog) - software.amazon.awssdk:bundle:x.xx.xxx
(For working with AWS S3) - software.amazon.awssdk:url-connection-client:x.xx.xxx
(For working with AWS S3)
- Org.apache.iceberg:iceberg-gcp:x.x.x
(For working with Google Cloud) - org.projectnessie:nessie-spark-extensions-x.x_x.xx:x.xx.x
(For using a Project Nessie or Dremio Arctic catalog) - org.apache.iceberg:iceberg-dell:x.xx.x
(For using Dell ECS) - com.emc.ecs:object-client-bundle:x.x.x
(For using Dell ECS)
Note: Make sure to replace the xs with the current version number of the library. You can look them up in the Maven repository.
--conf spark.sql.extensions=org.apache.iceberg.spark.extensions.Iceberg SparkSessionExtensions
This specifies any extensions to SQL that should be present in the Spark session; this is what makes the special SQL syntax for Apache Iceberg or Project Nessie (branching and merging) available to the existing Spark session.
- org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
(Apache Iceberg Spark Extensions) - org.projectnessie.spark.extensions.NessieSparkSessionExtensions
(Project Nessie Catalog SQL Extensions)
Catalog-Specific Settings
The settings below are to configure your specific catalog, which can be under a namespace of your choosing. We’ll name the catalog for this Spark session “my_iceberg_catalog”, to make it clear where you are able to specify the namespace of the catalog in this setting.
--conf spark.sql.catalog.my_iceberg_catalog=org.apache.iceberg.spark.SparkCatalog
This specifies that this specific catalog is using the Apache Iceberg Spark Catalog class.
--conf spark.sql.catalog.my_iceberg_catalog.type=hadoop
This setting is used to set the type of catalog you are using, and possible values include:
- Hadoop (if using HDFS/File System Catalog)
- Hive (if using Hive catalog)
You can exclude this setting if you’re not using one of the above two catalogs.
--conf spark.sql.catalog.my_iceberg_catalog.uri=$URI
This is the URI for catalog types that use a URI to communicate with the catalog. These catalogs include:
- Nessie (http://example.com:19120/api/v1)
- Hive (thrift://example.com:9083)
- REST (http://example.com/catalog/v1)
- JDBC (jdbc:mysql://example.com:3306/default) [Note: May need to configure database drivers]
--conf spark.sql.catalog.my_iceberg_catalog.catalog-impl=org.apache.iceberg.nessie.NessieCatalog
This setting chooses the catalog implementation to use. If not the default, possible options are:
- org.apache.iceberg.nessie.NessieCatalog
(For a Project Nessie/Dremio Arctic Catalog) - org.apache.iceberg.rest.RESTCatalog
(For the REST catalog) - org.apache.iceberg.aws.glue.GlueCatalog
(For Glue Catalog) - org.apache.iceberg.jdbc.JdbcCatalog
(For JDBC Catalog) - org.apache.iceberg.aws.dynamodb.DynamoDbCatalog
(For DynamoDB Catalog) - org.apache.iceberg.dell.ecs.EcsCatalog
(For Dell ECS Catalog)
--conf spark.sql.catalog.my_iceberg_catalog.warehouse=$WAREHOUSE
This specifies where any data files for new tables should be warehoused. This will not affect existing tables as they will already have a location where they exist that is specified in their metadata. This can be a local file system path or an object storage path for S3. The value could be something like: “s3a://my-bucket/path/”
.
--conf spark.sql.catalog.glue.io-impl=org.apache.iceberg.aws.s3.S3FileIO
This specifies what class to use for writing to the file system. It is mainly needed if you’re using a cloud file system, as a standard file system will be used if not specified. If you are writing to HDFS or a local filesystem, you can exclude this property. Possible values include:
- org.apache.iceberg.aws.s3.S3FileIO
(If using an S3 compatible store) - org.apache.iceberg.gcp.gcs.GCSFileIO
(If using Google Cloud storage) - org.apache.iceberg.dell.ecs.EcsFileIO
(If using dell ECS)
Nessie-Specific Settings
Project Nessie is an open source transactional catalog that goes beyond a normal catalog by providing catalog-level snapshots and branching, which enables the following:
- Isolation of ingestion
- Catalog-level rollbacks
- Catalog-level time travel
- Branching as a way to make zero-copy clones for experimentation
- Tagging at the catalog level for reproducibility
If you haven’t tried out Project Nessie, here are a few articles on it and the cloud service, Dremio Arctic, which provides cloud-managed Nessie catalogs with extensive lakehouse management features:
- Intro to Data as Code Article/Tutorial
- Intro to Nessie with Spark
- Intro to Nessie with Jupyter Notebook
- Multi-Table Transactions Tutorial
- Data as Code: ML Reproducibility
For Spark, there are a few more parameters you want to configure.
--conf spark.sql.catalog.my_iceberg_catalog.ref=main \ --conf spark.sql.catalog.my_iceberg_catalog.authentication.type=BEARER \ --conf spark.sql.catalog.my_iceberg_catalog.authentication.token=$TOKEN \
- ref: what branch should it by default work with
- authentication.type: Type of authentication such as BEARER or NONE
- authentication.token: Auth token if Auth type set to BEARER.
Settings for S3 Compatible Stores
--conf spark.sql.catalog.my_iceberg_catalog.s3.endpoint=http://10.x.x.x:9020
If using ECS, Minio, or other S3 compatible services make sure to specify the endpoint in your settings. For S3 and compatible stores make sure to define these environmental variables prior to starting up Spark.
export AWS_ACCESS_KEY_ID=XXXXXX export AWS_SECRET_ACCESS_KEY=XXXXXX export AWS_REGION=us-east-1
JDBC-Specific Settings
For a JDBC compatible database, you can pass any typical JDBC settings under the JDBC setting category for your catalog like so:
--conf spark.sql.catalog.my_iceberg_catalog.jdbc.verifyServerCertificate=true \ --conf spark.sql.catalog.my_iceberg_catalog.jdbc.useSSL=true \ --conf spark.sql.catalog.my_iceberg_catalog.jdbc.user=$DB_USERNAME \ --conf spark.sql.catalog.my_iceberg_catalog.jdbc.password=$DB_PASSWORD
Examples of Everything Put Together
Below are some examples of what the settings may look like in the shell and in PySpark, but here are some other articles where you can find examples for reference:
- Configuring PySpark Reference
- Configuring Spark CLI reference
- Configuring Spark/Minio/Dremio Locally
PROJECT NESSIE/DREMIO ARCTIC – SparkSQL
spark-sql --packages "org.apache.iceberg:iceberg-spark-runtime-x.x_x.xx:x.x.x,org.projectnessie:nessie-spark-extensions-x.x_x.xx:x.xx.x,software.amazon.awssdk:bundle:x.xx.xxx,software.amazon.awssdk:url-connection-client:x.xx.xxx" \ --conf spark.sql.extensions="org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,org.projectnessie.spark.extensions.NessieSparkSessionExtensions" \ --conf spark.sql.catalog.nessie.uri=$ARCTIC_URI \ --conf spark.sql.catalog.nessie.ref=main \ --conf spark.sql.catalog.nessie.authentication.type=BEARER \ --conf spark.sql.catalog.nessie.authentication.token=$TOKEN \ --conf spark.sql.catalog.nessie.catalog-impl=org.apache.iceberg.nessie.NessieCatalog \ --conf spark.sql.catalog.nessie.warehouse=$WAREHOUSE \ --conf spark.sql.catalog.nessie=org.apache.iceberg.spark.SparkCatalog \ --conf spark.sql.catalog.nessie.io-impl=org.apache.iceberg.aws.s3.S3FileIO
PROJECT NESSIE/DREMIO ARCTIC – PySpark
import pyspark from pyspark.sql import SparkSession import os ## DEFINE SENSITIVE VARIABLES ARCTIC_URI = os.environ.get("ARCTIC_URI") ## Nessie Server URI TOKEN = os.environ.get("TOKEN") ## Authentication Token AWS_ACCESS_KEY = os.environ.get("AWS_ACCESS_KEY") ## AWS CREDENTIALS AWS_SECRET_KEY = os.environ.get("AWS_SECRET_KEY") ## AWS CREDENTIALS conf = ( pyspark.SparkConf() .setAppName('app_name') .setMaster(SPARK_MASTER) #packages .set('spark.jars.packages', 'org.apache.iceberg:iceberg-spark-runtime-x.x_x.xx:x.x.x,org.projectnessie:nessie-spark-extensions-x.x_x.xx:x.xx.x,software.amazon.awssdk:bundle:x.xx.xxx,software.amazon.awssdk:url-connection-client:x.xx.xxx') #SQL Extensions .set('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,org.projectnessie.spark.extensions.NessieSparkSessionExtensions') #Configuring Catalog .set('spark.sql.catalog.arctic', 'org.apache.iceberg.spark.SparkCatalog') .set('spark.sql.catalog.arctic.uri', ARCTIC_URI) .set('spark.sql.catalog.arctic.ref', 'main') .set('spark.sql.catalog.arctic.authentication.type', 'BEARER') .set('spark.sql.catalog.arctic.authentication.token', TOKEN) .set('spark.sql.catalog.arctic.catalog-impl', 'org.apache.iceberg.nessie.NessieCatalog') .set('spark.sql.catalog.arctic.warehouse', 's3a://my-bucket/path/') .set('spark.sql.catalog.arctic.io-impl', 'org.apache.iceberg.aws.s3.S3FileIO') #AWS CREDENTIALS .set('spark.hadoop.fs.s3a.access.key', AWS_ACCESS_KEY) .set('spark.hadoop.fs.s3a.secret.key', AWS_SECRET_KEY) ) ## Start Spark Session spark = SparkSession.builder.config(conf=conf).getOrCreate() print("Spark Running") ## Run a Query spark.sql("SELECT * FROM arctic.table1;").show()
AWS GLUE – SparkSQL
spark-sql --packages "org.apache.iceberg:iceberg-spark-runtime-x.x_x.xx:x.x.x,software.amazon.awssdk:bundle:x.xx.xxx,software.amazon.awssdk:url-connection-client:x.xx.xxx" \ --conf spark.sql.catalog.glue=org.apache.iceberg.spark.SparkCatalog \ --conf spark.sql.catalog.glue.warehouse=$WAREHOUSE \ --conf spark.sql.catalog.glue.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog \ --conf spark.sql.catalog.glue.io-impl=org.apache.iceberg.aws.s3.S3FileIO \ --conf spark.hadoop.fs.s3a.access.key=$AWS_ACCESS_KEY \ --conf spark.hadoop.fs.s3a.secret.key=$AWS_SECRET_ACCESS_KEY
AWS GLUE – PySpark
import pyspark from pyspark.sql import SparkSession import os ## DEFINE SENSITIVE VARIABLES AWS_ACCESS_KEY = os.environ.get("AWS_ACCESS_KEY") ## AWS CREDENTIALS AWS_SECRET_KEY = os.environ.get("AWS_SECRET_KEY") ## AWS CREDENTIALS conf = ( pyspark.SparkConf() .setAppName('app_name') .setMaster(SPARK_MASTER) #packages .set('spark.jars.packages', 'org.apache.iceberg:iceberg-spark-runtime-x.x_x.xx:x.x.x,software.amazon.awssdk:bundle:x.xx.xxx,software.amazon.awssdk:url-connection-client:x.xx.xxx') #SQL Extensions .set('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions') #Configuring Catalog .set('spark.sql.catalog.glue', 'org.apache.iceberg.spark.SparkCatalog') .set('spark.sql.catalog.glue.catalog-impl', 'org.apache.iceberg.aws.glue.GlueCatalog') .set('spark.sql.catalog.glue.warehouse', 's3a://my-bucket/path/') .set('spark.sql.catalog.glue.io-impl', 'org.apache.iceberg.aws.s3.S3FileIO') #AWS CREDENTIALS .set('spark.hadoop.fs.s3a.access.key', AWS_ACCESS_KEY) .set('spark.hadoop.fs.s3a.secret.key', AWS_SECRET_KEY) ) ## Start Spark Session spark = SparkSession.builder.config(conf=conf).getOrCreate() print("Spark Running") ## Run a Query spark.sql("SELECT * FROM glue.table1;").show()
HDFS/S3 – SparkSQL
org.apache.iceberg:iceberg-spark-runtime-x.x_x.xx:x.x.x\ --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \ --conf spark.sql.catalog.hdfs_catalog=org.apache.iceberg.spark.SparkCatalog \ --conf spark.sql.catalog.hdfs_catalog.type=hadoop \ --conf spark.sql.catalog.hdfs_catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO \ --conf spark.sql.catalog.hdfs_catalog.warehouse=s3a://my-bucket/path/
HDFS/S3 – PySpark
import pyspark from pyspark.sql import SparkSession import os ## DEFINE SENSITIVE VARIABLES AWS_ACCESS_KEY = os.environ.get("AWS_ACCESS_KEY") ## AWS CREDENTIALS AWS_SECRET_KEY = os.environ.get("AWS_SECRET_KEY") ## AWS CREDENTIALS conf = ( pyspark.SparkConf() .setAppName('app_name') .setMaster(SPARK_MASTER) #packages .set('spark.jars.packages', 'org.apache.hadoop:hadoop-aws:3.3.4,org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.0.0,software.amazon.awssdk:bundle:2.17.178,software.amazon.awssdk:url-connection-client:2.17.178') #SQL Extensions .set('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions') #Configuring Catalog .set('spark.sql.catalog.hdfs_catalog', 'org.apache.iceberg.spark.SparkCatalog') .set('spark.sql.catalog.hdfs_catalog.type', 'hadoop') .set('spark.sql.catalog.hdfs_catalog.warehouse', 's3a://my-bucket/path/') .set('spark.sql.catalog.hdfs_catalog.io-impl', 'org.apache.iceberg.aws.s3.S3FileIO') #AWS CREDENTIALS .set('spark.hadoop.fs.s3a.access.key', AWS_ACCESS_KEY) .set('spark.hadoop.fs.s3a.secret.key', AWS_SECRET_KEY) ) ## Start Spark Session spark = SparkSession.builder.config(conf=conf).getOrCreate() print("Spark Running") ## Run a Query spark.sql("SELECT * FROM hdfs_catalog.table1;").show()