Skip to content

Conversation

@luoyuxia
Copy link
Contributor

@luoyuxia luoyuxia commented Dec 16, 2025

Purpose

Linked issue: close #1893

Brief change log

Tests

API and Format

Documentation

Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR implements tiering timeout support to prevent long-running table tiering operations from blocking other tables. The feature introduces a maximum duration for tiering a single table, after which it will be force-completed or skipped.

Key Changes

  • Added forceIgnore flag to TieringSplit and its subclasses to mark splits that should be skipped due to timeout
  • Implemented periodic timeout checking in TieringSourceEnumerator with configurable max duration and detection interval
  • Introduced TieringReachMaxDurationEvent to notify readers when a table reaches max tiering duration
  • Updated split handling in TieringSplitReader to force-complete in-progress log splits and skip new splits when timeout occurs

Reviewed changes

Copilot reviewed 16 out of 16 changed files in this pull request and generated 18 comments.

Show a summary per file
File Description
TieringSplit.java Added forceIgnore field and methods to mark splits for skipping
TieringSnapshotSplit.java Added constructors supporting forceIgnore parameter
TieringLogSplit.java Added constructors supporting forceIgnore parameter
TieringSplitSerializer.java Updated serialization to include forceIgnore field
TieringSourceEnumerator.java Implemented periodic timeout checking, deadline tracking, and timeout event broadcasting
TieringSplitReader.java Added logic to force-complete log splits and skip splits marked with forceIgnore
TieringSourceReader.java Integrated timeout event handling and custom fetcher manager
TieringSourceFetcherManager.java New class to manage timeout notifications to split readers
TieringReachMaxDurationEvent.java New event class to signal table timeout to readers
TieringSourceOptions.java Added configuration options for max duration and detection interval
TieringSource.java Updated builders to support new timeout configuration parameters
LakeTieringJobBuilder.java Wired up timeout configuration from Fluss config
TieringSplitGenerator.java Removed hardcoded numberOfSplits parameter, changed log levels to DEBUG
TieringSplitSerializerTest.java Added tests for forceIgnore serialization and updated string representations
TieringSourceEnumeratorTest.java Added timeout test, refactored assertions to use containsExactlyInAnyOrderElementsOf, added helper methods
TieringSourceReaderTest.java New test file for testing timeout event handling at reader level
Comments suppressed due to low confidence (2)

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/split/TieringSplitGenerator.java:339

  • The log level was changed from INFO to DEBUG for this message about skipping splits when offset conditions are met. This is appropriate as it reduces log verbosity for expected behavior. However, ensure this doesn't make it harder to diagnose why certain tables aren't being tiered, as this condition might occur frequently during normal operation and could be useful for troubleshooting.
                LOG.debug(
                        "The lastCommittedBucketOffset {} is equals or bigger than latestBucketOffset {}, skip generating split for bucket {}",
                        lastCommittedBucketOffset,
                        latestBucketOffset,
                        tableBucket);
                return Optional.empty();
            }
        }
    }

    private Optional<TieringSplit> generateSplitForLogTableBucket(
            TablePath tablePath,
            TableBucket tableBucket,
            @Nullable String partitionName,
            @Nullable Long lastCommittedBucketOffset,
            long latestBucketOffset) {
        if (latestBucketOffset <= 0) {
            LOG.debug(
                    "The latestBucketOffset {} is equals or less than 0, skip generating split for bucket {}",
                    latestBucketOffset,
                    tableBucket);
            return Optional.empty();
        }

        // the bucket is never been tiered
        if (lastCommittedBucketOffset == null) {
            // the bucket is never been tiered, scan fluss log from the earliest offset
            return Optional.of(
                    new TieringLogSplit(
                            tablePath,
                            tableBucket,
                            partitionName,
                            EARLIEST_OFFSET,
                            latestBucketOffset));
        } else {
            // the bucket has been tiered, scan remain fluss log
            if (lastCommittedBucketOffset < latestBucketOffset) {
                return Optional.of(
                        new TieringLogSplit(
                                tablePath,
                                tableBucket,
                                partitionName,
                                lastCommittedBucketOffset,
                                latestBucketOffset));
            }
        }
        LOG.debug(
                "The lastCommittedBucketOffset {} is equals or bigger than latestBucketOffset {}, skip generating split for bucket {}",
                lastCommittedBucketOffset,
                latestBucketOffset,
                tableBucket);

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java:168

  • When a table reaches the max tiering duration, the timeout check only forces completion for log splits (lines 159-161 and 292). However, snapshot splits can also be in progress. If a table times out while processing snapshot splits, they won't be force-completed, leading to potential hangs or inconsistent behavior. Consider extending the timeout handling to snapshot splits as well.
        // may read snapshot firstly
        if (currentSnapshotSplitReader != null) {
            // for snapshot split, we don't force to complete it
            // since we rely on the log offset for the snapshot to
            // do next tiering, if force to complete, we can't get the log offset
            CloseableIterator<RecordAndPos> recordIterator = currentSnapshotSplitReader.readBatch();
            if (recordIterator == null) {
                LOG.info("Split {} is finished", currentSnapshotSplit.splitId());
                return finishCurrentSnapshotSplit();
            } else {
                return forSnapshotSplitRecords(
                        currentSnapshotSplit.getTableBucket(), recordIterator);
            }
        } else {
            if (currentLogScanner != null) {
                if (timeoutTables.contains(currentTableId)) {
                    return forceCompleteTieringLogRecords();
                }
                ScanRecords scanRecords = currentLogScanner.poll(pollTimeout);
                // force to complete records
                return forLogRecords(scanRecords);
            } else {
                return emptyTableBucketWriteResultWithSplitIds();
            }
        }

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

for (int reader : readers) {
context.sendEventToSourceReader(
reader, new TieringReachMaxDurationEvent(reachMaxDurationTable));
}
Copy link

Copilot AI Dec 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The deadline is removed from tieringTablesDeadline when a table finishes or fails (lines 240 and 259), but it's never removed when the timeout event is sent at line 323. This means a timed-out table will continue triggering timeout checks and events on every periodic check interval, potentially sending duplicate TieringReachMaxDurationEvent messages to readers. Consider removing the deadline after sending the timeout event.

Suggested change
}
}
// remove the deadline once timeout is handled to avoid duplicate timeout events
tieringTablesDeadline.remove(reachMaxDurationTable);

