From e8dfbab749bbae82d1393c827bb493a15283369c Mon Sep 17 00:00:00 2001 From: Yang Guo Date: Thu, 2 Jul 2026 16:55:28 +0800 Subject: [PATCH] feat: support table-level control for LogTable remote log copy --- .../fluss/client/admin/FlussAdminITCase.java | 157 ++++++++++++++++++ .../apache/fluss/config/ConfigOptions.java | 10 ++ .../org/apache/fluss/config/TableConfig.java | 5 + .../flink/catalog/FlinkCatalogITCase.java | 15 ++ .../server/log/remote/RemoteLogManager.java | 2 +- .../apache/fluss/server/replica/Replica.java | 4 + .../utils/TableDescriptorValidation.java | 11 ++ .../log/remote/RemoteLogManagerTest.java | 116 +++++++++++++ 8 files changed, 319 insertions(+), 1 deletion(-) diff --git a/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java b/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java index 68181844ea..22878e2c1e 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java @@ -1964,6 +1964,163 @@ public void testAddAndRemoveServerTags() throws Exception { + "server tag of this server is TEMPORARY_OFFLINE."); } + @Test + void testCreateLogTableWithLogRemoteCopyDisabled() throws Exception { + Schema logTableSchema = + Schema.newBuilder() + .column("id", DataTypes.INT()) + .column("name", DataTypes.STRING()) + .column("age", DataTypes.INT()) + .build(); + TablePath tablePath = TablePath.of("test_db", "test_log_table_remote_copy_disabled"); + TableDescriptor tableDescriptor = + TableDescriptor.builder() + .schema(logTableSchema) + .comment("test log table with remote copy disabled") + .distributedBy(3) + .property(ConfigOptions.TABLE_LOG_REMOTE_COPY_ENABLED.key(), "false") + .build(); + + admin.createTable(tablePath, tableDescriptor, false).get(); + + TableInfo tableInfo = admin.getTableInfo(tablePath).get(); + assertThat(tableInfo.toTableDescriptor().getProperties()) + .containsEntry(ConfigOptions.TABLE_LOG_REMOTE_COPY_ENABLED.key(), "false"); + assertThat(tableInfo.getTableConfig().isLogRemoteCopyEnabled()).isFalse(); + } + + @Test + void testCreateLogTableWithoutLogRemoteCopyPropertyUsesDefault() throws Exception { + Schema logTableSchema = + Schema.newBuilder() + .column("id", DataTypes.INT()) + .column("name", DataTypes.STRING()) + .column("age", DataTypes.INT()) + .build(); + TablePath tablePath = TablePath.of("test_db", "test_log_table_remote_copy_default"); + TableDescriptor tableDescriptor = + TableDescriptor.builder() + .schema(logTableSchema) + .comment("test log table with default remote copy") + .distributedBy(3) + .build(); + + admin.createTable(tablePath, tableDescriptor, false).get(); + + TableInfo tableInfo = admin.getTableInfo(tablePath).get(); + assertThat(tableInfo.toTableDescriptor().getProperties()) + .doesNotContainKey(ConfigOptions.TABLE_LOG_REMOTE_COPY_ENABLED.key()); + assertThat(tableInfo.getTableConfig().isLogRemoteCopyEnabled()).isTrue(); + } + + @Test + void testCreatePrimaryKeyTableWithLogRemoteCopyEnabledRejected() { + TablePath tablePath = TablePath.of("test_db", "test_pk_table_remote_copy_true"); + TableDescriptor tableDescriptor = + TableDescriptor.builder() + .schema(DEFAULT_SCHEMA) + .comment("test primary key table with remote copy enabled") + .distributedBy(3, "id") + .property(ConfigOptions.TABLE_LOG_REMOTE_COPY_ENABLED.key(), "true") + .build(); + + assertThatThrownBy(() -> admin.createTable(tablePath, tableDescriptor, false).get()) + .cause() + .isInstanceOf(InvalidConfigException.class) + .hasMessageContaining(ConfigOptions.TABLE_LOG_REMOTE_COPY_ENABLED.key()) + .hasMessageContaining("only supported for LogTable"); + } + + @Test + void testCreatePrimaryKeyTableWithLogRemoteCopyDisabledRejected() { + TablePath tablePath = TablePath.of("test_db", "test_pk_table_remote_copy_false"); + TableDescriptor tableDescriptor = + TableDescriptor.builder() + .schema(DEFAULT_SCHEMA) + .comment("test primary key table with remote copy disabled") + .distributedBy(3, "id") + .property(ConfigOptions.TABLE_LOG_REMOTE_COPY_ENABLED.key(), "false") + .build(); + + assertThatThrownBy(() -> admin.createTable(tablePath, tableDescriptor, false).get()) + .cause() + .isInstanceOf(InvalidConfigException.class) + .hasMessageContaining(ConfigOptions.TABLE_LOG_REMOTE_COPY_ENABLED.key()) + .hasMessageContaining("only supported for LogTable"); + } + + @Test + void testAlterTableSetLogRemoteCopyEnabledRejected() throws Exception { + Schema logTableSchema = + Schema.newBuilder() + .column("id", DataTypes.INT()) + .column("name", DataTypes.STRING()) + .column("age", DataTypes.INT()) + .build(); + TablePath tablePath = TablePath.of("test_db", "test_alter_set_log_remote_copy"); + TableDescriptor tableDescriptor = + TableDescriptor.builder() + .schema(logTableSchema) + .comment("test alter set log remote copy") + .distributedBy(3) + .build(); + admin.createTable(tablePath, tableDescriptor, false).get(); + + assertThatThrownBy( + () -> + admin.alterTable( + tablePath, + Collections.singletonList( + TableChange.set( + ConfigOptions + .TABLE_LOG_REMOTE_COPY_ENABLED + .key(), + "false")), + false) + .get()) + .cause() + .hasMessageContaining( + "The following options are not supported to alter yet: '" + + ConfigOptions.TABLE_LOG_REMOTE_COPY_ENABLED.key() + + "'."); + } + + @Test + void testAlterTableResetLogRemoteCopyEnabledRejected() throws Exception { + Schema logTableSchema = + Schema.newBuilder() + .column("id", DataTypes.INT()) + .column("name", DataTypes.STRING()) + .column("age", DataTypes.INT()) + .build(); + TablePath tablePath = TablePath.of("test_db", "test_alter_reset_log_remote_copy"); + TableDescriptor tableDescriptor = + TableDescriptor.builder() + .schema(logTableSchema) + .comment("test alter reset log remote copy") + .distributedBy(3) + .property(ConfigOptions.TABLE_LOG_REMOTE_COPY_ENABLED.key(), "false") + .build(); + admin.createTable(tablePath, tableDescriptor, false).get(); + + assertThatThrownBy( + () -> + admin.alterTable( + tablePath, + Collections.singletonList( + TableChange.reset( + ConfigOptions + .TABLE_LOG_REMOTE_COPY_ENABLED + .key())), + false) + .get()) + .cause() + .hasMessageContaining( + "The following options are not supported to alter yet: '" + + ConfigOptions.TABLE_LOG_REMOTE_COPY_ENABLED.key() + + "'."); + } + @Test void testAlterTableTieredLogLocalSegments() throws Exception { // 1. Create table with default config = 2 diff --git a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java index 0301d3d47d..4925ed5a30 100644 --- a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java +++ b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java @@ -1637,6 +1637,16 @@ public class ConfigOptions { "The number of log segments to retain in local for each table when log tiered storage is enabled. " + "It must be greater that 0. The default is 2."); + public static final ConfigOption TABLE_LOG_REMOTE_COPY_ENABLED = + key("table.log.remote-copy.enabled") + .booleanType() + .defaultValue(true) + .withDescription( + "Whether remote log segment upload is enabled for the LogTable. " + + "If false, the LogTable does not upload log segments to remote storage. " + + "Cluster remote storage configuration is still required. " + + "This option is only supported for LogTable and is unsupported for primary key tables."); + public static final ConfigOption TABLE_DATALAKE_ENABLED = key("table.datalake.enabled") .booleanType() diff --git a/fluss-common/src/main/java/org/apache/fluss/config/TableConfig.java b/fluss-common/src/main/java/org/apache/fluss/config/TableConfig.java index fbf8c77264..638f8d051c 100644 --- a/fluss-common/src/main/java/org/apache/fluss/config/TableConfig.java +++ b/fluss-common/src/main/java/org/apache/fluss/config/TableConfig.java @@ -95,6 +95,11 @@ public int getTieredLogLocalSegments() { return config.get(ConfigOptions.TABLE_TIERED_LOG_LOCAL_SEGMENTS); } + /** Whether remote log segment upload is enabled for the log table. */ + public boolean isLogRemoteCopyEnabled() { + return config.get(ConfigOptions.TABLE_LOG_REMOTE_COPY_ENABLED); + } + /** Whether the data lake is enabled. */ public boolean isDataLakeEnabled() { return config.get(ConfigOptions.TABLE_DATALAKE_ENABLED); diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java index 54942d7d89..679e5078f9 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java @@ -348,6 +348,21 @@ void testCreateNoPkTable() throws Exception { assertOptionsEqual(table.getOptions(), expectedOptions); } + @Test + void testShowCreateTableRetainsExplicitLogRemoteCopyDisabled() { + tEnv.executeSql( + "create table log_table_remote_copy_disabled(a int, b int) with (" + + "'table.log.remote-copy.enabled' = 'false')"); + + CloseableIterator showCreateResult = + tEnv.executeSql("show create table log_table_remote_copy_disabled").collect(); + List showCreateRows = CollectionUtil.iteratorToList(showCreateResult); + + assertThat(showCreateRows).hasSize(1); + String showCreateStatement = (String) showCreateRows.get(0).getField(0); + assertThat(showCreateStatement).contains("'table.log.remote-copy.enabled' = 'false'"); + } + @Test void testPartitionedTable() throws Exception { ObjectPath objectPath = new ObjectPath(DEFAULT_DB, "test_partitioned_table"); diff --git a/fluss-server/src/main/java/org/apache/fluss/server/log/remote/RemoteLogManager.java b/fluss-server/src/main/java/org/apache/fluss/server/log/remote/RemoteLogManager.java index 143ae251a7..b68ed41e82 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/log/remote/RemoteLogManager.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/log/remote/RemoteLogManager.java @@ -145,7 +145,7 @@ public FsPath remoteLogDir() { /** Register the replica to the remote log manager. */ public void registerReplica(Replica replica) throws Exception { - if (remoteDisabled()) { + if (remoteDisabled() || !replica.isLogRemoteCopyEnabled()) { return; } TableBucket tableBucket = replica.getTableBucket(); diff --git a/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java b/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java index e11dd69c14..2b2f823369 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java @@ -360,6 +360,10 @@ public boolean isDataLakeEnabled() { return tableConfig.isDataLakeEnabled(); } + public boolean isLogRemoteCopyEnabled() { + return tableConfig.isLogRemoteCopyEnabled(); + } + public long getLocalLogStartOffset() { return logTablet.localLogStartOffset(); } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java b/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java index 949afd8c67..f808222834 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java @@ -122,6 +122,7 @@ public static void validateTableDescriptor( checkArrowCompression(tableConf); checkMergeEngine(tableConf, hasPrimaryKey, schema); checkDeleteBehavior(tableConf, hasPrimaryKey); + checkLogRemoteCopy(tableConf, hasPrimaryKey); checkTieredLog(tableConf); checkPartition(tableConf, tableDescriptor.getPartitionKeys(), schema.getRowType()); checkSystemColumns(schema.getRowType()); @@ -417,6 +418,16 @@ private static void validateAggregationFunctionParameters(Schema schema) { } } + private static void checkLogRemoteCopy(Configuration tableConf, boolean hasPrimaryKey) { + if (hasPrimaryKey + && tableConf.containsKey(ConfigOptions.TABLE_LOG_REMOTE_COPY_ENABLED.key())) { + throw new InvalidConfigException( + String.format( + "'%s' is only supported for LogTable and unsupported for primary key tables.", + ConfigOptions.TABLE_LOG_REMOTE_COPY_ENABLED.key())); + } + } + private static void checkTieredLog(Configuration tableConf) { if (tableConf.get(ConfigOptions.TABLE_TIERED_LOG_LOCAL_SEGMENTS) <= 0) { throw new InvalidConfigException( diff --git a/fluss-server/src/test/java/org/apache/fluss/server/log/remote/RemoteLogManagerTest.java b/fluss-server/src/test/java/org/apache/fluss/server/log/remote/RemoteLogManagerTest.java index 2944f1c663..2ed6e86093 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/log/remote/RemoteLogManagerTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/log/remote/RemoteLogManagerTest.java @@ -18,6 +18,7 @@ package org.apache.fluss.server.log.remote; import org.apache.fluss.config.ConfigOptions; +import org.apache.fluss.config.Configuration; import org.apache.fluss.exception.NotLeaderOrFollowerException; import org.apache.fluss.fs.FsPath; import org.apache.fluss.metadata.TableBucket; @@ -431,6 +432,64 @@ void testCleanupLocalSegments(boolean partitionTable) throws Exception { assertThat(logTablet.getSegments()).hasSize(2); } + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testLogRemoteCopyDisabledTableDoesNotUploadRemoteSegments(boolean partitionedTable) + throws Exception { + long tableId = + registerTableInZkClient( + DATA1_TABLE_PATH, + DATA1_SCHEMA, + 200L, + Collections.emptyList(), + Collections.singletonMap("table.log.remote-copy.enabled", "false")); + TableBucket tb = makeTableBucket(tableId, partitionedTable); + + makeLogTableAsLeader(tb, partitionedTable); + Replica replica = replicaManager.getReplicaOrException(tb); + LogTablet logTablet = replica.getLogTablet(); + addMultiSegmentsToLogTablet(logTablet, 5); + remoteLogTaskScheduler.triggerPeriodicScheduledTasks(); + + assertThat(remoteLogManager.getTaskWithFuture(tb)).isNull(); + assertThatThrownBy(() -> remoteLogManager.remoteLogTablet(tb)) + .isInstanceOf(IllegalStateException.class) + .hasMessageContaining("RemoteLogTablet can't be found for table-bucket " + tb); + FsPath remoteLogTabletDir = + remoteLogTabletDir( + remoteLogDir(conf), + partitionedTable + ? DATA1_PHYSICAL_TABLE_PATH_PA_2024 + : DATA1_PHYSICAL_TABLE_PATH, + tb); + assertThat(remoteLogTabletDir.getFileSystem().exists(remoteLogTabletDir)).isFalse(); + assertThat(logTablet.getSegments()).hasSize(5); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testLogRemoteCopyDefaultEnabledTableUploadsRemoteSegments(boolean partitionedTable) + throws Exception { + long tableId = + registerTableInZkClient( + DATA1_TABLE_PATH, + DATA1_SCHEMA, + 201L, + Collections.emptyList(), + Collections.emptyMap()); + TableBucket tb = makeTableBucket(tableId, partitionedTable); + + makeLogTableAsLeader(tb, partitionedTable); + LogTablet logTablet = replicaManager.getReplicaOrException(tb).getLogTablet(); + addMultiSegmentsToLogTablet(logTablet, 5); + remoteLogTaskScheduler.triggerPeriodicScheduledTasks(); + + assertThat(remoteLogManager.getTaskWithFuture(tb)).isNotNull(); + assertThat(remoteLogManager.relevantRemoteLogSegments(tb, 0L)).hasSize(4); + assertThat(listRemoteLogFiles(tb)).hasSize(4); + assertThat(logTablet.getSegments()).hasSize(2); + } + @ParameterizedTest @ValueSource(booleans = {true, false}) void testConfigureTieredLogLocalSegments(boolean partitionedTable) throws Exception { @@ -785,3 +844,60 @@ void testRemoteIndexCacheFollowsReplicaDirectory() throws Exception { .noneMatch(name -> name.contains(segment1.remoteLogSegmentId().toString())); } } + +class RemoteLogManagerClusterDisabledTest extends RemoteLogTestBase { + + @Override + public Configuration getServerConf() { + Configuration conf = super.getServerConf(); + conf.set(ConfigOptions.REMOTE_LOG_TASK_INTERVAL_DURATION, Duration.ZERO); + return conf; + } + + @BeforeEach + public void setup() throws Exception { + super.setup(); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testClusterDisabledPreventsUploadsWhenTableRemoteCopyExplicitlyEnabled( + boolean partitionedTable) throws Exception { + long tableId = + registerTableInZkClient( + DATA1_TABLE_PATH, + DATA1_SCHEMA, + 202L, + Collections.emptyList(), + Collections.singletonMap("table.log.remote-copy.enabled", "true")); + TableBucket tb = makeTableBucket(tableId, partitionedTable); + + makeLogTableAsLeader(tb, partitionedTable); + Replica replica = replicaManager.getReplicaOrException(tb); + LogTablet logTablet = replica.getLogTablet(); + addMultiSegmentsToLogTablet(logTablet, 5); + remoteLogTaskScheduler.triggerPeriodicScheduledTasks(); + + assertThat(remoteLogManager.getTaskWithFuture(tb)).isNull(); + assertThatThrownBy(() -> remoteLogManager.remoteLogTablet(tb)) + .isInstanceOf(IllegalStateException.class) + .hasMessageContaining("RemoteLogTablet can't be found for table-bucket " + tb); + FsPath remoteLogTabletDir = + remoteLogTabletDir( + remoteLogDir(conf), + partitionedTable + ? DATA1_PHYSICAL_TABLE_PATH_PA_2024 + : DATA1_PHYSICAL_TABLE_PATH, + tb); + assertThat(remoteLogTabletDir.getFileSystem().exists(remoteLogTabletDir)).isFalse(); + assertThat(logTablet.getSegments()).hasSize(5); + } + + private TableBucket makeTableBucket(long tableId, boolean partitionedTable) { + if (partitionedTable) { + return new TableBucket(tableId, 0L, 0); + } else { + return new TableBucket(tableId, 0); + } + } +}