Skip to content

[python] Support query auth (row filter & column masking) for REST catalog#8136

Open
MgjLLL wants to merge 3 commits into
apache:masterfrom
MgjLLL:python-query-auth
Open

[python] Support query auth (row filter & column masking) for REST catalog#8136
MgjLLL wants to merge 3 commits into
apache:masterfrom
MgjLLL:python-query-auth

Conversation

@MgjLLL

@MgjLLL MgjLLL commented Jun 5, 2026

Copy link
Copy Markdown

Purpose

Adds query-auth support to the Python client so it honors the row-level filter and column masking rules returned by a REST catalog, matching the existing JVM client behavior.

When the new option query-auth.enabled is set to true, before producing a Plan the client calls POST /v1/.../databases/{db}/tables/{tb}/auth with the projected fields, receives { filter, columnMasking }, and applies them on the read path:

  • RESTApi.auth_table_query issues the call (new request/response models AuthTableQueryRequest / AuthTableQueryResponse, new path in ResourcePaths.auth_table).
  • TableQueryAuth / TableQueryAuthResult (catalog/table_query_auth.py) wrap the result and convert each split to a QueryAuthSplit.
  • predicate_json_parser (common/predicate_json_parser.py) parses Paimon predicate JSON into a PyArrow compute filter (EQ/NEQ/LT/LTEQ/GT/GTEQ/IS_NULL/IS_NOT_NULL/IN/NOT_IN/STARTS_WITH/ENDS_WITH/CONTAINS/AND/OR/NOT).
  • AuthFilterReader / AuthMaskingReader / ColumnProjectReader (read/reader/auth_masking_reader.py) implement row filtering, column masking transforms (NULL, FIELD_REF, CAST, UPPER, LOWER, CONCAT, CONCAT_WS) and final projection back to the user's requested columns.
  • read_builder / stream_read_builder / table_read / table_scan / file_store_table / catalog_environment / rest_catalog are wired to invoke the auth call and pull extra fields required only by the auth filter.

Behavior is gated by the new CoreOptions.QUERY_AUTH_ENABLED (query-auth.enabled, default false), so existing users see no change.

Tests

Three new test files (994+ lines, all passing locally under pytest):

  • paimon-python/pypaimon/tests/predicate_json_parser_test.py — covers each predicate kind, nested AND/OR/NOT, type coercion, null handling, and extract_referenced_fields.
  • paimon-python/pypaimon/tests/auth_masking_reader_test.py — covers each masking transform, missing-field validation, and projection back to the user-requested columns.
  • paimon-python/pypaimon/tests/table_query_auth_test.py — end-to-end coverage: REST catalog calls auth_table_query, the result is plumbed into the plan, splits become QueryAuthSplit, and reads return filtered + masked rows.

Local check:

cd paimon-python
python -m pytest pypaimon/tests/predicate_json_parser_test.py \
                  pypaimon/tests/auth_masking_reader_test.py \
                  pypaimon/tests/table_query_auth_test.py -q
flake8 --config dev/cfg.ini pypaimon/  # 已在改动范围内通过

API and Format

  • New catalog option: query-auth.enabled (boolean, default false).
  • New REST endpoint consumed by the client: POST /v1/{prefix}/databases/{db}/tables/{tb}/auth. Request { "select": [...] }, response { "filter": [<predicate-json>...], "columnMasking": { <col>: <transform-json>, ... } }. The contract follows the existing Java client; no server-side change is required for catalogs that already implement query auth.
  • No change to existing user-facing Python APIs. New types (AuthTableQueryRequest, AuthTableQueryResponse, TableQueryAuth, TableQueryAuthResult, QueryAuthSplit, AuthFilterReader, AuthMaskingReader, ColumnProjectReader) are additive and live under existing modules.
  • File format / on-disk layout: unchanged.

Documentation

The new option query-auth.enabled should be reflected in the Python configuration reference. Happy to add the docs entry in this PR or in a follow-up — please advise.

This closes #8135

…talog

Adds query-auth support to the Python client so it honors the row-level
filter and column masking rules returned by a REST catalog, matching the
existing JVM client behavior.

