Using Apache Iceberg as your data lake table format can enable faster data lake analytics, time travel, partition evolution, ACID transactions, and more. Apache Iceberg is a key piece to achieving an open lakehouse architecture so you can reduce the cost of data warehouses and avoid vendor lock-in.
Recently, I wrote this article regarding different strategies for the migration of Hive tables to Iceberg tables. In this article, I’d like to show you some hands-on exercises to demonstrate how the Hive-to-Iceberg table conversion works on a small scale, so you can try these techniques out before you apply them at scale in your own data pipelines. These same techniques and commands can be used when migrating your existing Hive tables to Iceberg tables.
We will be:
You will need to have Docker installed to follow this example. The first step will start up a container with a Docker image I created called iceberg-starter that will give us an opportunity to get hands-on with Iceberg.
docker run -it --name iceberg-env alexmerced/iceberg-starter
This will put you in a shell in the Docker image. We will then open up Spark using the command below, which configures an Iceberg catalog in our Spark session.
spark-shell --packages org.apache.iceberg:iceberg-spark3-runtime:0.13.0\ --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \ --conf spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog \ --conf spark.sql.catalog.spark_catalog.type=hive \ --conf spark.sql.catalog.iceberg=org.apache.iceberg.spark.SparkCatalog \ --conf spark.sql.catalog.iceberg.type=hadoop \ --conf spark.sql.catalog.iceberg.warehouse=$PWD/iceberg-warehouse\ --conf spark.sql.warehouse.dir=$PWD/hive-warehouse
Let’s break down what all these flags are doing.
--packages org.apache.iceberg-spark3-runtime:0.13.0
This instructs Spark to use the Apache Iceberg package.
--conf spark.sql.catalog.spark_catalog=org.apache.Apache Iceberg.spark.SparkSessionCatalog
By default, the Spark catalog uses the V1 Data Source specification, which has a different API than Spark’s V2 Data Source API. In this setting, we change the implementation of the Spark catalog to an Iceberg version of the V2 API to use the features Iceberg needs to interact with the default catalog.
--conf spark.sql.catalog.spark_catalog.type=hive
This sets the default catalog implementation of the interface to Hive.
--conf spark.sql.catalog.iceberg=org.apache.Apache Iceberg.spark.SparkCatalog
This create a new catalog called “iceberg” and sets its implementation to Iceberg’s Spark catalog implementation
--conf spark.sql.catalog.iceberg.type=hadoop
This sets the implementation of the “iceberg” catalog to a file-based Hadoop implementation.
–conf spark.sql.catalog.iceberg.warehouse=$PWD/iceberg-warehouse
This sets the directory where tables created in the “iceberg” catalog will be written to. Iceberg will write files to “~/iceberg-warehouse” in the Docker container’s file system.
--conf spark.sql.warehouse.dir=$PWD/hive-warehouse
This configuration tells the default Spark catalog, which stores tables in the Hive table format, to track tables in “~/hive-warehouse”.
Now that we are in Spark shell, let’s create some Hive tables to simulate tables you may have in your data lake.
spark.sql("CREATE TABLE people (first_name string, last_name string) USING parquet");
Since we used the USING parquet
clause, the data will be stored in Apache Parquet files (data must be in Parquet, ORC, or AVRO to do in-place migrations). This will create a Hive table. But since we didn’t refer to the “iceberg” catalog that was configured or use a USING iceberg
clause, it will use the default Spark catalog, which uses a Hive implementation that will be stored in ~/spark-warehouse.
Let’s add a few records to the table.
spark.sql("INSERT INTO people VALUES ('Alex', 'Merced'), ('Jason', 'Hughes'), ('Ben', 'Hudson')");
Let’s confirm the data was added:
spark.sql("SELECT * FROM people").show()
If we see our data, then our Hive table is ready to go.
Before we run the migration, let’s see where the data files for this Hive table live. Quit out of Spark shell.
:quit
In our home directory you’ll notice new “hive-warehouse” and “metastore_db” directories exist. The “metastore_db” directory is where the embedded Hive metastore’s metadata is stored (in a derby database by default) while the hive-warehouse directory is where the data files are stored. Let’s look at the contents of the table directory.
ls ~/hive-warehouse/people
This should yield something like the following as the output:
_SUCCESS part-00000-bd527ad5-ff32-4704-af79-329ae4b2d008-c000.snappy.parquet part-00001-bd527ad5-ff32-4704-af79-329ae4b2d008-c000.snappy.parquet part-00002-bd527ad5-ff32-4704-af79-329ae4b2d008-c000.snappy.parquet
Re-open Spark:
spark-shell --packages org.apache.iceberg:iceberg-spark3-runtime:0.13.0\ --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \ --conf spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog \ --conf spark.sql.catalog.spark_catalog.type=hive \ --conf spark.sql.catalog.iceberg=org.apache.iceberg.spark.SparkCatalog \ --conf spark.sql.catalog.iceberg.type=hadoop \ --conf spark.sql.catalog.iceberg.warehouse=$PWD/iceberg-warehouse\ --conf spark.sql.warehouse.dir=$PWD/hive-warehouse
With Spark up and running, let’s confirm our Hive metastore is connected. Check the tables we’ve created in the Hive catalog by running the following command.
spark.sql("SHOW TABLES").show()
Now let’s migrate our Hive table to an Iceberg table.
This migration will use an in-place migration strategy. An in-place migration means we will leave the existing data files as-is and create only the metadata for the new Iceberg table using the data files of the existing Hive table. This can be a much less expensive operation than rewriting all the data. The existing Hive table must store the data in Parquet, ORC, or AVRO for this to work, which is why the USING parquet
clause was important earlier. This can be done in two ways:
As a best practice, I recommend you use the snapshot procedure, which allows you to test the results of the migrate
procedure. Snapshot keeps the old Hive table intact, so you should use it before running the actual migrate
procedure, which removes the old Hive table.
We will be using the add_files
procedure for this tutorial. All of the Iceberg procedures can be called through SQL statements using the following syntax:
spark.sql("CALL catalog_name.system.procedure_name(arguments)")
Before we use add_files
we need to have an existing Iceberg table with a matching schema to migrate our Hive tables data into. So let’s create an Iceberg table with the following command. We’ll use a CTAS (CREATE TABLE AS SELECT) statement to create an empty table with an identical schema to the original.
spark.sql("CREATE TABLE iceberg.db.people USING iceberg AS (SELECT * FROM people LIMIT 0)")
If this command throws an error, make sure you have successfully run the SHOW TABLES
query from earlier.
Now that the table has been created we can run the add_files
procedure telling it to add the data files from the people
table to the iceberg.db.people
table.
spark.sql("CALL iceberg.system.add_files(table => 'db.people', source_table => 'people')")
Now let’s query the Iceberg table to confirm it has the files added.
spark.sql("SELECT * FROM iceberg.db.people").show()
We can also run this query to see which files belong to our Iceberg table.
spark.sql("SELECT file_path FROM iceberg.db.people.files").show(20, false)
You’ll see the files are in the “spark_warehouse” directory where Hive tables are stored, confirming the table is using the original data files instead of recreating them. Now both tables exist using the same data files.
New metadata has been written and is being stored in the Iceberg warehouse, which we can see in the following query.
spark.sql("SELECT snapshot_id, manifest_list FROM iceberg.db.people.snapshots").show(20, false)
Let’s once again take our people
table and turn it into an Iceberg table, but this time we’ll be restating the data. This is called shadow migration because during the migration process the new Iceberg table acts as a shadow of the original table. Once both tables are in sync, you can flip the switch to all workloads on the Iceberg table. In this scenario, we are creating new data files in the Iceberg table based on the data in the existing Hive tables data files.
Turning the Hive table into an Iceberg table using this method is as simple as running a CTAS statement.
spark.sql("CREATE TABLE iceberg.db.people_restated USING iceberg AS (SELECT * FROM people)")
Now the table exists, let’s confirm it by querying the table.
spark.sql("SELECT * FROM iceberg.db.people_restated").show()
Let’s look at the location of our data files.
spark.sql("SELECT file_path FROM iceberg.db.people_restated.files").show(20, false)
This time the files are located in our Iceberg catalog directory, “iceberg-warehouse”, instead of the Hive directory “hive-warehouse”. This tells us that new data files were written instead of using the old files.
spark.sql("SELECT snapshot_id, manifest_list FROM iceberg.db.people_restated.snapshots").show(20, false)
Regardless of whether you restate the files or not, you now have an Iceberg table that as it evolves will have all the benefits of time travel, version rollback, partition evolution, and so forth.
In general, your migration should go through a four-phase process:
Other important migration considerations:
Now you have hands-on experience migrating Hive tables into Iceberg tables, just take the lessons learned and empower your data. If you use AWS Glue, check out this tutorial on how to make Iceberg tables using Glue.