Connect Flink to Iceberg REST
Introduction
Apache Gravitino exposes an Iceberg REST catalog endpoint that any Iceberg-compatible engine can connect to directly — without installing a Gravitino-specific connector plugin. This page describes how to configure Apache Flink to use Gravitino's Iceberg REST (IRC) endpoint.
This integration uses the standard Apache Iceberg REST catalog specification. Gravitino enforces its full access-control model on all IRC requests.
Prerequisites
- Apache Gravitino running with the Iceberg REST service enabled. See Iceberg REST catalog service for setup instructions.
- The Gravitino IRC endpoint is accessible from your Flink environment. The default port is
9001. - The following JAR files on your Flink classpath:
iceberg-flink-runtime-1.18-1.7.1.jar(oriceberg-flink-runtime-1.19-1.7.1.jarfor Flink 1.19)iceberg-aws-bundle-1.7.1.jarflink-shaded-hadoop-2-uber.jar
This page uses Flink 1.18 with Iceberg 1.7.1.
Unlike Spark and Trino, Flink requires S3 connection properties to be specified in the catalog definition itself rather than in a separate configuration file.
Configuration
Flink uses a cluster configuration file (flink-conf.yaml) for general settings. The Iceberg
catalog is registered per-session using a CREATE CATALOG SQL statement.
flink-conf.yaml
Set batch execution mode and result display in $FLINK_HOME/conf/flink-conf.yaml:
execution.runtime-mode: batch
sql-client.execution.result-mode: tableau
tableau mode prints query results inline in the terminal. Without it, Flink SQL Client opens
results in a full-screen pager.
Start the Flink SQL Client
$FLINK_HOME/bin/sql-client.sh
Register the Catalog
At the Flink SQL Client prompt, run the following CREATE CATALOG statement. Replace
<gravitino-host> with your Gravitino server address and supply your S3 credentials.
No Authentication
CREATE CATALOG gravitino_irc WITH (
'type' = 'iceberg',
'catalog-type' = 'rest',
'uri' = 'http://<gravitino-host>:9001/iceberg',
'io-impl' = 'org.apache.iceberg.aws.s3.S3FileIO',
's3.region' = 'us-east-1',
's3.access-key-id' = '<access-key>',
's3.secret-access-key' = '<secret-key>'
);
Basic Authentication
CREATE CATALOG gravitino_irc WITH (
'type' = 'iceberg',
'catalog-type' = 'rest',
'uri' = 'http://<gravitino-host>:9001/iceberg',
'rest.auth.type' = 'basic',
'rest.auth.basic.username' = '<username>',
'rest.auth.basic.password' = '<password>',
'io-impl' = 'org.apache.iceberg.aws.s3.S3FileIO',
's3.region' = 'us-east-1',
's3.access-key-id' = '<access-key>',
's3.secret-access-key' = '<secret-key>'
);
OAuth2 Authentication
CREATE CATALOG gravitino_irc WITH (
'type' = 'iceberg',
'catalog-type' = 'rest',
'uri' = 'http://<gravitino-host>:9001/iceberg',
'rest.auth.type' = 'oauth2',
'rest.auth.oauth2.token' = '<your-token>',
'io-impl' = 'org.apache.iceberg.aws.s3.S3FileIO',
's3.region' = 'us-east-1',
's3.access-key-id' = '<access-key>',
's3.secret-access-key' = '<secret-key>'
);
See How to authenticate for Gravitino authentication configuration options.
The catalog registration persists for the duration of the SQL Client session. You must re-run
CREATE CATALOG each time you start a new session.
For local development with MinIO, add the following S3 properties to the catalog definition:
's3.endpoint' = 'http://<minio-host>:9000',
's3.path-style-access' = 'true',
See gravitino-irc-quickstart for a complete local development environment using MinIO.
Examples
Use the Catalog
USE CATALOG gravitino_irc;
List Databases
SHOW DATABASES;
List Tables
SHOW TABLES IN <namespace>;
Query a Table
SELECT * FROM <namespace>.<table>;
Create a Table
CREATE TABLE gravitino_irc.<namespace>.new_table (
id INT,
name STRING,
created_at TIMESTAMP
);
Insert Data
INSERT INTO gravitino_irc.<namespace>.new_table VALUES (1, 'example', CURRENT_TIMESTAMP);
Gravitino Connector vs. Iceberg REST
| Feature | Gravitino Engine Connector | Iceberg REST |
|---|---|---|
| Engine plugin required | Yes | No |
| Gravitino access control | Yes | Yes |
| Supported engines | Trino, Spark, Flink, Daft | Any Iceberg-compatible engine |
| Credential vending | Varies | Yes (S3, GCS, OSS, ADLS) |