From 79767b82a3a6099cf2fd9618fd0353ae7b46623d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=BE=8A=E5=B7=9D?= Date: Wed, 1 Jul 2026 23:22:45 +0800 Subject: [PATCH 1/2] refactor TieringSplit --- .../client/tiering}/TieringLogSplit.java | 2 +- .../client/tiering}/TieringSnapshotSplit.java | 2 +- .../fluss/client/tiering}/TieringSplit.java | 12 +- .../flink/tiering/source/TieringSource.java | 18 ++- .../source/TieringSourceFetcherManager.java | 10 +- .../tiering/source/TieringSourceReader.java | 10 +- .../tiering/source/TieringSplitReader.java | 14 +- .../enumerator/TieringSourceEnumerator.java | 36 +++-- .../source/split/FlinkTieringSplit.java | 152 ++++++++++++++++++ .../source/split/TieringSplitGenerator.java | 3 + .../source/split/TieringSplitSerializer.java | 56 ++++--- .../source/state/TieringSplitState.java | 64 ++++---- .../source/TieringSourceReaderTest.java | 9 +- .../source/TieringSplitReaderTest.java | 40 ++--- .../TieringSourceEnumeratorTest.java | 95 +++++------ .../split/TieringSplitSerializerTest.java | 47 +++--- .../source/state/TieringSplitStateTest.java | 18 ++- 17 files changed, 393 insertions(+), 195 deletions(-) rename {fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/split => fluss-client/src/main/java/org/apache/fluss/client/tiering}/TieringLogSplit.java (99%) rename {fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/split => fluss-client/src/main/java/org/apache/fluss/client/tiering}/TieringSnapshotSplit.java (99%) rename {fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/split => fluss-client/src/main/java/org/apache/fluss/client/tiering}/TieringSplit.java (96%) create mode 100644 fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/split/FlinkTieringSplit.java diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/split/TieringLogSplit.java b/fluss-client/src/main/java/org/apache/fluss/client/tiering/TieringLogSplit.java similarity index 99% rename from fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/split/TieringLogSplit.java rename to fluss-client/src/main/java/org/apache/fluss/client/tiering/TieringLogSplit.java index a1e983b503..194851e2bc 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/split/TieringLogSplit.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/tiering/TieringLogSplit.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.fluss.flink.tiering.source.split; +package org.apache.fluss.client.tiering; import org.apache.fluss.metadata.TableBucket; import org.apache.fluss.metadata.TablePath; diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/split/TieringSnapshotSplit.java b/fluss-client/src/main/java/org/apache/fluss/client/tiering/TieringSnapshotSplit.java similarity index 99% rename from fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/split/TieringSnapshotSplit.java rename to fluss-client/src/main/java/org/apache/fluss/client/tiering/TieringSnapshotSplit.java index 8c3bad3645..f4e7901ec7 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/split/TieringSnapshotSplit.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/tiering/TieringSnapshotSplit.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.fluss.flink.tiering.source.split; +package org.apache.fluss.client.tiering; import org.apache.fluss.metadata.TableBucket; import org.apache.fluss.metadata.TablePath; diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/split/TieringSplit.java b/fluss-client/src/main/java/org/apache/fluss/client/tiering/TieringSplit.java similarity index 96% rename from fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/split/TieringSplit.java rename to fluss-client/src/main/java/org/apache/fluss/client/tiering/TieringSplit.java index b41aef3f70..dae0d3dad9 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/split/TieringSplit.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/tiering/TieringSplit.java @@ -15,19 +15,18 @@ * limitations under the License. */ -package org.apache.fluss.flink.tiering.source.split; +package org.apache.fluss.client.tiering; import org.apache.fluss.metadata.TableBucket; import org.apache.fluss.metadata.TablePath; -import org.apache.flink.api.connector.source.SourceSplit; - import javax.annotation.Nullable; +import java.io.Serializable; import java.util.Objects; /** The base table split for tiering service. */ -public abstract class TieringSplit implements SourceSplit { +public abstract class TieringSplit implements Serializable { public static final byte TIERING_SNAPSHOT_SPLIT_FLAG = 1; public static final byte TIERING_LOG_SPLIT_FLAG = 2; @@ -91,6 +90,9 @@ public TieringSplit( this.tieringRoundTimestamp = tieringRoundTimestamp; } + /** Returns the unique identifier for this split. */ + public abstract String splitId(); + /** Checks whether this split is a primary key table split to tier. */ public final boolean isTieringSnapshotSplit() { return getClass() == TieringSnapshotSplit.class; @@ -128,7 +130,7 @@ public TieringLogSplit asTieringLogSplit() { return (TieringLogSplit) this; } - protected byte splitKind() { + public byte splitKind() { if (isTieringSnapshotSplit()) { return TIERING_SNAPSHOT_SPLIT_FLAG; } else if (isTieringLogSplit()) { diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSource.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSource.java index 649b758704..e7435a7763 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSource.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSource.java @@ -21,7 +21,7 @@ import org.apache.fluss.client.ConnectionFactory; import org.apache.fluss.config.Configuration; import org.apache.fluss.flink.tiering.source.enumerator.TieringSourceEnumerator; -import org.apache.fluss.flink.tiering.source.split.TieringSplit; +import org.apache.fluss.flink.tiering.source.split.FlinkTieringSplit; import org.apache.fluss.flink.tiering.source.split.TieringSplitSerializer; import org.apache.fluss.flink.tiering.source.state.TieringSourceEnumeratorState; import org.apache.fluss.flink.tiering.source.state.TieringSourceEnumeratorStateSerializer; @@ -55,7 +55,9 @@ */ public class TieringSource implements Source< - TableBucketWriteResult, TieringSplit, TieringSourceEnumeratorState> { + TableBucketWriteResult, + FlinkTieringSplit, + TieringSourceEnumeratorState> { public static final String TIERING_SOURCE_TRANSFORMATION_UID = "$$fluss_tiering_source_operator$$"; @@ -81,15 +83,15 @@ public Boundedness getBoundedness() { } @Override - public SplitEnumerator createEnumerator( - SplitEnumeratorContext splitEnumeratorContext) { + public SplitEnumerator createEnumerator( + SplitEnumeratorContext splitEnumeratorContext) { return new TieringSourceEnumerator( flussConf, splitEnumeratorContext, pollTieringTableIntervalMs); } @Override - public SplitEnumerator restoreEnumerator( - SplitEnumeratorContext splitEnumeratorContext, + public SplitEnumerator restoreEnumerator( + SplitEnumeratorContext splitEnumeratorContext, TieringSourceEnumeratorState tieringSourceEnumeratorState) { // stateless operator return new TieringSourceEnumerator( @@ -97,7 +99,7 @@ public SplitEnumerator restoreEnumer } @Override - public SimpleVersionedSerializer getSplitSerializer() { + public SimpleVersionedSerializer getSplitSerializer() { return TieringSplitSerializer.INSTANCE; } @@ -108,7 +110,7 @@ public SimpleVersionedSerializer getSplitSerializer() { } @Override - public SourceReader, TieringSplit> createReader( + public SourceReader, FlinkTieringSplit> createReader( SourceReaderContext sourceReaderContext) { FutureCompletingBlockingQueue>> elementsQueue = new FutureCompletingBlockingQueue<>(); diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSourceFetcherManager.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSourceFetcherManager.java index ac72aad664..fb19afbe24 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSourceFetcherManager.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSourceFetcherManager.java @@ -19,7 +19,7 @@ package org.apache.fluss.flink.tiering.source; import org.apache.fluss.flink.adapter.SingleThreadFetcherManagerAdapter; -import org.apache.fluss.flink.tiering.source.split.TieringSplit; +import org.apache.fluss.flink.tiering.source.split.FlinkTieringSplit; import org.apache.flink.configuration.Configuration; import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; @@ -40,14 +40,14 @@ */ public class TieringSourceFetcherManager extends SingleThreadFetcherManagerAdapter< - TableBucketWriteResult, TieringSplit> { + TableBucketWriteResult, FlinkTieringSplit> { private static final Logger LOG = LoggerFactory.getLogger(TieringSourceFetcherManager.class); public TieringSourceFetcherManager( FutureCompletingBlockingQueue>> elementsQueue, - Supplier, TieringSplit>> + Supplier, FlinkTieringSplit>> splitReaderSupplier, Configuration configuration, Consumer> splitFinishedHook) { @@ -64,7 +64,7 @@ public void markTableReachTieringMaxDuration(long tableId) { enqueueMarkTableReachTieringMaxDurationTask( splitFetcher, tableId)); } else { - SplitFetcher, TieringSplit> splitFetcher = + SplitFetcher, FlinkTieringSplit> splitFetcher = createSplitFetcher(); LOG.info( "fetchers is empty, enqueue marking tiering max duration for table {}", @@ -75,7 +75,7 @@ public void markTableReachTieringMaxDuration(long tableId) { } private void enqueueMarkTableReachTieringMaxDurationTask( - SplitFetcher, TieringSplit> splitFetcher, + SplitFetcher, FlinkTieringSplit> splitFetcher, long reachTieringDeadlineTable) { splitFetcher.enqueueTask( new SplitFetcherTask() { diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSourceReader.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSourceReader.java index 3e2fa13fd5..83af8a05e4 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSourceReader.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSourceReader.java @@ -23,7 +23,7 @@ import org.apache.fluss.flink.adapter.SingleThreadMultiplexSourceReaderBaseAdapter; import org.apache.fluss.flink.tiering.event.TieringReachMaxDurationEvent; import org.apache.fluss.flink.tiering.source.metrics.TieringMetrics; -import org.apache.fluss.flink.tiering.source.split.TieringSplit; +import org.apache.fluss.flink.tiering.source.split.FlinkTieringSplit; import org.apache.fluss.flink.tiering.source.state.TieringSplitState; import org.apache.fluss.lake.writer.LakeTieringFactory; @@ -48,7 +48,7 @@ public final class TieringSourceReader extends SingleThreadMultiplexSourceReaderBaseAdapter< TableBucketWriteResult, TableBucketWriteResult, - TieringSplit, + FlinkTieringSplit, TieringSplitState> { private static final Logger LOG = LoggerFactory.getLogger(TieringSourceReader.class); @@ -118,13 +118,13 @@ protected void onSplitFinished(Map finishedSplitIds) } @Override - public List snapshotState(long checkpointId) { + public List snapshotState(long checkpointId) { // we return empty list to make source reader be stateless return Collections.emptyList(); } @Override - protected TieringSplitState initializedState(TieringSplit split) { + protected TieringSplitState initializedState(FlinkTieringSplit split) { if (split.isTieringSnapshotSplit()) { return new TieringSplitState(split); } else if (split.isTieringLogSplit()) { @@ -135,7 +135,7 @@ protected TieringSplitState initializedState(TieringSplit split) { } @Override - protected TieringSplit toSplitType(String splitId, TieringSplitState splitState) { + protected FlinkTieringSplit toSplitType(String splitId, TieringSplitState splitState) { return splitState.toSourceSplit(); } diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java index 960c4db7e5..6b1aef1c79 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java @@ -25,12 +25,13 @@ import org.apache.fluss.client.table.scanner.log.LogScanner; import org.apache.fluss.client.table.scanner.log.LogScannerImpl; import org.apache.fluss.client.table.scanner.log.ScanRecords; +import org.apache.fluss.client.tiering.TieringLogSplit; +import org.apache.fluss.client.tiering.TieringSnapshotSplit; +import org.apache.fluss.client.tiering.TieringSplit; import org.apache.fluss.flink.source.reader.BoundedSplitReader; import org.apache.fluss.flink.source.reader.RecordAndPos; import org.apache.fluss.flink.tiering.source.metrics.TieringMetrics; -import org.apache.fluss.flink.tiering.source.split.TieringLogSplit; -import org.apache.fluss.flink.tiering.source.split.TieringSnapshotSplit; -import org.apache.fluss.flink.tiering.source.split.TieringSplit; +import org.apache.fluss.flink.tiering.source.split.FlinkTieringSplit; import org.apache.fluss.lake.batch.ArrowRecordBatch; import org.apache.fluss.lake.writer.LakeTieringFactory; import org.apache.fluss.lake.writer.LakeWriter; @@ -72,7 +73,7 @@ /** The {@link SplitReader} implementation which will read Fluss and write to lake. */ public class TieringSplitReader - implements SplitReader, TieringSplit> { + implements SplitReader, FlinkTieringSplit> { private static final Logger LOG = LoggerFactory.getLogger(TieringSplitReader.class); @@ -210,14 +211,15 @@ public RecordsWithSplitIds> fetch() throws I } @Override - public void handleSplitsChanges(SplitsChange splitsChange) { + public void handleSplitsChanges(SplitsChange splitsChange) { if (!(splitsChange instanceof SplitsAddition)) { throw new UnsupportedOperationException( String.format( "The SplitChange type of %s is not supported.", splitsChange.getClass())); } - for (TieringSplit split : splitsChange.splits()) { + for (FlinkTieringSplit flinkSplit : splitsChange.splits()) { + TieringSplit split = flinkSplit.unwrap(); LOG.info("add split {}", split.splitId()); if (split.shouldSkipCurrentRound()) { // if the split is forced to ignore, diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java index 5545e9a50f..82e2954047 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java @@ -22,12 +22,13 @@ import org.apache.fluss.client.ConnectionFactory; import org.apache.fluss.client.admin.Admin; import org.apache.fluss.client.metadata.MetadataUpdater; +import org.apache.fluss.client.tiering.TieringSplit; import org.apache.fluss.config.Configuration; import org.apache.fluss.flink.metrics.FlinkMetricRegistry; import org.apache.fluss.flink.tiering.event.FailedTieringEvent; import org.apache.fluss.flink.tiering.event.FinishedTieringEvent; import org.apache.fluss.flink.tiering.event.TieringReachMaxDurationEvent; -import org.apache.fluss.flink.tiering.source.split.TieringSplit; +import org.apache.fluss.flink.tiering.source.split.FlinkTieringSplit; import org.apache.fluss.flink.tiering.source.split.TieringSplitGenerator; import org.apache.fluss.flink.tiering.source.state.TieringSourceEnumeratorState; import org.apache.fluss.lake.committer.TieringStats; @@ -93,16 +94,16 @@ * */ public class TieringSourceEnumerator - implements SplitEnumerator { + implements SplitEnumerator { private static final Logger LOG = LoggerFactory.getLogger(TieringSourceEnumerator.class); private final Configuration flussConf; - private final SplitEnumeratorContext context; + private final SplitEnumeratorContext context; private final ScheduledExecutorService timerService; private final SplitEnumeratorMetricGroup enumeratorMetricGroup; private final long pollTieringTableIntervalMs; - private final List pendingSplits; + private final List pendingSplits; private final Set readersAwaitingSplit; private final Map tieringTableEpochs; @@ -124,7 +125,7 @@ public class TieringSourceEnumerator public TieringSourceEnumerator( Configuration flussConf, - SplitEnumeratorContext context, + SplitEnumeratorContext context, long pollTieringTableIntervalMs) { this.flussConf = flussConf; this.context = context; @@ -205,7 +206,7 @@ public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname } @Override - public void addSplitsBack(List splits, int subtaskId) { + public void addSplitsBack(List splits, int subtaskId) { readersAwaitingSplit.add(subtaskId); pendingSplits.addAll(splits); assignSplits(); @@ -333,7 +334,7 @@ protected void handleTableTieringReachMaxDuration( LOG.info("Table {}-{} reached max duration. Force completing.", tablePath, tableId); tieringReachMaxDurationsTables.add(tableId); - for (TieringSplit tieringSplit : pendingSplits) { + for (FlinkTieringSplit tieringSplit : pendingSplits) { if (tieringSplit.getTableBucket().getTableId() == tableId) { // mark this tiering split to skip the current round since the tiering for // this table has timed out, so the tiering source reader can skip them directly @@ -378,7 +379,7 @@ private void assignSplits() { continue; } if (!pendingSplits.isEmpty()) { - TieringSplit tieringSplit = pendingSplits.remove(0); + FlinkTieringSplit tieringSplit = pendingSplits.remove(0); context.assignSplit(tieringSplit, nextAwaitingReader); LOG.info("Assigning split {} to readers {}", tieringSplit, nextAwaitingReader); readersAwaitingSplit.remove(nextAwaitingReader); @@ -453,20 +454,20 @@ private void generateTieringSplits(Tuple3 tieringTable) // shuffle tiering split to avoid splits tiering skew // after introduce tiering max duration Collections.shuffle(tieringSplits); - tieringSplits = populateTieringRoundMetadata(tieringSplits); + List flinkSplits = populateTieringRoundMetadata(tieringSplits); LOG.info( "Generate Tiering {} splits for table {} with cost {}ms.", - tieringSplits.size(), + flinkSplits.size(), tieringTable.f2, System.currentTimeMillis() - start); - if (tieringSplits.isEmpty()) { + if (flinkSplits.isEmpty()) { LOG.info( "Generate Tiering splits for table {} is empty, no need to tier data.", tieringTable.f2.getTableName()); finishedTables.put(tieringTable.f0, TieringFinishInfo.from(tieringTable.f1)); } else { tieringTableEpochs.put(tieringTable.f0, tieringTable.f1); - pendingSplits.addAll(tieringSplits); + pendingSplits.addAll(flinkSplits); timerService.schedule( () -> @@ -487,18 +488,19 @@ private void generateTieringSplits(Tuple3 tieringTable) } } - private List populateTieringRoundMetadata(List tieringSplits) { + private List populateTieringRoundMetadata(List tieringSplits) { int numberOfSplits = tieringSplits.size(); if (numberOfSplits == 0) { return Collections.emptyList(); } long tieringRoundTimestamp = System.currentTimeMillis(); - List splitsWithMetadata = new ArrayList<>(numberOfSplits); + List splitsWithMetadata = new ArrayList<>(numberOfSplits); for (int splitIndex = 0; splitIndex < numberOfSplits; splitIndex++) { splitsWithMetadata.add( - tieringSplits - .get(splitIndex) - .copy(numberOfSplits, splitIndex, tieringRoundTimestamp)); + new FlinkTieringSplit( + tieringSplits + .get(splitIndex) + .copy(numberOfSplits, splitIndex, tieringRoundTimestamp))); } return splitsWithMetadata; } diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/split/FlinkTieringSplit.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/split/FlinkTieringSplit.java new file mode 100644 index 0000000000..ddf7d63f29 --- /dev/null +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/split/FlinkTieringSplit.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.tiering.source.split; + +import org.apache.fluss.client.tiering.TieringLogSplit; +import org.apache.fluss.client.tiering.TieringSnapshotSplit; +import org.apache.fluss.client.tiering.TieringSplit; +import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.metadata.TablePath; + +import org.apache.flink.api.connector.source.SourceSplit; + +import javax.annotation.Nullable; + +import java.util.Objects; + +/** + * A Flink-specific wrapper for {@link TieringSplit} that implements Flink's {@link SourceSplit} + * interface. This adapter allows the engine-agnostic {@link TieringSplit} to be used within Flink's + * source framework. + */ +public class FlinkTieringSplit implements SourceSplit { + + private final TieringSplit tieringSplit; + + public FlinkTieringSplit(TieringSplit tieringSplit) { + this.tieringSplit = tieringSplit; + } + + @Override + public String splitId() { + return tieringSplit.splitId(); + } + + /** Returns the underlying engine-agnostic {@link TieringSplit}. */ + public TieringSplit unwrap() { + return tieringSplit; + } + + /** Checks whether this split is a primary key table split to tier. */ + public boolean isTieringSnapshotSplit() { + return tieringSplit.isTieringSnapshotSplit(); + } + + /** Checks whether this split is a log split to tier. */ + public boolean isTieringLogSplit() { + return tieringSplit.isTieringLogSplit(); + } + + /** Casts the underlying split into a {@link TieringSnapshotSplit}. */ + public TieringSnapshotSplit asTieringSnapshotSplit() { + return tieringSplit.asTieringSnapshotSplit(); + } + + /** Casts the underlying split into a {@link TieringLogSplit}. */ + public TieringLogSplit asTieringLogSplit() { + return tieringSplit.asTieringLogSplit(); + } + + /** + * Marks this split to skip reading data in the current round. Once called, the split will not + * be processed and data reading will be skipped. + */ + public void skipCurrentRound() { + tieringSplit.skipCurrentRound(); + } + + /** + * Returns whether this split should skip tiering data in the current round of tiering. + * + * @return true if the split should skip tiering data, false otherwise + */ + public boolean shouldSkipCurrentRound() { + return tieringSplit.shouldSkipCurrentRound(); + } + + public byte splitKind() { + return tieringSplit.splitKind(); + } + + public int getNumberOfSplits() { + return tieringSplit.getNumberOfSplits(); + } + + public int getSplitIndex() { + return tieringSplit.getSplitIndex(); + } + + public boolean isFirstSplit() { + return tieringSplit.isFirstSplit(); + } + + public long getTieringRoundTimestamp() { + return tieringSplit.getTieringRoundTimestamp(); + } + + public TablePath getTablePath() { + return tieringSplit.getTablePath(); + } + + public TableBucket getTableBucket() { + return tieringSplit.getTableBucket(); + } + + @Nullable + public String getPartitionName() { + return tieringSplit.getPartitionName(); + } + + public FlinkTieringSplit copy(int numberOfSplits) { + return new FlinkTieringSplit(tieringSplit.copy(numberOfSplits)); + } + + public FlinkTieringSplit copy(int numberOfSplits, int splitIndex, long tieringRoundTimestamp) { + return new FlinkTieringSplit( + tieringSplit.copy(numberOfSplits, splitIndex, tieringRoundTimestamp)); + } + + @Override + public boolean equals(Object object) { + if (!(object instanceof FlinkTieringSplit)) { + return false; + } + FlinkTieringSplit that = (FlinkTieringSplit) object; + return Objects.equals(tieringSplit, that.tieringSplit); + } + + @Override + public int hashCode() { + return Objects.hash(tieringSplit); + } + + @Override + public String toString() { + return "FlinkTieringSplit{" + tieringSplit + '}'; + } +} diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/split/TieringSplitGenerator.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/split/TieringSplitGenerator.java index 1bb94a7a92..9b9a88a17b 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/split/TieringSplitGenerator.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/split/TieringSplitGenerator.java @@ -22,6 +22,9 @@ import org.apache.fluss.client.initializer.OffsetsInitializer.BucketOffsetsRetriever; import org.apache.fluss.client.metadata.KvSnapshots; import org.apache.fluss.client.metadata.LakeSnapshot; +import org.apache.fluss.client.tiering.TieringLogSplit; +import org.apache.fluss.client.tiering.TieringSnapshotSplit; +import org.apache.fluss.client.tiering.TieringSplit; import org.apache.fluss.exception.LakeTableSnapshotNotExistException; import org.apache.fluss.metadata.PartitionInfo; import org.apache.fluss.metadata.TableBucket; diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/split/TieringSplitSerializer.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/split/TieringSplitSerializer.java index e336ee4670..f35a78e540 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/split/TieringSplitSerializer.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/split/TieringSplitSerializer.java @@ -17,6 +17,9 @@ package org.apache.fluss.flink.tiering.source.split; +import org.apache.fluss.client.tiering.TieringLogSplit; +import org.apache.fluss.client.tiering.TieringSnapshotSplit; +import org.apache.fluss.client.tiering.TieringSplit; import org.apache.fluss.flink.tiering.source.TieringSource; import org.apache.fluss.flink.tiering.source.enumerator.TieringSourceEnumerator; import org.apache.fluss.metadata.TableBucket; @@ -29,13 +32,13 @@ import java.io.IOException; /** - * A serializer for the {@link TieringSplit}. + * A serializer for the {@link FlinkTieringSplit}. * *

This serializer is only used to serialize and deserialize splits sent from {@link * TieringSourceEnumerator} to {@link TieringSource} for network transmission. Therefore, it does * not need to consider compatibility. */ -public class TieringSplitSerializer implements SimpleVersionedSerializer { +public class TieringSplitSerializer implements SimpleVersionedSerializer { public static final TieringSplitSerializer INSTANCE = new TieringSplitSerializer(); @@ -55,8 +58,9 @@ public int getVersion() { } @Override - public byte[] serialize(TieringSplit split) throws IOException { + public byte[] serialize(FlinkTieringSplit flinkSplit) throws IOException { final DataOutputSerializer out = SERIALIZER_CACHE.get(); + TieringSplit split = flinkSplit.unwrap(); byte splitKind = split.splitKind(); out.writeByte(splitKind); @@ -111,7 +115,7 @@ public byte[] serialize(TieringSplit split) throws IOException { } @Override - public TieringSplit deserialize(int version, byte[] serialized) throws IOException { + public FlinkTieringSplit deserialize(int version, byte[] serialized) throws IOException { if (version != VERSION_0) { throw new IOException("Unknown version " + version); } @@ -145,36 +149,40 @@ public TieringSplit deserialize(int version, byte[] serialized) throws IOExcepti int splitIndex = in.readInt(); long tieringRoundTimestamp = in.readLong(); + TieringSplit tieringSplit; if (splitKind == TIERING_SNAPSHOT_SPLIT_FLAG) { // deserialize snapshot id long snapshotId = in.readLong(); // deserialize log offset of snapshot long logOffsetOfSnapshot = in.readLong(); - return new TieringSnapshotSplit( - tablePath, - tableBucket, - partitionName, - snapshotId, - logOffsetOfSnapshot, - numberOfSplits, - skipCurrentRound, - splitIndex, - tieringRoundTimestamp); + tieringSplit = + new TieringSnapshotSplit( + tablePath, + tableBucket, + partitionName, + snapshotId, + logOffsetOfSnapshot, + numberOfSplits, + skipCurrentRound, + splitIndex, + tieringRoundTimestamp); } else { // deserialize starting offset long startingOffset = in.readLong(); // deserialize starting offset long stoppingOffset = in.readLong(); - return new TieringLogSplit( - tablePath, - tableBucket, - partitionName, - startingOffset, - stoppingOffset, - numberOfSplits, - skipCurrentRound, - splitIndex, - tieringRoundTimestamp); + tieringSplit = + new TieringLogSplit( + tablePath, + tableBucket, + partitionName, + startingOffset, + stoppingOffset, + numberOfSplits, + skipCurrentRound, + splitIndex, + tieringRoundTimestamp); } + return new FlinkTieringSplit(tieringSplit); } } diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/state/TieringSplitState.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/state/TieringSplitState.java index 0690da6286..dec70219ed 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/state/TieringSplitState.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/state/TieringSplitState.java @@ -17,12 +17,13 @@ package org.apache.fluss.flink.tiering.source.state; -import org.apache.fluss.flink.tiering.source.split.TieringLogSplit; -import org.apache.fluss.flink.tiering.source.split.TieringSnapshotSplit; -import org.apache.fluss.flink.tiering.source.split.TieringSplit; +import org.apache.fluss.client.tiering.TieringLogSplit; +import org.apache.fluss.client.tiering.TieringSnapshotSplit; +import org.apache.fluss.client.tiering.TieringSplit; +import org.apache.fluss.flink.tiering.source.split.FlinkTieringSplit; /** - * The state of a {@link TieringSplit}. + * The state of a {@link FlinkTieringSplit}. * *

Note: The tiering service adopts a stateless design and does not store any progress * information in state during checkpoints. All splits are re-requested from the Fluss cluster in @@ -30,37 +31,40 @@ */ public class TieringSplitState { - protected final TieringSplit tieringSplit; + protected final FlinkTieringSplit flinkTieringSplit; - public TieringSplitState(TieringSplit tieringSplit) { - this.tieringSplit = tieringSplit; + public TieringSplitState(FlinkTieringSplit flinkTieringSplit) { + this.flinkTieringSplit = flinkTieringSplit; } - public TieringSplit toSourceSplit() { + public FlinkTieringSplit toSourceSplit() { + TieringSplit tieringSplit = flinkTieringSplit.unwrap(); if (tieringSplit.isTieringSnapshotSplit()) { - final TieringSnapshotSplit split = (TieringSnapshotSplit) this.tieringSplit; - return new TieringSnapshotSplit( - split.getTablePath(), - split.getTableBucket(), - split.getPartitionName(), - split.getSnapshotId(), - split.getLogOffsetOfSnapshot(), - split.getNumberOfSplits(), - split.shouldSkipCurrentRound(), - split.getSplitIndex(), - split.getTieringRoundTimestamp()); + final TieringSnapshotSplit split = tieringSplit.asTieringSnapshotSplit(); + return new FlinkTieringSplit( + new TieringSnapshotSplit( + split.getTablePath(), + split.getTableBucket(), + split.getPartitionName(), + split.getSnapshotId(), + split.getLogOffsetOfSnapshot(), + split.getNumberOfSplits(), + split.shouldSkipCurrentRound(), + split.getSplitIndex(), + split.getTieringRoundTimestamp())); } else { - final TieringLogSplit split = (TieringLogSplit) tieringSplit; - return new TieringLogSplit( - split.getTablePath(), - split.getTableBucket(), - split.getPartitionName(), - split.getStartingOffset(), - split.getStoppingOffset(), - split.getNumberOfSplits(), - split.shouldSkipCurrentRound(), - split.getSplitIndex(), - split.getTieringRoundTimestamp()); + final TieringLogSplit split = tieringSplit.asTieringLogSplit(); + return new FlinkTieringSplit( + new TieringLogSplit( + split.getTablePath(), + split.getTableBucket(), + split.getPartitionName(), + split.getStartingOffset(), + split.getStoppingOffset(), + split.getNumberOfSplits(), + split.shouldSkipCurrentRound(), + split.getSplitIndex(), + split.getTieringRoundTimestamp())); } } } diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/TieringSourceReaderTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/TieringSourceReaderTest.java index 9e9de2c792..369722ab8e 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/TieringSourceReaderTest.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/TieringSourceReaderTest.java @@ -20,12 +20,13 @@ import org.apache.fluss.client.Connection; import org.apache.fluss.client.ConnectionFactory; +import org.apache.fluss.client.tiering.TieringLogSplit; import org.apache.fluss.config.ConfigOptions; import org.apache.fluss.config.Configuration; import org.apache.fluss.flink.tiering.TestingLakeTieringFactory; import org.apache.fluss.flink.tiering.TestingWriteResult; import org.apache.fluss.flink.tiering.event.TieringReachMaxDurationEvent; -import org.apache.fluss.flink.tiering.source.split.TieringLogSplit; +import org.apache.fluss.flink.tiering.source.split.FlinkTieringSplit; import org.apache.fluss.flink.utils.FlinkTestBase; import org.apache.fluss.metadata.TableBucket; import org.apache.fluss.metadata.TablePath; @@ -78,7 +79,7 @@ void testHandleTieringReachMaxDurationEvent() throws Exception { TieringLogSplit split = new TieringLogSplit( tablePath, new TableBucket(tableId, 0), null, EARLIEST_OFFSET, 100); - reader.addSplits(Collections.singletonList(split)); + reader.addSplits(Collections.singletonList(new FlinkTieringSplit(split))); // send TieringReachMaxDurationEvent TieringReachMaxDurationEvent event = new TieringReachMaxDurationEvent(tableId); @@ -113,7 +114,7 @@ void testHandleTieringReachMaxDurationEvent() throws Exception { // tiering won't be finished if no tiering reach max duration logic 100L); - reader.addSplits(Collections.singletonList(split)); + reader.addSplits(Collections.singletonList(new FlinkTieringSplit(split))); // wait to run one round of tiering to do some tiering FutureCompletingBlockingQueue< @@ -157,7 +158,7 @@ void testHandleTieringReachMaxDurationEvent() throws Exception { EARLIEST_OFFSET, 100L); split.skipCurrentRound(); - reader.addSplits(Collections.singletonList(split)); + reader.addSplits(Collections.singletonList(new FlinkTieringSplit(split))); // should skip tiering for this split retry( Duration.ofMinutes(1), diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/TieringSplitReaderTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/TieringSplitReaderTest.java index 91c65ffc7e..700138f63b 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/TieringSplitReaderTest.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/TieringSplitReaderTest.java @@ -23,14 +23,14 @@ import org.apache.fluss.client.table.writer.AppendWriter; import org.apache.fluss.client.table.writer.TableWriter; import org.apache.fluss.client.table.writer.UpsertWriter; +import org.apache.fluss.client.tiering.TieringLogSplit; +import org.apache.fluss.client.tiering.TieringSnapshotSplit; import org.apache.fluss.client.write.HashBucketAssigner; import org.apache.fluss.config.ConfigOptions; import org.apache.fluss.flink.tiering.TestingLakeTieringFactory; import org.apache.fluss.flink.tiering.TestingWriteResult; import org.apache.fluss.flink.tiering.source.metrics.TieringMetrics; -import org.apache.fluss.flink.tiering.source.split.TieringLogSplit; -import org.apache.fluss.flink.tiering.source.split.TieringSnapshotSplit; -import org.apache.fluss.flink.tiering.source.split.TieringSplit; +import org.apache.fluss.flink.tiering.source.split.FlinkTieringSplit; import org.apache.fluss.flink.utils.FlinkTestBase; import org.apache.fluss.lake.writer.LakeWriter; import org.apache.fluss.lake.writer.WriterInitContext; @@ -79,7 +79,7 @@ void testTieringTable() throws Exception { TieringSplitReader tieringSplitReader = createTieringReader(connection)) { // test empty splits - SplitsAddition splitsAddition = + SplitsAddition splitsAddition = new SplitsAddition<>( Arrays.asList( createLogSplit(tablePath, tableId, 0, EARLIEST_OFFSET, 0), @@ -146,14 +146,14 @@ void testTieringTable() throws Exception { Map> secondRows = putRows(tableId, tablePath, 10); Map expectedRowCount = new HashMap<>(); Set expectFinishTieringSplits = new HashSet<>(); - List logSplits = new ArrayList<>(); + List logSplits = new ArrayList<>(); for (int bucket = 0; bucket < 3; bucket++) { TableBucket tableBucket = new TableBucket(tableId, bucket); long startingOffset = firstRows.get(tableBucket).size(); // -U, +U long stoppingOffset = startingOffset + secondRows.get(tableBucket).size() * 2L; expectedRowCount.put(tableBucket, secondRows.get(tableBucket).size() * 2); - TieringLogSplit tieringLogSplit = + FlinkTieringSplit tieringLogSplit = createLogSplit(tablePath, tableId, bucket, startingOffset, stoppingOffset); logSplits.add(tieringLogSplit); expectFinishTieringSplits.add(tieringLogSplit.splitId()); @@ -182,14 +182,14 @@ void testTieringMixTables() throws Exception { FLUSS_CLUSTER_EXTENSION.triggerAndWaitSnapshot(tablePath1); // first add snapshot split of bucket 0, bucket 1 of table id 0 - SplitsAddition splitsAddition = + SplitsAddition splitsAddition = new SplitsAddition<>( Arrays.asList( createSnapshotSplit(tablePath0, tableId0, 0, 0), createSnapshotSplit(tablePath0, tableId0, 1, 0))); Set table0Splits = splitsAddition.splits().stream() - .map(TieringSplit::splitId) + .map(FlinkTieringSplit::splitId) .collect(Collectors.toSet()); tieringSplitReader.handleSplitsChanges(splitsAddition); @@ -213,7 +213,7 @@ void testTieringMixTables() throws Exception { tieringSplitReader.handleSplitsChanges(splitsAddition); Set table1Splits = splitsAddition.splits().stream() - .map(TieringSplit::splitId) + .map(FlinkTieringSplit::splitId) .collect(Collectors.toSet()); // add bucket2 of table id 0 @@ -228,7 +228,7 @@ void testTieringMixTables() throws Exception { table0Rows.get(new TableBucket(tableId0, 2)).size()))); table0Splits.addAll( splitsAddition.splits().stream() - .map(TieringSplit::splitId) + .map(FlinkTieringSplit::splitId) .collect(Collectors.toSet())); tieringSplitReader.handleSplitsChanges(splitsAddition); @@ -276,7 +276,7 @@ void testTieringMixTables() throws Exception { table2Rows.get(new TableBucket(tableId2, 2)).size()))); Set table2Splits = splitsAddition.splits().stream() - .map(TieringSplit::splitId) + .map(FlinkTieringSplit::splitId) .collect(Collectors.toSet()); tieringSplitReader.handleSplitsChanges(splitsAddition); Map expectedRowCount = @@ -325,7 +325,8 @@ connection, new ThrowOnEmptyCompleteLakeTieringFactory())) { // The custom factory fails if complete() is called on a writer that never received any // record, which captures the regression this test covers. tieringSplitReader.handleSplitsChanges( - new SplitsAddition(Collections.singletonList(tieringLogSplit))); + new SplitsAddition<>( + Collections.singletonList(new FlinkTieringSplit(tieringLogSplit)))); RecordsWithSplitIds> result = tieringSplitReader.fetch(); @@ -368,7 +369,7 @@ void testTieringFirstRowMergeEngineFinishes() throws Exception { } // Build log splits whose stoppingOffset equals the leader's current logEndOffset. - List logSplits = new ArrayList<>(); + List logSplits = new ArrayList<>(); Set splitIds = new HashSet<>(); long totalLogEndOffset = 0L; for (int bucket = 0; bucket < DEFAULT_BUCKET_NUM; bucket++) { @@ -379,7 +380,7 @@ void testTieringFirstRowMergeEngineFinishes() throws Exception { if (stoppingOffset <= 0) { continue; } - TieringLogSplit split = + FlinkTieringSplit split = createLogSplit(tablePath, tableId, bucket, EARLIEST_OFFSET, stoppingOffset); logSplits.add(split); splitIds.add(split.splitId()); @@ -497,20 +498,23 @@ private void verifyTieringRows( } } - private TieringLogSplit createLogSplit( + private FlinkTieringSplit createLogSplit( TablePath tablePath, long tableId, int bucket, long startingOffset, long stoppingOffset) { TableBucket tableBucket = new TableBucket(tableId, bucket); - return new TieringLogSplit(tablePath, tableBucket, null, startingOffset, stoppingOffset, 3); + return new FlinkTieringSplit( + new TieringLogSplit( + tablePath, tableBucket, null, startingOffset, stoppingOffset, 3)); } - private TieringSnapshotSplit createSnapshotSplit( + private FlinkTieringSplit createSnapshotSplit( TablePath tablePath, long tableId, int bucket, long snapshotId) { TableBucket tableBucket = new TableBucket(tableId, bucket); - return new TieringSnapshotSplit(tablePath, tableBucket, null, snapshotId, 10, 3); + return new FlinkTieringSplit( + new TieringSnapshotSplit(tablePath, tableBucket, null, snapshotId, 10, 3)); } private Map> putRows(long tableId, TablePath tablePath, int rows) diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumeratorTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumeratorTest.java index 237e29037f..7fba767646 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumeratorTest.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumeratorTest.java @@ -17,6 +17,9 @@ package org.apache.fluss.flink.tiering.source.enumerator; +import org.apache.fluss.client.tiering.TieringLogSplit; +import org.apache.fluss.client.tiering.TieringSnapshotSplit; +import org.apache.fluss.client.tiering.TieringSplit; import org.apache.fluss.config.ConfigOptions; import org.apache.fluss.config.Configuration; import org.apache.fluss.exception.NetworkException; @@ -24,9 +27,7 @@ import org.apache.fluss.flink.tiering.event.FinishedTieringEvent; import org.apache.fluss.flink.tiering.event.TieringReachMaxDurationEvent; import org.apache.fluss.flink.tiering.source.TieringTestBase; -import org.apache.fluss.flink.tiering.source.split.TieringLogSplit; -import org.apache.fluss.flink.tiering.source.split.TieringSnapshotSplit; -import org.apache.fluss.flink.tiering.source.split.TieringSplit; +import org.apache.fluss.flink.tiering.source.split.FlinkTieringSplit; import org.apache.fluss.flink.tiering.source.split.TieringSplitGenerator; import org.apache.fluss.metadata.TableBucket; import org.apache.fluss.metadata.TableChange; @@ -83,7 +84,7 @@ void testPrimaryKeyTableWithNoSnapshotSplits() throws Throwable { int numSubtasks = 4; int expectNumberOfSplits = 3; // test get snapshot split & log split and the assignment - try (FlussMockSplitEnumeratorContext context = + try (FlussMockSplitEnumeratorContext context = new FlussMockSplitEnumeratorContext<>(numSubtasks)) { TieringSourceEnumerator enumerator = createTieringSourceEnumerator(flussConf, context); @@ -96,7 +97,7 @@ void testPrimaryKeyTableWithNoSnapshotSplits() throws Throwable { // try to assign splits context.runPeriodicCallable(0); - List actualAssignment = new ArrayList<>(); + List actualAssignment = new ArrayList<>(); context.getSplitsAssignmentSequence() .forEach(a -> a.assignment().values().forEach(actualAssignment::addAll)); @@ -140,7 +141,7 @@ void testPrimaryKeyTableWithNoSnapshotSplits() throws Throwable { + bucketOffsetOfSecondWrite.get(tableBucket), expectNumberOfSplits)); } - List actualLogAssignment = new ArrayList<>(); + List actualLogAssignment = new ArrayList<>(); context.getSplitsAssignmentSequence() .forEach(a -> a.assignment().values().forEach(actualLogAssignment::addAll)); assertTieringSplitsMatch(actualLogAssignment, expectedLogAssignment); @@ -160,7 +161,7 @@ void testPrimaryKeyTableWithSnapshotSplits() throws Throwable { int expectNumberOfSplits = 3; // test get snapshot split assignment - try (FlussMockSplitEnumeratorContext context = + try (FlussMockSplitEnumeratorContext context = new FlussMockSplitEnumeratorContext<>(numSubtasks)) { TieringSourceEnumerator enumerator = createTieringSourceEnumerator(flussConf, context); @@ -182,7 +183,7 @@ void testPrimaryKeyTableWithSnapshotSplits() throws Throwable { bucketOffsetOfInitialWrite.get(tableBucket), expectNumberOfSplits)); } - List actualAssignment = new ArrayList<>(); + List actualAssignment = new ArrayList<>(); context.getSplitsAssignmentSequence() .forEach(a -> a.assignment().values().forEach(actualAssignment::addAll)); assertTieringSplitsMatch(actualAssignment, expectedSnapshotAssignment); @@ -226,7 +227,7 @@ void testPrimaryKeyTableWithSnapshotSplits() throws Throwable { + bucketOffsetOfSecondWrite.get(tableBucket), expectNumberOfSplits)); } - List actualLogAssignment = new ArrayList<>(); + List actualLogAssignment = new ArrayList<>(); context.getSplitsAssignmentSequence() .forEach(a -> a.assignment().values().forEach(actualLogAssignment::addAll)); assertTieringSplitsMatch(actualLogAssignment, expectedLogAssignment); @@ -240,7 +241,7 @@ void testLogTableSplits() throws Throwable { int numSubtasks = 4; int expectNumberOfSplits = 3; // test get log split and the assignment - try (FlussMockSplitEnumeratorContext context = + try (FlussMockSplitEnumeratorContext context = new FlussMockSplitEnumeratorContext<>(numSubtasks)) { TieringSourceEnumerator enumerator = createTieringSourceEnumerator(flussConf, context); @@ -267,7 +268,7 @@ void testLogTableSplits() throws Throwable { bucketOffsetOfFirstWrite.get(bucketId), bucketOffsetOfFirstWrite.size())); } - List actualAssignment = new ArrayList<>(); + List actualAssignment = new ArrayList<>(); context.getSplitsAssignmentSequence() .forEach(a -> a.assignment().values().forEach(actualAssignment::addAll)); @@ -311,7 +312,7 @@ void testLogTableSplits() throws Throwable { + bucketOffsetOfSecondWrite.get(tableBucket), expectNumberOfSplits)); } - List actualLogAssignment = new ArrayList<>(); + List actualLogAssignment = new ArrayList<>(); context.getSplitsAssignmentSequence() .forEach(a -> a.assignment().values().forEach(actualLogAssignment::addAll)); assertTieringSplitsMatch(actualLogAssignment, expectedLogAssignment); @@ -330,7 +331,7 @@ void testPartitionedPrimaryKeyTable() throws Throwable { int numSubtasks = 6; int expectNumberOfSplits = 6; // test get snapshot split assignment - try (FlussMockSplitEnumeratorContext context = + try (FlussMockSplitEnumeratorContext context = new FlussMockSplitEnumeratorContext<>(numSubtasks)) { TieringSourceEnumerator enumerator = createTieringSourceEnumerator(flussConf, context); @@ -343,8 +344,8 @@ void testPartitionedPrimaryKeyTable() throws Throwable { // try to assign splits context.runPeriodicCallable(0); - List actualSnapshotAssignment = new ArrayList<>(); - for (SplitsAssignment splitsAssignment : + List actualSnapshotAssignment = new ArrayList<>(); + for (SplitsAssignment splitsAssignment : context.getSplitsAssignmentSequence()) { splitsAssignment.assignment().values().forEach(actualSnapshotAssignment::addAll); } @@ -408,8 +409,8 @@ void testPartitionedPrimaryKeyTable() throws Throwable { expectNumberOfSplits)); } } - List actualLogAssignment = new ArrayList<>(); - for (SplitsAssignment splitsAssignment : + List actualLogAssignment = new ArrayList<>(); + for (SplitsAssignment splitsAssignment : context.getSplitsAssignmentSequence()) { splitsAssignment.assignment().values().forEach(actualLogAssignment::addAll); } @@ -429,7 +430,7 @@ void testPartitionedLogTableSplits() throws Throwable { int numSubtasks = 6; int expectNumberOfSplits = 6; // test get log split assignment - try (FlussMockSplitEnumeratorContext context = + try (FlussMockSplitEnumeratorContext context = new FlussMockSplitEnumeratorContext<>(numSubtasks)) { TieringSourceEnumerator enumerator = createTieringSourceEnumerator(flussConf, context); @@ -463,8 +464,8 @@ void testPartitionedLogTableSplits() throws Throwable { bucketOffsetOfFirstWrite.size())); } } - List actualAssignment = new ArrayList<>(); - for (SplitsAssignment splitsAssignment : + List actualAssignment = new ArrayList<>(); + for (SplitsAssignment splitsAssignment : context.getSplitsAssignmentSequence()) { splitsAssignment.assignment().values().forEach(actualAssignment::addAll); } @@ -530,8 +531,8 @@ void testPartitionedLogTableSplits() throws Throwable { expectNumberOfSplits)); } } - List actualLogAssignment = new ArrayList<>(); - for (SplitsAssignment splitsAssignment : + List actualLogAssignment = new ArrayList<>(); + for (SplitsAssignment splitsAssignment : context.getSplitsAssignmentSequence()) { splitsAssignment.assignment().values().forEach(actualLogAssignment::addAll); } @@ -548,7 +549,7 @@ void testHandleFailedTieringTableEvent() throws Throwable { Map bucketOffsetOfWrite = appendRow(tablePath, DEFAULT_LOG_TABLE_DESCRIPTOR, 0, 10); // test get log split and the assignment - try (FlussMockSplitEnumeratorContext context = + try (FlussMockSplitEnumeratorContext context = new FlussMockSplitEnumeratorContext<>(numSubtasks)) { TieringSourceEnumerator enumerator = createTieringSourceEnumerator(flussConf, context); @@ -570,7 +571,7 @@ void testHandleFailedTieringTableEvent() throws Throwable { bucketOffsetOfWrite.get(bucketId), expectNumberOfSplits)); } - List actualAssignment = new ArrayList<>(); + List actualAssignment = new ArrayList<>(); context.getSplitsAssignmentSequence() .forEach(a -> a.assignment().values().forEach(actualAssignment::addAll)); @@ -585,7 +586,7 @@ void testHandleFailedTieringTableEvent() throws Throwable { enumerator.handleSplitRequest(subtaskId, "localhost-" + subtaskId); } waitUntilTieringTableSplitAssignmentReady(context, DEFAULT_BUCKET_NUM, 500L); - List actualAssignment1 = new ArrayList<>(); + List actualAssignment1 = new ArrayList<>(); context.getSplitsAssignmentSequence() .forEach(a -> a.assignment().values().forEach(actualAssignment1::addAll)); assertTieringSplitsMatch(actualAssignment1, expectedAssignment); @@ -602,7 +603,7 @@ void testHandleReaderFailOver() throws Throwable { createTable(tablePath2, DEFAULT_LOG_TABLE_DESCRIPTOR); appendRow(tablePath2, DEFAULT_LOG_TABLE_DESCRIPTOR, 0, 10); - try (FlussMockSplitEnumeratorContext context = + try (FlussMockSplitEnumeratorContext context = new FlussMockSplitEnumeratorContext<>(3)) { TieringSourceEnumerator enumerator = createTieringSourceEnumerator(flussConf, context); @@ -677,16 +678,18 @@ private static CommitLakeTableSnapshotRequest genCommitLakeTableSnapshotRequest( * the regular {@code equals}. */ private static void assertTieringSplitsMatch( - List actualSplits, List expectedSplits) { + List actualSplits, List expectedSplits) { assertValidTieringRound(actualSplits); List normalizedActualSplits = actualSplits.stream() .map( - split -> - split.copy( - split.getNumberOfSplits(), - TieringSplit.UNKNOWN_SPLIT_INDEX, - TieringSplit.UNKNOWN_TIERING_ROUND_TIMESTAMP)) + flinkSplit -> { + TieringSplit split = flinkSplit.unwrap(); + return split.copy( + split.getNumberOfSplits(), + TieringSplit.UNKNOWN_SPLIT_INDEX, + TieringSplit.UNKNOWN_TIERING_ROUND_TIMESTAMP); + }) .collect(Collectors.toList()); assertThat(normalizedActualSplits).containsExactlyInAnyOrderElementsOf(expectedSplits); } @@ -696,11 +699,11 @@ private static void assertTieringSplitsMatch( * 0..size-1} with exactly one first split, every split reports the round size as its number of * splits, and all splits share the same positive tiering round timestamp. */ - private static void assertValidTieringRound(List tieringSplits) { + private static void assertValidTieringRound(List tieringSplits) { assertThat(tieringSplits).isNotEmpty(); - assertThat(tieringSplits).filteredOn(TieringSplit::isFirstSplit).hasSize(1); + assertThat(tieringSplits).filteredOn(FlinkTieringSplit::isFirstSplit).hasSize(1); assertThat(tieringSplits) - .extracting(TieringSplit::getSplitIndex) + .extracting(FlinkTieringSplit::getSplitIndex) .containsExactlyInAnyOrderElementsOf( IntStream.range(0, tieringSplits.size()) .boxed() @@ -717,7 +720,7 @@ private static void assertValidTieringRound(List tieringSplits) { } private void registerReaderAndHandleSplitRequests( - FlussMockSplitEnumeratorContext context, + FlussMockSplitEnumeratorContext context, TieringSourceEnumerator enumerator, int numSubtasks, int attemptNumber) { @@ -728,7 +731,7 @@ private void registerReaderAndHandleSplitRequests( } private void registerSingleReaderAndHandleSplitRequests( - FlussMockSplitEnumeratorContext context, + FlussMockSplitEnumeratorContext context, TieringSourceEnumerator enumerator, int subtaskId, int attemptNumber) { @@ -738,7 +741,7 @@ private void registerSingleReaderAndHandleSplitRequests( } private void waitUntilTieringTableSplitAssignmentReady( - FlussMockSplitEnumeratorContext context, + FlussMockSplitEnumeratorContext context, int expectedSplitsNum, long sleepMs) throws Throwable { @@ -753,15 +756,15 @@ private void waitUntilTieringTableSplitAssignmentReady( } private void verifyTieringSplitAssignment( - FlussMockSplitEnumeratorContext context, + FlussMockSplitEnumeratorContext context, int expectedSplitSize, TablePath expectedTablePath) throws Throwable { waitUntilTieringTableSplitAssignmentReady(context, expectedSplitSize, 200); - List> actualAssignment = + List> actualAssignment = context.getSplitsAssignmentSequence(); - List allTieringSplits = + List allTieringSplits = actualAssignment.stream() .flatMap(assignments -> assignments.assignment().values().stream()) .flatMap(List::stream) @@ -772,13 +775,13 @@ private void verifyTieringSplitAssignment( } private TieringSourceEnumerator createTieringSourceEnumerator( - Configuration flussConf, MockSplitEnumeratorContext context) { + Configuration flussConf, MockSplitEnumeratorContext context) { return new TieringSourceEnumerator(flussConf, context, 500); } @Test void testNetworkErrorInHeartbeatTriggersFailover() throws Exception { - try (FlussMockSplitEnumeratorContext context = + try (FlussMockSplitEnumeratorContext context = new FlussMockSplitEnumeratorContext<>(1)) { TieringSourceEnumerator enumerator = createTieringSourceEnumerator(flussConf, context); FlinkRuntimeException networkError = @@ -796,7 +799,7 @@ void testTableReachMaxTieringDuration() throws Throwable { long tableId = createTable(tablePath, DEFAULT_LOG_TABLE_DESCRIPTOR); int numSubtasks = 2; - try (FlussMockSplitEnumeratorContext context = + try (FlussMockSplitEnumeratorContext context = new FlussMockSplitEnumeratorContext<>(numSubtasks); TieringSourceEnumerator enumerator = createTieringSourceEnumerator(flussConf, context)) { @@ -840,7 +843,7 @@ void testTableReachMaxTieringDuration() throws Throwable { // the split should be marked as skipCurrentRound waitUntilTieringTableSplitAssignmentReady(context, 1, 100L); - List assignedSplits = new ArrayList<>(); + List assignedSplits = new ArrayList<>(); context.getSplitsAssignmentSequence() .forEach(a -> a.assignment().values().forEach(assignedSplits::addAll)); assertThat(assignedSplits).hasSize(1); @@ -886,7 +889,7 @@ void testTableReachMaxTieringDuration() throws Throwable { // Wait for the table to be assigned again waitUntilTieringTableSplitAssignmentReady(context, 2, 500L); - List reassignedSplits = new ArrayList<>(); + List reassignedSplits = new ArrayList<>(); context.getSplitsAssignmentSequence() .forEach(a -> a.assignment().values().forEach(reassignedSplits::addAll)); assertThat(reassignedSplits).hasSize(2); diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/split/TieringSplitSerializerTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/split/TieringSplitSerializerTest.java index f9c0913b90..8fd43ec51f 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/split/TieringSplitSerializerTest.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/split/TieringSplitSerializerTest.java @@ -17,6 +17,8 @@ package org.apache.fluss.flink.tiering.source.split; +import org.apache.fluss.client.tiering.TieringLogSplit; +import org.apache.fluss.client.tiering.TieringSnapshotSplit; import org.apache.fluss.metadata.TableBucket; import org.apache.fluss.metadata.TablePath; @@ -48,9 +50,10 @@ void testTieringSnapshotSplitSerde(Boolean isPartitionedTable) throws Exception TieringSnapshotSplit tieringSplit = new TieringSnapshotSplit(path, bucket, partitionName, 0L, 200L, 10); - byte[] serialized = serializer.serialize(tieringSplit); - TieringSnapshotSplit deserializedSplit = - (TieringSnapshotSplit) serializer.deserialize(serializer.getVersion(), serialized); + byte[] serialized = serializer.serialize(new FlinkTieringSplit(tieringSplit)); + FlinkTieringSplit deserializedFlinkSplit = + serializer.deserialize(serializer.getVersion(), serialized); + TieringSnapshotSplit deserializedSplit = deserializedFlinkSplit.asTieringSnapshotSplit(); assertThat(deserializedSplit).isEqualTo(tieringSplit); } @@ -84,9 +87,10 @@ void testTieringLogSplitSerde(Boolean isPartitionedTable) throws Exception { TieringLogSplit tieringSplit = new TieringLogSplit(path, bucket, partitionName, 100, 200, 40); - byte[] serialized = serializer.serialize(tieringSplit); - TieringLogSplit deserializedSplit = - (TieringLogSplit) serializer.deserialize(serializer.getVersion(), serialized); + byte[] serialized = serializer.serialize(new FlinkTieringSplit(tieringSplit)); + FlinkTieringSplit deserializedFlinkSplit = + serializer.deserialize(serializer.getVersion(), serialized); + TieringLogSplit deserializedSplit = deserializedFlinkSplit.asTieringLogSplit(); assertThat(deserializedSplit).isEqualTo(tieringSplit); } @@ -114,17 +118,20 @@ void testSkipCurrentRoundSerde() throws Exception { // Test TieringSnapshotSplit with skipCurrentRound set at creation TieringSnapshotSplit snapshotSplitWithSkipCurrentRound = new TieringSnapshotSplit(tablePath, tableBucket, null, 0L, 200L, 10, true); - byte[] serialized = serializer.serialize(snapshotSplitWithSkipCurrentRound); + byte[] serialized = + serializer.serialize(new FlinkTieringSplit(snapshotSplitWithSkipCurrentRound)); TieringSnapshotSplit deserializedSnapshotSplit = - (TieringSnapshotSplit) serializer.deserialize(serializer.getVersion(), serialized); + serializer + .deserialize(serializer.getVersion(), serialized) + .asTieringSnapshotSplit(); assertThat(deserializedSnapshotSplit).isEqualTo(snapshotSplitWithSkipCurrentRound); // Test TieringLogSplit with skipCurrentRound set at creation TieringLogSplit logSplitWithSkipCurrentRound = new TieringLogSplit(tablePath, tableBucket, null, 100, 200, 40, true); - serialized = serializer.serialize(logSplitWithSkipCurrentRound); + serialized = serializer.serialize(new FlinkTieringSplit(logSplitWithSkipCurrentRound)); TieringLogSplit deserializedLogSplit = - (TieringLogSplit) serializer.deserialize(serializer.getVersion(), serialized); + serializer.deserialize(serializer.getVersion(), serialized).asTieringLogSplit(); assertThat(deserializedLogSplit).isEqualTo(logSplitWithSkipCurrentRound); // Test TieringSnapshotSplit with skipCurrentRound set after creation @@ -134,9 +141,11 @@ void testSkipCurrentRoundSerde() throws Exception { snapshotSplit.skipCurrentRound(); assertThat(snapshotSplit.shouldSkipCurrentRound()).isTrue(); - serialized = serializer.serialize(snapshotSplit); + serialized = serializer.serialize(new FlinkTieringSplit(snapshotSplit)); deserializedSnapshotSplit = - (TieringSnapshotSplit) serializer.deserialize(serializer.getVersion(), serialized); + serializer + .deserialize(serializer.getVersion(), serialized) + .asTieringSnapshotSplit(); assertThat(deserializedSnapshotSplit).isEqualTo(snapshotSplit); // Test TieringLogSplit with skipCurrentRound set after creation @@ -146,9 +155,9 @@ void testSkipCurrentRoundSerde() throws Exception { logSplit.skipCurrentRound(); assertThat(logSplit.shouldSkipCurrentRound()).isTrue(); - serialized = serializer.serialize(logSplit); + serialized = serializer.serialize(new FlinkTieringSplit(logSplit)); deserializedLogSplit = - (TieringLogSplit) serializer.deserialize(serializer.getVersion(), serialized); + serializer.deserialize(serializer.getVersion(), serialized).asTieringLogSplit(); assertThat(deserializedLogSplit).isEqualTo(logSplit); } @@ -156,18 +165,20 @@ void testSkipCurrentRoundSerde() throws Exception { void testTieringRoundTimestampSerde() throws Exception { TieringSnapshotSplit snapshotSplit = new TieringSnapshotSplit(tablePath, tableBucket, null, 0L, 200L, 10, 0, 1000L); - byte[] serialized = serializer.serialize(snapshotSplit); + byte[] serialized = serializer.serialize(new FlinkTieringSplit(snapshotSplit)); TieringSnapshotSplit deserializedSnapshotSplit = - (TieringSnapshotSplit) serializer.deserialize(serializer.getVersion(), serialized); + serializer + .deserialize(serializer.getVersion(), serialized) + .asTieringSnapshotSplit(); assertThat(deserializedSnapshotSplit.getSplitIndex()).isZero(); assertThat(deserializedSnapshotSplit.isFirstSplit()).isTrue(); assertThat(deserializedSnapshotSplit.getTieringRoundTimestamp()).isEqualTo(1000L); TieringLogSplit logSplit = new TieringLogSplit(tablePath, tableBucket, null, 100, 200, 40, 2, 2000L); - serialized = serializer.serialize(logSplit); + serialized = serializer.serialize(new FlinkTieringSplit(logSplit)); TieringLogSplit deserializedLogSplit = - (TieringLogSplit) serializer.deserialize(serializer.getVersion(), serialized); + serializer.deserialize(serializer.getVersion(), serialized).asTieringLogSplit(); assertThat(deserializedLogSplit.getSplitIndex()).isEqualTo(2); assertThat(deserializedLogSplit.isFirstSplit()).isFalse(); assertThat(deserializedLogSplit.getTieringRoundTimestamp()).isEqualTo(2000L); diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/state/TieringSplitStateTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/state/TieringSplitStateTest.java index ebc08553e6..68b8620692 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/state/TieringSplitStateTest.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/state/TieringSplitStateTest.java @@ -17,8 +17,9 @@ package org.apache.fluss.flink.tiering.source.state; -import org.apache.fluss.flink.tiering.source.split.TieringLogSplit; -import org.apache.fluss.flink.tiering.source.split.TieringSnapshotSplit; +import org.apache.fluss.client.tiering.TieringLogSplit; +import org.apache.fluss.client.tiering.TieringSnapshotSplit; +import org.apache.fluss.flink.tiering.source.split.FlinkTieringSplit; import org.apache.fluss.metadata.TableBucket; import org.apache.fluss.metadata.TablePath; @@ -39,9 +40,10 @@ void testTieringSnapshotSplit() { new TieringSnapshotSplit( tablePath, tableBucket, "partition1", 0L, 200L, 10, 0, 1000L); tieringSnapshotSplit.skipCurrentRound(); - TieringSplitState tieringSnapshotSplitState = new TieringSplitState(tieringSnapshotSplit); - TieringSnapshotSplit restoredSplit = - (TieringSnapshotSplit) tieringSnapshotSplitState.toSourceSplit(); + FlinkTieringSplit flinkSplit = new FlinkTieringSplit(tieringSnapshotSplit); + TieringSplitState tieringSnapshotSplitState = new TieringSplitState(flinkSplit); + FlinkTieringSplit restoredFlinkSplit = tieringSnapshotSplitState.toSourceSplit(); + TieringSnapshotSplit restoredSplit = restoredFlinkSplit.asTieringSnapshotSplit(); assertThat(restoredSplit).isEqualTo(tieringSnapshotSplit); assertThat(restoredSplit.shouldSkipCurrentRound()).isTrue(); assertThat(restoredSplit.getSplitIndex()).isZero(); @@ -57,8 +59,10 @@ void testTieringLogSplit() { TieringLogSplit tieringLogSplit = new TieringLogSplit(tablePath, tableBucket, "partition1", 100L, 200L, 20, 1, 2000L); tieringLogSplit.skipCurrentRound(); - TieringSplitState tieringLogSplitState = new TieringSplitState(tieringLogSplit); - TieringLogSplit restoredSplit = (TieringLogSplit) tieringLogSplitState.toSourceSplit(); + FlinkTieringSplit flinkSplit = new FlinkTieringSplit(tieringLogSplit); + TieringSplitState tieringLogSplitState = new TieringSplitState(flinkSplit); + FlinkTieringSplit restoredFlinkSplit = tieringLogSplitState.toSourceSplit(); + TieringLogSplit restoredSplit = restoredFlinkSplit.asTieringLogSplit(); assertThat(restoredSplit).isEqualTo(tieringLogSplit); assertThat(restoredSplit.shouldSkipCurrentRound()).isTrue(); assertThat(restoredSplit.getSplitIndex()).isEqualTo(1); From d77a38e32dd0be4004f4966d77410835c3fd0d0b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=BE=8A=E5=B7=9D?= Date: Thu, 2 Jul 2026 14:11:22 +0800 Subject: [PATCH 2/2] add tests --- .../split/TieringSplitSerializerTest.java | 75 +++++++++++++++++++ 1 file changed, 75 insertions(+) diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/split/TieringSplitSerializerTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/split/TieringSplitSerializerTest.java index 8fd43ec51f..2756209a1d 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/split/TieringSplitSerializerTest.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/split/TieringSplitSerializerTest.java @@ -19,6 +19,7 @@ import org.apache.fluss.client.tiering.TieringLogSplit; import org.apache.fluss.client.tiering.TieringSnapshotSplit; +import org.apache.fluss.client.tiering.TieringSplit; import org.apache.fluss.metadata.TableBucket; import org.apache.fluss.metadata.TablePath; @@ -183,4 +184,78 @@ void testTieringRoundTimestampSerde() throws Exception { assertThat(deserializedLogSplit.isFirstSplit()).isFalse(); assertThat(deserializedLogSplit.getTieringRoundTimestamp()).isEqualTo(2000L); } + + @Test + void testFlinkTieringSplitDelegation() { + TieringLogSplit logSplit = new TieringLogSplit(tablePath, tableBucket, null, 10, 200, 5); + FlinkTieringSplit flinkSplit = new FlinkTieringSplit(logSplit); + + assertThat(flinkSplit.splitId()).isNotNull(); + assertThat(flinkSplit.unwrap()).isSameAs(logSplit); + assertThat(flinkSplit.isTieringLogSplit()).isTrue(); + assertThat(flinkSplit.isTieringSnapshotSplit()).isFalse(); + assertThat(flinkSplit.asTieringLogSplit()).isSameAs(logSplit); + assertThat(flinkSplit.getTablePath()).isEqualTo(tablePath); + assertThat(flinkSplit.getTableBucket()).isEqualTo(tableBucket); + assertThat(flinkSplit.getPartitionName()).isNull(); + assertThat(flinkSplit.getNumberOfSplits()).isEqualTo(5); + assertThat(flinkSplit.splitKind()).isEqualTo(TieringSplit.TIERING_LOG_SPLIT_FLAG); + + TieringSnapshotSplit snapshotSplit = + new TieringSnapshotSplit( + tablePath, tableBucket, null, 1L, 100L, 3, false, 0, 5000L); + FlinkTieringSplit flinkSnapshotSplit = new FlinkTieringSplit(snapshotSplit); + assertThat(flinkSnapshotSplit.isTieringSnapshotSplit()).isTrue(); + assertThat(flinkSnapshotSplit.asTieringSnapshotSplit()).isSameAs(snapshotSplit); + assertThat(flinkSnapshotSplit.getSplitIndex()).isZero(); + assertThat(flinkSnapshotSplit.isFirstSplit()).isTrue(); + assertThat(flinkSnapshotSplit.getTieringRoundTimestamp()).isEqualTo(5000L); + } + + @Test + void testFlinkTieringSplitCopyAndSkip() { + TieringLogSplit logSplit = new TieringLogSplit(tablePath, tableBucket, null, 0, 100, 3); + FlinkTieringSplit flinkSplit = new FlinkTieringSplit(logSplit); + + assertThat(flinkSplit.shouldSkipCurrentRound()).isFalse(); + flinkSplit.skipCurrentRound(); + assertThat(flinkSplit.shouldSkipCurrentRound()).isTrue(); + + FlinkTieringSplit copied = flinkSplit.copy(5); + assertThat(copied.getNumberOfSplits()).isEqualTo(5); + assertThat(copied.splitId()).isEqualTo(flinkSplit.splitId()); + + FlinkTieringSplit copiedWithMeta = flinkSplit.copy(10, 2, 9999L); + assertThat(copiedWithMeta.getNumberOfSplits()).isEqualTo(10); + assertThat(copiedWithMeta.getSplitIndex()).isEqualTo(2); + assertThat(copiedWithMeta.isFirstSplit()).isFalse(); + assertThat(copiedWithMeta.getTieringRoundTimestamp()).isEqualTo(9999L); + } + + @Test + void testFlinkTieringSplitEqualsHashCodeToString() { + TieringLogSplit logSplit = new TieringLogSplit(tablePath, tableBucket, null, 0, 100, 3); + FlinkTieringSplit split1 = new FlinkTieringSplit(logSplit); + FlinkTieringSplit split2 = new FlinkTieringSplit(logSplit); + + assertThat(split1).isEqualTo(split2); + assertThat(split1.hashCode()).isEqualTo(split2.hashCode()); + assertThat(split1).isNotEqualTo(null); + assertThat(split1).isNotEqualTo("not a split"); + + String str = split1.toString(); + assertThat(str).startsWith("FlinkTieringSplit{"); + assertThat(str).contains("TieringLogSplit"); + } + + @Test + void testFlinkTieringSplitPartitioned() { + FlinkTieringSplit flinkSplit = + new FlinkTieringSplit( + new TieringLogSplit( + partitionedTablePath, partitionedTableBucket, "p1", 0, 50, 2)); + + assertThat(flinkSplit.getPartitionName()).isEqualTo("p1"); + assertThat(flinkSplit.getTableBucket().getPartitionId()).isEqualTo(100L); + } }