Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.fluss.flink.tiering.source.split;
package org.apache.fluss.client.tiering;

import org.apache.fluss.metadata.TableBucket;
import org.apache.fluss.metadata.TablePath;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.fluss.flink.tiering.source.split;
package org.apache.fluss.client.tiering;

import org.apache.fluss.metadata.TableBucket;
import org.apache.fluss.metadata.TablePath;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,18 @@
* limitations under the License.
*/

package org.apache.fluss.flink.tiering.source.split;
package org.apache.fluss.client.tiering;

import org.apache.fluss.metadata.TableBucket;
import org.apache.fluss.metadata.TablePath;

import org.apache.flink.api.connector.source.SourceSplit;

import javax.annotation.Nullable;

import java.io.Serializable;
import java.util.Objects;

/** The base table split for tiering service. */
public abstract class TieringSplit implements SourceSplit {
public abstract class TieringSplit implements Serializable {

public static final byte TIERING_SNAPSHOT_SPLIT_FLAG = 1;
public static final byte TIERING_LOG_SPLIT_FLAG = 2;
Expand Down Expand Up @@ -91,6 +90,9 @@ public TieringSplit(
this.tieringRoundTimestamp = tieringRoundTimestamp;
}

/** Returns the unique identifier for this split. */
public abstract String splitId();

/** Checks whether this split is a primary key table split to tier. */
public final boolean isTieringSnapshotSplit() {
return getClass() == TieringSnapshotSplit.class;
Expand Down Expand Up @@ -128,7 +130,7 @@ public TieringLogSplit asTieringLogSplit() {
return (TieringLogSplit) this;
}

protected byte splitKind() {
public byte splitKind() {
if (isTieringSnapshotSplit()) {
return TIERING_SNAPSHOT_SPLIT_FLAG;
} else if (isTieringLogSplit()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.apache.fluss.client.ConnectionFactory;
import org.apache.fluss.config.Configuration;
import org.apache.fluss.flink.tiering.source.enumerator.TieringSourceEnumerator;
import org.apache.fluss.flink.tiering.source.split.TieringSplit;
import org.apache.fluss.flink.tiering.source.split.FlinkTieringSplit;
import org.apache.fluss.flink.tiering.source.split.TieringSplitSerializer;
import org.apache.fluss.flink.tiering.source.state.TieringSourceEnumeratorState;
import org.apache.fluss.flink.tiering.source.state.TieringSourceEnumeratorStateSerializer;
Expand Down Expand Up @@ -55,7 +55,9 @@
*/
public class TieringSource<WriteResult>
implements Source<
TableBucketWriteResult<WriteResult>, TieringSplit, TieringSourceEnumeratorState> {
TableBucketWriteResult<WriteResult>,
FlinkTieringSplit,
TieringSourceEnumeratorState> {

public static final String TIERING_SOURCE_TRANSFORMATION_UID =
"$$fluss_tiering_source_operator$$";
Expand All @@ -81,23 +83,23 @@ public Boundedness getBoundedness() {
}

@Override
public SplitEnumerator<TieringSplit, TieringSourceEnumeratorState> createEnumerator(
SplitEnumeratorContext<TieringSplit> splitEnumeratorContext) {
public SplitEnumerator<FlinkTieringSplit, TieringSourceEnumeratorState> createEnumerator(
SplitEnumeratorContext<FlinkTieringSplit> splitEnumeratorContext) {
return new TieringSourceEnumerator(
flussConf, splitEnumeratorContext, pollTieringTableIntervalMs);
}

@Override
public SplitEnumerator<TieringSplit, TieringSourceEnumeratorState> restoreEnumerator(
SplitEnumeratorContext<TieringSplit> splitEnumeratorContext,
public SplitEnumerator<FlinkTieringSplit, TieringSourceEnumeratorState> restoreEnumerator(
SplitEnumeratorContext<FlinkTieringSplit> splitEnumeratorContext,
TieringSourceEnumeratorState tieringSourceEnumeratorState) {
// stateless operator
return new TieringSourceEnumerator(
flussConf, splitEnumeratorContext, pollTieringTableIntervalMs);
}

@Override
public SimpleVersionedSerializer<TieringSplit> getSplitSerializer() {
public SimpleVersionedSerializer<FlinkTieringSplit> getSplitSerializer() {
return TieringSplitSerializer.INSTANCE;
}

Expand All @@ -108,7 +110,7 @@ public SimpleVersionedSerializer<TieringSplit> getSplitSerializer() {
}

@Override
public SourceReader<TableBucketWriteResult<WriteResult>, TieringSplit> createReader(
public SourceReader<TableBucketWriteResult<WriteResult>, FlinkTieringSplit> createReader(
SourceReaderContext sourceReaderContext) {
FutureCompletingBlockingQueue<RecordsWithSplitIds<TableBucketWriteResult<WriteResult>>>
elementsQueue = new FutureCompletingBlockingQueue<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
package org.apache.fluss.flink.tiering.source;

import org.apache.fluss.flink.adapter.SingleThreadFetcherManagerAdapter;
import org.apache.fluss.flink.tiering.source.split.TieringSplit;
import org.apache.fluss.flink.tiering.source.split.FlinkTieringSplit;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
Expand All @@ -40,14 +40,14 @@
*/
public class TieringSourceFetcherManager<WriteResult>
extends SingleThreadFetcherManagerAdapter<
TableBucketWriteResult<WriteResult>, TieringSplit> {
TableBucketWriteResult<WriteResult>, FlinkTieringSplit> {

private static final Logger LOG = LoggerFactory.getLogger(TieringSourceFetcherManager.class);

public TieringSourceFetcherManager(
FutureCompletingBlockingQueue<RecordsWithSplitIds<TableBucketWriteResult<WriteResult>>>
elementsQueue,
Supplier<SplitReader<TableBucketWriteResult<WriteResult>, TieringSplit>>
Supplier<SplitReader<TableBucketWriteResult<WriteResult>, FlinkTieringSplit>>
splitReaderSupplier,
Configuration configuration,
Consumer<Collection<String>> splitFinishedHook) {
Expand All @@ -64,7 +64,7 @@ public void markTableReachTieringMaxDuration(long tableId) {
enqueueMarkTableReachTieringMaxDurationTask(
splitFetcher, tableId));
} else {
SplitFetcher<TableBucketWriteResult<WriteResult>, TieringSplit> splitFetcher =
SplitFetcher<TableBucketWriteResult<WriteResult>, FlinkTieringSplit> splitFetcher =
createSplitFetcher();
LOG.info(
"fetchers is empty, enqueue marking tiering max duration for table {}",
Expand All @@ -75,7 +75,7 @@ public void markTableReachTieringMaxDuration(long tableId) {
}

private void enqueueMarkTableReachTieringMaxDurationTask(
SplitFetcher<TableBucketWriteResult<WriteResult>, TieringSplit> splitFetcher,
SplitFetcher<TableBucketWriteResult<WriteResult>, FlinkTieringSplit> splitFetcher,
long reachTieringDeadlineTable) {
splitFetcher.enqueueTask(
new SplitFetcherTask() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import org.apache.fluss.flink.adapter.SingleThreadMultiplexSourceReaderBaseAdapter;
import org.apache.fluss.flink.tiering.event.TieringReachMaxDurationEvent;
import org.apache.fluss.flink.tiering.source.metrics.TieringMetrics;
import org.apache.fluss.flink.tiering.source.split.TieringSplit;
import org.apache.fluss.flink.tiering.source.split.FlinkTieringSplit;
import org.apache.fluss.flink.tiering.source.state.TieringSplitState;
import org.apache.fluss.lake.writer.LakeTieringFactory;

Expand All @@ -48,7 +48,7 @@ public final class TieringSourceReader<WriteResult>
extends SingleThreadMultiplexSourceReaderBaseAdapter<
TableBucketWriteResult<WriteResult>,
TableBucketWriteResult<WriteResult>,
TieringSplit,
FlinkTieringSplit,
TieringSplitState> {

private static final Logger LOG = LoggerFactory.getLogger(TieringSourceReader.class);
Expand Down Expand Up @@ -118,13 +118,13 @@ protected void onSplitFinished(Map<String, TieringSplitState> finishedSplitIds)
}

@Override
public List<TieringSplit> snapshotState(long checkpointId) {
public List<FlinkTieringSplit> snapshotState(long checkpointId) {
// we return empty list to make source reader be stateless
return Collections.emptyList();
}

@Override
protected TieringSplitState initializedState(TieringSplit split) {
protected TieringSplitState initializedState(FlinkTieringSplit split) {
if (split.isTieringSnapshotSplit()) {
return new TieringSplitState(split);
} else if (split.isTieringLogSplit()) {
Expand All @@ -135,7 +135,7 @@ protected TieringSplitState initializedState(TieringSplit split) {
}

@Override
protected TieringSplit toSplitType(String splitId, TieringSplitState splitState) {
protected FlinkTieringSplit toSplitType(String splitId, TieringSplitState splitState) {
return splitState.toSourceSplit();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,13 @@
import org.apache.fluss.client.table.scanner.log.LogScanner;
import org.apache.fluss.client.table.scanner.log.LogScannerImpl;
import org.apache.fluss.client.table.scanner.log.ScanRecords;
import org.apache.fluss.client.tiering.TieringLogSplit;
import org.apache.fluss.client.tiering.TieringSnapshotSplit;
import org.apache.fluss.client.tiering.TieringSplit;
import org.apache.fluss.flink.source.reader.BoundedSplitReader;
import org.apache.fluss.flink.source.reader.RecordAndPos;
import org.apache.fluss.flink.tiering.source.metrics.TieringMetrics;
import org.apache.fluss.flink.tiering.source.split.TieringLogSplit;
import org.apache.fluss.flink.tiering.source.split.TieringSnapshotSplit;
import org.apache.fluss.flink.tiering.source.split.TieringSplit;
import org.apache.fluss.flink.tiering.source.split.FlinkTieringSplit;
import org.apache.fluss.lake.batch.ArrowRecordBatch;
import org.apache.fluss.lake.writer.LakeTieringFactory;
import org.apache.fluss.lake.writer.LakeWriter;
Expand Down Expand Up @@ -72,7 +73,7 @@

/** The {@link SplitReader} implementation which will read Fluss and write to lake. */
public class TieringSplitReader<WriteResult>
implements SplitReader<TableBucketWriteResult<WriteResult>, TieringSplit> {
implements SplitReader<TableBucketWriteResult<WriteResult>, FlinkTieringSplit> {

private static final Logger LOG = LoggerFactory.getLogger(TieringSplitReader.class);

Expand Down Expand Up @@ -210,14 +211,15 @@ public RecordsWithSplitIds<TableBucketWriteResult<WriteResult>> fetch() throws I
}

@Override
public void handleSplitsChanges(SplitsChange<TieringSplit> splitsChange) {
public void handleSplitsChanges(SplitsChange<FlinkTieringSplit> splitsChange) {
if (!(splitsChange instanceof SplitsAddition)) {
throw new UnsupportedOperationException(
String.format(
"The SplitChange type of %s is not supported.",
splitsChange.getClass()));
}
for (TieringSplit split : splitsChange.splits()) {
for (FlinkTieringSplit flinkSplit : splitsChange.splits()) {
TieringSplit split = flinkSplit.unwrap();
LOG.info("add split {}", split.splitId());
if (split.shouldSkipCurrentRound()) {
// if the split is forced to ignore,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,13 @@
import org.apache.fluss.client.ConnectionFactory;
import org.apache.fluss.client.admin.Admin;
import org.apache.fluss.client.metadata.MetadataUpdater;
import org.apache.fluss.client.tiering.TieringSplit;
import org.apache.fluss.config.Configuration;
import org.apache.fluss.flink.metrics.FlinkMetricRegistry;
import org.apache.fluss.flink.tiering.event.FailedTieringEvent;
import org.apache.fluss.flink.tiering.event.FinishedTieringEvent;
import org.apache.fluss.flink.tiering.event.TieringReachMaxDurationEvent;
import org.apache.fluss.flink.tiering.source.split.TieringSplit;
import org.apache.fluss.flink.tiering.source.split.FlinkTieringSplit;
import org.apache.fluss.flink.tiering.source.split.TieringSplitGenerator;
import org.apache.fluss.flink.tiering.source.state.TieringSourceEnumeratorState;
import org.apache.fluss.lake.committer.TieringStats;
Expand Down Expand Up @@ -93,16 +94,16 @@
* </ul>
*/
public class TieringSourceEnumerator
implements SplitEnumerator<TieringSplit, TieringSourceEnumeratorState> {
implements SplitEnumerator<FlinkTieringSplit, TieringSourceEnumeratorState> {

private static final Logger LOG = LoggerFactory.getLogger(TieringSourceEnumerator.class);

private final Configuration flussConf;
private final SplitEnumeratorContext<TieringSplit> context;
private final SplitEnumeratorContext<FlinkTieringSplit> context;
private final ScheduledExecutorService timerService;
private final SplitEnumeratorMetricGroup enumeratorMetricGroup;
private final long pollTieringTableIntervalMs;
private final List<TieringSplit> pendingSplits;
private final List<FlinkTieringSplit> pendingSplits;
private final Set<Integer> readersAwaitingSplit;

private final Map<Long, Long> tieringTableEpochs;
Expand All @@ -124,7 +125,7 @@ public class TieringSourceEnumerator

public TieringSourceEnumerator(
Configuration flussConf,
SplitEnumeratorContext<TieringSplit> context,
SplitEnumeratorContext<FlinkTieringSplit> context,
long pollTieringTableIntervalMs) {
this.flussConf = flussConf;
this.context = context;
Expand Down Expand Up @@ -205,7 +206,7 @@ public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname
}

@Override
public void addSplitsBack(List<TieringSplit> splits, int subtaskId) {
public void addSplitsBack(List<FlinkTieringSplit> splits, int subtaskId) {
readersAwaitingSplit.add(subtaskId);
pendingSplits.addAll(splits);
assignSplits();
Expand Down Expand Up @@ -333,7 +334,7 @@ protected void handleTableTieringReachMaxDuration(
LOG.info("Table {}-{} reached max duration. Force completing.", tablePath, tableId);
tieringReachMaxDurationsTables.add(tableId);

for (TieringSplit tieringSplit : pendingSplits) {
for (FlinkTieringSplit tieringSplit : pendingSplits) {
if (tieringSplit.getTableBucket().getTableId() == tableId) {
// mark this tiering split to skip the current round since the tiering for
// this table has timed out, so the tiering source reader can skip them directly
Expand Down Expand Up @@ -378,7 +379,7 @@ private void assignSplits() {
continue;
}
if (!pendingSplits.isEmpty()) {
TieringSplit tieringSplit = pendingSplits.remove(0);
FlinkTieringSplit tieringSplit = pendingSplits.remove(0);
context.assignSplit(tieringSplit, nextAwaitingReader);
LOG.info("Assigning split {} to readers {}", tieringSplit, nextAwaitingReader);
readersAwaitingSplit.remove(nextAwaitingReader);
Expand Down Expand Up @@ -453,20 +454,20 @@ private void generateTieringSplits(Tuple3<Long, Long, TablePath> tieringTable)
// shuffle tiering split to avoid splits tiering skew
// after introduce tiering max duration
Collections.shuffle(tieringSplits);
tieringSplits = populateTieringRoundMetadata(tieringSplits);
List<FlinkTieringSplit> flinkSplits = populateTieringRoundMetadata(tieringSplits);
LOG.info(
"Generate Tiering {} splits for table {} with cost {}ms.",
tieringSplits.size(),
flinkSplits.size(),
tieringTable.f2,
System.currentTimeMillis() - start);
if (tieringSplits.isEmpty()) {
if (flinkSplits.isEmpty()) {
LOG.info(
"Generate Tiering splits for table {} is empty, no need to tier data.",
tieringTable.f2.getTableName());
finishedTables.put(tieringTable.f0, TieringFinishInfo.from(tieringTable.f1));
} else {
tieringTableEpochs.put(tieringTable.f0, tieringTable.f1);
pendingSplits.addAll(tieringSplits);
pendingSplits.addAll(flinkSplits);

timerService.schedule(
() ->
Expand All @@ -487,18 +488,19 @@ private void generateTieringSplits(Tuple3<Long, Long, TablePath> tieringTable)
}
}

private List<TieringSplit> populateTieringRoundMetadata(List<TieringSplit> tieringSplits) {
private List<FlinkTieringSplit> populateTieringRoundMetadata(List<TieringSplit> tieringSplits) {
int numberOfSplits = tieringSplits.size();
if (numberOfSplits == 0) {
return Collections.emptyList();
}
long tieringRoundTimestamp = System.currentTimeMillis();
List<TieringSplit> splitsWithMetadata = new ArrayList<>(numberOfSplits);
List<FlinkTieringSplit> splitsWithMetadata = new ArrayList<>(numberOfSplits);
for (int splitIndex = 0; splitIndex < numberOfSplits; splitIndex++) {
splitsWithMetadata.add(
tieringSplits
.get(splitIndex)
.copy(numberOfSplits, splitIndex, tieringRoundTimestamp));
new FlinkTieringSplit(
tieringSplits
.get(splitIndex)
.copy(numberOfSplits, splitIndex, tieringRoundTimestamp)));
}
return splitsWithMetadata;
}
Expand Down
Loading
Loading