When the new option `query-auth.enabled` is set to true, the client
calls `POST /v1/.../databases/{db}/tables/{tb}/auth` before producing a
plan, receives `{ filter, columnMasking }`, and applies them on the
read path:

  * `predicate_json_parser` parses Paimon predicate JSON into a
    PyArrow compute filter (EQ/NEQ/LT/LTEQ/GT/GTEQ/IS_NULL/IS_NOT_NULL/
    IN/NOT_IN/STARTS_WITH/ENDS_WITH/CONTAINS/AND/OR/NOT).
  * `AuthFilterReader` / `AuthMaskingReader` / `ColumnProjectReader`
    perform row filtering, column masking transforms (NULL, FIELD_REF,
    CAST, UPPER, LOWER, CONCAT, CONCAT_WS) and final projection back to
    the user's requested columns.
  * `TableQueryAuth` / `TableQueryAuthResult` wrap the result and
    convert each split to a `QueryAuthSplit`.

Behavior is gated by `CoreOptions.QUERY_AUTH_ENABLED` (default false),
so existing users see no change.
@JingsongLi

Copy link
Copy Markdown
Contributor

I found a few correctness issues in the query-auth paths introduced here:

  1. paimon-python/pypaimon/read/datasource/split_provider.py:127 constructs ReadBuilder(self._ensure_table()) directly. That bypasses FileStoreTable.new_read_builder(), which is where the REST query auth is injected. As a result, pypaimon.ray.read_paimon(...) can read REST tables without applying server-side row filters or column masking. I think this should either call self._ensure_table().new_read_builder() or explicitly pass the table's query auth into the builder, with a Ray regression test for row filtering/masking.

  2. paimon-python/pypaimon/read/stream_read_builder.py:117 stores _query_auth, but new_streaming_scan() does not pass it into AsyncStreamingTableScan. The plans returned from streaming_table_scan.py:322 and streaming_table_scan.py:386 also do not go through auth_result.convert_plan(). So table.new_stream_read_builder() skips row filters and column masking for both the initial scan and later delta/changelog scans. The streaming scan should preserve and apply query auth before returning each plan.

  3. The auth reader wrappers currently assume the inner reader supports read_arrow_batch() (paimon-python/pypaimon/read/reader/auth_masking_reader.py:38 and :66). For primary-key tables with non-raw-convertible splits, TableRead can create a MergeFileSplitRead, whose create_reader() returns the normal row RecordReader path rather than a RecordBatchReader. Wrapping that in AuthFilterReader/AuthMaskingReader will fail with AttributeError when query auth is enabled. This needs either a row-reader auth path, conversion to a batch-capable reader before wrapping, or routing/rejecting these splits explicitly.

- Ray: use table.new_read_builder() instead of direct ReadBuilder()
- Streaming: pass query_auth to AsyncStreamingTableScan, apply to all plans
- Merge reader: add RecordReaderToBatchAdapter for primary-key tables
- Parallel: use _create_reader_for_split, add raw_convertible proxy
@MgjLLL

MgjLLL commented Jun 8, 2026

Copy link
Copy Markdown
Author

Fixes for issues raised by @JingsongLi, plus one additional issue found during analysis.
8 files changed, +188 -13 lines.

Fix 1: Ray read path bypasses auth (split_provider.py)

_ensure_planned() directly constructed ReadBuilder(table), bypassing the query_auth injection from FileStoreTable.new_read_builder().

Fix: Changed to self._ensure_table().new_read_builder(). Removed the direct ReadBuilder import.

Fix 2: Streaming read path skips auth entirely (streaming_table_scan.py, stream_read_builder.py)

StreamReadBuilder stored _query_auth but never passed it to AsyncStreamingTableScan. All three plan creation methods (initial/follow-up/catch-up) produced plans without auth wrapping.

Fix:

  • Added query_auth parameter to AsyncStreamingTableScan.__init__()
  • Added _apply_auth(plan) method that calls query_auth(select)convert_plan(plan)
  • Applied auth to _create_initial_plan, _create_follow_up_plan, _create_catch_up_plan
  • Extracted _create_initial_plan_raw() to avoid double auth in catch-up path
  • StreamReadBuilder.new_streaming_scan() now passes query_auth

Fix 3: Primary-key table merge reader incompatible with auth wrappers (auth_masking_reader.py, table_read.py)

MergeFileSplitRead.create_reader() returns a RecordReader (row-level), but AuthFilterReader/AuthMaskingReader require RecordBatchReader (batch-level with read_arrow_batch()). Java has a unified RecordReader<InternalRow> so this
mismatch doesn't exist in JVM.

Fix:

  • Added RecordReaderToBatchAdapter(RecordBatchReader) that collects OffsetRow tuples from read_batch()/next() and converts them to pa.RecordBatch
  • _authed_reader() now detects non-RecordBatchReader and wraps with the adapter before applying auth wrappers

Fix 4: Parallel read path bypasses auth + missing raw_convertible proxy (table_read.py, query_auth_split.py)

