Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2285,6 +2285,15 @@ public class ConfigOptions {
+ ConfigOptions.TABLE_DATALAKE_AUTO_EXPIRE_SNAPSHOT
+ " is false.");

public static final ConfigOption<String> LAKE_TIERING_IO_TMP_DIR =
key("lake.tiering.io.tmpdir")
.stringType()
.noDefaultValue()
.withDescription(
"Local directory that is used by the lake tiering service for temporary IO files. "
+ "If not configured and the tiering service runs in a Flink job, Fluss uses "
+ "Flink's IO temporary directory with a 'fluss' child directory.");

// ------------------------------------------------------------------------
// ConfigOptions for fluss kafka
// ------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,4 +94,15 @@ default int splitIndex() {
default long tieringRoundTimestamp() {
return UNKNOWN_TIERING_ROUND_TIMESTAMP;
}

/**
* Returns the local directory for temporary IO files, or null if the lake writer should use its
* own default.
*
* @return the local temporary IO directory, or null
*/
@Nullable
default String ioTmpDir() {
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,5 +59,6 @@ public TableInfo tableInfo() {
assertThat(context.splitIndex()).isEqualTo(WriterInitContext.UNKNOWN_SPLIT_INDEX);
assertThat(context.tieringRoundTimestamp())
.isEqualTo(WriterInitContext.UNKNOWN_TIERING_ROUND_TIMESTAMP);
assertThat(context.ioTmpDir()).isNull();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import static org.apache.fluss.config.ConfigOptions.CLIENT_SCANNER_IO_TMP_DIR;
import static org.apache.fluss.flink.tiering.source.TieringSourceOptions.POLL_TIERING_TABLE_INTERVAL;
import static org.apache.fluss.flink.utils.FlinkConnectorOptionsUtils.getClientScannerIoTmpDir;
import static org.apache.fluss.flink.utils.FlinkConnectorOptionsUtils.getLakeTieringIoTmpDir;

/**
* The flink source implementation for tiering data from Fluss to downstream lake.
Expand Down Expand Up @@ -117,7 +118,11 @@ public SourceReader<TableBucketWriteResult<WriteResult>, TieringSplit> createRea
getClientScannerIoTmpDir(flussConf, sourceReaderContext.getConfiguration()));
Connection connection = ConnectionFactory.createConnection(flussConf);
return new TieringSourceReader<>(
elementsQueue, sourceReaderContext, connection, lakeTieringFactory);
elementsQueue,
sourceReaderContext,
connection,
lakeTieringFactory,
getLakeTieringIoTmpDir(flussConf, sourceReaderContext.getConfiguration()));
}

/** This follows the operator uid hash generation logic of flink {@link StreamGraphHasherV2}. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import java.time.Duration;
import java.util.Collections;
import java.util.List;
Expand All @@ -61,7 +63,23 @@ public TieringSourceReader(
SourceReaderContext context,
Connection connection,
LakeTieringFactory<WriteResult, ?> lakeTieringFactory) {
this(elementsQueue, context, connection, lakeTieringFactory, DEFAULT_POLL_TIMEOUT);
this(elementsQueue, context, connection, lakeTieringFactory, (String) null);
}

public TieringSourceReader(
FutureCompletingBlockingQueue<RecordsWithSplitIds<TableBucketWriteResult<WriteResult>>>
elementsQueue,
SourceReaderContext context,
Connection connection,
LakeTieringFactory<WriteResult, ?> lakeTieringFactory,
@Nullable String ioTmpDir) {
this(
elementsQueue,
context,
connection,
lakeTieringFactory,
DEFAULT_POLL_TIMEOUT,
ioTmpDir);
}

@VisibleForTesting
Expand All @@ -72,10 +90,27 @@ public TieringSourceReader(
Connection connection,
LakeTieringFactory<WriteResult, ?> lakeTieringFactory,
Duration pollTimeout) {
this(elementsQueue, context, connection, lakeTieringFactory, pollTimeout, null);
}

@VisibleForTesting
TieringSourceReader(
FutureCompletingBlockingQueue<RecordsWithSplitIds<TableBucketWriteResult<WriteResult>>>
elementsQueue,
SourceReaderContext context,
Connection connection,
LakeTieringFactory<WriteResult, ?> lakeTieringFactory,
Duration pollTimeout,
@Nullable String ioTmpDir) {
super(
elementsQueue,
createFetcherManager(
elementsQueue, context, connection, lakeTieringFactory, pollTimeout),
elementsQueue,
context,
connection,
lakeTieringFactory,
pollTimeout,
ioTmpDir),
new TableBucketWriteResultEmitter<>(),
context.getConfiguration(),
context);
Expand All @@ -88,7 +123,8 @@ private static <WriteResult> TieringSourceFetcherManager<WriteResult> createFetc
SourceReaderContext context,
Connection connection,
LakeTieringFactory<WriteResult, ?> lakeTieringFactory,
Duration pollTimeout) {
Duration pollTimeout,
@Nullable String ioTmpDir) {
TieringMetrics tieringMetrics = new TieringMetrics(context.metricGroup());
ClassLoader userClassLoader = context.getUserCodeClassLoader().asClassLoader();
return new TieringSourceFetcherManager<>(
Expand All @@ -99,7 +135,8 @@ private static <WriteResult> TieringSourceFetcherManager<WriteResult> createFetc
lakeTieringFactory,
userClassLoader,
pollTimeout,
tieringMetrics),
tieringMetrics,
ioTmpDir),
context.getConfiguration(),
(ignore) -> {});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,13 +120,20 @@ public class TieringSplitReader<WriteResult>

private final TieringMetrics tieringMetrics;
private final boolean unshadedArrowAvailable;
@Nullable private final String ioTmpDir;

public TieringSplitReader(
Connection connection,
LakeTieringFactory<WriteResult, ?> lakeTieringFactory,
ClassLoader userClassLoader,
TieringMetrics tieringMetrics) {
this(connection, lakeTieringFactory, userClassLoader, DEFAULT_POLL_TIMEOUT, tieringMetrics);
this(
connection,
lakeTieringFactory,
userClassLoader,
DEFAULT_POLL_TIMEOUT,
tieringMetrics,
null);
}

@VisibleForTesting
Expand All @@ -136,6 +143,17 @@ protected TieringSplitReader(
ClassLoader userClassLoader,
Duration pollTimeout,
TieringMetrics tieringMetrics) {
this(connection, lakeTieringFactory, userClassLoader, pollTimeout, tieringMetrics, null);
}

@VisibleForTesting
protected TieringSplitReader(
Connection connection,
LakeTieringFactory<WriteResult, ?> lakeTieringFactory,
ClassLoader userClassLoader,
Duration pollTimeout,
TieringMetrics tieringMetrics,
@Nullable String ioTmpDir) {
this.lakeTieringFactory = lakeTieringFactory;
// owned by TieringSourceReader
this.connection = connection;
Expand All @@ -151,6 +169,7 @@ protected TieringSplitReader(
this.pollTimeout = pollTimeout;
this.tieringMetrics = tieringMetrics;
this.unshadedArrowAvailable = checkUnshadedArrowAvailable(userClassLoader);
this.ioTmpDir = ioTmpDir;
}

@Override
Expand Down Expand Up @@ -607,7 +626,8 @@ private LakeWriter<WriteResult> getOrCreateLakeWriter(
partitionName,
currentTable.getTableInfo(),
splitIndex,
tieringRoundTimestamp));
tieringRoundTimestamp,
ioTmpDir));
lakeWriters.put(bucket, lakeWriter);
}
return lakeWriter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public class TieringWriterInitContext implements WriterInitContext {
private final TableInfo tableInfo;
private final int splitIndex;
private final long tieringRoundTimestamp;
@Nullable private final String ioTmpDir;

public TieringWriterInitContext(
TablePath tablePath,
Expand All @@ -45,7 +46,8 @@ public TieringWriterInitContext(
partition,
tableInfo,
UNKNOWN_SPLIT_INDEX,
UNKNOWN_TIERING_ROUND_TIMESTAMP);
UNKNOWN_TIERING_ROUND_TIMESTAMP,
null);
}

public TieringWriterInitContext(
Expand All @@ -55,12 +57,24 @@ public TieringWriterInitContext(
TableInfo tableInfo,
int splitIndex,
long tieringRoundTimestamp) {
this(tablePath, tableBucket, partition, tableInfo, splitIndex, tieringRoundTimestamp, null);
}

public TieringWriterInitContext(
TablePath tablePath,
TableBucket tableBucket,
@Nullable String partition,
TableInfo tableInfo,
int splitIndex,
long tieringRoundTimestamp,
@Nullable String ioTmpDir) {
this.tablePath = tablePath;
this.tableBucket = tableBucket;
this.partition = partition;
this.tableInfo = tableInfo;
this.splitIndex = splitIndex;
this.tieringRoundTimestamp = tieringRoundTimestamp;
this.ioTmpDir = ioTmpDir;
}

@Override
Expand Down Expand Up @@ -93,4 +107,10 @@ public int splitIndex() {
public long tieringRoundTimestamp() {
return tieringRoundTimestamp;
}

@Nullable
@Override
public String ioTmpDir() {
return ioTmpDir;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@

import static org.apache.flink.configuration.CoreOptions.TMP_DIRS;
import static org.apache.fluss.config.ConfigOptions.CLIENT_SCANNER_IO_TMP_DIR;
import static org.apache.fluss.config.ConfigOptions.LAKE_TIERING_IO_TMP_DIR;
import static org.apache.fluss.flink.FlinkConnectorOptions.SCAN_SPLIT_ASSIGNMENT_BATCH_SIZE;
import static org.apache.fluss.flink.FlinkConnectorOptions.SCAN_STARTUP_MODE;
import static org.apache.fluss.flink.FlinkConnectorOptions.SCAN_STARTUP_TIMESTAMP;
Expand Down Expand Up @@ -202,14 +203,26 @@ public static long parseTimestamp(String timestampStr, String optionKey, ZoneId
public static String getClientScannerIoTmpDir(
Configuration flussConf, org.apache.flink.configuration.Configuration flinkConfig) {
if (!flussConf.contains(CLIENT_SCANNER_IO_TMP_DIR)) {
if (flinkConfig.contains(TMP_DIRS)) {
// pass flink io tmp dir to fluss client.
return new File(flinkConfig.get(CoreOptions.TMP_DIRS), "/fluss").getAbsolutePath();
}
return getFlinkIoTmpDir(flinkConfig);
}
return flussConf.getString(CLIENT_SCANNER_IO_TMP_DIR);
}

public static String getFlinkIoTmpDir(
org.apache.flink.configuration.Configuration flinkConfig) {
if (flinkConfig.contains(TMP_DIRS)) {
return new File(flinkConfig.get(CoreOptions.TMP_DIRS), "/fluss").getAbsolutePath();
}
return System.getProperty("java.io.tmpdir") + "/fluss";
}

public static String getLakeTieringIoTmpDir(
Configuration flussConf, org.apache.flink.configuration.Configuration flinkConfig) {
return flussConf
.getOptional(LAKE_TIERING_IO_TMP_DIR)
.orElseGet(() -> getFlinkIoTmpDir(flinkConfig));
}

/** Fluss startup options. * */
public static class StartupOptions {
public ScanStartupMode startupMode;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.fluss.flink.tiering.source;

import org.junit.jupiter.api.Test;

import static org.assertj.core.api.Assertions.assertThat;

/** Tests for {@link TieringWriterInitContext}. */
class TieringWriterInitContextTest {

@Test
void testIoTmpDir() {
TieringWriterInitContext defaultContext =
new TieringWriterInitContext(null, null, null, null);
TieringWriterInitContext context =
new TieringWriterInitContext(null, null, null, null, 0, 1L, "/flink_tmp/fluss");

assertThat(defaultContext.ioTmpDir()).isNull();
assertThat(context.ioTmpDir()).isEqualTo("/flink_tmp/fluss");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

import static org.apache.flink.configuration.CoreOptions.TMP_DIRS;
import static org.apache.fluss.config.ConfigOptions.CLIENT_SCANNER_IO_TMP_DIR;
import static org.apache.fluss.config.ConfigOptions.LAKE_TIERING_IO_TMP_DIR;
import static org.apache.fluss.flink.FlinkConnectorOptions.SCAN_SPLIT_ASSIGNMENT_BATCH_SIZE;
import static org.apache.fluss.flink.FlinkConnectorOptions.SCAN_STARTUP_TIMESTAMP;
import static org.apache.fluss.flink.utils.FlinkConnectorOptionsUtils.parseTimestamp;
Expand Down Expand Up @@ -105,4 +106,34 @@ void testGetClientScannerIoTmpDir() {
new Configuration(), flinkConfig))
.isEqualTo("/flink_tmp_dir/fluss");
}

@Test
void testGetFlinkIoTmpDir() {
org.apache.flink.configuration.Configuration flinkConfig =
new org.apache.flink.configuration.Configuration().set(TMP_DIRS, "/flink_tmp_dir");
String property = System.getProperty("java.io.tmpdir");

assertThat(FlinkConnectorOptionsUtils.getFlinkIoTmpDir(flinkConfig))
.isEqualTo("/flink_tmp_dir/fluss");
assertThat(
FlinkConnectorOptionsUtils.getFlinkIoTmpDir(
new org.apache.flink.configuration.Configuration()))
.isEqualTo(property + "/fluss");
}

@Test
void testGetLakeTieringIoTmpDir() {
org.apache.flink.configuration.Configuration flinkConfig =
new org.apache.flink.configuration.Configuration().set(TMP_DIRS, "/flink_tmp_dir");

assertThat(
FlinkConnectorOptionsUtils.getLakeTieringIoTmpDir(
new Configuration().set(LAKE_TIERING_IO_TMP_DIR, "/tiering_tmp"),
flinkConfig))
.isEqualTo("/tiering_tmp");
assertThat(
FlinkConnectorOptionsUtils.getLakeTieringIoTmpDir(
new Configuration(), flinkConfig))
.isEqualTo("/flink_tmp_dir/fluss");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ public PaimonLakeWriter(
writerInitContext.tableBucket(),
writerInitContext.partition(),
partitionKeys,
flussRowType);
flussRowType,
writerInitContext.ioTmpDir());
}

@Override
Expand Down
Loading
Loading