From 133269617737ecbf2331cc1186b3a6b4d7eb391f Mon Sep 17 00:00:00 2001 From: Prajwal Banakar Date: Sat, 27 Jun 2026 07:23:37 +0000 Subject: [PATCH 1/3] [tests] Add DynamicWriteBatchSizeEstimator integration tests --- .../tests/integration/dynamic_batch_size.rs | 241 ++++++++++++++++++ crates/fluss/tests/test_fluss.rs | 1 + 2 files changed, 242 insertions(+) create mode 100644 crates/fluss/tests/integration/dynamic_batch_size.rs diff --git a/crates/fluss/tests/integration/dynamic_batch_size.rs b/crates/fluss/tests/integration/dynamic_batch_size.rs new file mode 100644 index 00000000..fe26d311 --- /dev/null +++ b/crates/fluss/tests/integration/dynamic_batch_size.rs @@ -0,0 +1,241 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#[cfg(test)] +mod dynamic_batch_size_test { + use crate::integration::utils::{create_table, get_shared_cluster, wait_for_table_ready}; + use fluss::client::FlussConnection; + use fluss::config::Config; + use fluss::metadata::{DataTypes, Schema, TableDescriptor, TablePath}; + use fluss::row::{Datum, GenericRow}; + + fn make_config( + bootstrap_servers: &str, + batch_size: i32, + dynamic_enabled: bool, + dynamic_min: i32, + ) -> Config { + Config { + bootstrap_servers: bootstrap_servers.to_string(), + writer_acks: "all".to_string(), + writer_batch_size: batch_size, + writer_dynamic_batch_size_enabled: dynamic_enabled, + writer_dynamic_batch_size_min: dynamic_min, + ..Config::default() + } + } + + fn log_table_descriptor() -> TableDescriptor { + TableDescriptor::builder() + .schema( + Schema::builder() + .column("id", DataTypes::int()) + .column("payload", DataTypes::string()) + .build() + .expect("Failed to build schema"), + ) + .build() + .expect("Failed to build table descriptor") + } + + fn make_row(id: i32, payload: &str) -> GenericRow<'static> { + let mut row = GenericRow::new(2); + row.set_field(0, Datum::Int32(id)); + row.set_field(1, Datum::String(payload.to_string().into())); + row + } + + /// Many small rows should cause the estimator to shrink the batch size toward min. + /// Verifies that all writes succeed end-to-end with dynamic sizing enabled. + #[tokio::test] + async fn small_rows_shrink_batch_size() { + let cluster = get_shared_cluster(); + let config = make_config( + cluster.plaintext_bootstrap_servers(), + 256 * 1024, // max = 256 KB + true, + 4 * 1024, // min = 4 KB + ); + let connection = FlussConnection::new(config) + .await + .expect("Failed to connect"); + + let admin = connection.get_admin().expect("Failed to get admin"); + let table_path = TablePath::new("fluss", "test_dynamic_small_rows"); + create_table(&admin, &table_path, &log_table_descriptor()).await; + wait_for_table_ready(&admin, &table_path).await; + + let table = connection + .get_table(&table_path) + .await + .expect("Failed to get table"); + let writer = table + .new_append() + .expect("Failed to create append") + .create_writer() + .expect("Failed to create writer"); + + // Write many tiny rows — each well below 50% of the batch size, so + // the estimator should shrink the target toward min after each drain. + for i in 0..200 { + let row = make_row(i, "x"); + writer.append(&row).expect("Failed to append row"); + } + writer.flush().await.expect("Failed to flush"); + } + + /// Rows close to the batch size should cause the estimator to grow toward max. + /// Verifies that all writes succeed end-to-end with dynamic sizing enabled. + #[tokio::test] + async fn large_rows_grow_batch_size() { + let cluster = get_shared_cluster(); + let max_batch = 256 * 1024i32; // 256 KB + let config = make_config( + cluster.plaintext_bootstrap_servers(), + max_batch, + true, + 4 * 1024, // min = 4 KB + ); + let connection = FlussConnection::new(config) + .await + .expect("Failed to connect"); + + let admin = connection.get_admin().expect("Failed to get admin"); + let table_path = TablePath::new("fluss", "test_dynamic_large_rows"); + create_table(&admin, &table_path, &log_table_descriptor()).await; + wait_for_table_ready(&admin, &table_path).await; + + let table = connection + .get_table(&table_path) + .await + .expect("Failed to get table"); + let writer = table + .new_append() + .expect("Failed to create append") + .create_writer() + .expect("Failed to create writer"); + + // Write rows with a payload that fills well above 80% of the batch. + // Repeat to give the estimator multiple drain cycles to grow. + let large_payload = "A".repeat(220 * 1024); // ~220 KB per row + for i in 0..5 { + let row = make_row(i, &large_payload); + writer.append(&row).expect("Failed to append row"); + } + writer.flush().await.expect("Failed to flush"); + } + + /// With dynamic sizing disabled, the writer should use the static writer_batch_size + /// regardless of how large or small the rows are. Verifies writes succeed. + #[tokio::test] + async fn disabled_keeps_static_batch_size() { + let cluster = get_shared_cluster(); + let config = make_config( + cluster.plaintext_bootstrap_servers(), + 256 * 1024, + false, // disabled + 4 * 1024, + ); + let connection = FlussConnection::new(config) + .await + .expect("Failed to connect"); + + let admin = connection.get_admin().expect("Failed to get admin"); + let table_path = TablePath::new("fluss", "test_dynamic_disabled"); + create_table(&admin, &table_path, &log_table_descriptor()).await; + wait_for_table_ready(&admin, &table_path).await; + + let table = connection + .get_table(&table_path) + .await + .expect("Failed to get table"); + let writer = table + .new_append() + .expect("Failed to create append") + .create_writer() + .expect("Failed to create writer"); + + for i in 0..100 { + let row = make_row(i, "static"); + writer.append(&row).expect("Failed to append row"); + } + writer.flush().await.expect("Failed to flush"); + } + + /// Multiple concurrent writers to the same table should not corrupt the estimator. + /// Each writer uses its own connection; all writes must succeed. + #[tokio::test] + async fn concurrent_writers_dont_corrupt_state() { + let cluster = get_shared_cluster(); + + // Create the table using a shared admin connection. + let setup_config = make_config( + cluster.plaintext_bootstrap_servers(), + 256 * 1024, + true, + 4 * 1024, + ); + let setup_conn = FlussConnection::new(setup_config) + .await + .expect("Failed to connect for setup"); + let admin = setup_conn.get_admin().expect("Failed to get admin"); + let table_path = TablePath::new("fluss", "test_dynamic_concurrent"); + create_table(&admin, &table_path, &log_table_descriptor()).await; + wait_for_table_ready(&admin, &table_path).await; + + let bootstrap = cluster.plaintext_bootstrap_servers().to_string(); + let table_path_clone = table_path.clone(); + + // Spawn 4 concurrent writer tasks, each with its own connection. + let mut handles = Vec::new(); + for writer_id in 0..4usize { + let servers = bootstrap.clone(); + let path = table_path_clone.clone(); + let handle = tokio::spawn(async move { + let config = Config { + bootstrap_servers: servers, + writer_acks: "all".to_string(), + writer_batch_size: 256 * 1024, + writer_dynamic_batch_size_enabled: true, + writer_dynamic_batch_size_min: 4 * 1024, + ..Config::default() + }; + let conn = FlussConnection::new(config) + .await + .expect("Failed to connect"); + let table = conn.get_table(&path).await.expect("Failed to get table"); + let writer = table + .new_append() + .expect("Failed to create append") + .create_writer() + .expect("Failed to create writer"); + + for i in 0..50 { + let row = make_row((writer_id * 50 + i) as i32, "concurrent"); + writer.append(&row).expect("Failed to append"); + } + writer.flush().await.expect("Failed to flush"); + }); + handles.push(handle); + } + + for handle in handles { + handle.await.expect("Writer task panicked"); + } + } +} diff --git a/crates/fluss/tests/test_fluss.rs b/crates/fluss/tests/test_fluss.rs index eb1e4070..21f1e478 100644 --- a/crates/fluss/tests/test_fluss.rs +++ b/crates/fluss/tests/test_fluss.rs @@ -32,4 +32,5 @@ mod integration { mod utils; mod table_remote_scan; + mod dynamic_batch_size; } From 2dc0b1133351e0df755d9e02257672c8e622492a Mon Sep 17 00:00:00 2001 From: Prajwal Banakar Date: Sat, 27 Jun 2026 07:33:07 +0000 Subject: [PATCH 2/3] improved format issues --- crates/fluss/tests/test_fluss.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/fluss/tests/test_fluss.rs b/crates/fluss/tests/test_fluss.rs index 21f1e478..63c4f785 100644 --- a/crates/fluss/tests/test_fluss.rs +++ b/crates/fluss/tests/test_fluss.rs @@ -31,6 +31,6 @@ mod integration { mod utils; - mod table_remote_scan; mod dynamic_batch_size; + mod table_remote_scan; } From f5525cae3e90dabb59a3720d09eae66cb3e05533 Mon Sep 17 00:00:00 2001 From: Prajwal Banakar Date: Fri, 3 Jul 2026 04:05:59 +0000 Subject: [PATCH 3/3] Address review: add read-back assertions, shrink-then-grow cycle, fix concurrent test to share connection --- .../tests/integration/dynamic_batch_size.rs | 206 ++++++++++++------ 1 file changed, 142 insertions(+), 64 deletions(-) diff --git a/crates/fluss/tests/integration/dynamic_batch_size.rs b/crates/fluss/tests/integration/dynamic_batch_size.rs index fe26d311..840c47ad 100644 --- a/crates/fluss/tests/integration/dynamic_batch_size.rs +++ b/crates/fluss/tests/integration/dynamic_batch_size.rs @@ -19,10 +19,12 @@ #[cfg(test)] mod dynamic_batch_size_test { use crate::integration::utils::{create_table, get_shared_cluster, wait_for_table_ready}; - use fluss::client::FlussConnection; + use fluss::client::{EARLIEST_OFFSET, FlussConnection}; use fluss::config::Config; use fluss::metadata::{DataTypes, Schema, TableDescriptor, TablePath}; - use fluss::row::{Datum, GenericRow}; + use fluss::row::{DataGetters, Datum, GenericRow}; + use std::sync::Arc; + use std::time::Duration; fn make_config( bootstrap_servers: &str, @@ -60,8 +62,44 @@ mod dynamic_batch_size_test { row } - /// Many small rows should cause the estimator to shrink the batch size toward min. - /// Verifies that all writes succeed end-to-end with dynamic sizing enabled. + async fn read_back_records( + connection: &FlussConnection, + table_path: &TablePath, + expected_count: usize, + ) -> Vec<(i32, String)> { + let table = connection + .get_table(table_path) + .await + .expect("Failed to get table for scan"); + let log_scanner = table + .new_scan() + .create_log_scanner() + .expect("Failed to create log scanner"); + log_scanner + .subscribe(0, EARLIEST_OFFSET) + .await + .expect("Failed to subscribe"); + + let mut collected: Vec<(i32, String)> = Vec::new(); + let start = std::time::Instant::now(); + while collected.len() < expected_count && start.elapsed() < Duration::from_secs(30) { + let records = log_scanner + .poll(Duration::from_millis(500)) + .await + .expect("Failed to poll"); + for rec in records { + let row = rec.row(); + collected.push(( + row.get_int(0).unwrap(), + row.get_string(1).unwrap().to_string(), + )); + } + } + collected + } + + /// Writes many tiny rows (well below 50% of batch size per drain) so the estimator + /// shrinks the target toward min, then reads all records back to verify data integrity. #[tokio::test] async fn small_rows_shrink_batch_size() { let cluster = get_shared_cluster(); @@ -74,7 +112,6 @@ mod dynamic_batch_size_test { let connection = FlussConnection::new(config) .await .expect("Failed to connect"); - let admin = connection.get_admin().expect("Failed to get admin"); let table_path = TablePath::new("fluss", "test_dynamic_small_rows"); create_table(&admin, &table_path, &log_table_descriptor()).await; @@ -90,19 +127,30 @@ mod dynamic_batch_size_test { .create_writer() .expect("Failed to create writer"); - // Write many tiny rows — each well below 50% of the batch size, so - // the estimator should shrink the target toward min after each drain. - for i in 0..200 { - let row = make_row(i, "x"); + // Write many tiny rows — each well below 50% of the batch size so + // the estimator shrinks the target toward min after each drain. + let row_count = 200usize; + for i in 0..row_count { + let row = make_row(i as i32, "x"); writer.append(&row).expect("Failed to append row"); } writer.flush().await.expect("Failed to flush"); + + // Read back all records to verify data integrity through shrink cycles. + let records = read_back_records(&connection, &table_path, row_count).await; + assert_eq!( + records.len(), + row_count, + "Expected {row_count} records after shrink cycles, got {}", + records.len() + ); } - /// Rows close to the batch size should cause the estimator to grow toward max. - /// Verifies that all writes succeed end-to-end with dynamic sizing enabled. + /// Verifies estimator adapts correctly across a shrink-then-grow cycle: + /// first shrinks with tiny rows, then grows with large rows, and all + /// records written in both phases are read back correctly. #[tokio::test] - async fn large_rows_grow_batch_size() { + async fn shrink_then_grow_batch_size() { let cluster = get_shared_cluster(); let max_batch = 256 * 1024i32; // 256 KB let config = make_config( @@ -114,9 +162,8 @@ mod dynamic_batch_size_test { let connection = FlussConnection::new(config) .await .expect("Failed to connect"); - let admin = connection.get_admin().expect("Failed to get admin"); - let table_path = TablePath::new("fluss", "test_dynamic_large_rows"); + let table_path = TablePath::new("fluss", "test_dynamic_shrink_grow"); create_table(&admin, &table_path, &log_table_descriptor()).await; wait_for_table_ready(&admin, &table_path).await; @@ -130,18 +177,37 @@ mod dynamic_batch_size_test { .create_writer() .expect("Failed to create writer"); - // Write rows with a payload that fills well above 80% of the batch. - // Repeat to give the estimator multiple drain cycles to grow. - let large_payload = "A".repeat(220 * 1024); // ~220 KB per row - for i in 0..5 { - let row = make_row(i, &large_payload); - writer.append(&row).expect("Failed to append row"); + // Phase 1: shrink — many tiny rows drive the target toward min. + let small_count = 100usize; + for i in 0..small_count { + let row = make_row(i as i32, "small"); + writer.append(&row).expect("Failed to append small row"); } - writer.flush().await.expect("Failed to flush"); + writer.flush().await.expect("Failed to flush phase 1"); + + // Phase 2: grow — large rows (>80% of current target) drive the + // estimator back up toward max after it has been shrunk. + let large_payload = "A".repeat(200 * 1024); // ~200 KB + let large_count = 5usize; + for i in 0..large_count { + let row = make_row((small_count + i) as i32, &large_payload); + writer.append(&row).expect("Failed to append large row"); + } + writer.flush().await.expect("Failed to flush phase 2"); + + // Read back all records from both phases to verify data integrity. + let total = small_count + large_count; + let records = read_back_records(&connection, &table_path, total).await; + assert_eq!( + records.len(), + total, + "Expected {total} records after shrink-then-grow cycle, got {}", + records.len() + ); } - /// With dynamic sizing disabled, the writer should use the static writer_batch_size - /// regardless of how large or small the rows are. Verifies writes succeed. + /// With dynamic sizing disabled, the writer uses the static writer_batch_size + /// for every batch. Verifies all records are written and read back correctly. #[tokio::test] async fn disabled_keeps_static_batch_size() { let cluster = get_shared_cluster(); @@ -154,7 +220,6 @@ mod dynamic_batch_size_test { let connection = FlussConnection::new(config) .await .expect("Failed to connect"); - let admin = connection.get_admin().expect("Failed to get admin"); let table_path = TablePath::new("fluss", "test_dynamic_disabled"); create_table(&admin, &table_path, &log_table_descriptor()).await; @@ -170,72 +235,85 @@ mod dynamic_batch_size_test { .create_writer() .expect("Failed to create writer"); - for i in 0..100 { - let row = make_row(i, "static"); + let row_count = 100usize; + for i in 0..row_count { + let row = make_row(i as i32, "static"); writer.append(&row).expect("Failed to append row"); } writer.flush().await.expect("Failed to flush"); + + let records = read_back_records(&connection, &table_path, row_count).await; + assert_eq!( + records.len(), + row_count, + "Expected {row_count} records with static batch size, got {}", + records.len() + ); } - /// Multiple concurrent writers to the same table should not corrupt the estimator. - /// Each writer uses its own connection; all writes must succeed. + /// Multiple concurrent tasks share a single connection (and thus a single + /// WriterClient and RecordAccumulator with a shared estimator). Verifies that + /// concurrent appends from different tasks don't corrupt estimator state and + /// all records land correctly. #[tokio::test] - async fn concurrent_writers_dont_corrupt_state() { + async fn concurrent_appends_share_estimator_without_corruption() { let cluster = get_shared_cluster(); - - // Create the table using a shared admin connection. - let setup_config = make_config( + let config = make_config( cluster.plaintext_bootstrap_servers(), 256 * 1024, true, 4 * 1024, ); - let setup_conn = FlussConnection::new(setup_config) + let connection = FlussConnection::new(config) .await - .expect("Failed to connect for setup"); - let admin = setup_conn.get_admin().expect("Failed to get admin"); + .expect("Failed to connect"); + let admin = connection.get_admin().expect("Failed to get admin"); let table_path = TablePath::new("fluss", "test_dynamic_concurrent"); create_table(&admin, &table_path, &log_table_descriptor()).await; wait_for_table_ready(&admin, &table_path).await; - let bootstrap = cluster.plaintext_bootstrap_servers().to_string(); - let table_path_clone = table_path.clone(); + let table = connection + .get_table(&table_path) + .await + .expect("Failed to get table"); + + // All tasks share the same AppendWriter (and thus the same + // RecordAccumulator / estimator) via Arc. + let writer = Arc::new( + table + .new_append() + .expect("Failed to create append") + .create_writer() + .expect("Failed to create writer"), + ); - // Spawn 4 concurrent writer tasks, each with its own connection. + let tasks_count = 4usize; + let rows_per_task = 50usize; let mut handles = Vec::new(); - for writer_id in 0..4usize { - let servers = bootstrap.clone(); - let path = table_path_clone.clone(); + + for task_id in 0..tasks_count { + let writer_clone = Arc::clone(&writer); let handle = tokio::spawn(async move { - let config = Config { - bootstrap_servers: servers, - writer_acks: "all".to_string(), - writer_batch_size: 256 * 1024, - writer_dynamic_batch_size_enabled: true, - writer_dynamic_batch_size_min: 4 * 1024, - ..Config::default() - }; - let conn = FlussConnection::new(config) - .await - .expect("Failed to connect"); - let table = conn.get_table(&path).await.expect("Failed to get table"); - let writer = table - .new_append() - .expect("Failed to create append") - .create_writer() - .expect("Failed to create writer"); - - for i in 0..50 { - let row = make_row((writer_id * 50 + i) as i32, "concurrent"); - writer.append(&row).expect("Failed to append"); + for i in 0..rows_per_task { + let row = make_row((task_id * rows_per_task + i) as i32, "concurrent"); + writer_clone.append(&row).expect("Failed to append"); } - writer.flush().await.expect("Failed to flush"); }); handles.push(handle); } for handle in handles { - handle.await.expect("Writer task panicked"); + handle.await.expect("Task panicked"); } + writer.flush().await.expect("Failed to flush"); + + let total = tasks_count * rows_per_task; + let records = read_back_records(&connection, &table_path, total).await; + assert_eq!( + records.len(), + total, + "Expected {total} records from concurrent appends, got {}", + records.len() + ); } }