_read_one_split_to_batches() called _create_split_read(split).create_reader() directly, bypassing QueryAuthSplit detection. Additionally, QueryAuthSplit lacked a raw_convertible property proxy, causing AttributeError when accessed.

Fix:

  • Changed to self._create_reader_for_split(split) which correctly handles QueryAuthSplit
  • Added raw_convertible property proxy to QueryAuthSplit

New Tests (for fixes)

  • TestQueryAuthSplitRawConvertible (2 tests) — verifies raw_convertible proxy for true/false
  • TestRecordReaderToBatchAdapter (5 tests) — basic conversion, multi-batch, empty reader, close delegation, integration with AuthFilterReader

All 90 auth-related tests pass. Full test suite: 318/324 pass (6 pre-existing failures unrelated to this PR).

@MgjLLL

MgjLLL commented Jun 8, 2026

Copy link
Copy Markdown
Author

I found a few correctness issues in the query-auth paths introduced here:

  1. paimon-python/pypaimon/read/datasource/split_provider.py:127 constructs ReadBuilder(self._ensure_table()) directly. That bypasses FileStoreTable.new_read_builder(), which is where the REST query auth is injected. As a result, pypaimon.ray.read_paimon(...) can read REST tables without applying server-side row filters or column masking. I think this should either call self._ensure_table().new_read_builder() or explicitly pass the table's query auth into the builder, with a Ray regression test for row filtering/masking.
  2. paimon-python/pypaimon/read/stream_read_builder.py:117 stores _query_auth, but new_streaming_scan() does not pass it into AsyncStreamingTableScan. The plans returned from streaming_table_scan.py:322 and streaming_table_scan.py:386 also do not go through auth_result.convert_plan(). So table.new_stream_read_builder() skips row filters and column masking for both the initial scan and later delta/changelog scans. The streaming scan should preserve and apply query auth before returning each plan.
  3. The auth reader wrappers currently assume the inner reader supports read_arrow_batch() (paimon-python/pypaimon/read/reader/auth_masking_reader.py:38 and :66). For primary-key tables with non-raw-convertible splits, TableRead can create a MergeFileSplitRead, whose create_reader() returns the normal row RecordReader path rather than a RecordBatchReader. Wrapping that in AuthFilterReader/AuthMaskingReader will fail with AttributeError when query auth is enabled. This needs either a row-reader auth path, conversion to a batch-capable reader before wrapping, or routing/rejecting these splits explicitly.

@JingsongLi All 3 issues fixed (+ 1 additional parallel path bypass found during analysis). See updated PR description. PTAL.

Return None instead of a local lambda from table_query_auth() when
auth is disabled, since pickle cannot serialize local lambdas. This
fixes serializable_test and ray_sink_test failures.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
if not self.filter and not self.column_masking:
return plan
auth_splits = [QueryAuthSplit(split, self) for split in plan.splits()]
return Plan(auth_splits)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Java TableScan.Plan does not carry a snapshot id, but Python Plan does and the update / row-id update paths use it as check_from_snapshot. Wrapping the plan here drops plan.snapshot_id, so a query-auth table planned from a non-empty snapshot becomes snapshot_id=None; table_update then emits commit messages with -1, which disables the row-id conflict checks (and related global-index update checks). Please preserve the original plan metadata, e.g. Plan(auth_splits, snapshot_id=plan.snapshot_id).


return reader

def _create_split_read_with_read_type(self, split, read_type):

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This auth-specific construction bypasses the normal PK read path above. In _create_split_read, PK tables inject missing sequence.field columns into the inner read type and then project them back out, matching the Java withReadType + outer projection behavior. Here, if query auth is enabled and the user projects id,val from a PK table with sequence.field=ts, MergeFileSplitRead is built without ts; that can either fail with sequence.field ... not found or merge by file sequence instead of the configured user sequence. Please reuse the existing _create_split_read widening/project-back logic for effective_read_type, or factor it so the auth path cannot drift from the normal PK path.

elif function == "LIKE":
raw = literals[0]
escaped = re.escape(raw)
pattern = escaped.replace("%", ".*").replace("_", ".")

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This does not match the JVM LIKE semantics. Java treats backslash as the default escape character before expanding % / _, so a policy predicate like LIKE admin\\_% matches admin_foo and not adminXfoo. Escaping the whole string first and then replacing every % / _ makes escaped wildcards behave as wildcards (or requires a literal backslash), so Python can allow/deny different rows from the Java client for the same auth filter. Please port the Java Like.sqlToRegexLike behavior, including invalid escape handling.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Feature] [python] Support query auth (row filter & column masking) for REST catalog

2 participants