Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1964,6 +1964,163 @@ public void testAddAndRemoveServerTags() throws Exception {
+ "server tag of this server is TEMPORARY_OFFLINE.");
}

@Test
void testCreateLogTableWithLogRemoteCopyDisabled() throws Exception {
Schema logTableSchema =
Schema.newBuilder()
.column("id", DataTypes.INT())
.column("name", DataTypes.STRING())
.column("age", DataTypes.INT())
.build();
TablePath tablePath = TablePath.of("test_db", "test_log_table_remote_copy_disabled");
TableDescriptor tableDescriptor =
TableDescriptor.builder()
.schema(logTableSchema)
.comment("test log table with remote copy disabled")
.distributedBy(3)
.property(ConfigOptions.TABLE_LOG_REMOTE_COPY_ENABLED.key(), "false")
.build();

admin.createTable(tablePath, tableDescriptor, false).get();

TableInfo tableInfo = admin.getTableInfo(tablePath).get();
assertThat(tableInfo.toTableDescriptor().getProperties())
.containsEntry(ConfigOptions.TABLE_LOG_REMOTE_COPY_ENABLED.key(), "false");
assertThat(tableInfo.getTableConfig().isLogRemoteCopyEnabled()).isFalse();
}

@Test
void testCreateLogTableWithoutLogRemoteCopyPropertyUsesDefault() throws Exception {
Schema logTableSchema =
Schema.newBuilder()
.column("id", DataTypes.INT())
.column("name", DataTypes.STRING())
.column("age", DataTypes.INT())
.build();
TablePath tablePath = TablePath.of("test_db", "test_log_table_remote_copy_default");
TableDescriptor tableDescriptor =
TableDescriptor.builder()
.schema(logTableSchema)
.comment("test log table with default remote copy")
.distributedBy(3)
.build();

admin.createTable(tablePath, tableDescriptor, false).get();

TableInfo tableInfo = admin.getTableInfo(tablePath).get();
assertThat(tableInfo.toTableDescriptor().getProperties())
.doesNotContainKey(ConfigOptions.TABLE_LOG_REMOTE_COPY_ENABLED.key());
assertThat(tableInfo.getTableConfig().isLogRemoteCopyEnabled()).isTrue();
}

@Test
void testCreatePrimaryKeyTableWithLogRemoteCopyEnabledRejected() {
TablePath tablePath = TablePath.of("test_db", "test_pk_table_remote_copy_true");
TableDescriptor tableDescriptor =
TableDescriptor.builder()
.schema(DEFAULT_SCHEMA)
.comment("test primary key table with remote copy enabled")
.distributedBy(3, "id")
.property(ConfigOptions.TABLE_LOG_REMOTE_COPY_ENABLED.key(), "true")
.build();

assertThatThrownBy(() -> admin.createTable(tablePath, tableDescriptor, false).get())
.cause()
.isInstanceOf(InvalidConfigException.class)
.hasMessageContaining(ConfigOptions.TABLE_LOG_REMOTE_COPY_ENABLED.key())
.hasMessageContaining("only supported for LogTable");
}

@Test
void testCreatePrimaryKeyTableWithLogRemoteCopyDisabledRejected() {
TablePath tablePath = TablePath.of("test_db", "test_pk_table_remote_copy_false");
TableDescriptor tableDescriptor =
TableDescriptor.builder()
.schema(DEFAULT_SCHEMA)
.comment("test primary key table with remote copy disabled")
.distributedBy(3, "id")
.property(ConfigOptions.TABLE_LOG_REMOTE_COPY_ENABLED.key(), "false")
.build();

assertThatThrownBy(() -> admin.createTable(tablePath, tableDescriptor, false).get())
.cause()
.isInstanceOf(InvalidConfigException.class)
.hasMessageContaining(ConfigOptions.TABLE_LOG_REMOTE_COPY_ENABLED.key())
.hasMessageContaining("only supported for LogTable");
}

