Skip to content

[spark] Consolidate scan builders into Append/Upsert with SplitPlanner#3559

Open
YannByron wants to merge 3 commits into
apache:mainfrom
YannByron:feat/spark-scan-consolidation-pr1
Open

[spark] Consolidate scan builders into Append/Upsert with SplitPlanner#3559
YannByron wants to merge 3 commits into
apache:mainfrom
YannByron:feat/spark-scan-consolidation-pr1

Conversation

@YannByron

Copy link
Copy Markdown
Contributor

Purpose

Linked issue: relates to #3213

The Fluss Spark connector currently ships four scan builders — FlussAppendScanBuilder / FlussLakeAppendScanBuilder / FlussUpsertScanBuilder / FlussLakeUpsertScanBuilder — split along two orthogonal axes (append vs upsert, native vs lake). The lake batches additionally carry a runtime fallback path (isFallback) that morphs a "lake" batch into a "non-lake" batch when the readable lake snapshot is absent. Two consequences follow:

  1. Each FlussLake*Batch violates SRP by doing both lake planning and native fallback planning, and dynamically picks the reader factory at runtime. Reasoning about which physical read path a query will take requires reading both classes end-to-end.
  2. FlussUpsertScan has no pushedPredicate (upsert never accepts data predicates) but FlussLakeUpsertScan does — an asymmetric API surface between siblings.
  3. Streaming (toMicroBatchStream) is shared between the lake and non-lake variants, but batch (toBatch) is split — an inconsistent split axis in the class hierarchy.

This PR consolidates the four scan builders into two (FlussAppendScanBuilder, FlussUpsertScanBuilder) and routes lake-union vs log-only reads through a single SplitPlanner that is probed at scan build time. The runtime fallback path is removed: presence/absence of the readable lake snapshot is decided deterministically at planner construction, and plan() picks a branch accordingly.

Brief change log

Introduce SplitPlanner (fluss-spark-common/.../read/SplitPlanner.scala, new)

  • sealed trait SplitPlanner extends AutoCloseable with marker sub-traits AppendSplitPlanner and UpsertSplitPlanner (so FlussAppendBatch / FlussUpsertBatch can constrain the planner type they accept).
  • AbstractSplitPlanner owns the shared Fluss client Connection / Admin lifecycle and centralizes the lake-snapshot probe: if TableConfig.isDataLakeEnabled is false, no probe; otherwise admin.getReadableLakeSnapshot and treat LakeTableSnapshotNotExistException (unwrapped via ExceptionUtils.stripExecutionException) as "no snapshot".
  • Two concrete planners — AppendPlanner and UpsertPlanner — probe the readable lake snapshot exactly once at construction, expose hasLakeSnapshot: Boolean and logTailPredicate: Option[FlussPredicate], and dispatch inside plan() to either planLakeUnion(snap) (lake splits unioned with a Fluss log-tail / kv+log-tail) or planLogOnly() (pure Fluss read).

Remove lake-specific Batch classes

  • FlussLakeAppendBatch.scala, FlussLakeUpsertBatch.scala, and FlussLakeBatch.scala are deleted. FlussAppendBatch / FlussUpsertBatch now accept an AppendSplitPlanner / UpsertSplitPlanner respectively and dispatch createReaderFactory based on planner.hasLakeSnapshot. The lake union path routes through the universal FlussLakePartitionReaderFactory (which already dispatches by input-partition type). The Batch is AutoCloseable and owns the planner's close().

Remove startup-mode gating for batch reads

  • Batch scans no longer consume SCAN_START_UP_MODE. Batch semantics are always "the full table". Rationale:
    1. Letting a streaming-oriented config gate batch planning creates asymmetric semantics on append vs upsert tables (KV snapshot has no partial-read semantics).
    2. mode=latest with no writes since planning time produces start == stop == tail, an empty offset range that trips the reader-side Invalid offset range guard.
    3. Time-range batch reads should be expressed via predicate pushdown on the timestamp column, not startup mode.
  • AppendPlanner uses OffsetsInitializer.full() instead of earliest() so start offsets resolve to concrete numeric values (needed by the SCAN_MAX_RECORDS_PER_PARTITION range-split logic). Per the OffsetsInitializer.full javadoc the two are semantically equivalent for a log table, but .full() RPC-resolves each bucket to a concrete offset whereas .earliest() returns the LogScanner.EARLIEST_OFFSET (-2) sentinel.

Merge the two V2Filters push-down traits

  • FlussSupportsPushDownV2Filters and FlussLakeSupportsPushDownV2Filters are collapsed into a single FlussSupportsPushDownV2Filters trait. Pushdown decisions here are schema-level (partition predicate extraction plus ARROW/non-PK data-predicate acceptance); any further pushdown to LakeSource happens inside the planner (in the lake-union branch) and is transparent to Spark. Lake and Fluss are expected to accept the same predicate set, so no lake-only pushdown regresses.

SparkTable is no longer lake-aware

  • newScanBuilder constructs a plain FlussAppendScanBuilder / FlussUpsertScanBuilder; the planner probes the readable lake snapshot internally. Removes the admin-backed lake-snapshot lazy val on SparkTable and the associated Catalog.loadTable caching load-bearing assumption.

Test changes

  • SparkPrimaryKeyTableReadTest: the "throws when SCAN_START_UP_MODE=latest" case is inverted to assert the batch scan returns the full table regardless of startup mode, cross-referenced with the log-table symmetric case.
  • SparkLakeTableReadTestBase: the helper that collects input partitions from FlussScan no longer discriminates by planner type — after consolidation, both native and lake paths flow through the same FlussAppendBatch / FlussUpsertBatch.

