Flink’s Iceberg Sink
This is the core of how the Flink Iceberg sink works, and it’s a great “dance” between the Flink runtime and the two main operators. Let’s walk through the lifecycle step-by-step.
The Two Main Actors
IcebergStreamWriter: Its job is to take incoming data records, buffer them, and write them into data files (like Parquet or ORC) on a distributed filesystem. It has many parallel instances (subtasks), one for each sink parallelism.IcebergFilesCommitter: Its job is to collect the information about all the data files created by all theIcebergStreamWriterinstances and commit them to the Iceberg table as a new snapshot. It runs as a single, non-parallel operator (parallelism = 1).
The Lifecycle and Method Calls
Here is the fundamental sequence of events:
Phase 1: Job Initialization
initializeState(context)is called.- On
IcebergStreamWriter:- It gets its runtime context (like its unique subtask ID).
- It initializes the
TaskWriterFactory, which knows how to create writers for data files. - It creates the very first
TaskWriterinstance. This is the initial buffer for writing Parquet files.
- On
IcebergFilesCommitter:- It loads the Iceberg
Tablemetadata. - If it’s restoring from a previous Flink checkpoint, it inspects the old snapshots in the Iceberg table to find the last checkpoint ID that was successfully committed. This is crucial to avoid re-committing data after a failure.
- It initializes its state, which is where it will keep track of files ready to be committed.
- It loads the Iceberg
- On
Phase 2: Steady State - Processing Data
processElement(record)is called.- On
IcebergStreamWriter:- This is called for every single data record coming into the sink.
- The implementation is simple:
writer.write(element.getValue()). - The
TaskWriterinstance handles the buffering and spilling of data to the underlying Parquet/ORC file on disk. The file is kept open.
- On
IcebergFilesCommitter:- This method is also called, but its input is not the raw data. It receives the
FlinkWriteResultthat theIcebergStreamWriteremits. - It takes this result and stores it in an internal map (
writeResultsSinceLastSnapshot) keyed by the checkpoint ID. It does not commit anything yet; it just collects the file information.
- This method is also called, but its input is not the raw data. It receives the
- On
Phase 3: The Checkpoint Dance (The Most Important Part)
This is triggered periodically by Flink’s JobManager.
prepareSnapshotPreBarrier(checkpointId)is called.- On
IcebergStreamWriter:- This is the signal that a Flink checkpoint is about to happen. This operator must now finalize all work related to the current checkpoint window.
- It calls its internal
flush()method. - Inside
flush():writer.complete()is called. This releases the buffer: it finalizes the current Parquet file, closes it, and returns aWriteResultobject containing the file’s metadata (path, statistics, etc.).- It emits this
WriteResultdownstream to theIcebergFilesCommitterusingoutput.collect().
- Immediately after the
flush(), it creates a newTaskWriterinstance. This is the new, empty buffer that will be used for all the records that arrive in the next checkpoint window.
- On
snapshotState(context)is called.- On
IcebergStreamWriter: Its state is very simple and managed by Flink automatically. - On
IcebergFilesCommitter:- This is a critical step for fault tolerance. The committer takes all the
WriteResultobjects it has received for the current checkpoint. - It writes them into a temporary manifest file. This file is a list of all the new data files for this checkpoint.
- It then saves the bytes of this manifest file into Flink’s durable state backend (e.g., RocksDB on HDFS/S3).
- Why? If the job fails right after this, upon recovery, the committer can restore this state and know exactly which files were ready to be committed, ensuring no data is lost.
- This is a critical step for fault tolerance. The committer takes all the
- On
notifyCheckpointComplete(checkpointId)is called.- On
IcebergStreamWriter: This method is not used; its work is done for this checkpoint. - On
IcebergFilesCommitter:- This is the final signal from the Flink JobManager. It means the checkpoint has been successfully completed across the entire Flink job. It is now safe to make an external commit.
- This is when the committer sends data to Iceberg.
- It retrieves the manifest files for the completed checkpoint from its state.
- It creates an Iceberg transaction (
AppendFilesorRowDelta). - It adds the new files to the transaction and calls
operation.commit(). - This commit creates a new snapshot in the Iceberg table, making the new data visible to all other query engines. It also writes the
checkpointIdinto the snapshot summary properties for tracking.
- On
Buffer Creation / Release and File Commit
- When does
IcebergStreamWritercreate a new buffer?- On initialization (
initializeState). - At the end of every
prepareSnapshotPreBarriercall, right after flushing the previous one.
- On initialization (
- When are those buffers released?
- During the
prepareSnapshotPreBarriercall, whenwriter.complete()is invoked. This finalizes the file and “releases” the buffer by making it ready for commit.
- During the
- When does
IcebergFilesCommittersend data files to Iceberg?- Only inside the
notifyCheckpointComplete(checkpointId)method. This ensures that data is only made visible in the Iceberg table after Flink guarantees the state has been durably saved, achieving exactly-once semantics.
- Only inside the
The core interaction IcebergStreamWriter and IcebergFilesCommiter
That line of code inside IcebergStreamWriter::flush(), output.collect(new StreamRecord<>(new FlinkWriteResult(checkpointId, result))), is precisely how a Flink operator sends data to the next operator in the stream graph.
Let’s break it down:
output: This is an object of typeorg.apache.flink.streaming.api.operators.Outputthat Flink provides to every operator. It’s the gateway to all downstream operators..collect(...): This is the method you call on theoutputobject to emit a single record downstream.new StreamRecord<>(...): This is the standard wrapper Flink uses for all records flowing through a stream. It contains the actual data element and potentially other metadata like a timestamp.new FlinkWriteResult(checkpointId, result): This is the payload. Theresultobject (of typeWriteResult) contains the list of data files and delete files that were just written and closed by thewriter.complete()call. This payload is what the next operator needs to work with.
So, the IcebergStreamWriter’s responsibility ends here. It writes data files and then emits the results of that work downstream for another operator to handle the commit phase.
The checkpoint consistency across distributed data flow
The scenario where the Committer’s prepareSnapshotPreBarrier is called before it has received the Writer’s FlinkWriteResult for that same checkpoint—is guaranteed not to happen by the Flink runtime.
Here’s the critical concept that prevents this: The Checkpoint Barrier.
Think of the data stream not just as records, but as records with special markers called “checkpoint barriers” flowing along with them. Let’s trace the flow for a single checkpoint, let’s call it checkpointId=N:
- At the
IcebergStreamWriter:- The
Writeris busy processing normal data records viaprocessElement. - Suddenly, a
CheckpointBarrier(N)arrives from upstream. - The Flink runtime for this operator sees the barrier and stops processing any further data records.
- It immediately calls
prepareSnapshotPreBarrier(N)on theWriter. - Inside this method, your highlighted line
output.collect(...)is executed. This sends theFlinkWriteResultrecord into the network buffer, destined for theCommitter. - The
Writerfinishes its checkpointing logic (snapshotState).
- The
- In the Network Channel between
WriterandCommitter:- The stream now looks like this:
..., FlinkWriteResult, CheckpointBarrier(N), ... - The
FlinkWriteResultrecord is always ahead of theCheckpointBarrier(N)that triggered its creation.
- The stream now looks like this:
- At the
IcebergFilesCommitter:- The
Committer’s thread is processing its input stream. - It first receives and processes the
FlinkWriteResultrecord by calling its ownprocessElementmethod (Line 428 inIcebergFilesCommitter.java). This method adds the result to thewriteResultsSinceLastSnapshotmap. - Only after it has finished processing the
FlinkWriteResultrecord does it see theCheckpointBarrier(N)in its input stream. - Upon seeing the barrier, the Flink runtime for the
CommittercallsprepareSnapshotPreBarrier(N)and thensnapshotState(N). By this time, it is guaranteed to have already processed all records that came before that barrier.
- The
The Flink runtime guarantees that for any given checkpoint N, an operator will have received and processed all data records belonging to that checkpoint window before its own prepareSnapshotPreBarrier(N) method is invoked.
So, there is no race condition. The output.collect() in the Writer is part of the data stream, not a side channel. The Committer will process that collected record just like any other data record, and only then will it handle the checkpoint barrier that follows it. This elegant mechanism, called “checkpoint alignment,” is fundamental to how Flink achieves its exactly-once processing guarantees.