diff --git a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java index 2d97ebab56..2082d0dc8c 100644 --- a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java +++ b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java @@ -2285,6 +2285,15 @@ public class ConfigOptions { + ConfigOptions.TABLE_DATALAKE_AUTO_EXPIRE_SNAPSHOT + " is false."); + public static final ConfigOption LAKE_TIERING_IO_TMP_DIR = + key("lake.tiering.io.tmpdir") + .stringType() + .noDefaultValue() + .withDescription( + "Local directory that is used by the lake tiering service for temporary IO files. " + + "If not configured and the tiering service runs in a Flink job, Fluss uses " + + "Flink's IO temporary directory with a 'fluss' child directory."); + // ------------------------------------------------------------------------ // ConfigOptions for fluss kafka // ------------------------------------------------------------------------ diff --git a/fluss-common/src/main/java/org/apache/fluss/lake/writer/WriterInitContext.java b/fluss-common/src/main/java/org/apache/fluss/lake/writer/WriterInitContext.java index 318f53f54b..7f697accc4 100644 --- a/fluss-common/src/main/java/org/apache/fluss/lake/writer/WriterInitContext.java +++ b/fluss-common/src/main/java/org/apache/fluss/lake/writer/WriterInitContext.java @@ -94,4 +94,15 @@ default int splitIndex() { default long tieringRoundTimestamp() { return UNKNOWN_TIERING_ROUND_TIMESTAMP; } + + /** + * Returns the local directory for temporary IO files, or null if the lake writer should use its + * own default. + * + * @return the local temporary IO directory, or null + */ + @Nullable + default String ioTmpDir() { + return null; + } } diff --git a/fluss-common/src/test/java/org/apache/fluss/lake/writer/WriterInitContextTest.java b/fluss-common/src/test/java/org/apache/fluss/lake/writer/WriterInitContextTest.java index bf0198bd8b..ad2db2a308 100644 --- a/fluss-common/src/test/java/org/apache/fluss/lake/writer/WriterInitContextTest.java +++ b/fluss-common/src/test/java/org/apache/fluss/lake/writer/WriterInitContextTest.java @@ -59,5 +59,6 @@ public TableInfo tableInfo() { assertThat(context.splitIndex()).isEqualTo(WriterInitContext.UNKNOWN_SPLIT_INDEX); assertThat(context.tieringRoundTimestamp()) .isEqualTo(WriterInitContext.UNKNOWN_TIERING_ROUND_TIMESTAMP); + assertThat(context.ioTmpDir()).isNull(); } } diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSource.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSource.java index 649b758704..3bb158bf4e 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSource.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSource.java @@ -47,6 +47,7 @@ import static org.apache.fluss.config.ConfigOptions.CLIENT_SCANNER_IO_TMP_DIR; import static org.apache.fluss.flink.tiering.source.TieringSourceOptions.POLL_TIERING_TABLE_INTERVAL; import static org.apache.fluss.flink.utils.FlinkConnectorOptionsUtils.getClientScannerIoTmpDir; +import static org.apache.fluss.flink.utils.FlinkConnectorOptionsUtils.getLakeTieringIoTmpDir; /** * The flink source implementation for tiering data from Fluss to downstream lake. @@ -117,7 +118,11 @@ public SourceReader, TieringSplit> createRea getClientScannerIoTmpDir(flussConf, sourceReaderContext.getConfiguration())); Connection connection = ConnectionFactory.createConnection(flussConf); return new TieringSourceReader<>( - elementsQueue, sourceReaderContext, connection, lakeTieringFactory); + elementsQueue, + sourceReaderContext, + connection, + lakeTieringFactory, + getLakeTieringIoTmpDir(flussConf, sourceReaderContext.getConfiguration())); } /** This follows the operator uid hash generation logic of flink {@link StreamGraphHasherV2}. */ diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSourceReader.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSourceReader.java index 3e2fa13fd5..86a2434198 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSourceReader.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSourceReader.java @@ -35,6 +35,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nullable; + import java.time.Duration; import java.util.Collections; import java.util.List; @@ -61,7 +63,23 @@ public TieringSourceReader( SourceReaderContext context, Connection connection, LakeTieringFactory lakeTieringFactory) { - this(elementsQueue, context, connection, lakeTieringFactory, DEFAULT_POLL_TIMEOUT); + this(elementsQueue, context, connection, lakeTieringFactory, (String) null); + } + + public TieringSourceReader( + FutureCompletingBlockingQueue>> + elementsQueue, + SourceReaderContext context, + Connection connection, + LakeTieringFactory lakeTieringFactory, + @Nullable String ioTmpDir) { + this( + elementsQueue, + context, + connection, + lakeTieringFactory, + DEFAULT_POLL_TIMEOUT, + ioTmpDir); } @VisibleForTesting @@ -72,10 +90,27 @@ public TieringSourceReader( Connection connection, LakeTieringFactory lakeTieringFactory, Duration pollTimeout) { + this(elementsQueue, context, connection, lakeTieringFactory, pollTimeout, null); + } + + @VisibleForTesting + TieringSourceReader( + FutureCompletingBlockingQueue>> + elementsQueue, + SourceReaderContext context, + Connection connection, + LakeTieringFactory lakeTieringFactory, + Duration pollTimeout, + @Nullable String ioTmpDir) { super( elementsQueue, createFetcherManager( - elementsQueue, context, connection, lakeTieringFactory, pollTimeout), + elementsQueue, + context, + connection, + lakeTieringFactory, + pollTimeout, + ioTmpDir), new TableBucketWriteResultEmitter<>(), context.getConfiguration(), context); @@ -88,7 +123,8 @@ private static TieringSourceFetcherManager createFetc SourceReaderContext context, Connection connection, LakeTieringFactory lakeTieringFactory, - Duration pollTimeout) { + Duration pollTimeout, + @Nullable String ioTmpDir) { TieringMetrics tieringMetrics = new TieringMetrics(context.metricGroup()); ClassLoader userClassLoader = context.getUserCodeClassLoader().asClassLoader(); return new TieringSourceFetcherManager<>( @@ -99,7 +135,8 @@ private static TieringSourceFetcherManager createFetc lakeTieringFactory, userClassLoader, pollTimeout, - tieringMetrics), + tieringMetrics, + ioTmpDir), context.getConfiguration(), (ignore) -> {}); } diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java index 960c4db7e5..bad46d8205 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java @@ -120,13 +120,20 @@ public class TieringSplitReader private final TieringMetrics tieringMetrics; private final boolean unshadedArrowAvailable; + @Nullable private final String ioTmpDir; public TieringSplitReader( Connection connection, LakeTieringFactory lakeTieringFactory, ClassLoader userClassLoader, TieringMetrics tieringMetrics) { - this(connection, lakeTieringFactory, userClassLoader, DEFAULT_POLL_TIMEOUT, tieringMetrics); + this( + connection, + lakeTieringFactory, + userClassLoader, + DEFAULT_POLL_TIMEOUT, + tieringMetrics, + null); } @VisibleForTesting @@ -136,6 +143,17 @@ protected TieringSplitReader( ClassLoader userClassLoader, Duration pollTimeout, TieringMetrics tieringMetrics) { + this(connection, lakeTieringFactory, userClassLoader, pollTimeout, tieringMetrics, null); + } + + @VisibleForTesting + protected TieringSplitReader( + Connection connection, + LakeTieringFactory lakeTieringFactory, + ClassLoader userClassLoader, + Duration pollTimeout, + TieringMetrics tieringMetrics, + @Nullable String ioTmpDir) { this.lakeTieringFactory = lakeTieringFactory; // owned by TieringSourceReader this.connection = connection; @@ -151,6 +169,7 @@ protected TieringSplitReader( this.pollTimeout = pollTimeout; this.tieringMetrics = tieringMetrics; this.unshadedArrowAvailable = checkUnshadedArrowAvailable(userClassLoader); + this.ioTmpDir = ioTmpDir; } @Override @@ -607,7 +626,8 @@ private LakeWriter getOrCreateLakeWriter( partitionName, currentTable.getTableInfo(), splitIndex, - tieringRoundTimestamp)); + tieringRoundTimestamp, + ioTmpDir)); lakeWriters.put(bucket, lakeWriter); } return lakeWriter; diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringWriterInitContext.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringWriterInitContext.java index 7dbb6e41b8..a187d8e51c 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringWriterInitContext.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringWriterInitContext.java @@ -33,6 +33,7 @@ public class TieringWriterInitContext implements WriterInitContext { private final TableInfo tableInfo; private final int splitIndex; private final long tieringRoundTimestamp; + @Nullable private final String ioTmpDir; public TieringWriterInitContext( TablePath tablePath, @@ -45,7 +46,8 @@ public TieringWriterInitContext( partition, tableInfo, UNKNOWN_SPLIT_INDEX, - UNKNOWN_TIERING_ROUND_TIMESTAMP); + UNKNOWN_TIERING_ROUND_TIMESTAMP, + null); } public TieringWriterInitContext( @@ -55,12 +57,24 @@ public TieringWriterInitContext( TableInfo tableInfo, int splitIndex, long tieringRoundTimestamp) { + this(tablePath, tableBucket, partition, tableInfo, splitIndex, tieringRoundTimestamp, null); + } + + public TieringWriterInitContext( + TablePath tablePath, + TableBucket tableBucket, + @Nullable String partition, + TableInfo tableInfo, + int splitIndex, + long tieringRoundTimestamp, + @Nullable String ioTmpDir) { this.tablePath = tablePath; this.tableBucket = tableBucket; this.partition = partition; this.tableInfo = tableInfo; this.splitIndex = splitIndex; this.tieringRoundTimestamp = tieringRoundTimestamp; + this.ioTmpDir = ioTmpDir; } @Override @@ -93,4 +107,10 @@ public int splitIndex() { public long tieringRoundTimestamp() { return tieringRoundTimestamp; } + + @Nullable + @Override + public String ioTmpDir() { + return ioTmpDir; + } } diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkConnectorOptionsUtils.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkConnectorOptionsUtils.java index 249e1be089..d0287f7007 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkConnectorOptionsUtils.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkConnectorOptionsUtils.java @@ -43,6 +43,7 @@ import static org.apache.flink.configuration.CoreOptions.TMP_DIRS; import static org.apache.fluss.config.ConfigOptions.CLIENT_SCANNER_IO_TMP_DIR; +import static org.apache.fluss.config.ConfigOptions.LAKE_TIERING_IO_TMP_DIR; import static org.apache.fluss.flink.FlinkConnectorOptions.SCAN_SPLIT_ASSIGNMENT_BATCH_SIZE; import static org.apache.fluss.flink.FlinkConnectorOptions.SCAN_STARTUP_MODE; import static org.apache.fluss.flink.FlinkConnectorOptions.SCAN_STARTUP_TIMESTAMP; @@ -202,14 +203,26 @@ public static long parseTimestamp(String timestampStr, String optionKey, ZoneId public static String getClientScannerIoTmpDir( Configuration flussConf, org.apache.flink.configuration.Configuration flinkConfig) { if (!flussConf.contains(CLIENT_SCANNER_IO_TMP_DIR)) { - if (flinkConfig.contains(TMP_DIRS)) { - // pass flink io tmp dir to fluss client. - return new File(flinkConfig.get(CoreOptions.TMP_DIRS), "/fluss").getAbsolutePath(); - } + return getFlinkIoTmpDir(flinkConfig); } return flussConf.getString(CLIENT_SCANNER_IO_TMP_DIR); } + public static String getFlinkIoTmpDir( + org.apache.flink.configuration.Configuration flinkConfig) { + if (flinkConfig.contains(TMP_DIRS)) { + return new File(flinkConfig.get(CoreOptions.TMP_DIRS), "/fluss").getAbsolutePath(); + } + return System.getProperty("java.io.tmpdir") + "/fluss"; + } + + public static String getLakeTieringIoTmpDir( + Configuration flussConf, org.apache.flink.configuration.Configuration flinkConfig) { + return flussConf + .getOptional(LAKE_TIERING_IO_TMP_DIR) + .orElseGet(() -> getFlinkIoTmpDir(flinkConfig)); + } + /** Fluss startup options. * */ public static class StartupOptions { public ScanStartupMode startupMode; diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/TieringWriterInitContextTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/TieringWriterInitContextTest.java new file mode 100644 index 0000000000..3a44deefa8 --- /dev/null +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/TieringWriterInitContextTest.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.tiering.source; + +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link TieringWriterInitContext}. */ +class TieringWriterInitContextTest { + + @Test + void testIoTmpDir() { + TieringWriterInitContext defaultContext = + new TieringWriterInitContext(null, null, null, null); + TieringWriterInitContext context = + new TieringWriterInitContext(null, null, null, null, 0, 1L, "/flink_tmp/fluss"); + + assertThat(defaultContext.ioTmpDir()).isNull(); + assertThat(context.ioTmpDir()).isEqualTo("/flink_tmp/fluss"); + } +} diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkConnectorOptionsUtilTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkConnectorOptionsUtilTest.java index 0e06dc1019..600bded9f9 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkConnectorOptionsUtilTest.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkConnectorOptionsUtilTest.java @@ -27,6 +27,7 @@ import static org.apache.flink.configuration.CoreOptions.TMP_DIRS; import static org.apache.fluss.config.ConfigOptions.CLIENT_SCANNER_IO_TMP_DIR; +import static org.apache.fluss.config.ConfigOptions.LAKE_TIERING_IO_TMP_DIR; import static org.apache.fluss.flink.FlinkConnectorOptions.SCAN_SPLIT_ASSIGNMENT_BATCH_SIZE; import static org.apache.fluss.flink.FlinkConnectorOptions.SCAN_STARTUP_TIMESTAMP; import static org.apache.fluss.flink.utils.FlinkConnectorOptionsUtils.parseTimestamp; @@ -105,4 +106,34 @@ void testGetClientScannerIoTmpDir() { new Configuration(), flinkConfig)) .isEqualTo("/flink_tmp_dir/fluss"); } + + @Test + void testGetFlinkIoTmpDir() { + org.apache.flink.configuration.Configuration flinkConfig = + new org.apache.flink.configuration.Configuration().set(TMP_DIRS, "/flink_tmp_dir"); + String property = System.getProperty("java.io.tmpdir"); + + assertThat(FlinkConnectorOptionsUtils.getFlinkIoTmpDir(flinkConfig)) + .isEqualTo("/flink_tmp_dir/fluss"); + assertThat( + FlinkConnectorOptionsUtils.getFlinkIoTmpDir( + new org.apache.flink.configuration.Configuration())) + .isEqualTo(property + "/fluss"); + } + + @Test + void testGetLakeTieringIoTmpDir() { + org.apache.flink.configuration.Configuration flinkConfig = + new org.apache.flink.configuration.Configuration().set(TMP_DIRS, "/flink_tmp_dir"); + + assertThat( + FlinkConnectorOptionsUtils.getLakeTieringIoTmpDir( + new Configuration().set(LAKE_TIERING_IO_TMP_DIR, "/tiering_tmp"), + flinkConfig)) + .isEqualTo("/tiering_tmp"); + assertThat( + FlinkConnectorOptionsUtils.getLakeTieringIoTmpDir( + new Configuration(), flinkConfig)) + .isEqualTo("/flink_tmp_dir/fluss"); + } } diff --git a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/PaimonLakeWriter.java b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/PaimonLakeWriter.java index 5c2738e233..4e04ed50d5 100644 --- a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/PaimonLakeWriter.java +++ b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/PaimonLakeWriter.java @@ -71,7 +71,8 @@ public PaimonLakeWriter( writerInitContext.tableBucket(), writerInitContext.partition(), partitionKeys, - flussRowType); + flussRowType, + writerInitContext.ioTmpDir()); } @Override diff --git a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/mergetree/MergeTreeWriter.java b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/mergetree/MergeTreeWriter.java index 33abfc1b7b..ba7228b637 100644 --- a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/mergetree/MergeTreeWriter.java +++ b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/mergetree/MergeTreeWriter.java @@ -31,7 +31,6 @@ import javax.annotation.Nullable; import java.util.List; -import java.util.Map; import static org.apache.fluss.lake.paimon.tiering.PaimonLakeTieringFactory.FLUSS_LAKE_TIERING_COMMIT_USER; import static org.apache.fluss.lake.paimon.utils.PaimonConversions.toRowKind; @@ -39,9 +38,6 @@ /** A {@link RecordWriter} to write to Paimon's primary-key table. */ public class MergeTreeWriter extends RecordWriter { - // the option key to configure the temporary directory used by fluss tiering - private static final String FLUSS_TIERING_TMP_DIR_KEY = "fluss.tiering.io-tmpdir"; - private final KeyValue keyValue = new KeyValue(); private final RowKeyExtractor rowKeyExtractor; @@ -54,9 +50,19 @@ public MergeTreeWriter( @Nullable String partition, List partitionKeys, RowType flussRowType) { + this(fileStoreTable, tableBucket, partition, partitionKeys, flussRowType, null); + } + + public MergeTreeWriter( + FileStoreTable fileStoreTable, + TableBucket tableBucket, + @Nullable String partition, + List partitionKeys, + RowType flussRowType, + @Nullable String ioTmpDir) { this( fileStoreTable, - createIOManager(fileStoreTable), + createIOManager(ioTmpDir), tableBucket, partition, partitionKeys, @@ -81,15 +87,15 @@ public MergeTreeWriter( this.ioManager = ioManager; } - private static IOManager createIOManager(FileStoreTable fileStoreTable) { - // we allow users to configure the temporary directory used by fluss tiering - // since the default java.io.tmpdir may not be suitable. - // currently, we don't expose the option, as a workaround way, maybe in the future we can - // expose it if it's needed - Map props = fileStoreTable.options(); - String tmpDir = - props.getOrDefault(FLUSS_TIERING_TMP_DIR_KEY, System.getProperty("java.io.tmpdir")); - return IOManager.create(tmpDir); + private static IOManager createIOManager(@Nullable String ioTmpDir) { + return IOManager.create(getIoManagerTmpDir(ioTmpDir)); + } + + static String getIoManagerTmpDir(@Nullable String ioTmpDir) { + if (ioTmpDir != null) { + return ioTmpDir; + } + return System.getProperty("java.io.tmpdir"); } private static TableWriteImpl createTableWrite( diff --git a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/mergetree/MergeTreeWriterTest.java b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/mergetree/MergeTreeWriterTest.java new file mode 100644 index 0000000000..a9859f4022 --- /dev/null +++ b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/mergetree/MergeTreeWriterTest.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.lake.paimon.tiering.mergetree; + +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link MergeTreeWriter}. */ +class MergeTreeWriterTest { + + @Test + void testGetIoManagerTmpDir() { + assertThat(MergeTreeWriter.getIoManagerTmpDir("/flink_tmp/fluss")) + .isEqualTo("/flink_tmp/fluss"); + assertThat(MergeTreeWriter.getIoManagerTmpDir(null)) + .isEqualTo(System.getProperty("java.io.tmpdir")); + } +}