h2h2h2h2h2h2h2h2

25 minute read · July 28, 2023

Using Flink with Apache Iceberg and Nessie

Alex Merced

Alex Merced · Senior Tech Evangelist, Dremio

Flink is a supercharged tool for processing data in real-time or in batches. It's open source and has a unified programming model, so you can build some serious data processing pipelines.

But here's where things get interesting. When you bring Apache Iceberg and Project Nessie into the mix, Flink becomes even more awesome. Iceberg is a table format that makes working with large-scale data lakes a breeze. It lets you do cool stuff like evolving schemas, time travel through data history, and manage metadata efficiently.

And what about Nessie? Well, it's like Git for data lakes. It adds versioning and metadata management to Iceberg tables, so you can collaborate on datasets, keep track of changes, and ensure consistency.

With Flink, Iceberg, and Nessie, you can build robust and scalable data pipelines. Flink handles streaming and batch processing, Iceberg keeps your data organized and flexible, and Nessie adds version control and metadata magic.

The Tutorial

This tutorial explains all the steps for using Flink with Iceberg and Nessie, so you can customize them for your use case. To follow this tutorial, you need the following installed:

  • Docker
  • Java 8+
  • Maven

This GitHub repository has everything you need to reference the steps documented in this tutorial.

In the official Flink image, we need to add any Java jars for libraries we plan on using. To do this, we looked up the download links on the Maven repository website and created a Dockerfile for creating a custom Flink image that looks like this:

# Start from the official Flink image
FROM flink:1.16.1-scala_2.12

###############################################
## Download Neccessary Jars to Flink Class Path
###############################################

## Iceberg Flink Library
RUN curl -L https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-flink-runtime-1.16/1.3.1/iceberg-flink-runtime-1.16-1.3.1.jar -o /opt/flink/lib/iceberg-flink-runtime-1.16-1.3.1.jar

## Hive Flink Library
RUN curl -L https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-hive-2.3.9_2.12/1.16.1/flink-sql-connector-hive-2.3.9_2.12-1.16.1.jar -o /opt/flink/lib/flink-sql-connector-hive-2.3.9_2.12-1.16.1.jar

## Hadoop Common Classes
RUN curl -L https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-common/2.8.3/hadoop-common-2.8.3.jar -o /opt/flink/lib/hadoop-common-2.8.3.jar

## Hadoop AWS Classes
RUN curl -L https://repo.maven.apache.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/2.8.3-10.0/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar -o /opt/flink/lib/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar

## AWS Bundled Classes
RUN curl -L https://repo1.maven.org/maven2/software/amazon/awssdk/bundle/2.20.18/bundle-2.20.18.jar -o /opt/flink/lib/bundle-2.20.18.jar

## Install Nano to edit files
RUN apt update && apt install -y nano

CMD ["./bin/start-cluster.sh"]

## docker build --tag username/imagename .

This script does the following:

  • Starts with the official Flink 1.16 image
  • Downloads all the necessary jars and copies them to the Flink classpath at /opt/flink/lib 
  • Installs Nano in case we need to do any file editing on the fly for config files
  • Uses the same entry point command as the original Flink image

This image is already available from Docker Hub as alexmerced/flink-iceberg-1.3.1, but if you need to customize it, you can use the Dockerfile above to build it.

The Docker Compose File

Once our image is built, it’s time to create our environment. Create an empty directory with an empty warehouse, notebooks subdirectory, and a single docker-compose.yaml:

###########################################
# Flink - Iceberg - Nessie Setup
###########################################

version: "3"

