25 minute read · July 28, 2023
Using Flink with Apache Iceberg and Nessie
· Senior Tech Evangelist, Dremio
Flink is a supercharged tool for processing data in real-time or in batches. It's open source and has a unified programming model, so you can build some serious data processing pipelines.
But here's where things get interesting. When you bring Apache Iceberg and Project Nessie into the mix, Flink becomes even more awesome. Iceberg is a table format that makes working with large-scale data lakes a breeze. It lets you do cool stuff like evolving schemas, time travel through data history, and manage metadata efficiently.
And what about Nessie? Well, it's like Git for data lakes. It adds versioning and metadata management to Iceberg tables, so you can collaborate on datasets, keep track of changes, and ensure consistency.
With Flink, Iceberg, and Nessie, you can build robust and scalable data pipelines. Flink handles streaming and batch processing, Iceberg keeps your data organized and flexible, and Nessie adds version control and metadata magic.
The Tutorial
This tutorial explains all the steps for using Flink with Iceberg and Nessie, so you can customize them for your use case. To follow this tutorial, you need the following installed:
- Docker
- Java 8+
- Maven
This GitHub repository has everything you need to reference the steps documented in this tutorial.
Creating Our Flink Docker Image
In the official Flink image, we need to add any Java jars for libraries we plan on using. To do this, we looked up the download links on the Maven repository website and created a Dockerfile for creating a custom Flink image that looks like this:
# Start from the official Flink image FROM flink:1.16.1-scala_2.12 ############################################### ## Download Neccessary Jars to Flink Class Path ############################################### ## Iceberg Flink Library RUN curl -L https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-flink-runtime-1.16/1.3.1/iceberg-flink-runtime-1.16-1.3.1.jar -o /opt/flink/lib/iceberg-flink-runtime-1.16-1.3.1.jar ## Hive Flink Library RUN curl -L https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-hive-2.3.9_2.12/1.16.1/flink-sql-connector-hive-2.3.9_2.12-1.16.1.jar -o /opt/flink/lib/flink-sql-connector-hive-2.3.9_2.12-1.16.1.jar ## Hadoop Common Classes RUN curl -L https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-common/2.8.3/hadoop-common-2.8.3.jar -o /opt/flink/lib/hadoop-common-2.8.3.jar ## Hadoop AWS Classes RUN curl -L https://repo.maven.apache.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/2.8.3-10.0/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar -o /opt/flink/lib/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar ## AWS Bundled Classes RUN curl -L https://repo1.maven.org/maven2/software/amazon/awssdk/bundle/2.20.18/bundle-2.20.18.jar -o /opt/flink/lib/bundle-2.20.18.jar ## Install Nano to edit files RUN apt update && apt install -y nano CMD ["./bin/start-cluster.sh"] ## docker build --tag username/imagename .
This script does the following:
- Starts with the official Flink 1.16 image
- Downloads all the necessary jars and copies them to the Flink classpath at /opt/flink/lib
- Installs Nano in case we need to do any file editing on the fly for config files
- Uses the same entry point command as the original Flink image
This image is already available from Docker Hub as alexmerced/flink-iceberg-1.3.1, but if you need to customize it, you can use the Dockerfile above to build it.
The Docker Compose File
Once our image is built, it’s time to create our environment. Create an empty directory with an empty warehouse, notebooks subdirectory, and a single docker-compose.yaml:
########################################### # Flink - Iceberg - Nessie Setup ########################################### version: "3" services: # Spark Notebook Server spark-iceberg: image: alexmerced/spark34notebook container_name: spark-iceberg networks: iceberg-nessie-flink-net: depends_on: - catalog - storage volumes: - ./warehouse:/home/docker/warehouse - ./notebooks:/home/docker/notebooks environment: - AWS_ACCESS_KEY_ID=admin - AWS_SECRET_ACCESS_KEY=password - AWS_REGION=us-east-1 - AWS_DEFAULT_REGION=us-east-1 ports: - 8888:8888 - 8080:8080 - 10000:10000 - 10001:10001 # Nessie Catalog Server Using In-Memory Store catalog: image: projectnessie/nessie:0.67.0 container_name: catalog networks: iceberg-nessie-flink-net: ports: - 19120:19120 # Minio Storage Server storage: image: minio/minio:RELEASE.2023-07-21T21-12-44Z container_name: storage environment: - MINIO_ROOT_USER=admin - MINIO_ROOT_PASSWORD=password - MINIO_DOMAIN=storage - MINIO_REGION_NAME=us-east-1 - MINIO_REGION=us-east-1 networks: iceberg-nessie-flink-net: ports: - 9001:9001 - 9000:9000 command: ["server", "/data", "--console-address", ":9001"] # Minio Client Container mc: depends_on: - storage image: minio/mc:RELEASE.2023-07-21T20-44-27Z container_name: mc networks: iceberg-nessie-flink-net: aliases: - minio.storage environment: - AWS_ACCESS_KEY_ID=admin - AWS_SECRET_ACCESS_KEY=password - AWS_REGION=us-east-1 - AWS_DEFAULT_REGION=us-east-1 entrypoint: > /bin/sh -c " until (/usr/bin/mc config host add minio http://storage:9000 admin password) do echo '...waiting...' && sleep 1; done; /usr/bin/mc rm -r --force minio/warehouse; /usr/bin/mc mb minio/warehouse; /usr/bin/mc mb minio/iceberg; /usr/bin/mc policy set public minio/warehouse; /usr/bin/mc policy set public minio/iceberg; tail -f /dev/null " # Flink Job Manager flink-jobmanager: image: alexmerced/iceberg-flink-1.3.1 ports: - "8081:8081" command: jobmanager networks: iceberg-nessie-flink-net: environment: - | FLINK_PROPERTIES= jobmanager.rpc.address: flink-jobmanager - AWS_ACCESS_KEY_ID=admin - AWS_SECRET_ACCESS_KEY=password - AWS_REGION=us-east-1 - AWS_DEFAULT_REGION=us-east-1 - S3_ENDPOINT=http://minio.storage:9000 - S3_PATH_STYLE_ACCESS=true # Flink Task Manager flink-taskmanager: image: alexmerced/iceberg-flink-1.3.1 depends_on: - flink-jobmanager command: taskmanager networks: iceberg-nessie-flink-net: scale: 1 environment: - | FLINK_PROPERTIES= jobmanager.rpc.address: flink-jobmanager taskmanager.numberOfTaskSlots: 2 - AWS_ACCESS_KEY_ID=admin - AWS_SECRET_ACCESS_KEY=password - AWS_REGION=us-east-1 - AWS_DEFAULT_REGION=us-east-1 - S3_ENDPOINT=http://minio.storage:9000 - S3_PATH_STYLE_ACCESS=true networks: iceberg-nessie-flink-net:
This docker-compose file does the following:
- Creates a network called iceberg-nessie-flink-net
- Creates a container for running Spark 3.4 notebooks with the alexmerced/spark34notebook image
- Creates a container to host our Nessie catalog
- Creates a MinIO container to act as our storage
- Creates a MinIO client container, which is mainly there to create a few initial public buckets
- Creates two Flink containers to act as the JobManager and TaskManager
What Is the JobManager and TaskManager?
In a Flink cluster, the JobManager is like the head chef who coordinates the operation, assigns tasks, and ensures smooth functioning. The TaskManagers are the junior chefs who execute the assigned tasks, managing resources and exchanging data. Together, they form the cluster, with the JobManager as the central coordinator and the TaskManagers as the workers who perform computations. It's analogous to a restaurant where the head chef manages the kitchen and assigns tasks to the chefs who cook the meals.
Status Check
Right now your directory should look like this:
├── docker-compose.yaml ├── warehouse └── notebooks
This environment can be spun up by running the docker-compose up command.
Creating Our Flink Job
We will be using Apache Maven to manage our Flink job. Apache Maven is like a project manager for Java projects. It automates tasks, manages dependencies, and coordinates the building of software. It's similar to having a personal assistant who organizes resources, schedules tasks, and ensures smooth project development.
First, we need to create a blank Maven project:
mvn archetype:generate
It will ask you several questions and you can just hit enter and accept the default answers, except for:
- artifactID: this is the project name, let’s make it “flink_job”
- groupID: uniquely identifies who is building the project, traditionally a reverse domain, let’s put “com.my_flink_job”
Status Check
Now your directory should look like the following:
. ├── docker-compose.yaml ├── warehouse ├── notebooks └── flink_job ├── pom.xml └── src ├── main │ └── java │ └── com │ └── my_flink_job │ └── App.java └── test └── java └── com └── my_flink_job └── AppTest.java
- pom.xml is where we can define project plugins and dependencies
- App.java will have our App class which is our application entry point
- AppTest.java is for unit testing the App class
Adding Dependencies
Any libraries we import in our Java classes need to be part of our project for when we build the final jar file. Maven will know which libraries it needs to download and install based on the dependencies section of our pom.xml. We can normally look at the Maven repository website to find the specific dependencies we need, but to save you time here is what the dependencies section of your pom.xml should look like. Your IDE (intellij, VSCode) may detect these changes and ask to sync your dependencies, say “yes.”
<dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>1.16.1</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java</artifactId> <version>1.16.1</version> <scope>provided</scope> </dependency> <dependency> <groupId>software.amazon.awssdk</groupId> <artifactId>bundle</artifactId> <version>2.20.18</version> </dependency> <dependency> <groupId>org.apache.iceberg</groupId> <artifactId>iceberg-flink</artifactId> <version>1.3.1</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java-bridge</artifactId> <version>1.16.1</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java</artifactId> <version>1.16.1</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-common</artifactId> <version>1.16.1</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.iceberg</groupId> <artifactId>iceberg-flink-runtime-1.16</artifactId> <version>1.3.1</version> </dependency> <dependency> <groupId>software.amazon.awssdk</groupId> <artifactId>url-connection-client</artifactId> <version>2.20.18</version> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.iceberg</groupId> <artifactId>iceberg-core</artifactId> <version>1.3.1</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.8.5</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.11</version> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-files</artifactId> <version>1.16.1</version> </dependency> </dependencies>
The dependencies fall into a few different categories:
- Flink libraries for using the table API (so we can use SQL instead of imperative Java), connectors for working with, and other Flink libraries we may use
- Iceberg libraries, including the core Iceberg library
- AWS-related libraries for connecting to AWS, many of these were also added to the Flink containers class path earlier
Also, ensure the source version is at least 1.8 in the POM file. This specifies the version of Java your code should compile to; the value of 1.8 is for Java 8. Later versions just use the raw number, like 11 for Java 11 or 17 for Java 17.
<maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target>
Note: For this exercise, don't use a Java version over 11; you may get the following error when running the Flink Job if using higher versions.
Caused by: java.lang.UnsupportedClassVersionError: com/my_flink_job/App has been compiled by a more recent version of the Java Runtime (class file version 61.0), this version of the Java Runtime only recognizes class file versions up to 55.0
Creating a Class for Our Records
Operators in a Flink job are crucial for the web UI because they represent data transformations and enable visualization and monitoring, aiding in job analysis and optimization. So to make sure there are operators in our Flink job for the UI to display, we will create a class to map our example data to.
Create an “ExampleData.java” file in the same directory as App.java and include the following:
package com.my_flink_job; public class ExampleData { private Long id; private String data; public ExampleData() { } public ExampleData(Long id, String data) { this.id = id; this.data = data; } public Long getId() { return id; } public void setId(Long id) { this.id = id; } public String getData() { return data; } public void setData(String data) { this.data = data; } }
This is a pretty straightforward class that just has a property with getters and setters for each field in our table. With this, we can now write our job in App.java.
package com.my_flink_job; // IMPORT DEPENDENCIES import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.TableResult; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import static org.apache.flink.table.api.Expressions.$; import com.my_flink_job.ExampleData; public class App { public static void main(String[] args) throws Exception { // set up the execution environment final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // set up the table environment final StreamTableEnvironment tableEnv = StreamTableEnvironment.create( env, EnvironmentSettings.newInstance().inStreamingMode().build()); // create the Nessie catalog tableEnv.executeSql( "CREATE CATALOG iceberg WITH (" + "'type'='iceberg'," + "'catalog-impl'='org.apache.iceberg.nessie.NessieCatalog'," + "'io-impl'='org.apache.iceberg.aws.s3.S3FileIO'," + "'uri'='http://catalog:19120/api/v1'," + "'authentication.type'='none'," + "'ref'='main'," + "'client.assume-role.region'='us-east-1'," + "'warehouse' = 's3://warehouse'," + "'s3.endpoint'='http://{id-address}:9000'" + ")"); // List all catalogs TableResult result = tableEnv.executeSql("SHOW CATALOGS"); // Print the result to standard out result.print(); // Set the current catalog to the new catalog tableEnv.useCatalog("iceberg"); // Create a database in the current catalog tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS db"); // create the table tableEnv.executeSql( "CREATE TABLE IF NOT EXISTS db.table1 (" + "id BIGINT COMMENT 'unique id'," + "data STRING" + ")"); // create a DataStream of Tuple2 (equivalent to Row of 2 fields) DataStream<Tuple2<Long, String>> dataStream = env.fromElements( Tuple2.of(1L, "foo"), Tuple2.of(1L, "bar"), Tuple2.of(1L, "baz")); // apply a map transformation to convert the Tuple2 to an ExampleData object DataStream<ExampleData> mappedStream = dataStream.map(new MapFunction<Tuple2<Long, String>, ExampleData>() { @Override public ExampleData map(Tuple2<Long, String> value) throws Exception { // perform your mapping logic here and return a ExampleData instance // for example: return new ExampleData(value.f0, value.f1); } }); // convert the DataStream to a Table Table table = tableEnv.fromDataStream(mappedStream, $("id"), $("data")); // register the Table as a temporary view tableEnv.createTemporaryView("my_datastream", table); // write the DataStream to the table tableEnv.executeSql( "INSERT INTO db.table1 SELECT * FROM my_datastream"); } }
This Java class does the following:
- Creates an execution environment; operators executed by this environment will be displayed in the UI for inspection after the job is done
- Creates a table environment where we can send SQL
- Creates our Iceberg-Nessie Catalog (we will need to update the IP-address of “s3.endpoint” soon)
- Creates a database if it doesn’t exist
- Creates a table if it doesn’t exist
- Generates some example data
- Maps it to a DataStream using our ExampleData Class
- Creates a table from the DataStream and registers it as a view
- Inserts the data using SQL
Before we build our jar, we need to get the IP address of our Minio server.
Getting the IP Address
First, let’s spin up our environment if you haven’t already with:
docker-compose up
You’ll see all the containers beginning to be spun up:
Creating network "flink-iceberg_iceberg-nessie-flink-net" with the default driver Creating storage ... done Creating flink-iceberg_flink-jobmanager_1 ... done Creating catalog ... done Creating spark-iceberg ... Creating mc ... Creating flink-iceberg_flink-taskmanager_1 ...
The output will be coming in for all the containers, keep an eye out for this section as it will have the token you’ll need to access the notebook server on localhost:8888 if you want to create any notebooks.
spark-iceberg | [I 18:15:54.912 NotebookApp] http://62ae3f075416:8888/?token=35b05a816c31c48d447c502a52b446ecbb781a6e95b0ccf2 spark-iceberg | [I 18:15:54.912 NotebookApp] or http://127.0.0.1:8888/?token=35b05a816c31c48d447c502a52b446ecbb781a6e95b0ccf2 spark-iceberg | [I 18:15:54.912 NotebookApp] Use Control-C to stop this server and shut down all kernels (twice to skip confirmation).
Once all of these containers are running, open up another terminal window and connect to the storage containers shell.
docker exec -it storage /bin/bash
Then run the following command to see the IP information for that container:
ifconfig
Look for the inet value in the output for eth0.
eth0: flags=4163<UP,BROADCAST,RUNNING,MULTICAST> mtu 1500 inet 172.31.0.3 netmask 255.255.0.0 broadcast 172.31.255.255 ether 02:42:ac:1f:00:03 txqueuelen 0 (Ethernet) RX packets 166 bytes 35643 (34.8 KiB) RX errors 0 dropped 0 overruns 0 frame 0 TX packets 56 bytes 8468 (8.2 KiB) TX errors 0 dropped 0 overruns 0 carrier 0 collisions 0 lo: flags=73<UP,LOOPBACK,RUNNING> mtu 65536 inet 127.0.0.1 netmask 255.0.0.0 loop txqueuelen 1000 (Local Loopback) RX packets 8 bytes 778 (778.0 B) RX errors 0 dropped 0 overruns 0 frame 0 TX packets 8 bytes 778 (778.0 B) TX errors 0 dropped 0 overruns 0 carrier 0 collisions 0
Now we can update our Java code in the CREATE CATALOG SQL statement and put in the IP address for the s3.endpoint property.
// create the Nessie catalog tableEnv.executeSql( "CREATE CATALOG iceberg WITH (" + "'type'='iceberg'," + "'catalog-impl'='org.apache.iceberg.nessie.NessieCatalog'," + "'io-impl'='org.apache.iceberg.aws.s3.S3FileIO'," + "'uri'='http://catalog:19120/api/v1'," + "'authentication.type'='none'," + "'ref'='main'," + "'client.assume-role.region'='us-east-1'," + "'warehouse' = 's3://warehouse'," + "'s3.endpoint'='http://172.31.0.3:9000'" + ")");
Now navigate a terminal to the folder where the pom.xml resides and build the package with the command:
mvn package
(Note: If you're not writing your own unit tests, delete the “test” folder before compiling to avoid testing errors.)
Status Check
Here is how your files should look:
. ├── pom.xml ├── src │ └── main │ └── java │ └── com │ └── my_flink_job │ ├── App.java │ └── ExampleData.java └── target ├── classes │ └── com │ └── my_flink_job │ ├── App$1.class │ ├── App.class │ └── ExampleData.class ├── flink_job-1.0-SNAPSHOT.jar ├── generated-sources │ └── annotations ├── maven-archiver │ └── pom.properties └── maven-status └── maven-compiler-plugin └── compile └── default-compile ├── createdFiles.lst └── inputFiles.lst
Running the Job
Head over to localhost:8081 where the Flink web UI is visible.
- Click on the “Submit Job” tab
- Click on “New Job”
- Click on the uploaded job
- Enter “com.my_flink_job.App” as the entry class (this may differ if your package name is different)
- Hit submit
- Done!
Confirming the Job Was Done
Head over to localhost:9001 where the Minio web UI is running:
- Log in with username: admin, and password: password
- You should see all the files of our table in the warehouse bucket
We also want to confirm the commit is in our catalog. Open up a terminal and enter the following CURL:
http://0.0.0.0:19120/api/v2/trees/main/entries?content=true
You should see an entry regarding changes to an Iceberg table in the output, including a reference to where the metadata.json is. This is how an engine would know where to find the data on your Iceberg table by requesting this information from Nessie and then reading the metadata.json stored in Minio.
Conclusion
This tutorial should show you how to work with Flink and Iceberg with a Nessie catalog to configure for your needs. You can also access this data via Spark and a Notebook on localhost:8888. Nessie enables Git-like features so you can ingest your streams into a branch where further quality checks are done via other tools before merging into the main branch for production analytics. The possibilities of catalog versioning are quite exciting. Even better, you don’t have to maintain your own Nessie server, as you can get a Nessie server as a service using the Dremio Arctic service.