Flink is a supercharged tool for processing data in real-time or in batches. It's open source and has a unified programming model, so you can build some serious data processing pipelines.
But here's where things get interesting. When you bring Apache Iceberg and Project Nessie into the mix, Flink becomes even more awesome. Iceberg is a table format that makes working with large-scale data lakes a breeze. It lets you do cool stuff like evolving schemas, time travel through data history, and manage metadata efficiently.
And what about Nessie? Well, it's like Git for data lakes. It adds versioning and metadata management to Iceberg tables, so you can collaborate on datasets, keep track of changes, and ensure consistency.
With Flink, Iceberg, and Nessie, you can build robust and scalable data pipelines. Flink handles streaming and batch processing, Iceberg keeps your data organized and flexible, and Nessie adds version control and metadata magic.
The Tutorial
This tutorial explains all the steps for using Flink with Iceberg and Nessie, so you can customize them for your use case. To follow this tutorial, you need the following installed:
Docker
Java 8+
Maven
This GitHub repository has everything you need to reference the steps documented in this tutorial.
Try Dremio’s Interactive Demo
Explore this interactive demo and see how Dremio's Intelligent Lakehouse enables Agentic AI
Creating Our Flink Docker Image
In the official Flink image, we need to add any Java jars for libraries we plan on using. To do this, we looked up the download links on the Maven repository website and created a Dockerfile for creating a custom Flink image that looks like this:
# Start from the official Flink image
FROM flink:1.16.1-scala_2.12
###############################################
## Download Neccessary Jars to Flink Class Path
###############################################
## Iceberg Flink Library
RUN curl -L https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-flink-runtime-1.16/1.3.1/iceberg-flink-runtime-1.16-1.3.1.jar -o /opt/flink/lib/iceberg-flink-runtime-1.16-1.3.1.jar
## Hive Flink Library
RUN curl -L https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-hive-2.3.9_2.12/1.16.1/flink-sql-connector-hive-2.3.9_2.12-1.16.1.jar -o /opt/flink/lib/flink-sql-connector-hive-2.3.9_2.12-1.16.1.jar
## Hadoop Common Classes
RUN curl -L https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-common/2.8.3/hadoop-common-2.8.3.jar -o /opt/flink/lib/hadoop-common-2.8.3.jar
## Hadoop AWS Classes
RUN curl -L https://repo.maven.apache.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/2.8.3-10.0/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar -o /opt/flink/lib/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
## AWS Bundled Classes
RUN curl -L https://repo1.maven.org/maven2/software/amazon/awssdk/bundle/2.20.18/bundle-2.20.18.jar -o /opt/flink/lib/bundle-2.20.18.jar
## Install Nano to edit files
RUN apt update && apt install -y nano
CMD ["./bin/start-cluster.sh"]
## docker build --tag username/imagename .
This script does the following:
Starts with the official Flink 1.16 image
Downloads all the necessary jars and copies them to the Flink classpath at /opt/flink/lib
Installs Nano in case we need to do any file editing on the fly for config files
Uses the same entry point command as the original Flink image
This image is already available from Docker Hub as alexmerced/flink-iceberg-1.3.1, but if you need to customize it, you can use the Dockerfile above to build it.
The Docker Compose File
Once our image is built, it’s time to create our environment. Create an empty directory with an empty warehouse, notebooks subdirectory, and a single docker-compose.yaml:
Creates a container for running Spark 3.4 notebooks with the alexmerced/spark34notebook image
Creates a container to host our Nessie catalog
Creates a MinIO container to act as our storage
Creates a MinIO client container, which is mainly there to create a few initial public buckets
Creates two Flink containers to act as the JobManager and TaskManager
What Is the JobManager and TaskManager?
In a Flink cluster, the JobManager is like the head chef who coordinates the operation, assigns tasks, and ensures smooth functioning. The TaskManagers are the junior chefs who execute the assigned tasks, managing resources and exchanging data. Together, they form the cluster, with the JobManager as the central coordinator and the TaskManagers as the workers who perform computations. It's analogous to a restaurant where the head chef manages the kitchen and assigns tasks to the chefs who cook the meals.
This environment can be spun up by running the docker-compose up command.
Creating Our Flink Job
We will be using Apache Maven to manage our Flink job. Apache Maven is like a project manager for Java projects. It automates tasks, manages dependencies, and coordinates the building of software. It's similar to having a personal assistant who organizes resources, schedules tasks, and ensures smooth project development.
First, we need to create a blank Maven project:
mvn archetype:generate
It will ask you several questions and you can just hit enter and accept the default answers, except for:
artifactID: this is the project name, let’s make it “flink_job”
groupID: uniquely identifies who is building the project, traditionally a reverse domain, let’s put “com.my_flink_job”
Status Check
Now your directory should look like the following:
.
├── docker-compose.yaml
├── warehouse
├── notebooks
└── flink_job
├── pom.xml
└── src
├── main
│ └── java
│ └── com
│ └── my_flink_job
│ └── App.java
└── test
└── java
└── com
└── my_flink_job
└── AppTest.java
pom.xml is where we can define project plugins and dependencies
App.java will have our App class which is our application entry point
AppTest.java is for unit testing the App class
Adding Dependencies
Any libraries we import in our Java classes need to be part of our project for when we build the final jar file. Maven will know which libraries it needs to download and install based on the dependencies section of our pom.xml. We can normally look at the Maven repository website to find the specific dependencies we need, but to save you time here is what the dependencies section of your pom.xml should look like. Your IDE (intellij, VSCode) may detect these changes and ask to sync your dependencies, say “yes.”
The dependencies fall into a few different categories:
Flink libraries for using the table API (so we can use SQL instead of imperative Java), connectors for working with, and other Flink libraries we may use
Iceberg libraries, including the core Iceberg library
AWS-related libraries for connecting to AWS, many of these were also added to the Flink containers class path earlier
Also, ensure the source version is at least 1.8 in the POM file. This specifies the version of Java your code should compile to; the value of 1.8 is for Java 8. Later versions just use the raw number, like 11 for Java 11 or 17 for Java 17.
Note: For this exercise, don't use a Java version over 11; you may get the following error when running the Flink Job if using higher versions.
Caused by: java.lang.UnsupportedClassVersionError: com/my_flink_job/App has been compiled by a more recent version of the Java Runtime (class file version 61.0), this version of the Java Runtime only recognizes class file versions up to 55.0
Creating a Class for Our Records
Operators in a Flink job are crucial for the web UI because they represent data transformations and enable visualization and monitoring, aiding in job analysis and optimization. So to make sure there are operators in our Flink job for the UI to display, we will create a class to map our example data to.
Create an “ExampleData.java” file in the same directory as App.java and include the following:
package com.my_flink_job;
public class ExampleData {
private Long id;
private String data;
public ExampleData() {
}
public ExampleData(Long id, String data) {
this.id = id;
this.data = data;
}
public Long getId() {
return id;
}
public void setId(Long id) {
this.id = id;
}
public String getData() {
return data;
}
public void setData(String data) {
this.data = data;
}
}
This is a pretty straightforward class that just has a property with getters and setters for each field in our table. With this, we can now write our job in App.java.
package com.my_flink_job;
// IMPORT DEPENDENCIES
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import static org.apache.flink.table.api.Expressions.$;
import com.my_flink_job.ExampleData;
public class App
{
public static void main(String[] args) throws Exception {
// set up the execution environment
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
// set up the table environment
final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(
env,
EnvironmentSettings.newInstance().inStreamingMode().build());
// create the Nessie catalog
tableEnv.executeSql(
"CREATE CATALOG iceberg WITH ("
+ "'type'='iceberg',"
+ "'catalog-impl'='org.apache.iceberg.nessie.NessieCatalog',"
+ "'io-impl'='org.apache.iceberg.aws.s3.S3FileIO',"
+ "'uri'='http://catalog:19120/api/v1',"
+ "'authentication.type'='none',"
+ "'ref'='main',"
+ "'client.assume-role.region'='us-east-1',"
+ "'warehouse' = 's3://warehouse',"
+ "'s3.endpoint'='http://{id-address}:9000'"
+ ")");
// List all catalogs
TableResult result = tableEnv.executeSql("SHOW CATALOGS");
// Print the result to standard out
result.print();
// Set the current catalog to the new catalog
tableEnv.useCatalog("iceberg");
// Create a database in the current catalog
tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS db");
// create the table
tableEnv.executeSql(
"CREATE TABLE IF NOT EXISTS db.table1 ("
+ "id BIGINT COMMENT 'unique id',"
+ "data STRING"
+ ")");
// create a DataStream of Tuple2 (equivalent to Row of 2 fields)
DataStream<Tuple2<Long, String>> dataStream = env.fromElements(
Tuple2.of(1L, "foo"),
Tuple2.of(1L, "bar"),
Tuple2.of(1L, "baz"));
// apply a map transformation to convert the Tuple2 to an ExampleData object
DataStream<ExampleData> mappedStream = dataStream.map(new MapFunction<Tuple2<Long, String>, ExampleData>() {
@Override
public ExampleData map(Tuple2<Long, String> value) throws Exception {
// perform your mapping logic here and return a ExampleData instance
// for example:
return new ExampleData(value.f0, value.f1);
}
});
// convert the DataStream to a Table
Table table = tableEnv.fromDataStream(mappedStream, $("id"), $("data"));
// register the Table as a temporary view
tableEnv.createTemporaryView("my_datastream", table);
// write the DataStream to the table
tableEnv.executeSql(
"INSERT INTO db.table1 SELECT * FROM my_datastream");
}
}
This Java class does the following:
Creates an execution environment; operators executed by this environment will be displayed in the UI for inspection after the job is done
Creates a table environment where we can send SQL
Creates our Iceberg-Nessie Catalog (we will need to update the IP-address of “s3.endpoint” soon)
Creates a database if it doesn’t exist
Creates a table if it doesn’t exist
Generates some example data
Maps it to a DataStream using our ExampleData Class
Creates a table from the DataStream and registers it as a view
Inserts the data using SQL
Before we build our jar, we need to get the IP address of our Minio server.
Getting the IP Address
First, let’s spin up our environment if you haven’t already with:
docker-compose up
You’ll see all the containers beginning to be spun up:
Creating network "flink-iceberg_iceberg-nessie-flink-net" with the default driver
Creating storage ... done
Creating flink-iceberg_flink-jobmanager_1 ... done
Creating catalog ... done
Creating spark-iceberg ...
Creating mc ...
Creating flink-iceberg_flink-taskmanager_1 ...
The output will be coming in for all the containers, keep an eye out for this section as it will have the token you’ll need to access the notebook server on localhost:8888 if you want to create any notebooks.
spark-iceberg | [I 18:15:54.912 NotebookApp] http://62ae3f075416:8888/?token=35b05a816c31c48d447c502a52b446ecbb781a6e95b0ccf2
spark-iceberg | [I 18:15:54.912 NotebookApp] or http://127.0.0.1:8888/?token=35b05a816c31c48d447c502a52b446ecbb781a6e95b0ccf2
spark-iceberg | [I 18:15:54.912 NotebookApp] Use Control-C to stop this server and shut down all kernels (twice to skip confirmation).
Once all of these containers are running, open up another terminal window and connect to the storage containers shell.
docker exec -it storage /bin/bash
Then run the following command to see the IP information for that container:
You should see an entry regarding changes to an Iceberg table in the output, including a reference to where the metadata.json is. This is how an engine would know where to find the data on your Iceberg table by requesting this information from Nessie and then reading the metadata.json stored in Minio.
Conclusion
This tutorial should show you how to work with Flink and Iceberg with a Nessie catalog to configure for your needs. You can also access this data via Spark and a Notebook on localhost:8888. Nessie enables Git-like features so you can ingest your streams into a branch where further quality checks are done via other tools before merging into the main branch for production analytics. The possibilities of catalog versioning are quite exciting. Even better, you don’t have to maintain your own Nessie server, as you can get a Nessie server as a service using the Dremio Arctic service.
Try Dremio Cloud free for 30 days
Deploy agentic analytics directly on Apache Iceberg data with no pipelines and no added overhead.
Intro to Dremio, Nessie, and Apache Iceberg on Your Laptop
We're always looking for ways to better handle and save money on our data. That's why the "data lakehouse" is becoming so popular. It offers a mix of the flexibility of data lakes and the ease of use and performance of data warehouses. The goal? Make data handling easier and cheaper. So, how do we […]
Aug 16, 2023·Dremio Blog: News Highlights
5 Use Cases for the Dremio Lakehouse
With its capabilities in on-prem to cloud migration, data warehouse offload, data virtualization, upgrading data lakes and lakehouses, and building customer-facing analytics applications, Dremio provides the tools and functionalities to streamline operations and unlock the full potential of data assets.
Aug 31, 2023·Dremio Blog: News Highlights
Dremio Arctic is Now Your Data Lakehouse Catalog in Dremio Cloud
Dremio Arctic bring new features to Dremio Cloud, including Apache Iceberg table optimization and Data as Code.