[spark] Consolidate scan builders into Append/Upsert with SplitPlanner#3559
Open
YannByron wants to merge 3 commits into
Open
[spark] Consolidate scan builders into Append/Upsert with SplitPlanner#3559YannByron wants to merge 3 commits into
YannByron wants to merge 3 commits into
Conversation
Consolidates the four scan builders (FlussLakeAppendScanBuilder / FlussAppendScanBuilder / FlussLakeUpsertScanBuilder / FlussUpsertScanBuilder) into two (FlussAppendScanBuilder, FlussUpsertScanBuilder), routing lake-union vs log-only reads through a single SplitPlanner probed at scan build time. Highlights: - Introduce SplitPlanner with AppendPlanner / UpsertPlanner. Each planner probes the readable lake snapshot exactly once at construction and dispatches inside plan() to either the lake-union branch (lake splits unioned with a Fluss log-tail / kv+log-tail) or the log-only branch (pure Fluss read). No runtime fallback path — presence/absence of the lake snapshot is decided deterministically at construction. - Remove FlussLakeAppendBatch, FlussLakeUpsertBatch, FlussLakeBatch. The planner-backed FlussAppendBatch / FlussUpsertBatch dispatch the reader factory based on planner.hasLakeSnapshot, and own the planner's Fluss client connection / admin lifecycle via close(). - Batch scans no longer consume SCAN_START_UP_MODE. Batch semantics are "the full table" regardless of the user-facing startup mode. This keeps append and upsert planners symmetric (KV snapshot has no partial-read semantics) and structurally avoids the empty offset-range case that otherwise trips the reader-side guard. Time-range batch reads should be expressed via predicate pushdown, not startup mode. AppendPlanner uses OffsetsInitializer.full() rather than earliest() so start offsets resolve to concrete numeric values usable by the split-by-max-records logic. - Merge FlussSupportsPushDownV2Filters and FlussLakeSupportsPushDownV2Filters. Pushdown decisions are schema-level (partition predicate extraction plus ARROW/non-PK data-predicate acceptance); further pushdown to LakeSource happens inside the planner and is transparent to Spark. - SparkTable is no longer lake-aware. newScanBuilder constructs a plain Append/Upsert ScanBuilder; the planner probes the snapshot internally. - Tests: SparkPrimaryKeyTableReadTest asserts SCAN_START_UP_MODE is ignored for batch reads; SparkLakeTableReadTestBase helper no longer filters by planner type. 🤖 Generated with [Qoder](https://qoder.com)
Rewrap scaladoc lines to match spotless-maven-plugin's line-length rule. No code semantics changed. 🤖 Generated with [Qoder](https://qoder.com)
… scan builder The scan builder consolidation (e16af58) merged FlussSupportsPushDownV2Filters and FlussLakeSupportsPushDownV2Filters into a single trait but only preserved the ARROW log-table branch. This left PK tables with lake enabled unable to push predicates to the lake source, causing pk filter pushdown tests to report empty pushed predicates. Add an `else if (isDataLakeEnabled)` branch that probes the lake source via FlussLakeUtils.applyLakeFilters — mirroring the former FlussLakeSupportsPushDown- V2Filters behavior — so that accepted predicates are reported back to Spark and passed to the planner for lake-path filtering. 🤖 Generated with [Qoder][https://qoder.com]
Contributor
Author
|
@Yohahaha @fresh-borzoni Please take a look if have change, thanks. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Purpose
Linked issue: relates to #3213
The Fluss Spark connector currently ships four scan builders —
FlussAppendScanBuilder/FlussLakeAppendScanBuilder/FlussUpsertScanBuilder/FlussLakeUpsertScanBuilder— split along two orthogonal axes (append vs upsert, native vs lake). The lake batches additionally carry a runtime fallback path (isFallback) that morphs a "lake" batch into a "non-lake" batch when the readable lake snapshot is absent. Two consequences follow:FlussLake*Batchviolates SRP by doing both lake planning and native fallback planning, and dynamically picks the reader factory at runtime. Reasoning about which physical read path a query will take requires reading both classes end-to-end.FlussUpsertScanhas nopushedPredicate(upsert never accepts data predicates) butFlussLakeUpsertScandoes — an asymmetric API surface between siblings.toMicroBatchStream) is shared between the lake and non-lake variants, but batch (toBatch) is split — an inconsistent split axis in the class hierarchy.This PR consolidates the four scan builders into two (
FlussAppendScanBuilder,FlussUpsertScanBuilder) and routes lake-union vs log-only reads through a singleSplitPlannerthat is probed at scan build time. The runtime fallback path is removed: presence/absence of the readable lake snapshot is decided deterministically at planner construction, andplan()picks a branch accordingly.Brief change log
Introduce
SplitPlanner(fluss-spark-common/.../read/SplitPlanner.scala, new)sealed trait SplitPlanner extends AutoCloseablewith marker sub-traitsAppendSplitPlannerandUpsertSplitPlanner(soFlussAppendBatch/FlussUpsertBatchcan constrain the planner type they accept).AbstractSplitPlannerowns the shared Fluss clientConnection/Adminlifecycle and centralizes the lake-snapshot probe: ifTableConfig.isDataLakeEnabledis false, no probe; otherwiseadmin.getReadableLakeSnapshotand treatLakeTableSnapshotNotExistException(unwrapped viaExceptionUtils.stripExecutionException) as "no snapshot".AppendPlannerandUpsertPlanner— probe the readable lake snapshot exactly once at construction, exposehasLakeSnapshot: BooleanandlogTailPredicate: Option[FlussPredicate], and dispatch insideplan()to eitherplanLakeUnion(snap)(lake splits unioned with a Fluss log-tail / kv+log-tail) orplanLogOnly()(pure Fluss read).Remove lake-specific Batch classes
FlussLakeAppendBatch.scala,FlussLakeUpsertBatch.scala, andFlussLakeBatch.scalaare deleted.FlussAppendBatch/FlussUpsertBatchnow accept anAppendSplitPlanner/UpsertSplitPlannerrespectively and dispatchcreateReaderFactorybased onplanner.hasLakeSnapshot. The lake union path routes through the universalFlussLakePartitionReaderFactory(which already dispatches by input-partition type). The Batch isAutoCloseableand owns the planner'sclose().Remove startup-mode gating for batch reads
SCAN_START_UP_MODE. Batch semantics are always "the full table". Rationale:mode=latestwith no writes since planning time producesstart == stop == tail, an empty offset range that trips the reader-sideInvalid offset rangeguard.AppendPlannerusesOffsetsInitializer.full()instead ofearliest()so start offsets resolve to concrete numeric values (needed by theSCAN_MAX_RECORDS_PER_PARTITIONrange-split logic). Per theOffsetsInitializer.fulljavadoc the two are semantically equivalent for a log table, but.full()RPC-resolves each bucket to a concrete offset whereas.earliest()returns theLogScanner.EARLIEST_OFFSET(-2) sentinel.Merge the two
V2Filterspush-down traitsFlussSupportsPushDownV2FiltersandFlussLakeSupportsPushDownV2Filtersare collapsed into a singleFlussSupportsPushDownV2Filterstrait. Pushdown decisions here are schema-level (partition predicate extraction plus ARROW/non-PK data-predicate acceptance); any further pushdown toLakeSourcehappens inside the planner (in the lake-union branch) and is transparent to Spark. Lake and Fluss are expected to accept the same predicate set, so no lake-only pushdown regresses.SparkTableis no longer lake-awarenewScanBuilderconstructs a plainFlussAppendScanBuilder/FlussUpsertScanBuilder; the planner probes the readable lake snapshot internally. Removes theadmin-backed lake-snapshot lazy val onSparkTableand the associatedCatalog.loadTablecaching load-bearing assumption.Test changes
SparkPrimaryKeyTableReadTest: the "throws when SCAN_START_UP_MODE=latest" case is inverted to assert the batch scan returns the full table regardless of startup mode, cross-referenced with the log-table symmetric case.SparkLakeTableReadTestBase: the helper that collects input partitions fromFlussScanno longer discriminates by planner type — after consolidation, both native and lake paths flow through the sameFlussAppendBatch/FlussUpsertBatch.Terminology alignment
SplitPlanner,FlussBatch, andFlussScanBuilderscaladocs uses "lake-union" (snapshot present) and "log-only" (snapshot absent) consistently. The former legacy term "Native" is not used to describe branches.logTailPredicatesignature cleanupSplitPlanner.logTailPredicatebecomes avalon each planner instead of a method takingOption[FlussPredicate]. The planner already holdspushedPredicatefrom its constructor, so the previous plumbing (ScanBuilder → Scan → Batch → planner) that threaded the same value back into the planner is removed.Tests
Corretto 11with-Dspotless.check.skip=true:SparkLogTableReadTest— 25 / 25 pass. Notably includes:Spark Read: split partition by config(validates the range-split logic that requires.full()to return concrete numeric offsets).Spark Read: ignores SCAN_START_UP_MODE (symmetric with upsert)(asserts full-table batch semantics regardless of startup mode).SparkPrimaryKeyTableReadTest— 13 / 13 pass. Includes the invertedSCAN_START_UP_MODE=latestcase now asserting a full-table batch result.SparkLakePaimonLogTableReadTest— full suite pass on the prior verification pass (partition pushdown / union read / filter pushdown / limit pushdown / falls back when no lake snapshot / non-FULL startup mode skips lake path / partition filter pushdown in fallback).SparkLakeIcebergLogTableReadTest) locally intermittently fails onNot all buckets synced to lake within PT2M— a mini-cluster tiering timing issue that reproduces onmainand is not a regression of this refactor.API and Format
No public API change. No storage-format change. The only user-visible behavior change is that batch reads no longer honor
SCAN_START_UP_MODE; this is intentional (batch = full table). Streaming continues to honor the config as before.Documentation
No new user-facing feature to document. A follow-up docs change will note the batch-full-table convention.