@Test
void testAlterTableSetLogRemoteCopyEnabledRejected() throws Exception {
Schema logTableSchema =
Schema.newBuilder()
.column("id", DataTypes.INT())
.column("name", DataTypes.STRING())
.column("age", DataTypes.INT())
.build();
TablePath tablePath = TablePath.of("test_db", "test_alter_set_log_remote_copy");
TableDescriptor tableDescriptor =
TableDescriptor.builder()
.schema(logTableSchema)
.comment("test alter set log remote copy")
.distributedBy(3)
.build();
admin.createTable(tablePath, tableDescriptor, false).get();

assertThatThrownBy(
() ->
admin.alterTable(
tablePath,
Collections.singletonList(
TableChange.set(
ConfigOptions
.TABLE_LOG_REMOTE_COPY_ENABLED
.key(),
"false")),
false)
.get())
.cause()
.hasMessageContaining(
"The following options are not supported to alter yet: '"
+ ConfigOptions.TABLE_LOG_REMOTE_COPY_ENABLED.key()
+ "'.");
}

@Test
void testAlterTableResetLogRemoteCopyEnabledRejected() throws Exception {
Schema logTableSchema =
Schema.newBuilder()
.column("id", DataTypes.INT())
.column("name", DataTypes.STRING())
.column("age", DataTypes.INT())
.build();
TablePath tablePath = TablePath.of("test_db", "test_alter_reset_log_remote_copy");
TableDescriptor tableDescriptor =
TableDescriptor.builder()
.schema(logTableSchema)
.comment("test alter reset log remote copy")
.distributedBy(3)
.property(ConfigOptions.TABLE_LOG_REMOTE_COPY_ENABLED.key(), "false")
.build();
admin.createTable(tablePath, tableDescriptor, false).get();

assertThatThrownBy(
() ->
admin.alterTable(
tablePath,
Collections.singletonList(
TableChange.reset(
ConfigOptions
.TABLE_LOG_REMOTE_COPY_ENABLED
.key())),
false)
.get())
.cause()
.hasMessageContaining(
"The following options are not supported to alter yet: '"
+ ConfigOptions.TABLE_LOG_REMOTE_COPY_ENABLED.key()
+ "'.");
}

@Test
void testAlterTableTieredLogLocalSegments() throws Exception {
// 1. Create table with default config = 2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1637,6 +1637,16 @@ public class ConfigOptions {
"The number of log segments to retain in local for each table when log tiered storage is enabled. "
+ "It must be greater that 0. The default is 2.");

public static final ConfigOption<Boolean> TABLE_LOG_REMOTE_COPY_ENABLED =
key("table.log.remote-copy.enabled")
.booleanType()
.defaultValue(true)
.withDescription(
"Whether remote log segment upload is enabled for the LogTable. "
+ "If false, the LogTable does not upload log segments to remote storage. "
+ "Cluster remote storage configuration is still required. "
+ "This option is only supported for LogTable and is unsupported for primary key tables.");

public static final ConfigOption<Boolean> TABLE_DATALAKE_ENABLED =
key("table.datalake.enabled")
.booleanType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,11 @@ public int getTieredLogLocalSegments() {
return config.get(ConfigOptions.TABLE_TIERED_LOG_LOCAL_SEGMENTS);
}

/** Whether remote log segment upload is enabled for the log table. */
public boolean isLogRemoteCopyEnabled() {
return config.get(ConfigOptions.TABLE_LOG_REMOTE_COPY_ENABLED);
}

/** Whether the data lake is enabled. */
public boolean isDataLakeEnabled() {
return config.get(ConfigOptions.TABLE_DATALAKE_ENABLED);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,21 @@ void testCreateNoPkTable() throws Exception {
assertOptionsEqual(table.getOptions(), expectedOptions);
}

@Test
void testShowCreateTableRetainsExplicitLogRemoteCopyDisabled() {
tEnv.executeSql(
"create table log_table_remote_copy_disabled(a int, b int) with ("
+ "'table.log.remote-copy.enabled' = 'false')");

CloseableIterator<Row> showCreateResult =
tEnv.executeSql("show create table log_table_remote_copy_disabled").collect();
List<Row> showCreateRows = CollectionUtil.iteratorToList(showCreateResult);

assertThat(showCreateRows).hasSize(1);
String showCreateStatement = (String) showCreateRows.get(0).getField(0);
assertThat(showCreateStatement).contains("'table.log.remote-copy.enabled' = 'false'");
}

@Test
void testPartitionedTable() throws Exception {
ObjectPath objectPath = new ObjectPath(DEFAULT_DB, "test_partitioned_table");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ public FsPath remoteLogDir() {

/** Register the replica to the remote log manager. */
public void registerReplica(Replica replica) throws Exception {
if (remoteDisabled()) {
if (remoteDisabled() || !replica.isLogRemoteCopyEnabled()) {
return;
}
TableBucket tableBucket = replica.getTableBucket();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,10 @@ public boolean isDataLakeEnabled() {
return tableConfig.isDataLakeEnabled();
}

public boolean isLogRemoteCopyEnabled() {
return tableConfig.isLogRemoteCopyEnabled();
}

public long getLocalLogStartOffset() {
return logTablet.localLogStartOffset();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ public static void validateTableDescriptor(
checkArrowCompression(tableConf);
checkMergeEngine(tableConf, hasPrimaryKey, schema);
checkDeleteBehavior(tableConf, hasPrimaryKey);
checkLogRemoteCopy(tableConf, hasPrimaryKey);
checkTieredLog(tableConf);
checkPartition(tableConf, tableDescriptor.getPartitionKeys(), schema.getRowType());
checkSystemColumns(schema.getRowType());
Expand Down Expand Up @@ -417,6 +418,16 @@ private static void validateAggregationFunctionParameters(Schema schema) {
}
}

private static void checkLogRemoteCopy(Configuration tableConf, boolean hasPrimaryKey) {
if (hasPrimaryKey
&& tableConf.containsKey(ConfigOptions.TABLE_LOG_REMOTE_COPY_ENABLED.key())) {
throw new InvalidConfigException(
String.format(
"'%s' is only supported for LogTable and unsupported for primary key tables.",
ConfigOptions.TABLE_LOG_REMOTE_COPY_ENABLED.key()));
}
}

private static void checkTieredLog(Configuration tableConf) {
if (tableConf.get(ConfigOptions.TABLE_TIERED_LOG_LOCAL_SEGMENTS) <= 0) {
throw new InvalidConfigException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.fluss.server.log.remote;

import org.apache.fluss.config.ConfigOptions;
import org.apache.fluss.config.Configuration;
import org.apache.fluss.exception.NotLeaderOrFollowerException;
import org.apache.fluss.fs.FsPath;
import org.apache.fluss.metadata.TableBucket;
Expand Down Expand Up @@ -431,6 +432,64 @@ void testCleanupLocalSegments(boolean partitionTable) throws Exception {
assertThat(logTablet.getSegments()).hasSize(2);
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
void testLogRemoteCopyDisabledTableDoesNotUploadRemoteSegments(boolean partitionedTable)
throws Exception {
long tableId =
registerTableInZkClient(
DATA1_TABLE_PATH,
DATA1_SCHEMA,
200L,
Collections.emptyList(),
Collections.singletonMap("table.log.remote-copy.enabled", "false"));
TableBucket tb = makeTableBucket(tableId, partitionedTable);

makeLogTableAsLeader(tb, partitionedTable);
Replica replica = replicaManager.getReplicaOrException(tb);
LogTablet logTablet = replica.getLogTablet();
addMultiSegmentsToLogTablet(logTablet, 5);
remoteLogTaskScheduler.triggerPeriodicScheduledTasks();

assertThat(remoteLogManager.getTaskWithFuture(tb)).isNull();
assertThatThrownBy(() -> remoteLogManager.remoteLogTablet(tb))
.isInstanceOf(IllegalStateException.class)
.hasMessageContaining("RemoteLogTablet can't be found for table-bucket " + tb);
FsPath remoteLogTabletDir =
remoteLogTabletDir(
remoteLogDir(conf),
partitionedTable
? DATA1_PHYSICAL_TABLE_PATH_PA_2024
: DATA1_PHYSICAL_TABLE_PATH,
tb);
assertThat(remoteLogTabletDir.getFileSystem().exists(remoteLogTabletDir)).isFalse();
assertThat(logTablet.getSegments()).hasSize(5);
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
void testLogRemoteCopyDefaultEnabledTableUploadsRemoteSegments(boolean partitionedTable)
throws Exception {
long tableId =
registerTableInZkClient(
DATA1_TABLE_PATH,
DATA1_SCHEMA,
201L,
Collections.emptyList(),
Collections.emptyMap());
TableBucket tb = makeTableBucket(tableId, partitionedTable);

makeLogTableAsLeader(tb, partitionedTable);
LogTablet logTablet = replicaManager.getReplicaOrException(tb).getLogTablet();
addMultiSegmentsToLogTablet(logTablet, 5);
remoteLogTaskScheduler.triggerPeriodicScheduledTasks();

assertThat(remoteLogManager.getTaskWithFuture(tb)).isNotNull();
assertThat(remoteLogManager.relevantRemoteLogSegments(tb, 0L)).hasSize(4);
assertThat(listRemoteLogFiles(tb)).hasSize(4);
assertThat(logTablet.getSegments()).hasSize(2);
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
void testConfigureTieredLogLocalSegments(boolean partitionedTable) throws Exception {
Expand Down Expand Up @@ -785,3 +844,60 @@ void testRemoteIndexCacheFollowsReplicaDirectory() throws Exception {
.noneMatch(name -> name.contains(segment1.remoteLogSegmentId().toString()));
}
}

class RemoteLogManagerClusterDisabledTest extends RemoteLogTestBase {

@Override
public Configuration getServerConf() {
Configuration conf = super.getServerConf();
conf.set(ConfigOptions.REMOTE_LOG_TASK_INTERVAL_DURATION, Duration.ZERO);
return conf;
}

@BeforeEach
public void setup() throws Exception {
super.setup();
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
void testClusterDisabledPreventsUploadsWhenTableRemoteCopyExplicitlyEnabled(
boolean partitionedTable) throws Exception {
long tableId =
registerTableInZkClient(
DATA1_TABLE_PATH,
DATA1_SCHEMA,
202L,
Collections.emptyList(),
Collections.singletonMap("table.log.remote-copy.enabled", "true"));
TableBucket tb = makeTableBucket(tableId, partitionedTable);

makeLogTableAsLeader(tb, partitionedTable);
Replica replica = replicaManager.getReplicaOrException(tb);
LogTablet logTablet = replica.getLogTablet();
addMultiSegmentsToLogTablet(logTablet, 5);
remoteLogTaskScheduler.triggerPeriodicScheduledTasks();

assertThat(remoteLogManager.getTaskWithFuture(tb)).isNull();
assertThatThrownBy(() -> remoteLogManager.remoteLogTablet(tb))
.isInstanceOf(IllegalStateException.class)
.hasMessageContaining("RemoteLogTablet can't be found for table-bucket " + tb);
FsPath remoteLogTabletDir =
remoteLogTabletDir(
remoteLogDir(conf),
partitionedTable
? DATA1_PHYSICAL_TABLE_PATH_PA_2024
: DATA1_PHYSICAL_TABLE_PATH,
tb);
assertThat(remoteLogTabletDir.getFileSystem().exists(remoteLogTabletDir)).isFalse();
assertThat(logTablet.getSegments()).hasSize(5);
}

private TableBucket makeTableBucket(long tableId, boolean partitionedTable) {
if (partitionedTable) {
return new TableBucket(tableId, 0L, 0);
} else {
return new TableBucket(tableId, 0);
}
}
}
Loading