-
Notifications
You must be signed in to change notification settings - Fork 46
[tests] Add DynamicWriteBatchSizeEstimator integration tests #643
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
Prajwal-banakar
wants to merge
3
commits into
apache:main
Choose a base branch
from
Prajwal-banakar:integration-tests
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
+320
−0
Open
Changes from all commits
Commits
Show all changes
3 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,319 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| #[cfg(test)] | ||
| mod dynamic_batch_size_test { | ||
| use crate::integration::utils::{create_table, get_shared_cluster, wait_for_table_ready}; | ||
| use fluss::client::{EARLIEST_OFFSET, FlussConnection}; | ||
| use fluss::config::Config; | ||
| use fluss::metadata::{DataTypes, Schema, TableDescriptor, TablePath}; | ||
| use fluss::row::{DataGetters, Datum, GenericRow}; | ||
| use std::sync::Arc; | ||
| use std::time::Duration; | ||
|
|
||
| fn make_config( | ||
| bootstrap_servers: &str, | ||
| batch_size: i32, | ||
| dynamic_enabled: bool, | ||
| dynamic_min: i32, | ||
| ) -> Config { | ||
| Config { | ||
| bootstrap_servers: bootstrap_servers.to_string(), | ||
| writer_acks: "all".to_string(), | ||
| writer_batch_size: batch_size, | ||
| writer_dynamic_batch_size_enabled: dynamic_enabled, | ||
| writer_dynamic_batch_size_min: dynamic_min, | ||
| ..Config::default() | ||
| } | ||
| } | ||
|
|
||
| fn log_table_descriptor() -> TableDescriptor { | ||
| TableDescriptor::builder() | ||
| .schema( | ||
| Schema::builder() | ||
| .column("id", DataTypes::int()) | ||
| .column("payload", DataTypes::string()) | ||
| .build() | ||
| .expect("Failed to build schema"), | ||
| ) | ||
| .build() | ||
| .expect("Failed to build table descriptor") | ||
| } | ||
|
|
||
| fn make_row(id: i32, payload: &str) -> GenericRow<'static> { | ||
| let mut row = GenericRow::new(2); | ||
| row.set_field(0, Datum::Int32(id)); | ||
| row.set_field(1, Datum::String(payload.to_string().into())); | ||
| row | ||
| } | ||
|
|
||
| async fn read_back_records( | ||
| connection: &FlussConnection, | ||
| table_path: &TablePath, | ||
| expected_count: usize, | ||
| ) -> Vec<(i32, String)> { | ||
| let table = connection | ||
| .get_table(table_path) | ||
| .await | ||
| .expect("Failed to get table for scan"); | ||
| let log_scanner = table | ||
| .new_scan() | ||
| .create_log_scanner() | ||
| .expect("Failed to create log scanner"); | ||
| log_scanner | ||
| .subscribe(0, EARLIEST_OFFSET) | ||
| .await | ||
| .expect("Failed to subscribe"); | ||
|
|
||
| let mut collected: Vec<(i32, String)> = Vec::new(); | ||
| let start = std::time::Instant::now(); | ||
| while collected.len() < expected_count && start.elapsed() < Duration::from_secs(30) { | ||
| let records = log_scanner | ||
| .poll(Duration::from_millis(500)) | ||
| .await | ||
| .expect("Failed to poll"); | ||
| for rec in records { | ||
| let row = rec.row(); | ||
| collected.push(( | ||
| row.get_int(0).unwrap(), | ||
| row.get_string(1).unwrap().to_string(), | ||
| )); | ||
| } | ||
| } | ||
| collected | ||
| } | ||
|
|
||
| /// Writes many tiny rows (well below 50% of batch size per drain) so the estimator | ||
| /// shrinks the target toward min, then reads all records back to verify data integrity. | ||
| #[tokio::test] | ||
| async fn small_rows_shrink_batch_size() { | ||
| let cluster = get_shared_cluster(); | ||
| let config = make_config( | ||
| cluster.plaintext_bootstrap_servers(), | ||
| 256 * 1024, // max = 256 KB | ||
| true, | ||
| 4 * 1024, // min = 4 KB | ||
| ); | ||
| let connection = FlussConnection::new(config) | ||
| .await | ||
| .expect("Failed to connect"); | ||
| let admin = connection.get_admin().expect("Failed to get admin"); | ||
| let table_path = TablePath::new("fluss", "test_dynamic_small_rows"); | ||
| create_table(&admin, &table_path, &log_table_descriptor()).await; | ||
| wait_for_table_ready(&admin, &table_path).await; | ||
|
|
||
| let table = connection | ||
| .get_table(&table_path) | ||
| .await | ||
| .expect("Failed to get table"); | ||
| let writer = table | ||
| .new_append() | ||
| .expect("Failed to create append") | ||
| .create_writer() | ||
| .expect("Failed to create writer"); | ||
|
|
||
| // Write many tiny rows — each well below 50% of the batch size so | ||
| // the estimator shrinks the target toward min after each drain. | ||
| let row_count = 200usize; | ||
| for i in 0..row_count { | ||
| let row = make_row(i as i32, "x"); | ||
| writer.append(&row).expect("Failed to append row"); | ||
| } | ||
| writer.flush().await.expect("Failed to flush"); | ||
|
|
||
| // Read back all records to verify data integrity through shrink cycles. | ||
| let records = read_back_records(&connection, &table_path, row_count).await; | ||
| assert_eq!( | ||
| records.len(), | ||
| row_count, | ||
| "Expected {row_count} records after shrink cycles, got {}", | ||
| records.len() | ||
| ); | ||
| } | ||
|
|
||
| /// Verifies estimator adapts correctly across a shrink-then-grow cycle: | ||
| /// first shrinks with tiny rows, then grows with large rows, and all | ||
| /// records written in both phases are read back correctly. | ||
| #[tokio::test] | ||
| async fn shrink_then_grow_batch_size() { | ||
| let cluster = get_shared_cluster(); | ||
| let max_batch = 256 * 1024i32; // 256 KB | ||
| let config = make_config( | ||
| cluster.plaintext_bootstrap_servers(), | ||
| max_batch, | ||
| true, | ||
| 4 * 1024, // min = 4 KB | ||
| ); | ||
| let connection = FlussConnection::new(config) | ||
| .await | ||
| .expect("Failed to connect"); | ||
| let admin = connection.get_admin().expect("Failed to get admin"); | ||
| let table_path = TablePath::new("fluss", "test_dynamic_shrink_grow"); | ||
| create_table(&admin, &table_path, &log_table_descriptor()).await; | ||
| wait_for_table_ready(&admin, &table_path).await; | ||
|
|
||
| let table = connection | ||
| .get_table(&table_path) | ||
| .await | ||
| .expect("Failed to get table"); | ||
| let writer = table | ||
| .new_append() | ||
| .expect("Failed to create append") | ||
| .create_writer() | ||
| .expect("Failed to create writer"); | ||
|
|
||
| // Phase 1: shrink — many tiny rows drive the target toward min. | ||
| let small_count = 100usize; | ||
| for i in 0..small_count { | ||
| let row = make_row(i as i32, "small"); | ||
| writer.append(&row).expect("Failed to append small row"); | ||
| } | ||
| writer.flush().await.expect("Failed to flush phase 1"); | ||
|
|
||
| // Phase 2: grow — large rows (>80% of current target) drive the | ||
| // estimator back up toward max after it has been shrunk. | ||
| let large_payload = "A".repeat(200 * 1024); // ~200 KB | ||
| let large_count = 5usize; | ||
| for i in 0..large_count { | ||
| let row = make_row((small_count + i) as i32, &large_payload); | ||
| writer.append(&row).expect("Failed to append large row"); | ||
| } | ||
| writer.flush().await.expect("Failed to flush phase 2"); | ||
|
|
||
| // Read back all records from both phases to verify data integrity. | ||
| let total = small_count + large_count; | ||
| let records = read_back_records(&connection, &table_path, total).await; | ||
| assert_eq!( | ||
| records.len(), | ||
| total, | ||
| "Expected {total} records after shrink-then-grow cycle, got {}", | ||
| records.len() | ||
| ); | ||
| } | ||
|
|
||
| /// With dynamic sizing disabled, the writer uses the static writer_batch_size | ||
| /// for every batch. Verifies all records are written and read back correctly. | ||
| #[tokio::test] | ||
| async fn disabled_keeps_static_batch_size() { | ||
| let cluster = get_shared_cluster(); | ||
| let config = make_config( | ||
| cluster.plaintext_bootstrap_servers(), | ||
| 256 * 1024, | ||
| false, // disabled | ||
| 4 * 1024, | ||
| ); | ||
| let connection = FlussConnection::new(config) | ||
| .await | ||
| .expect("Failed to connect"); | ||
| let admin = connection.get_admin().expect("Failed to get admin"); | ||
| let table_path = TablePath::new("fluss", "test_dynamic_disabled"); | ||
| create_table(&admin, &table_path, &log_table_descriptor()).await; | ||
| wait_for_table_ready(&admin, &table_path).await; | ||
|
|
||
| let table = connection | ||
| .get_table(&table_path) | ||
| .await | ||
| .expect("Failed to get table"); | ||
| let writer = table | ||
| .new_append() | ||
| .expect("Failed to create append") | ||
| .create_writer() | ||
| .expect("Failed to create writer"); | ||
|
|
||
| let row_count = 100usize; | ||
| for i in 0..row_count { | ||
| let row = make_row(i as i32, "static"); | ||
| writer.append(&row).expect("Failed to append row"); | ||
| } | ||
| writer.flush().await.expect("Failed to flush"); | ||
|
|
||
| let records = read_back_records(&connection, &table_path, row_count).await; | ||
| assert_eq!( | ||
| records.len(), | ||
| row_count, | ||
| "Expected {row_count} records with static batch size, got {}", | ||
| records.len() | ||
| ); | ||
| } | ||
|
|
||
| /// Multiple concurrent tasks share a single connection (and thus a single | ||
| /// WriterClient and RecordAccumulator with a shared estimator). Verifies that | ||
| /// concurrent appends from different tasks don't corrupt estimator state and | ||
| /// all records land correctly. | ||
| #[tokio::test] | ||
| async fn concurrent_appends_share_estimator_without_corruption() { | ||
| let cluster = get_shared_cluster(); | ||
| let config = make_config( | ||
| cluster.plaintext_bootstrap_servers(), | ||
| 256 * 1024, | ||
| true, | ||
| 4 * 1024, | ||
| ); | ||
| let connection = FlussConnection::new(config) | ||
| .await | ||
| .expect("Failed to connect"); | ||
| let admin = connection.get_admin().expect("Failed to get admin"); | ||
| let table_path = TablePath::new("fluss", "test_dynamic_concurrent"); | ||
| create_table(&admin, &table_path, &log_table_descriptor()).await; | ||
| wait_for_table_ready(&admin, &table_path).await; | ||
|
|
||
| let table = connection | ||
| .get_table(&table_path) | ||
| .await | ||
| .expect("Failed to get table"); | ||
|
|
||
| // All tasks share the same AppendWriter (and thus the same | ||
| // RecordAccumulator / estimator) via Arc. | ||
| let writer = Arc::new( | ||
| table | ||
| .new_append() | ||
| .expect("Failed to create append") | ||
| .create_writer() | ||
| .expect("Failed to create writer"), | ||
| ); | ||
|
|
||
| let tasks_count = 4usize; | ||
| let rows_per_task = 50usize; | ||
| let mut handles = Vec::new(); | ||
|
|
||
| for task_id in 0..tasks_count { | ||
| let writer_clone = Arc::clone(&writer); | ||
| let handle = tokio::spawn(async move { | ||
| for i in 0..rows_per_task { | ||
| let row = make_row((task_id * rows_per_task + i) as i32, "concurrent"); | ||
| writer_clone.append(&row).expect("Failed to append"); | ||
| } | ||
| }); | ||
| handles.push(handle); | ||
| } | ||
|
|
||
| for handle in handles { | ||
| handle.await.expect("Task panicked"); | ||
| } | ||
| writer.flush().await.expect("Failed to flush"); | ||
|
|
||
| let total = tasks_count * rows_per_task; | ||
| let records = read_back_records(&connection, &table_path, total).await; | ||
| assert_eq!( | ||
| records.len(), | ||
| total, | ||
| "Expected {total} records from concurrent appends, got {}", | ||
| records.len() | ||
| ); | ||
| } | ||
| } | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -31,5 +31,6 @@ mod integration { | |
|
|
||
| mod utils; | ||
|
|
||
| mod dynamic_batch_size; | ||
| mod table_remote_scan; | ||
| } | ||
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i doubt this test will do the job (based on the comment and test func name) 🤔 since it only asserts flush succeeds, but don't observe estimated batch size, if a regression happens which disables shrinking, that will still pass i guess
so effectively, it is more like checking if dynamic write craches, but not end-to-end test that checks shrinking batch size towards min IMHO
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added read_back_records() helper that polls bucket 0 from EARLIEST_OFFSET and collects all records — every test now asserts the exact record count, so a silent regression would cause data loss and fail the assertion.