services:
 # Spark Notebook Server
 spark-iceberg:
   image: alexmerced/spark34notebook
   container_name: spark-iceberg
   networks:
     iceberg-nessie-flink-net:
   depends_on:
     - catalog
     - storage
   volumes:
     - ./warehouse:/home/docker/warehouse
     - ./notebooks:/home/docker/notebooks
   environment:
     - AWS_ACCESS_KEY_ID=admin
     - AWS_SECRET_ACCESS_KEY=password
     - AWS_REGION=us-east-1
     - AWS_DEFAULT_REGION=us-east-1
   ports:
     - 8888:8888
     - 8080:8080
     - 10000:10000
     - 10001:10001
 # Nessie Catalog Server Using In-Memory Store
 catalog:
   image: projectnessie/nessie:0.67.0
   container_name: catalog
   networks:
     iceberg-nessie-flink-net:
   ports:
     - 19120:19120
 # Minio Storage Server
 storage:
   image: minio/minio:RELEASE.2023-07-21T21-12-44Z
   container_name: storage
   environment:
     - MINIO_ROOT_USER=admin
     - MINIO_ROOT_PASSWORD=password
     - MINIO_DOMAIN=storage
     - MINIO_REGION_NAME=us-east-1
     - MINIO_REGION=us-east-1
   networks:
     iceberg-nessie-flink-net:
   ports:
     - 9001:9001
     - 9000:9000
   command: ["server", "/data", "--console-address", ":9001"]
 # Minio Client Container
 mc:
   depends_on:
     - storage
   image: minio/mc:RELEASE.2023-07-21T20-44-27Z
   container_name: mc
   networks:
     iceberg-nessie-flink-net:
       aliases:
         - minio.storage
   environment:
     - AWS_ACCESS_KEY_ID=admin
     - AWS_SECRET_ACCESS_KEY=password
     - AWS_REGION=us-east-1
     - AWS_DEFAULT_REGION=us-east-1
   entrypoint: >
     /bin/sh -c "
     until (/usr/bin/mc config host add minio http://storage:9000 admin password) do echo '...waiting...' && sleep 1; done;
     /usr/bin/mc rm -r --force minio/warehouse;
     /usr/bin/mc mb minio/warehouse;
     /usr/bin/mc mb minio/iceberg;
     /usr/bin/mc policy set public minio/warehouse;
     /usr/bin/mc policy set public minio/iceberg;
     tail -f /dev/null
     "
 # Flink Job Manager
 flink-jobmanager:
   image: alexmerced/iceberg-flink-1.3.1
   ports:
     - "8081:8081"
   command: jobmanager
   networks:
     iceberg-nessie-flink-net:
   environment:
     - |
       FLINK_PROPERTIES=
       jobmanager.rpc.address: flink-jobmanager
     - AWS_ACCESS_KEY_ID=admin
     - AWS_SECRET_ACCESS_KEY=password
     - AWS_REGION=us-east-1
     - AWS_DEFAULT_REGION=us-east-1
     - S3_ENDPOINT=http://minio.storage:9000
     - S3_PATH_STYLE_ACCESS=true
 # Flink Task Manager
 flink-taskmanager:
   image: alexmerced/iceberg-flink-1.3.1
   depends_on:
     - flink-jobmanager
   command: taskmanager
   networks:
     iceberg-nessie-flink-net:
   scale: 1
   environment:
     - |
       FLINK_PROPERTIES=
       jobmanager.rpc.address: flink-jobmanager
       taskmanager.numberOfTaskSlots: 2
     - AWS_ACCESS_KEY_ID=admin
     - AWS_SECRET_ACCESS_KEY=password
     - AWS_REGION=us-east-1
     - AWS_DEFAULT_REGION=us-east-1
     - S3_ENDPOINT=http://minio.storage:9000
     - S3_PATH_STYLE_ACCESS=true
networks:
 iceberg-nessie-flink-net:

This docker-compose file does the following:

  • Creates a network called iceberg-nessie-flink-net
  • Creates a container for running Spark 3.4 notebooks with the alexmerced/spark34notebook image
  • Creates a container to host our Nessie catalog
  • Creates a MinIO container to act as our storage
  • Creates a MinIO client container, which is mainly there to create a few initial public buckets
  • Creates two Flink containers to act as the JobManager and TaskManager

What Is the JobManager and TaskManager?

In a Flink cluster, the JobManager is like the head chef who coordinates the operation, assigns tasks, and ensures smooth functioning. The TaskManagers are the junior chefs who execute the assigned tasks, managing resources and exchanging data. Together, they form the cluster, with the JobManager as the central coordinator and the TaskManagers as the workers who perform computations. It's analogous to a restaurant where the head chef manages the kitchen and assigns tasks to the chefs who cook the meals.

Status Check

Right now your directory should look like this:

├── docker-compose.yaml
├── warehouse
└── notebooks

This environment can be spun up by running the docker-compose up command.

We will be using Apache Maven to manage our Flink job. Apache Maven is like a project manager for Java projects. It automates tasks, manages dependencies, and coordinates the building of software. It's similar to having a personal assistant who organizes resources, schedules tasks, and ensures smooth project development.

First, we need to create a blank Maven project:

mvn archetype:generate

It will ask you several questions and you can just hit enter and accept the default answers, except for:

  • artifactID: this is the project name, let’s make it “flink_job”
  • groupID: uniquely identifies who is building the project, traditionally a reverse domain, let’s put “com.my_flink_job”

Status Check

Now your directory should look like the following:

.
├── docker-compose.yaml
├── warehouse
├── notebooks
└── flink_job
    ├── pom.xml
    └── src
        ├── main
        │   └── java
        │       └── com
        │           └── my_flink_job
        │               └── App.java
        └── test
            └── java
                └── com
                    └── my_flink_job
                        └── AppTest.java
  • pom.xml is where we can define project plugins and dependencies
  • App.java will have our App class which is our application entry point
  • AppTest.java is for unit testing the App class

Adding Dependencies

Any libraries we import in our Java classes need to be part of our project for when we build the final jar file. Maven will know which libraries it needs to download and install based on the dependencies section of our pom.xml. We can normally look at the Maven repository website to find the specific dependencies we need, but to save you time here is what the dependencies section of your pom.xml should look like. Your IDE (intellij, VSCode) may detect these changes and ask to sync your dependencies, say “yes.”

<dependencies>
 <dependency>
       <groupId>org.apache.flink</groupId>
       <artifactId>flink-java</artifactId>
       <version>1.16.1</version>
   </dependency>
   <dependency>
       <groupId>org.apache.flink</groupId>
       <artifactId>flink-streaming-java</artifactId>
       <version>1.16.1</version>
       <scope>provided</scope>
   </dependency>
   <dependency>
       <groupId>software.amazon.awssdk</groupId>
       <artifactId>bundle</artifactId>
       <version>2.20.18</version>
   </dependency>
   <dependency>
       <groupId>org.apache.iceberg</groupId>
       <artifactId>iceberg-flink</artifactId>
       <version>1.3.1</version>
   </dependency>
   <dependency>
       <groupId>org.apache.flink</groupId>
       <artifactId>flink-table-api-java-bridge</artifactId>
       <version>1.16.1</version>
       <scope>provided</scope>
   </dependency>
   <dependency>
       <groupId>org.apache.flink</groupId>
       <artifactId>flink-table-api-java</artifactId>
       <version>1.16.1</version>
   </dependency>
   <dependency>
       <groupId>org.apache.flink</groupId>
       <artifactId>flink-table-common</artifactId>
       <version>1.16.1</version>
       <scope>provided</scope>
   </dependency>
   <dependency>
    <groupId>org.apache.iceberg</groupId>
    <artifactId>iceberg-flink-runtime-1.16</artifactId>
    <version>1.3.1</version>
   </dependency>
   <dependency>
       <groupId>software.amazon.awssdk</groupId>
       <artifactId>url-connection-client</artifactId>
       <version>2.20.18</version>
       <scope>test</scope>
   </dependency>
   <dependency>
       <groupId>org.apache.iceberg</groupId>
       <artifactId>iceberg-core</artifactId>
       <version>1.3.1</version>
   </dependency>
   <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-common</artifactId>
       <version>2.8.5</version>
   </dependency>
   <dependency>
     <groupId>junit</groupId>
     <artifactId>junit</artifactId>
     <version>4.11</version>
     <scope>test</scope>
   </dependency>
   <dependency>
     <groupId>org.apache.flink</groupId>
     <artifactId>flink-connector-files</artifactId>
     <version>1.16.1</version>
   </dependency>
</dependencies>

The dependencies fall into a few different categories:

  • Flink libraries for using the table API (so we can use SQL instead of imperative Java), connectors for working with, and other Flink libraries we may use
  • Iceberg libraries, including the core Iceberg library
  • AWS-related libraries for connecting to AWS, many of these were also added to the Flink containers class path earlier 

Also, ensure the source version is at least 1.8 in the POM file. This specifies the version of Java your code should compile to; the value of 1.8 is for Java 8. Later versions just use the raw number, like 11 for Java 11 or 17 for Java 17.

<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>

Note: For this exercise, don't use a Java version over 11; you may get the following error when running the Flink Job if using higher versions.

Caused by: java.lang.UnsupportedClassVersionError: com/my_flink_job/App has been compiled by a more recent version of the Java Runtime (class file version 61.0), this version of the Java Runtime only recognizes class file versions up to 55.0

Creating a Class for Our Records

Operators in a Flink job are crucial for the web UI because they represent data transformations and enable visualization and monitoring, aiding in job analysis and optimization. So to make sure there are operators in our Flink job for the UI to display, we will create a class to map our example data to.

Create an “ExampleData.java” file in the same directory as App.java and include the following:

package com.my_flink_job;

public class ExampleData {
   private Long id;
   private String data;

   public ExampleData() {
   }

   public ExampleData(Long id, String data) {
       this.id = id;
       this.data = data;
   }

   public Long getId() {
       return id;
   }

   public void setId(Long id) {
       this.id = id;
   }

   public String getData() {
       return data;
   }

   public void setData(String data) {
       this.data = data;
   }
}

This is a pretty straightforward class that just has a property with getters and setters for each field in our table. With this, we can now write our job in App.java.

package com.my_flink_job;


// IMPORT DEPENDENCIES
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import static org.apache.flink.table.api.Expressions.$;
import com.my_flink_job.ExampleData;


public class App
{
  public static void main(String[] args) throws Exception {


       // set up the execution environment
       final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
       // set up the table environment
       final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(
               env,
               EnvironmentSettings.newInstance().inStreamingMode().build());


       // create the Nessie catalog
       tableEnv.executeSql(
               "CREATE CATALOG iceberg WITH ("
                       + "'type'='iceberg',"
                       + "'catalog-impl'='org.apache.iceberg.nessie.NessieCatalog',"
                       + "'io-impl'='org.apache.iceberg.aws.s3.S3FileIO',"
                       + "'uri'='http://catalog:19120/api/v1',"
                       + "'authentication.type'='none',"
                       + "'ref'='main',"
                       + "'client.assume-role.region'='us-east-1',"
                       + "'warehouse' = 's3://warehouse',"
                       + "'s3.endpoint'='http://{id-address}:9000'"
                       + ")");


       // List all catalogs
       TableResult result = tableEnv.executeSql("SHOW CATALOGS");


       // Print the result to standard out
       result.print();


       // Set the current catalog to the new catalog
       tableEnv.useCatalog("iceberg");


       // Create a database in the current catalog
       tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS db");


       // create the table
       tableEnv.executeSql(
               "CREATE TABLE IF NOT EXISTS db.table1 ("
                       + "id BIGINT COMMENT 'unique id',"
                       + "data STRING"
                       + ")");


       // create a DataStream of Tuple2 (equivalent to Row of 2 fields)
       DataStream<Tuple2<Long, String>> dataStream = env.fromElements(
               Tuple2.of(1L, "foo"),
               Tuple2.of(1L, "bar"),
               Tuple2.of(1L, "baz"));


       // apply a map transformation to convert the Tuple2 to an ExampleData object
       DataStream<ExampleData> mappedStream = dataStream.map(new MapFunction<Tuple2<Long, String>, ExampleData>() {
           @Override
           public ExampleData map(Tuple2<Long, String> value) throws Exception {
               // perform your mapping logic here and return a ExampleData instance
               // for example:
               return new ExampleData(value.f0, value.f1);
           }
       });


       // convert the DataStream to a Table
       Table table = tableEnv.fromDataStream(mappedStream, $("id"), $("data"));


       // register the Table as a temporary view
       tableEnv.createTemporaryView("my_datastream", table);


       // write the DataStream to the table
       tableEnv.executeSql(
               "INSERT INTO db.table1 SELECT * FROM my_datastream");
   }
}

This Java class does the following:

  • Creates an execution environment; operators executed by this environment will be displayed in the UI for inspection after the job is done
  • Creates a table environment where we can send SQL
  • Creates our Iceberg-Nessie Catalog (we will need to update the IP-address of “s3.endpoint” soon)
  • Creates a database if it doesn’t exist
  • Creates a table if it doesn’t exist
  • Generates some example data
  • Maps it to a DataStream using our ExampleData Class
  • Creates a table from the DataStream and registers it as a view
  • Inserts the data using SQL

Before we build our jar, we need to get the IP address of our Minio server.

Getting the IP Address

First, let’s spin up our environment if you haven’t already with:

docker-compose up

You’ll see all the containers beginning to be spun up:

Creating network "flink-iceberg_iceberg-nessie-flink-net" with the default driver
Creating storage                          ... done
Creating flink-iceberg_flink-jobmanager_1 ... done
Creating catalog                          ... done
Creating spark-iceberg                    ... 
Creating mc                               ... 
Creating flink-iceberg_flink-taskmanager_1 ... 

The output will be coming in for all the containers, keep an eye out for this section as it will have the token you’ll need to access the notebook server on localhost:8888 if you want to create any notebooks.

spark-iceberg        | [I 18:15:54.912 NotebookApp] http://62ae3f075416:8888/?token=35b05a816c31c48d447c502a52b446ecbb781a6e95b0ccf2
spark-iceberg        | [I 18:15:54.912 NotebookApp]  or http://127.0.0.1:8888/?token=35b05a816c31c48d447c502a52b446ecbb781a6e95b0ccf2
spark-iceberg        | [I 18:15:54.912 NotebookApp] Use Control-C to stop this server and shut down all kernels (twice to skip confirmation).

Once all of these containers are running, open up another terminal window and connect to the storage containers shell.

docker exec -it storage /bin/bash

Then run the following command to see the IP information for that container:

ifconfig

Look for the inet value in the output for eth0.

eth0: flags=4163<UP,BROADCAST,RUNNING,MULTICAST>  mtu 1500
        inet 172.31.0.3  netmask 255.255.0.0  broadcast 172.31.255.255
        ether 02:42:ac:1f:00:03  txqueuelen 0  (Ethernet)
        RX packets 166  bytes 35643 (34.8 KiB)
        RX errors 0  dropped 0  overruns 0  frame 0
        TX packets 56  bytes 8468 (8.2 KiB)
        TX errors 0  dropped 0 overruns 0  carrier 0  collisions 0

lo: flags=73<UP,LOOPBACK,RUNNING>  mtu 65536
        inet 127.0.0.1  netmask 255.0.0.0
        loop  txqueuelen 1000  (Local Loopback)
        RX packets 8  bytes 778 (778.0 B)
        RX errors 0  dropped 0  overruns 0  frame 0
        TX packets 8  bytes 778 (778.0 B)
        TX errors 0  dropped 0 overruns 0  carrier 0  collisions 0

Now we can update our Java code in the CREATE CATALOG SQL statement and put in the IP address for the s3.endpoint property.

       // create the Nessie catalog
       tableEnv.executeSql(
               "CREATE CATALOG iceberg WITH ("
                       + "'type'='iceberg',"
                       + "'catalog-impl'='org.apache.iceberg.nessie.NessieCatalog',"
                       + "'io-impl'='org.apache.iceberg.aws.s3.S3FileIO',"
                       + "'uri'='http://catalog:19120/api/v1',"
                       + "'authentication.type'='none',"
                       + "'ref'='main',"
                       + "'client.assume-role.region'='us-east-1',"
                       + "'warehouse' = 's3://warehouse',"
                       + "'s3.endpoint'='http://172.31.0.3:9000'"
                       + ")");

Now navigate a terminal to the folder where the pom.xml resides and build the package with the command:

mvn package

(Note: If you're not writing your own unit tests, delete the “test” folder before compiling to avoid testing errors.)

Status Check

Here is how your files should look:

.
├── pom.xml
├── src
│   └── main
│       └── java
│           └── com
│               └── my_flink_job
│                   ├── App.java
│                   └── ExampleData.java
└── target
    ├── classes
    │   └── com
    │       └── my_flink_job
    │           ├── App$1.class
    │           ├── App.class
    │           └── ExampleData.class
    ├── flink_job-1.0-SNAPSHOT.jar
    ├── generated-sources
    │   └── annotations
    ├── maven-archiver
    │   └── pom.properties
    └── maven-status
        └── maven-compiler-plugin
            └── compile
                └── default-compile
                    ├── createdFiles.lst
                    └── inputFiles.lst

Running the Job

Head over to localhost:8081 where the Flink web UI is visible.

  • Click on the “Submit Job” tab
  • Click on “New Job”
  • Click on the uploaded job
  • Enter “com.my_flink_job.App” as the entry class (this may differ if your package name is different)
  • Hit submit
  • Done!

Confirming the Job Was Done

Head over to localhost:9001 where the Minio web UI is running:

  • Log in with username: admin, and password: password
  • You should see all the files of our table in the warehouse bucket

We also want to confirm the commit is in our catalog. Open up a terminal and enter the following CURL:

http://0.0.0.0:19120/api/v2/trees/main/entries?content=true

You should see an entry regarding changes to an Iceberg table in the output, including a reference to where the metadata.json is. This is how an engine would know where to find the data on your Iceberg table by requesting this information from Nessie and then reading the metadata.json stored in Minio.

Conclusion

This tutorial should show you how to work with Flink and Iceberg with a Nessie catalog to configure for your needs. You can also access this data via Spark and a Notebook on localhost:8888. Nessie enables Git-like features so you can ingest your streams into a branch where further quality checks are done via other tools before merging into the main branch for production analytics. The possibilities of catalog versioning are quite exciting. Even better, you don’t have to maintain your own Nessie server, as you can get a Nessie server as a service using the Dremio Arctic service.

Ready to Get Started?

Bring your users closer to the data with organization-wide self-service analytics and lakehouse flexibility, scalability, and performance at a fraction of the cost. Run Dremio anywhere with self-managed software or Dremio Cloud.