[tests] Add DynamicWriteBatchSizeEstimator integration tests#643
[tests] Add DynamicWriteBatchSizeEstimator integration tests#643Prajwal-banakar wants to merge 3 commits into
Conversation
|
Hi @charlesdong1991 @fresh-borzoni could you please help review this!? The failing Elixir check appears to be unrelated to this PR's changes, PTAL. |
8d254d1 to
2dc0b11
Compare
charlesdong1991
left a comment
There was a problem hiding this comment.
Thanks for the PR! Left some comments, PTAL
| let row = make_row(i, "x"); | ||
| writer.append(&row).expect("Failed to append row"); | ||
| } | ||
| writer.flush().await.expect("Failed to flush"); |
There was a problem hiding this comment.
i doubt this test will do the job (based on the comment and test func name) 🤔 since it only asserts flush succeeds, but don't observe estimated batch size, if a regression happens which disables shrinking, that will still pass i guess
so effectively, it is more like checking if dynamic write craches, but not end-to-end test that checks shrinking batch size towards min IMHO
There was a problem hiding this comment.
Added read_back_records() helper that polls bucket 0 from EARLIEST_OFFSET and collects all records — every test now asserts the exact record count, so a silent regression would cause data loss and fail the assertion.
| let row = make_row(i, &large_payload); | ||
| writer.append(&row).expect("Failed to append row"); | ||
| } | ||
| writer.flush().await.expect("Failed to flush"); |
There was a problem hiding this comment.
similar here, i don't think in this case (starting with above 80% of max) will observe growth, i think we should rework it to shrink-then-grow with a read back assertion for an actual end-to-end test
There was a problem hiding this comment.
Renamed large_rows_grow_batch_size to shrink_then_grow_batch_size — Phase 1 drives the estimator down with 100 tiny rows, Phase 2 sends rows filling ~200 KB (well above 80% of the shrunk target) to trigger growth, both phases are read back.
| /// Multiple concurrent writers to the same table should not corrupt the estimator. | ||
| /// Each writer uses its own connection; all writes must succeed. | ||
| #[tokio::test] | ||
| async fn concurrent_writers_dont_corrupt_state() { |
There was a problem hiding this comment.
as mentioned in comment and in spawning below, these will be 4 independent connections/estimators, so there will be no shared state to corrupt, so i wonder if this func name is intended or not? Or what actually do we intend to test?
There was a problem hiding this comment.
Fixed concurrent_writers_dont_corrupt_state to concurrent_appends_share_estimator_without_corruption — all 4 tasks now share a single Arc from one connection, so they share the same RecordAccumulator and estimator, and concurrent access to shared state is actually exercised.
… concurrent test to share connection
|
Hi @charlesdong1991 addressed all three comments, PTAL another look when you have some time, thanks! |
fresh-borzoni
left a comment
There was a problem hiding this comment.
@Prajwal-banakar Nice work, but I think Charles's first point still stands thougha as asserting the record count only proves no data was lost, it doesn't observe the batch size.
If shrinking regressed, all rows would still round-trip and the test would pass. Simplest fix: expose the estimator's current size via a pub(crate) accessor and assert it moved toward min/max.
Purpose
Linked issue: close #539
Adds integration tests for
DynamicWriteBatchSizeEstimatorto verify end-to-end write correctness through a real cluster for all four scenarios described in the issue.Brief change log
crates/fluss/tests/integration/dynamic_batch_size.rswith 4 integration tests covering:writer_batch_sizeunchangedcrates/fluss/tests/test_fluss.rsTests
small_rows_shrink_batch_size— writes 200 tiny rows with dynamic sizing enabled; verifies all writes succeedlarge_rows_grow_batch_size— writes rows filling >80% of batch capacity; verifies all writes succeeddisabled_keeps_static_batch_size— writes withwriter_dynamic_batch_size_enabled = false; verifies all writes succeedconcurrent_writers_dont_corrupt_state— spawns 4 concurrent writer tasks each with its own connection; verifies all writes succeed without errorsAPI and Format
No changes to API or storage format.
Documentation
No new feature introduced. This PR only adds integration tests for an existing feature.