Terminology alignment

  • Internal branch naming across SplitPlanner, FlussBatch, and FlussScanBuilder scaladocs uses "lake-union" (snapshot present) and "log-only" (snapshot absent) consistently. The former legacy term "Native" is not used to describe branches.

logTailPredicate signature cleanup

  • SplitPlanner.logTailPredicate becomes a val on each planner instead of a method taking Option[FlussPredicate]. The planner already holds pushedPredicate from its constructor, so the previous plumbing (ScanBuilder → Scan → Batch → planner) that threaded the same value back into the planner is removed.

Tests

  • Unit / IT suites executed locally on Corretto 11 with -Dspotless.check.skip=true:
    • SparkLogTableReadTest — 25 / 25 pass. Notably includes:
      • Spark Read: split partition by config (validates the range-split logic that requires .full() to return concrete numeric offsets).
      • Spark Read: ignores SCAN_START_UP_MODE (symmetric with upsert) (asserts full-table batch semantics regardless of startup mode).
    • SparkPrimaryKeyTableReadTest — 13 / 13 pass. Includes the inverted SCAN_START_UP_MODE=latest case now asserting a full-table batch result.
    • SparkLakePaimonLogTableReadTest — full suite pass on the prior verification pass (partition pushdown / union read / filter pushdown / limit pushdown / falls back when no lake snapshot / non-FULL startup mode skips lake path / partition filter pushdown in fallback).
  • Iceberg lake IT (SparkLakeIcebergLogTableReadTest) locally intermittently fails on Not all buckets synced to lake within PT2M — a mini-cluster tiering timing issue that reproduces on main and is not a regression of this refactor.

API and Format

No public API change. No storage-format change. The only user-visible behavior change is that batch reads no longer honor SCAN_START_UP_MODE; this is intentional (batch = full table). Streaming continues to honor the config as before.

Documentation

No new user-facing feature to document. A follow-up docs change will note the batch-full-table convention.

  • Yes (Qoder)

YannByron added 3 commits July 2, 2026 12:13
Consolidates the four scan builders (FlussLakeAppendScanBuilder /
FlussAppendScanBuilder / FlussLakeUpsertScanBuilder / FlussUpsertScanBuilder)
into two (FlussAppendScanBuilder, FlussUpsertScanBuilder), routing lake-union
vs log-only reads through a single SplitPlanner probed at scan build time.

Highlights:

- Introduce SplitPlanner with AppendPlanner / UpsertPlanner. Each planner
  probes the readable lake snapshot exactly once at construction and
  dispatches inside plan() to either the lake-union branch (lake splits
  unioned with a Fluss log-tail / kv+log-tail) or the log-only branch
  (pure Fluss read). No runtime fallback path — presence/absence of the
  lake snapshot is decided deterministically at construction.

- Remove FlussLakeAppendBatch, FlussLakeUpsertBatch, FlussLakeBatch. The
  planner-backed FlussAppendBatch / FlussUpsertBatch dispatch the reader
  factory based on planner.hasLakeSnapshot, and own the planner's
  Fluss client connection / admin lifecycle via close().

- Batch scans no longer consume SCAN_START_UP_MODE. Batch semantics are
  "the full table" regardless of the user-facing startup mode. This keeps
  append and upsert planners symmetric (KV snapshot has no partial-read
  semantics) and structurally avoids the empty offset-range case that
  otherwise trips the reader-side guard. Time-range batch reads should
  be expressed via predicate pushdown, not startup mode. AppendPlanner
  uses OffsetsInitializer.full() rather than earliest() so start offsets
  resolve to concrete numeric values usable by the split-by-max-records
  logic.

- Merge FlussSupportsPushDownV2Filters and FlussLakeSupportsPushDownV2Filters.
  Pushdown decisions are schema-level (partition predicate extraction plus
  ARROW/non-PK data-predicate acceptance); further pushdown to LakeSource
  happens inside the planner and is transparent to Spark.

- SparkTable is no longer lake-aware. newScanBuilder constructs a plain
  Append/Upsert ScanBuilder; the planner probes the snapshot internally.

- Tests: SparkPrimaryKeyTableReadTest asserts SCAN_START_UP_MODE is ignored
  for batch reads; SparkLakeTableReadTestBase helper no longer filters by
  planner type.

🤖 Generated with [Qoder](https://qoder.com)
Rewrap scaladoc lines to match spotless-maven-plugin's line-length rule.
No code semantics changed.

🤖 Generated with [Qoder](https://qoder.com)
… scan builder

The scan builder consolidation (e16af58) merged FlussSupportsPushDownV2Filters
and FlussLakeSupportsPushDownV2Filters into a single trait but only preserved
the ARROW log-table branch. This left PK tables with lake enabled unable to push
predicates to the lake source, causing pk filter pushdown tests to report empty
pushed predicates.

Add an `else if (isDataLakeEnabled)` branch that probes the lake source via
FlussLakeUtils.applyLakeFilters — mirroring the former FlussLakeSupportsPushDown-
V2Filters behavior — so that accepted predicates are reported back to Spark and
passed to the planner for lake-path filtering.

🤖 Generated with [Qoder][https://qoder.com]
@YannByron

Copy link
Copy Markdown
Contributor Author

@Yohahaha @fresh-borzoni Please take a look if have change, thanks.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant