diff --git a/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java b/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java index 68181844ea..4291d000cf 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java @@ -83,6 +83,7 @@ import org.apache.fluss.server.metadata.ServerInfo; import org.apache.fluss.server.replica.Replica; import org.apache.fluss.server.tablet.TestTabletServerGateway; +import org.apache.fluss.server.testutils.FlussClusterExtension; import org.apache.fluss.server.zk.ZooKeeperClient; import org.apache.fluss.server.zk.data.ServerTags; import org.apache.fluss.types.DataTypeChecks; @@ -1830,6 +1831,43 @@ public void testBucketLimitForNonPartitionedTable() throws Exception { .hasMessageContaining("exceeds the maximum limit"); } + @Test + public void testDefaultBucketLimitForNonPartitionedTable() throws Exception { + assertThat(ConfigOptions.MAX_BUCKET_NUM.defaultValue()).isEqualTo(20000); + + FlussClusterExtension defaultLimitCluster = + FlussClusterExtension.builder().setNumOfTabletServers(1).build(); + defaultLimitCluster.start(); + + try (Connection connection = + ConnectionFactory.createConnection(defaultLimitCluster.getClientConfig()); + Admin defaultLimitAdmin = connection.getAdmin()) { + String dbName = "test_default_bucket_limit_db"; + TablePath tablePath = TablePath.of(dbName, "test_too_many_buckets_by_default"); + TableDescriptor nonPartitionedTable = + TableDescriptor.builder() + .schema( + Schema.newBuilder() + .column("id", DataTypes.STRING()) + .column("name", DataTypes.STRING()) + .build()) + .distributedBy(30000, "id") + .build(); + + defaultLimitAdmin.createDatabase(dbName, DatabaseDescriptor.EMPTY, false).get(); + assertThatThrownBy( + () -> + defaultLimitAdmin + .createTable(tablePath, nonPartitionedTable, false) + .get()) + .cause() + .isInstanceOf(TooManyBucketsException.class) + .hasMessageContaining("Bucket count 30000 exceeds the maximum limit 20000."); + } finally { + defaultLimitCluster.close(); + } + } + /** Test that creating a table with system columns throws InvalidTableException. */ @Test public void testSystemsColumns() throws Exception { diff --git a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java index 0301d3d47d..383510eea8 100644 --- a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java +++ b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java @@ -290,10 +290,12 @@ public class ConfigOptions { public static final ConfigOption MAX_BUCKET_NUM = key("max.bucket.num") .intType() - .defaultValue(128000) + .defaultValue(20000) .withDescription( - "The maximum number of buckets that can be created for a table." - + "The default value is 128000"); + "The maximum number of buckets that can be created for a table. " + + "The default value is 20000. " + + "This default is capped to reduce the risk that the table assignment znode exceeds " + + "ZooKeeper's packet size limit."); /** * The network address and port the server binds to for accepting connections. diff --git a/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperUtils.java b/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperUtils.java index 61336ef8fb..4fffd5d06c 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperUtils.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperUtils.java @@ -111,7 +111,6 @@ public static ZooKeeperClient startZookeeperClient( if (configPath.isPresent()) { try { zkClientConfig = new ZKClientConfig(configPath.get()); - curatorFrameworkBuilder.zkClientConfig(zkClientConfig); } catch (QuorumPeerConfig.ConfigException e) { LOG.warn("Fail to load zookeeper client config from path {}", configPath.get(), e); throw new RuntimeException( @@ -127,6 +126,7 @@ public static ZooKeeperClient startZookeeperClient( zkClientConfig.setProperty( JUTE_MAXBUFFER, String.valueOf(configuration.getInt(ConfigOptions.ZOOKEEPER_MAX_BUFFER_SIZE))); + curatorFrameworkBuilder.zkClientConfig(zkClientConfig); return new ZooKeeperClient( startZookeeperClient(curatorFrameworkBuilder, fatalErrorHandler), configuration); diff --git a/fluss-server/src/test/java/org/apache/fluss/server/zk/ZooKeeperClientTest.java b/fluss-server/src/test/java/org/apache/fluss/server/zk/ZooKeeperClientTest.java index a94cf8e0d2..a7fd77adaa 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/zk/ZooKeeperClientTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/zk/ZooKeeperClientTest.java @@ -51,7 +51,6 @@ import org.apache.fluss.testutils.common.AllCallbackWrapper; import org.apache.fluss.types.DataTypes; import org.apache.fluss.utils.types.Tuple2; - import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; @@ -74,6 +73,7 @@ import static org.apache.fluss.cluster.rebalance.RebalanceStatus.COMPLETED; import static org.apache.fluss.cluster.rebalance.RebalanceStatus.NOT_STARTED; import static org.apache.fluss.server.utils.TableAssignmentUtils.generateAssignment; +import static org.apache.fluss.shaded.zookeeper3.org.apache.zookeeper.common.ZKConfig.JUTE_MAXBUFFER; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -742,6 +742,25 @@ void testZookeeperConfigPath() throws Exception { } } + @Test + void testZookeeperMaxBufferSizeWithoutConfigPath() throws Exception { + final Configuration config = new Configuration(); + config.setString( + ConfigOptions.ZOOKEEPER_ADDRESS, + ZOO_KEEPER_EXTENSION_WRAPPER.getCustomExtension().getConnectString()); + config.set(ConfigOptions.REMOTE_DATA_DIR, remoteDataDir); + config.set(ConfigOptions.ZOOKEEPER_MAX_BUFFER_SIZE, 2 * 1024 * 1024); + + try (ZooKeeperClient zookeeperClient = + ZooKeeperUtils.startZookeeperClient(config, NOPErrorHandler.INSTANCE); + CuratorFramework curatorClient = zookeeperClient.getCuratorClient(); + CuratorZookeeperClient curatorZookeeperClient = curatorClient.getZookeeperClient(); + ZooKeeper zooKeeper = curatorZookeeperClient.getZooKeeper()) { + ZKClientConfig clientConfig = zooKeeper.getClientConfig(); + assertThat(clientConfig.getProperty(JUTE_MAXBUFFER)).isEqualTo("2097152"); + } + } + @Test void testGetDatabaseSummary() throws Exception { TablePath tablePath1 = TablePath.of("db", "tb1"); diff --git a/website/docs/maintenance/configuration.md b/website/docs/maintenance/configuration.md index 0a4223074d..b84b56e9b4 100644 --- a/website/docs/maintenance/configuration.md +++ b/website/docs/maintenance/configuration.md @@ -46,7 +46,7 @@ during the Fluss cluster working. | allow.create.log.tables | Boolean | true | Whether to allow creation of log tables. When set to false, attempts to create log tables (tables without primary key) will be rejected. The default value is true. | | allow.create.kv.tables | Boolean | true | Whether to allow creation of kv tables (primary key tables). When set to false, attempts to create kv tables (tables with primary key) will be rejected. The default value is true. | | max.partition.num | Integer | 1000 | Limits the maximum number of partitions that can be created for a partitioned table to avoid creating too many partitions. | -| max.bucket.num | Integer | 128000 | The maximum number of buckets that can be created for a table. The default value is 128000. | +| max.bucket.num | Integer | 20000 | The maximum number of buckets that can be created for a table. The default value is 20000. This default is capped to reduce the risk that the table assignment znode exceeds ZooKeeper's packet size limit. | | acl.notification.expiration-time | Duration | 15min | The duration for which ACL notifications are valid before they expire. This configuration determines the time window during which an ACL notification is considered active. After this duration, the notification will no longer be valid and will be discarded. The default value is 15 minutes. This setting is important to ensure that ACL changes are propagated in a timely manner and do not remain active longer than necessary. | | authorizer.enabled | Boolean | false | Specifies whether to enable the authorization feature. If enabled, access control is enforced based on the authorization rules defined in the configuration. If disabled, all operations and resources are accessible to all users. | | authorizer.type | String | default | Specifies the type of authorizer to be used for access control. This value corresponds to the identifier of the authorization plugin. The default value is `default`, which indicates the built-in authorizer implementation. Custom authorizers can be implemented by providing a matching plugin identifier. |