Copilot uses AI. Check for mistakes.
Comment on lines +301 to +311
for (TieringSplit tieringSplit : pendingSplits) {
if (tieringSplit.getTableBucket().getTableId() == reachMaxDurationTable) {
// force ignore this tiering split since the tiering for this table is timeout,
// we have to force to set to ignore the tiering split so that the
// tiering source reader can ignore them directly
tieringSplit.forceIgnore();
} else {
// we can break directly, if found any one split's table id is not equal to the
// timeout
// table, the following split must be not equal to the table id
break;
Copy link

Copilot AI Dec 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The pendingSplits list is accessed both from the periodic timeout check (line 301) and the assignSplits method (line 350), which can be called from different threads via handleSplitRequest and addSplitsBack. While assignSplits synchronizes on readersAwaitingSplit, the timeout check doesn't use any synchronization, which could lead to a ConcurrentModificationException or inconsistent state when iterating and modifying pendingSplits concurrently.

Suggested change
for (TieringSplit tieringSplit : pendingSplits) {
if (tieringSplit.getTableBucket().getTableId() == reachMaxDurationTable) {
// force ignore this tiering split since the tiering for this table is timeout,
// we have to force to set to ignore the tiering split so that the
// tiering source reader can ignore them directly
tieringSplit.forceIgnore();
} else {
// we can break directly, if found any one split's table id is not equal to the
// timeout
// table, the following split must be not equal to the table id
break;
// Access to pendingSplits must be synchronized consistently with other paths
// (e.g., assignSplits) to avoid concurrent modification.
synchronized (readersAwaitingSplit) {
for (TieringSplit tieringSplit : pendingSplits) {
if (tieringSplit.getTableBucket().getTableId() == reachMaxDurationTable) {
// force ignore this tiering split since the tiering for this table is
// timeout, we have to force to set to ignore the tiering split so that the
// tiering source reader can ignore them directly
tieringSplit.forceIgnore();
} else {
// we can break directly, if found any one split's table id is not equal to
// the timeout table, the following split must be not equal to the table id
break;
}

Copilot uses AI. Check for mistakes.

private void assignSplits() {
/* This method may be called from both addSplitsBack and handleSplitRequest, make it thread safe. */
// todo: do we need to add lock?
Copy link

Copilot AI Dec 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The TODO comment at line 340 asks "do we need to add lock?" but this question is now more critical given that the timeout checking code (lines 301-312) accesses pendingSplits without synchronization while assignSplits() synchronizes on readersAwaitingSplit. This creates a potential race condition. The TODO should either be resolved by adding proper synchronization or be updated to reflect the new concurrency concerns.

Copilot uses AI. Check for mistakes.
class TieringSourceReaderTest extends FlinkTestBase {

@Test
void testHandleTieringReachMaxDurationEvent1() throws Exception {
Copy link

Copilot AI Dec 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test method name 'testHandleTieringReachMaxDurationEvent1' has a '1' suffix, which typically indicates there should be multiple similar tests or that this is a placeholder name. Consider using a more descriptive name like 'testHandleTieringReachMaxDurationEventWithForceIgnore' or 'testMaxDurationEventWithEmptyAndNonEmptySplits' to better describe what the test verifies.

Suggested change
void testHandleTieringReachMaxDurationEvent1() throws Exception {
void testHandleTieringReachMaxDurationEventWithForceIgnore() throws Exception {

Copilot uses AI. Check for mistakes.
Comment on lines +72 to +73
private static final long UNKNOW_BUCKET_OFFSET = -1;

Copy link

Copilot AI Dec 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo in constant name: 'UNKNOW_BUCKET_OFFSET' should be 'UNKNOWN_BUCKET_OFFSET' to be consistent with UNKNOWN_BUCKET_TIMESTAMP.

Suggested change
private static final long UNKNOW_BUCKET_OFFSET = -1;
// unknown bucket offset for empty split or snapshot split
private static final long UNKNOWN_BUCKET_OFFSET = -1;
/**
* @deprecated Use {@link #UNKNOWN_BUCKET_OFFSET} instead.
*/
@Deprecated
private static final long UNKNOW_BUCKET_OFFSET = UNKNOWN_BUCKET_OFFSET;

Copilot uses AI. Check for mistakes.
currentTableTieredOffsetAndTimestamp.get(bucket);
long logEndOffset =
logOffsetAndTimestamp == null
? UNKNOW_BUCKET_OFFSET
Copy link

Copilot AI Dec 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo in variable reference: 'UNKNOW_BUCKET_OFFSET' should be 'UNKNOWN_BUCKET_OFFSET'.

Suggested change
? UNKNOW_BUCKET_OFFSET
? UNKNOWN_BUCKET_OFFSET

Copilot uses AI. Check for mistakes.
Comment on lines 53 to +60
public TieringSourceReader(
FutureCompletingBlockingQueue<RecordsWithSplitIds<TableBucketWriteResult<WriteResult>>>
elementsQueue,
SourceReaderContext context,
Connection connection,
LakeTieringFactory<WriteResult, ?> lakeTieringFactory) {
this(elementsQueue, context, connection, lakeTieringFactory, DEFAULT_POLL_TIMEOUT);
}
Copy link

Copilot AI Dec 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The constructor now requires an elementsQueue parameter, changing from creating the queue internally to accepting it externally. However, the non-test constructor at line 53 doesn't expose a way to customize the queue, which limits flexibility for testing or customization. While this is addressed by the @VisibleForTesting constructor, consider whether the production code path should also allow queue configuration or if it should remain internal.

Copilot uses AI. Check for mistakes.
Comment on lines 74 to +78
out.writeBoolean(false);
}

// write force ignore
out.writeBoolean(split.isForceIgnore());
Copy link

Copilot AI Dec 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The serialization format has changed by adding a new forceIgnore boolean field (line 78), but the version number remains at VERSION_0. This breaks backward compatibility: older deserializers will fail when reading data serialized by the new version, and the new deserializer will fail when reading old serialized data that doesn't have the forceIgnore field. Consider bumping the version to VERSION_1 and handling both old and new formats in the deserialize method for backward compatibility.

Copilot uses AI. Check for mistakes.
tieringSplit.getPartitionName(),
null,
logSplit.getStoppingOffset(),
UNKNOW_BUCKET_OFFSET,
Copy link

Copilot AI Dec 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo in variable reference: 'UNKNOW_BUCKET_OFFSET' should be 'UNKNOWN_BUCKET_OFFSET'.

Suggested change
UNKNOW_BUCKET_OFFSET,
UNKNOWN_BUCKET_OFFSET,

Copilot uses AI. Check for mistakes.
* SourceEvent used to notify TieringSourceReader that a table has reached the maximum tiering
* duration and should be force completed.
*/
public class TieringReachMaxDurationEvent implements SourceEvent {
Copy link

Copilot AI Dec 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Class TieringReachMaxDurationEvent overrides hashCode but not equals.

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Tiering Service supports splitting large splits and committing them separately to the lake.

1 participant