From cffd764e99c1a52dc72a08d6e7f7e69bdd0522c3 Mon Sep 17 00:00:00 2001 From: Adam Gutglick Date: Fri, 29 May 2026 15:18:39 +0100 Subject: [PATCH 01/11] some stuff Signed-off-by: Adam Gutglick --- Cargo.lock | 2 + encodings/parquet-variant/Cargo.toml | 1 + vortex-btrblocks/Cargo.toml | 11 +- vortex-btrblocks/src/builder.rs | 2 + vortex-btrblocks/src/lib.rs | 2 + vortex-btrblocks/src/schemes/mod.rs | 3 +- vortex-btrblocks/src/variant/mod.rs | 427 +++++++++++++++++++++++++++ 7 files changed, 445 insertions(+), 3 deletions(-) create mode 100644 vortex-btrblocks/src/variant/mod.rs diff --git a/Cargo.lock b/Cargo.lock index b6f67682c4a..c47de7f153e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9819,6 +9819,7 @@ dependencies = [ "codspeed-divan-compat", "itertools 0.14.0", "num-traits", + "parquet-variant-compute", "pco", "rand 0.10.1", "rstest", @@ -9836,6 +9837,7 @@ dependencies = [ "vortex-fsst", "vortex-mask", "vortex-onpair", + "vortex-parquet-variant", "vortex-pco", "vortex-runend", "vortex-sequence", diff --git a/encodings/parquet-variant/Cargo.toml b/encodings/parquet-variant/Cargo.toml index 6b3900abb27..cb5a8492fea 100644 --- a/encodings/parquet-variant/Cargo.toml +++ b/encodings/parquet-variant/Cargo.toml @@ -12,6 +12,7 @@ readme = { workspace = true } repository = { workspace = true } rust-version = { workspace = true } version = { workspace = true } +publish = true [lints] workspace = true diff --git a/vortex-btrblocks/Cargo.toml b/vortex-btrblocks/Cargo.toml index 1adb6508828..69722215faf 100644 --- a/vortex-btrblocks/Cargo.toml +++ b/vortex-btrblocks/Cargo.toml @@ -16,6 +16,7 @@ version = { workspace = true } [dependencies] itertools = { workspace = true } num-traits = { workspace = true } +parquet-variant-compute = { workspace = true, optional = true } pco = { workspace = true, optional = true } rand = { workspace = true } rustc-hash = { workspace = true } @@ -31,6 +32,7 @@ vortex-fastlanes = { workspace = true } vortex-fsst = { workspace = true } vortex-mask = { workspace = true } vortex-onpair = { workspace = true, optional = true } +vortex-parquet-variant = { workspace = true, optional = true } vortex-pco = { workspace = true, optional = true } vortex-runend = { workspace = true } vortex-sequence = { workspace = true } @@ -42,22 +44,29 @@ vortex-zstd = { workspace = true, optional = true } [dev-dependencies] divan = { workspace = true } +rand = { workspace = true } rstest = { workspace = true } test-with = { workspace = true } vortex-array = { workspace = true, features = ["_test-harness"] } vortex-session = { workspace = true } [features] -# This feature enabled unstable encodings for which we don't guarantee stability. + unstable_encodings = [ "dep:vortex-tensor", "dep:vortex-onpair", "vortex-zstd?/unstable_encodings", ] +parquet-variant = [ + "dep:vortex-parquet-variant", + "dep:parquet-variant-compute", + "zstd", +] pco = ["dep:pco", "dep:vortex-pco"] zstd = ["dep:vortex-zstd"] [lints] +# This feature enabled unstable encodings for which we don't guarantee stability. workspace = true [[bench]] diff --git a/vortex-btrblocks/src/builder.rs b/vortex-btrblocks/src/builder.rs index c0a0eaeb5eb..79de8ee4f8b 100644 --- a/vortex-btrblocks/src/builder.rs +++ b/vortex-btrblocks/src/builder.rs @@ -70,6 +70,8 @@ pub const ALL_SCHEMES: &[&dyn Scheme] = &[ &decimal::DecimalScheme, // Temporal schemes. &temporal::TemporalScheme, + // Binary schemes + &binary::BinaryDictScheme, ]; /// Builder for creating configured [`BtrBlocksCompressor`] instances. diff --git a/vortex-btrblocks/src/lib.rs b/vortex-btrblocks/src/lib.rs index 39db05246a6..6001d77684f 100644 --- a/vortex-btrblocks/src/lib.rs +++ b/vortex-btrblocks/src/lib.rs @@ -58,6 +58,8 @@ mod builder; mod canonical_compressor; /// Compression scheme implementations. pub mod schemes; +#[cfg(feature = "parquet-variant")] +pub mod variant; // Re-export framework types from vortex-compressor for backwards compatibility. // Btrblocks-specific exports. diff --git a/vortex-btrblocks/src/schemes/mod.rs b/vortex-btrblocks/src/schemes/mod.rs index 16123429e86..8b8629d3f0a 100644 --- a/vortex-btrblocks/src/schemes/mod.rs +++ b/vortex-btrblocks/src/schemes/mod.rs @@ -5,11 +5,10 @@ pub mod binary; pub mod bool; +pub mod decimal; pub mod float; pub mod integer; pub mod string; - -pub mod decimal; pub mod temporal; pub(crate) mod patches; diff --git a/vortex-btrblocks/src/variant/mod.rs b/vortex-btrblocks/src/variant/mod.rs new file mode 100644 index 00000000000..2defc928e46 --- /dev/null +++ b/vortex-btrblocks/src/variant/mod.rs @@ -0,0 +1,427 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! Compression scheme for JSON data into binary variant representation + +use vortex_array::ArrayRef; +use vortex_array::Canonical; +use vortex_array::ExecutionCtx; +use vortex_array::IntoArray; +use vortex_array::arrays::ExtensionArray; +use vortex_array::arrays::extension::ExtensionArrayExt; +use vortex_array::arrow::ArrowSessionExt; +use vortex_array::dtype::extension::ExtDType; +use vortex_array::dtype::extension::ExtId; +use vortex_array::dtype::extension::ExtVTable; +use vortex_array::extension::EmptyMetadata; +use vortex_array::scalar::ScalarValue; +use vortex_compressor::ctx::CompressorContext; +use vortex_compressor::estimate::CompressionEstimate; +use vortex_compressor::estimate::DeferredEstimate; +use vortex_compressor::scheme::Scheme; +use vortex_compressor::scheme::SchemeExt; +use vortex_compressor::stats::ArrayAndStats; +use vortex_error::VortexResult; +use vortex_error::vortex_bail; +use vortex_error::vortex_ensure; +use vortex_parquet_variant::ParquetVariant; +use vortex_parquet_variant::ParquetVariantArrayExt; + +use crate::CascadingCompressor; + +/// Compression scheme that converts JSON string extension arrays to Parquet Variant arrays. +#[derive(Debug)] +pub struct JsonToVariantScheme; + +/// Child indices for recursively compressed Parquet Variant binary children. +mod parquet_variant_children { + /// The Parquet Variant metadata child. + pub const METADATA: usize = 0; + /// The raw Parquet Variant value child. + pub const VALUE: usize = 1; +} + +/// JSON logical type backed by UTF-8 string storage. +#[derive(Clone, Debug, Default, PartialEq, Eq, Hash)] +pub struct Json; + +impl ExtVTable for Json { + type Metadata = EmptyMetadata; + type NativeValue<'a> = &'a str; + + fn id(&self) -> ExtId { + ExtId::new("vortex.json") + } + + fn serialize_metadata(&self, _metadata: &Self::Metadata) -> VortexResult> { + Ok(Vec::new()) + } + + fn deserialize_metadata(&self, metadata: &[u8]) -> VortexResult { + vortex_ensure!(metadata.is_empty(), "JSON metadata must be empty"); + Ok(EmptyMetadata) + } + + fn validate_dtype(ext_dtype: &ExtDType) -> VortexResult<()> { + vortex_ensure!( + ext_dtype.storage_dtype().is_utf8(), + "JSON storage dtype must be utf8, got {}", + ext_dtype.storage_dtype() + ); + Ok(()) + } + + fn unpack_native<'a>( + _ext_dtype: &'a ExtDType, + storage_value: &'a ScalarValue, + ) -> VortexResult> { + let ScalarValue::Utf8(value) = storage_value else { + vortex_bail!("JSON storage scalar must be utf8, got {storage_value}"); + }; + Ok(value.as_str()) + } +} + +impl Scheme for JsonToVariantScheme { + fn scheme_name(&self) -> &'static str { + "json_to_variant" + } + + fn matches(&self, canonical: &Canonical) -> bool { + let Canonical::Extension(ext_array) = canonical else { + return false; + }; + + ext_array.ext_dtype().is::() + } + + fn num_children(&self) -> usize { + 2 + } + + fn expected_compression_ratio( + &self, + _data: &ArrayAndStats, + _compress_ctx: CompressorContext, + _exec_ctx: &mut ExecutionCtx, + ) -> CompressionEstimate { + CompressionEstimate::Deferred(DeferredEstimate::Sample) + } + + fn compress( + &self, + compressor: &CascadingCompressor, + data: &ArrayAndStats, + compress_ctx: CompressorContext, + exec_ctx: &mut ExecutionCtx, + ) -> VortexResult { + let array = data.array().clone().execute::(exec_ctx)?; + let storage = array.storage_array().clone(); + + if !storage.dtype().is_utf8() { + vortex_bail!("storage must be utf8"); + } + + let arrow_array = { + let session = exec_ctx.session().clone(); + let arrow = session.arrow(); + arrow.execute_arrow(storage, None, exec_ctx)? + }; + + let array = parquet_variant_compute::json_to_variant(&arrow_array)?; + + let parquet_variant = + ParquetVariant::from_arrow_variant(&array)?.downcast::(); + + let compressed_metadata = compressor.compress_child( + parquet_variant.metadata_array(), + &compress_ctx, + self.id(), + parquet_variant_children::METADATA, + exec_ctx, + )?; + let compressed_value = parquet_variant + .value_array() + .map(|value| { + compressor.compress_child( + value, + &compress_ctx, + self.id(), + parquet_variant_children::VALUE, + exec_ctx, + ) + }) + .transpose()?; + + ParquetVariant::try_new( + parquet_variant.validity()?, + compressed_metadata, + compressed_value, + parquet_variant.typed_value_array().cloned(), + ) + .map(IntoArray::into_array) + } +} + +#[cfg(test)] +mod tests { + use std::sync::LazyLock; + + use rand::RngExt; + use rand::SeedableRng; + use rand::rngs::StdRng; + use vortex_array::IntoArray; + use vortex_array::VortexSessionExecute; + use vortex_array::arrays::Extension; + use vortex_array::arrays::ExtensionArray; + use vortex_array::arrays::VarBinView; + use vortex_array::arrays::VarBinViewArray; + use vortex_array::arrays::extension::ExtensionArrayExt; + use vortex_array::session::ArraySession; + use vortex_compressor::builtins::BinaryDictScheme; + use vortex_compressor::builtins::IntConstantScheme; + use vortex_compressor::builtins::StringConstantScheme; + use vortex_compressor::builtins::StringDictScheme; + use vortex_session::VortexSession; + use vortex_zstd::Zstd; + + use super::*; + use crate::schemes::integer::BitPackingScheme; + use crate::schemes::integer::FoRScheme; + use crate::schemes::integer::RunEndScheme; + use crate::schemes::integer::SequenceScheme; + use crate::schemes::integer::SparseScheme; + use crate::schemes::integer::ZigZagScheme; + use crate::schemes::string::FSSTScheme; + use crate::schemes::string::ZstdScheme; + + static SESSION: LazyLock = + LazyLock::new(|| VortexSession::empty().with::()); + + fn json_data() -> Vec { + let mut rng = StdRng::seed_from_u64(0); + const ACCOUNT_KEYS: &[&str] = &["account_id", "customer_id", "tenant_id", "buyer_id"]; + const REGION_KEYS: &[&str] = &["region", "market", "country"]; + const REGIONS: &[&str] = &["us-east", "us-west", "eu", "apac", "latam"]; + const STATUS_KEYS: &[&str] = &["status", "payment_state", "lifecycle_state"]; + const STATUSES: &[&str] = &["draft", "open", "paid", "void", "past_due"]; + const AMOUNT_KEYS: &[&str] = &["discount", "tax", "shipping", "credit"]; + const FLAG_KEYS: &[&str] = &["autopay", "fraud_review", "priority", "disputed"]; + const TAGS: &[&str] = &["renewal", "manual", "usage", "trial", "enterprise"]; + + (0..1024) + .map(|_| { + let mut fields = vec![ + format!( + r#""{}":"acct_{:04x}""#, + ACCOUNT_KEYS[rng.random_range(0..ACCOUNT_KEYS.len())], + rng.random::(), + ), + format!( + r#""invoice_total":{}.{:02}"#, + rng.random_range(10_u32..100_000), + rng.random_range(0_u32..100), + ), + format!(r#""line_items":{}"#, rng.random_range(1_u32..250)), + ]; + + if rng.random_bool(0.85) { + fields.push(format!( + r#""{}":"{}""#, + STATUS_KEYS[rng.random_range(0..STATUS_KEYS.len())], + STATUSES[rng.random_range(0..STATUSES.len())], + )); + } + if rng.random_bool(0.75) { + fields.push(format!( + r#""{}":"{}""#, + REGION_KEYS[rng.random_range(0..REGION_KEYS.len())], + REGIONS[rng.random_range(0..REGIONS.len())], + )); + } + if rng.random_bool(0.55) { + fields.push(format!( + r#""{}":{}.{:02}"#, + AMOUNT_KEYS[rng.random_range(0..AMOUNT_KEYS.len())], + rng.random_range(0_u32..2_500), + rng.random_range(0_u32..100), + )); + } + if rng.random_bool(0.40) { + fields.push(format!( + r#""{}":{}"#, + FLAG_KEYS[rng.random_range(0..FLAG_KEYS.len())], + rng.random_bool(0.5), + )); + } + if rng.random_bool(0.30) { + fields.push(format!( + r#""tags":["{}","{}"]"#, + TAGS[rng.random_range(0..TAGS.len())], + TAGS[rng.random_range(0..TAGS.len())], + )); + } + + format!("{{{}}}", fields.join(",")) + }) + .collect() + } + + fn json_array(values: &[String]) -> VortexResult { + let storage = + VarBinViewArray::from_iter_str(values.iter().map(String::as_str)).into_array(); + Ok(ExtensionArray::try_new_from_vtable(Json, EmptyMetadata, storage)?.into_array()) + } + + fn parquet_variant_child_compressor() -> CascadingCompressor { + CascadingCompressor::new(vec![ + &JsonToVariantScheme, + &BinaryDictScheme, + &IntConstantScheme, + &FoRScheme, + &SparseScheme, + &BitPackingScheme, + &RunEndScheme, + &SequenceScheme, + &ZigZagScheme, + ]) + } + + fn print_comparison_output( + array: &ArrayRef, + string_compressed: &ArrayRef, + compressed: &ArrayRef, + ) { + let compressed_ratio = array.nbytes() as f64 / compressed.nbytes() as f64; + let compressed_array_ratio = string_compressed.nbytes() as f64 / compressed.nbytes() as f64; + println!( + "Compression sizes: input={} bytes, compressed string={} bytes, compressed output={} bytes", + array.nbytes(), + string_compressed.nbytes(), + compressed.nbytes(), + ); + println!("Compressed output ratio: {compressed_ratio:.2}x\n"); + println!("Compressed string / compressed output ratio: {compressed_array_ratio:.2}x\n"); + println!("JSON input encoding tree:\n{}", array.tree_display()); + println!( + "String-compressed encoding tree:\n{}", + string_compressed.tree_display() + ); + println!( + "Compressed output encoding tree:\n{}", + compressed.tree_display() + ); + } + + #[test] + fn parquet_variant_compresses_repeated_json_keys() -> VortexResult<()> { + let array = json_array(&json_data())?; + + let string_compressor = + CascadingCompressor::new(vec![&StringDictScheme, &StringConstantScheme]); + let mut exec_ctx = SESSION.create_execution_ctx(); + let string_compressed = string_compressor.compress(&array, &mut exec_ctx)?; + + let variant_compressor = parquet_variant_child_compressor(); + let mut exec_ctx = SESSION.create_execution_ctx(); + let variant_data = ArrayAndStats::new(array.clone(), Default::default()); + let variant_compressed = JsonToVariantScheme.compress( + &variant_compressor, + &variant_data, + CompressorContext::new(), + &mut exec_ctx, + )?; + + assert!( + variant_compressed.is::(), + "expected ParquetVariant output, got encoding {} with dtype {} and {} bytes", + variant_compressed.encoding_id(), + variant_compressed.dtype(), + variant_compressed.nbytes() + ); + assert!( + variant_compressed.nbytes() < string_compressed.nbytes(), + "Parquet Variant conversion should compress repeated JSON keys: \ + variant={} bytes, input={} bytes", + variant_compressed.nbytes(), + string_compressed.nbytes(), + ); + + print_comparison_output(&array, &string_compressed, &variant_compressed); + + Ok(()) + } + + #[test] + fn recursively_compresses_parquet_variant_binary_children() -> VortexResult<()> { + let array: ArrayRef = json_array(&json_data())?; + + let variant_compressor = parquet_variant_child_compressor(); + let mut exec_ctx = SESSION.create_execution_ctx(); + let variant_data = ArrayAndStats::new(array.clone(), Default::default()); + let compressed = JsonToVariantScheme.compress( + &variant_compressor, + &variant_data, + CompressorContext::new(), + &mut exec_ctx, + )?; + let parquet_variant = compressed.clone().downcast::(); + + assert!( + !parquet_variant.metadata_array().is::(), + "expected Parquet Variant metadata child to be compressed, got {}", + parquet_variant.metadata_array().encoding_id(), + ); + assert!(parquet_variant.value_array().is_some()); + assert!(parquet_variant.typed_value_array().is_none()); + + Ok(()) + } + + #[test] + fn prefers_smaller_extension_storage_over_variant_scheme() -> VortexResult<()> { + let array: ArrayRef = json_array(&json_data())?; + + let string_compressor = CascadingCompressor::new(vec![ + &StringDictScheme, + &FSSTScheme, + &IntConstantScheme, + &StringConstantScheme, + &FoRScheme, + &BitPackingScheme, + &RunEndScheme, + &SequenceScheme, + &ZigZagScheme, + ]); + let mut exec_ctx = SESSION.create_execution_ctx(); + let string_compressed = string_compressor.compress(&array, &mut exec_ctx)?; + + let variant_compressor = CascadingCompressor::new(vec![ + &JsonToVariantScheme, + &BinaryDictScheme, + &FSSTScheme, + &ZstdScheme, + &IntConstantScheme, + &StringConstantScheme, + &FoRScheme, + &SparseScheme, + &BitPackingScheme, + &RunEndScheme, + &SequenceScheme, + &ZigZagScheme, + ]); + let mut exec_ctx = SESSION.create_execution_ctx(); + let compressed = variant_compressor.compress(&array, &mut exec_ctx)?; + let extension = compressed.clone().downcast::(); + let storage = extension.storage_array(); + assert!( + storage.is::(), + "expected JSON extension storage fallback to use zstd, got {}", + storage.encoding_id(), + ); + + print_comparison_output(&array, &string_compressed, &compressed); + + Ok(()) + } +} From a92530a302cb11cf872c1e7753f4c57543549499 Mon Sep 17 00:00:00 2001 From: Adam Gutglick Date: Fri, 29 May 2026 16:20:07 +0100 Subject: [PATCH 02/11] binary compression Signed-off-by: Adam Gutglick --- vortex-btrblocks/src/builder.rs | 4 +- vortex-btrblocks/src/schemes/binary/fsst.rs | 189 ++++++++++++++++++++ vortex-btrblocks/src/schemes/binary/mod.rs | 2 + vortex-btrblocks/src/variant/mod.rs | 78 +++++--- 4 files changed, 249 insertions(+), 24 deletions(-) create mode 100644 vortex-btrblocks/src/schemes/binary/fsst.rs diff --git a/vortex-btrblocks/src/builder.rs b/vortex-btrblocks/src/builder.rs index 79de8ee4f8b..a09c4cc3a3a 100644 --- a/vortex-btrblocks/src/builder.rs +++ b/vortex-btrblocks/src/builder.rs @@ -65,13 +65,12 @@ pub const ALL_SCHEMES: &[&dyn Scheme] = &[ // Binary schemes. //////////////////////////////////////////////////////////////////////////////////////////////// &binary::BinaryDictScheme, + &binary::BinaryFSSTScheme, &binary::BinaryConstantScheme, // Decimal schemes. &decimal::DecimalScheme, // Temporal schemes. &temporal::TemporalScheme, - // Binary schemes - &binary::BinaryDictScheme, ]; /// Builder for creating configured [`BtrBlocksCompressor`] instances. @@ -195,6 +194,7 @@ impl BtrBlocksCompressorBuilder { string::StringDictScheme.id(), string::FSSTScheme.id(), binary::BinaryDictScheme.id(), + binary::BinaryFSSTScheme.id(), ]; #[cfg(feature = "unstable_encodings")] excluded.push(string::OnPairScheme.id()); diff --git a/vortex-btrblocks/src/schemes/binary/fsst.rs b/vortex-btrblocks/src/schemes/binary/fsst.rs new file mode 100644 index 00000000000..fdbdeaf1225 --- /dev/null +++ b/vortex-btrblocks/src/schemes/binary/fsst.rs @@ -0,0 +1,189 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! FSST binary compression. + +use vortex_array::ArrayRef; +use vortex_array::Canonical; +use vortex_array::ExecutionCtx; +use vortex_array::IntoArray; +use vortex_array::arrays::PrimitiveArray; +use vortex_array::arrays::VarBinArray; +use vortex_array::arrays::primitive::PrimitiveArrayExt; +use vortex_array::arrays::varbin::VarBinArrayExt; +use vortex_compressor::estimate::CompressionEstimate; +use vortex_compressor::estimate::DeferredEstimate; +use vortex_error::VortexResult; +use vortex_fsst::FSST; +use vortex_fsst::FSSTArrayExt; +use vortex_fsst::fsst_compress; +use vortex_fsst::fsst_train_compressor; + +use crate::ArrayAndStats; +use crate::CascadingCompressor; +use crate::CompressorContext; +use crate::Scheme; +use crate::SchemeExt; + +/// FSST compression for binary values. +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +pub struct BinaryFSSTScheme; + +impl Scheme for BinaryFSSTScheme { + fn scheme_name(&self) -> &'static str { + "vortex.binary.fsst" + } + + fn matches(&self, canonical: &Canonical) -> bool { + canonical.dtype().is_binary() + } + + /// Children: lengths=0, code_offsets=1. + fn num_children(&self) -> usize { + 2 + } + + fn expected_compression_ratio( + &self, + _data: &ArrayAndStats, + _compress_ctx: CompressorContext, + _exec_ctx: &mut ExecutionCtx, + ) -> CompressionEstimate { + CompressionEstimate::Deferred(DeferredEstimate::Sample) + } + + fn compress( + &self, + compressor: &CascadingCompressor, + data: &ArrayAndStats, + compress_ctx: CompressorContext, + exec_ctx: &mut ExecutionCtx, + ) -> VortexResult { + let binary = data.array_as_varbinview().into_owned(); + let compressor_fsst = fsst_train_compressor(&binary); + let fsst = fsst_compress( + &binary, + binary.len(), + binary.dtype(), + &compressor_fsst, + exec_ctx, + ); + + let uncompressed_lengths_primitive = fsst + .uncompressed_lengths() + .clone() + .execute::(exec_ctx)? + .narrow(exec_ctx)?; + let compressed_original_lengths = compressor.compress_child( + &uncompressed_lengths_primitive.into_array(), + &compress_ctx, + self.id(), + 0, + exec_ctx, + )?; + + let codes_offsets_primitive = fsst + .codes() + .offsets() + .clone() + .execute::(exec_ctx)? + .narrow(exec_ctx)?; + let compressed_codes_offsets = compressor.compress_child( + &codes_offsets_primitive.into_array(), + &compress_ctx, + self.id(), + 1, + exec_ctx, + )?; + let compressed_codes = VarBinArray::try_new( + compressed_codes_offsets, + fsst.codes().bytes().clone(), + fsst.codes().dtype().clone(), + fsst.codes().validity()?, + )?; + + let fsst = FSST::try_new( + fsst.dtype().clone(), + fsst.symbols().clone(), + fsst.symbol_lengths().clone(), + compressed_codes, + compressed_original_lengths, + exec_ctx, + )?; + + Ok(fsst.into_array()) + } +} + +#[cfg(test)] +mod tests { + use std::sync::LazyLock; + + use vortex_array::IntoArray; + use vortex_array::VortexSessionExecute; + use vortex_array::arrays::VarBinViewArray; + use vortex_array::assert_arrays_eq; + use vortex_array::dtype::DType; + use vortex_array::dtype::Nullability; + use vortex_array::session::ArraySession; + use vortex_error::VortexResult; + use vortex_fsst::FSST; + use vortex_session::VortexSession; + + use crate::BtrBlocksCompressor; + + static SESSION: LazyLock = + LazyLock::new(|| VortexSession::empty().with::()); + + fn binary_fsst_data() -> VarBinViewArray { + VarBinViewArray::from_iter( + (0..1024).map(|idx| { + Some(format!("variant-key-{idx:04}-invoice-total-line-items").into_bytes()) + }), + DType::Binary(Nullability::NonNullable), + ) + } + + #[test] + fn default_compressor_uses_fsst_for_binary_data() -> VortexResult<()> { + let array = binary_fsst_data().into_array(); + let compressed = + BtrBlocksCompressor::default().compress(&array, &mut SESSION.create_execution_ctx())?; + + assert!( + compressed.is::(), + "expected binary data to be FSST-compressed, got {}", + compressed.encoding_id(), + ); + assert!(compressed.nbytes() < array.nbytes()); + + let decompressed = + compressed.execute::(&mut SESSION.create_execution_ctx())?; + assert_arrays_eq!(array, decompressed); + + Ok(()) + } + + #[cfg(feature = "zstd")] + #[test] + fn compact_compressor_uses_zstd_for_binary_data() -> VortexResult<()> { + let array = binary_fsst_data().into_array(); + let compressed = crate::BtrBlocksCompressorBuilder::default() + .with_compact() + .build() + .compress(&array, &mut SESSION.create_execution_ctx())?; + + assert!( + compressed.is::(), + "expected compact binary data to be Zstd-compressed, got {}", + compressed.encoding_id(), + ); + assert!(compressed.nbytes() < array.nbytes()); + + let decompressed = + compressed.execute::(&mut SESSION.create_execution_ctx())?; + assert_arrays_eq!(array, decompressed); + + Ok(()) + } +} diff --git a/vortex-btrblocks/src/schemes/binary/mod.rs b/vortex-btrblocks/src/schemes/binary/mod.rs index 44f09f2daa3..f9885f67255 100644 --- a/vortex-btrblocks/src/schemes/binary/mod.rs +++ b/vortex-btrblocks/src/schemes/binary/mod.rs @@ -3,11 +3,13 @@ //! Binary compression schemes. +mod fsst; #[cfg(feature = "zstd")] mod zstd; #[cfg(all(feature = "zstd", feature = "unstable_encodings"))] mod zstd_buffers; +pub use fsst::BinaryFSSTScheme; // Re-export builtin schemes from vortex-compressor. pub use vortex_compressor::builtins::BinaryConstantScheme; pub use vortex_compressor::builtins::BinaryDictScheme; diff --git a/vortex-btrblocks/src/variant/mod.rs b/vortex-btrblocks/src/variant/mod.rs index 2defc928e46..12a97ab425c 100644 --- a/vortex-btrblocks/src/variant/mod.rs +++ b/vortex-btrblocks/src/variant/mod.rs @@ -182,10 +182,12 @@ mod tests { use vortex_compressor::builtins::IntConstantScheme; use vortex_compressor::builtins::StringConstantScheme; use vortex_compressor::builtins::StringDictScheme; + use vortex_fsst::FSST; use vortex_session::VortexSession; use vortex_zstd::Zstd; use super::*; + use crate::schemes::binary::BinaryFSSTScheme; use crate::schemes::integer::BitPackingScheme; use crate::schemes::integer::FoRScheme; use crate::schemes::integer::RunEndScheme; @@ -277,6 +279,7 @@ mod tests { CascadingCompressor::new(vec![ &JsonToVariantScheme, &BinaryDictScheme, + &BinaryFSSTScheme, &IntConstantScheme, &FoRScheme, &SparseScheme, @@ -324,13 +327,7 @@ mod tests { let variant_compressor = parquet_variant_child_compressor(); let mut exec_ctx = SESSION.create_execution_ctx(); - let variant_data = ArrayAndStats::new(array.clone(), Default::default()); - let variant_compressed = JsonToVariantScheme.compress( - &variant_compressor, - &variant_data, - CompressorContext::new(), - &mut exec_ctx, - )?; + let variant_compressed = variant_compressor.compress(&array, &mut exec_ctx)?; assert!( variant_compressed.is::(), @@ -358,13 +355,7 @@ mod tests { let variant_compressor = parquet_variant_child_compressor(); let mut exec_ctx = SESSION.create_execution_ctx(); - let variant_data = ArrayAndStats::new(array.clone(), Default::default()); - let compressed = JsonToVariantScheme.compress( - &variant_compressor, - &variant_data, - CompressorContext::new(), - &mut exec_ctx, - )?; + let compressed = variant_compressor.compress(&array, &mut exec_ctx)?; let parquet_variant = compressed.clone().downcast::(); assert!( @@ -378,6 +369,48 @@ mod tests { Ok(()) } + #[test] + fn binary_fsst_improves_parquet_variant_child_compression() -> VortexResult<()> { + let array: ArrayRef = json_array(&json_data())?; + let mut exec_ctx = SESSION.create_execution_ctx(); + let without_binary_fsst = CascadingCompressor::new(vec![ + &JsonToVariantScheme, + &BinaryDictScheme, + &IntConstantScheme, + &FoRScheme, + &SparseScheme, + &BitPackingScheme, + &RunEndScheme, + &SequenceScheme, + &ZigZagScheme, + ]) + .compress(&array, &mut exec_ctx)?; + + let mut exec_ctx = SESSION.create_execution_ctx(); + let with_binary_fsst = + parquet_variant_child_compressor().compress(&array, &mut exec_ctx)?; + let parquet_variant = with_binary_fsst.clone().downcast::(); + + assert!( + with_binary_fsst.nbytes() < without_binary_fsst.nbytes(), + "binary FSST should improve Parquet Variant child compression: with={} bytes, without={} bytes", + with_binary_fsst.nbytes(), + without_binary_fsst.nbytes(), + ); + assert!( + parquet_variant + .value_array() + .is_some_and(|value| value.is::()), + "expected Parquet Variant value child to use binary FSST, got {}", + parquet_variant.value_array().map_or_else( + || "missing".to_string(), + |value| value.encoding_id().to_string() + ), + ); + + Ok(()) + } + #[test] fn prefers_smaller_extension_storage_over_variant_scheme() -> VortexResult<()> { let array: ArrayRef = json_array(&json_data())?; @@ -400,7 +433,8 @@ mod tests { &JsonToVariantScheme, &BinaryDictScheme, &FSSTScheme, - &ZstdScheme, + &BinaryFSSTScheme, + // &ZstdScheme, &IntConstantScheme, &StringConstantScheme, &FoRScheme, @@ -412,13 +446,13 @@ mod tests { ]); let mut exec_ctx = SESSION.create_execution_ctx(); let compressed = variant_compressor.compress(&array, &mut exec_ctx)?; - let extension = compressed.clone().downcast::(); - let storage = extension.storage_array(); - assert!( - storage.is::(), - "expected JSON extension storage fallback to use zstd, got {}", - storage.encoding_id(), - ); + // let extension = compressed.clone().downcast::(); + // let storage = extension.storage_array(); + // assert!( + // storage.is::(), + // "expected JSON extension storage fallback to use zstd, got {}", + // storage.encoding_id(), + // ); print_comparison_output(&array, &string_compressed, &compressed); From dad5de3769c8116f732e485e9855289401d12a20 Mon Sep 17 00:00:00 2001 From: Adam Gutglick Date: Fri, 29 May 2026 17:38:13 +0100 Subject: [PATCH 03/11] stash work Signed-off-by: Adam Gutglick --- Cargo.lock | 2 + vortex-btrblocks/Cargo.toml | 6 + vortex-btrblocks/src/variant/mod.rs | 306 +++++++++++++++++++++++++++- 3 files changed, 311 insertions(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c47de7f153e..ac9b69219d0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9816,6 +9816,8 @@ dependencies = [ name = "vortex-btrblocks" version = "0.1.0" dependencies = [ + "arrow-array", + "arrow-schema", "codspeed-divan-compat", "itertools 0.14.0", "num-traits", diff --git a/vortex-btrblocks/Cargo.toml b/vortex-btrblocks/Cargo.toml index 69722215faf..e1c2bc38794 100644 --- a/vortex-btrblocks/Cargo.toml +++ b/vortex-btrblocks/Cargo.toml @@ -14,6 +14,8 @@ rust-version = { workspace = true } version = { workspace = true } [dependencies] +arrow-array = { workspace = true, optional = true } +arrow-schema = { workspace = true, optional = true } itertools = { workspace = true } num-traits = { workspace = true } parquet-variant-compute = { workspace = true, optional = true } @@ -36,6 +38,7 @@ vortex-parquet-variant = { workspace = true, optional = true } vortex-pco = { workspace = true, optional = true } vortex-runend = { workspace = true } vortex-sequence = { workspace = true } +vortex-session = { workspace = true, optional = true } vortex-sparse = { workspace = true } vortex-tensor = { workspace = true, optional = true } vortex-utils = { workspace = true } @@ -58,8 +61,11 @@ unstable_encodings = [ "vortex-zstd?/unstable_encodings", ] parquet-variant = [ + "dep:arrow-array", + "dep:arrow-schema", "dep:vortex-parquet-variant", "dep:parquet-variant-compute", + "dep:vortex-session", "zstd", ] pco = ["dep:pco", "dep:vortex-pco"] diff --git a/vortex-btrblocks/src/variant/mod.rs b/vortex-btrblocks/src/variant/mod.rs index 12a97ab425c..0b577c387e7 100644 --- a/vortex-btrblocks/src/variant/mod.rs +++ b/vortex-btrblocks/src/variant/mod.rs @@ -3,18 +3,41 @@ //! Compression scheme for JSON data into binary variant representation +use std::sync::Arc; + +use arrow_array::ArrayRef as ArrowArrayRef; +use arrow_array::StructArray as ArrowStructArray; +use arrow_schema::DataType; +use arrow_schema::Field; +use vortex_array::Array; +use vortex_array::ArrayId; +use vortex_array::ArrayParts; use vortex_array::ArrayRef; +use vortex_array::ArrayView; use vortex_array::Canonical; +use vortex_array::EmptyArrayData; use vortex_array::ExecutionCtx; +use vortex_array::ExecutionResult; use vortex_array::IntoArray; use vortex_array::arrays::ExtensionArray; +use vortex_array::arrays::VariantArray; use vortex_array::arrays::extension::ExtensionArrayExt; +use vortex_array::arrays::variant::VariantArrayExt; use vortex_array::arrow::ArrowSessionExt; +use vortex_array::arrow::FromArrowArray; +use vortex_array::arrow::to_arrow_null_buffer; +use vortex_array::buffer::BufferHandle; +use vortex_array::dtype::DType; use vortex_array::dtype::extension::ExtDType; use vortex_array::dtype::extension::ExtId; use vortex_array::dtype::extension::ExtVTable; use vortex_array::extension::EmptyMetadata; use vortex_array::scalar::ScalarValue; +use vortex_array::serde::ArrayChildren; +use vortex_array::validity::Validity; +use vortex_array::vtable::NotSupported; +use vortex_array::vtable::VTable; +use vortex_array::vtable::ValidityVTable; use vortex_compressor::ctx::CompressorContext; use vortex_compressor::estimate::CompressionEstimate; use vortex_compressor::estimate::DeferredEstimate; @@ -24,8 +47,12 @@ use vortex_compressor::stats::ArrayAndStats; use vortex_error::VortexResult; use vortex_error::vortex_bail; use vortex_error::vortex_ensure; +use vortex_error::vortex_err; +use vortex_error::vortex_panic; use vortex_parquet_variant::ParquetVariant; use vortex_parquet_variant::ParquetVariantArrayExt; +use vortex_session::VortexSession; +use vortex_session::registry::CachedId; use crate::CascadingCompressor; @@ -41,6 +68,12 @@ mod parquet_variant_children { pub const VALUE: usize = 1; } +mod variant_to_json_children { + pub const VARIANT: usize = 0; + pub const NUM_SLOTS: usize = 1; + pub const SLOT_NAMES: [&str; NUM_SLOTS] = ["variant"]; +} + /// JSON logical type backed by UTF-8 string storage. #[derive(Clone, Debug, Default, PartialEq, Eq, Hash)] pub struct Json; @@ -82,6 +115,227 @@ impl ExtVTable for Json { } } +/// Array that exposes a Variant array as JSON strings. +#[derive(Debug, Clone)] +pub struct VariantToJson; + +/// A [`VariantToJson`]-encoded array. +pub type VariantToJsonArray = Array; + +impl VariantToJson { + /// Creates a JSON wrapper around a Variant-typed array. + pub fn try_new(variant: ArrayRef) -> VortexResult { + vortex_ensure!( + variant.dtype().is_variant(), + "VariantToJson expects a Variant array, got {}", + variant.dtype() + ); + + let storage_dtype = DType::Utf8(variant.dtype().nullability()); + let dtype = + DType::Extension(ExtDType::::try_new(EmptyMetadata, storage_dtype)?.erased()); + let len = variant.len(); + + Array::try_from_parts( + ArrayParts::new(VariantToJson, dtype, len, EmptyArrayData) + .with_slots(vec![Some(variant)].into()), + ) + } +} + +impl VTable for VariantToJson { + type TypedArrayData = EmptyArrayData; + type OperationsVTable = NotSupported; + type ValidityVTable = Self; + + fn id(&self) -> ArrayId { + static ID: CachedId = CachedId::new("vortex.variant_to_json"); + *ID + } + + fn validate( + &self, + _data: &Self::TypedArrayData, + dtype: &DType, + len: usize, + slots: &[Option], + ) -> VortexResult<()> { + vortex_ensure!( + slots.len() == variant_to_json_children::NUM_SLOTS, + "VariantToJsonArray expects {} slots, got {}", + variant_to_json_children::NUM_SLOTS, + slots.len() + ); + let variant = slots[variant_to_json_children::VARIANT] + .as_ref() + .ok_or_else(|| vortex_err!("VariantToJsonArray variant slot must be present"))?; + + let DType::Extension(ext_dtype) = dtype else { + vortex_bail!("VariantToJsonArray dtype must be a JSON extension, got {dtype}"); + }; + vortex_ensure!( + ext_dtype.is::(), + "VariantToJsonArray dtype must be a JSON extension, got {dtype}" + ); + vortex_ensure!( + variant.dtype() == &DType::Variant(dtype.nullability()), + "VariantToJsonArray child dtype {} does not match JSON dtype nullability {}", + variant.dtype(), + dtype + ); + vortex_ensure!( + variant.len() == len, + "VariantToJsonArray child length {} does not match outer length {}", + variant.len(), + len + ); + + Ok(()) + } + + fn nbuffers(_array: ArrayView<'_, Self>) -> usize { + 0 + } + + fn buffer(_array: ArrayView<'_, Self>, idx: usize) -> BufferHandle { + vortex_panic!("VariantToJsonArray buffer index {idx} out of bounds") + } + + fn buffer_name(_array: ArrayView<'_, Self>, _idx: usize) -> Option { + None + } + + fn serialize( + _array: ArrayView<'_, Self>, + _session: &VortexSession, + ) -> VortexResult>> { + Ok(Some(Vec::new())) + } + + fn deserialize( + &self, + dtype: &DType, + len: usize, + metadata: &[u8], + buffers: &[BufferHandle], + children: &dyn ArrayChildren, + _session: &VortexSession, + ) -> VortexResult> { + vortex_ensure!( + metadata.is_empty(), + "VariantToJsonArray metadata must be empty" + ); + vortex_ensure!( + buffers.is_empty(), + "VariantToJsonArray expects 0 buffers, got {}", + buffers.len() + ); + vortex_ensure!( + children.len() == variant_to_json_children::NUM_SLOTS, + "VariantToJsonArray expects {} children, got {}", + variant_to_json_children::NUM_SLOTS, + children.len() + ); + + let variant_dtype = DType::Variant(dtype.nullability()); + let variant = children.get(variant_to_json_children::VARIANT, &variant_dtype, len)?; + + Ok( + ArrayParts::new(self.clone(), dtype.clone(), len, EmptyArrayData) + .with_slots(vec![Some(variant)].into()), + ) + } + + fn slot_name(_array: ArrayView<'_, Self>, idx: usize) -> String { + match variant_to_json_children::SLOT_NAMES.get(idx) { + Some(name) => (*name).to_string(), + None => vortex_panic!("VariantToJsonArray slot_name index {idx} out of bounds"), + } + } + + fn execute(array: Array, ctx: &mut ExecutionCtx) -> VortexResult { + let variant = array.as_ref().slots()[variant_to_json_children::VARIANT] + .as_ref() + .ok_or_else(|| vortex_err!("VariantToJsonArray variant slot must be present"))?; + let variant = variant.clone().execute::(ctx)?; + vortex_ensure!( + variant.shredded().is_none(), + "VariantToJsonArray can only export unshredded Parquet Variant storage to JSON" + ); + + let parquet_variant = variant + .core_storage() + .as_opt::() + .ok_or_else(|| { + vortex_err!( + "VariantToJsonArray requires Parquet Variant core storage, got {}", + variant.core_storage().encoding_id() + ) + })?; + let arrow_variant = parquet_variant_to_json_arrow(parquet_variant, ctx)?; + let arrow_json = parquet_variant_compute::variant_to_json(&arrow_variant)?; + let storage = ArrayRef::from_arrow(&arrow_json, array.dtype().is_nullable())?; + + Ok(ExecutionResult::done( + ExtensionArray::try_new_from_vtable(Json, EmptyMetadata, storage)?.into_array(), + )) + } +} + +impl ValidityVTable for VariantToJson { + fn validity(array: ArrayView<'_, VariantToJson>) -> VortexResult { + array.slots()[variant_to_json_children::VARIANT] + .as_ref() + .ok_or_else(|| vortex_err!("VariantToJsonArray variant slot must be present"))? + .validity() + } +} + +fn parquet_variant_to_json_arrow( + parquet_variant: ArrayView<'_, ParquetVariant>, + ctx: &mut ExecutionCtx, +) -> VortexResult { + vortex_ensure!( + parquet_variant.typed_value_array().is_none(), + "VariantToJsonArray can only export unshredded Parquet Variant storage to JSON" + ); + let value = parquet_variant + .value_array() + .ok_or_else(|| vortex_err!("VariantToJsonArray requires Parquet Variant value storage"))?; + + let metadata_arrow = { + let target = Field::new("", DataType::Binary, false); + let session = ctx.session().clone(); + session.arrow().execute_arrow( + parquet_variant.metadata_array().clone(), + Some(&target), + ctx, + )? + }; + let value_arrow = { + let target = Field::new("", DataType::Binary, value.dtype().is_nullable()); + let session = ctx.session().clone(); + session + .arrow() + .execute_arrow(value.clone(), Some(&target), ctx)? + }; + let fields = vec![ + Arc::new(Field::new("metadata", DataType::Binary, false)), + Arc::new(Field::new( + "value", + DataType::Binary, + value.dtype().is_nullable(), + )), + ]; + let nulls = to_arrow_null_buffer(parquet_variant.validity()?, parquet_variant.len(), ctx)?; + + Ok(Arc::new(ArrowStructArray::try_new( + fields.into(), + vec![metadata_arrow, value_arrow], + nulls, + )?)) +} + impl Scheme for JsonToVariantScheme { fn scheme_name(&self) -> &'static str { "json_to_variant" @@ -172,6 +426,7 @@ mod tests { use rand::rngs::StdRng; use vortex_array::IntoArray; use vortex_array::VortexSessionExecute; + use vortex_array::accessor::ArrayAccessor; use vortex_array::arrays::Extension; use vortex_array::arrays::ExtensionArray; use vortex_array::arrays::VarBinView; @@ -184,7 +439,6 @@ mod tests { use vortex_compressor::builtins::StringDictScheme; use vortex_fsst::FSST; use vortex_session::VortexSession; - use vortex_zstd::Zstd; use super::*; use crate::schemes::binary::BinaryFSSTScheme; @@ -195,7 +449,6 @@ mod tests { use crate::schemes::integer::SparseScheme; use crate::schemes::integer::ZigZagScheme; use crate::schemes::string::FSSTScheme; - use crate::schemes::string::ZstdScheme; static SESSION: LazyLock = LazyLock::new(|| VortexSession::empty().with::()); @@ -275,6 +528,53 @@ mod tests { Ok(ExtensionArray::try_new_from_vtable(Json, EmptyMetadata, storage)?.into_array()) } + #[test] + fn variant_to_json_canonicalizes_to_json_extension() -> VortexResult<()> { + let values = vec![ + "0".to_string(), + r#"{"a":32}"#.to_string(), + r#""hello""#.to_string(), + "null".to_string(), + ]; + let source = json_array(&values)?; + let source_ext = source.as_::(); + let storage = source_ext.storage_array().clone(); + + let mut exec_ctx = SESSION.create_execution_ctx(); + let arrow_array = { + let session = exec_ctx.session().clone(); + session + .arrow() + .execute_arrow(storage, None, &mut exec_ctx)? + }; + let arrow_variant = parquet_variant_compute::json_to_variant(&arrow_array)?; + let variant = ParquetVariant::from_arrow_variant(&arrow_variant)?; + + let wrapped = VariantToJson::try_new(variant)?; + assert_eq!(wrapped.dtype(), source.dtype()); + + let json = wrapped + .into_array() + .execute::(&mut exec_ctx)?; + assert!(json.ext_dtype().is::()); + let json_storage = json + .storage_array() + .clone() + .execute::(&mut exec_ctx)?; + let actual = json_storage.with_iterator(|iter| { + iter.map(|value| value.map(<[u8]>::to_vec)) + .collect::>() + }); + let expected = values + .iter() + .map(|value| Some(value.as_bytes().to_vec())) + .collect::>(); + + assert_eq!(actual, expected); + + Ok(()) + } + fn parquet_variant_child_compressor() -> CascadingCompressor { CascadingCompressor::new(vec![ &JsonToVariantScheme, @@ -356,7 +656,7 @@ mod tests { let variant_compressor = parquet_variant_child_compressor(); let mut exec_ctx = SESSION.create_execution_ctx(); let compressed = variant_compressor.compress(&array, &mut exec_ctx)?; - let parquet_variant = compressed.clone().downcast::(); + let parquet_variant = compressed.downcast::(); assert!( !parquet_variant.metadata_array().is::(), From a749bde64792ba0ba7220b5268f615b52a224bda Mon Sep 17 00:00:00 2001 From: Adam Gutglick Date: Tue, 2 Jun 2026 14:19:33 +0100 Subject: [PATCH 04/11] more compressor pieces Signed-off-by: Adam Gutglick --- vortex-btrblocks/src/variant/mod.rs | 80 +++++++++++++++-------------- 1 file changed, 41 insertions(+), 39 deletions(-) diff --git a/vortex-btrblocks/src/variant/mod.rs b/vortex-btrblocks/src/variant/mod.rs index 0b577c387e7..20678547c59 100644 --- a/vortex-btrblocks/src/variant/mod.rs +++ b/vortex-btrblocks/src/variant/mod.rs @@ -16,6 +16,7 @@ use vortex_array::ArrayRef; use vortex_array::ArrayView; use vortex_array::Canonical; use vortex_array::EmptyArrayData; +use vortex_array::EmptyMetadata; use vortex_array::ExecutionCtx; use vortex_array::ExecutionResult; use vortex_array::IntoArray; @@ -31,7 +32,6 @@ use vortex_array::dtype::DType; use vortex_array::dtype::extension::ExtDType; use vortex_array::dtype::extension::ExtId; use vortex_array::dtype::extension::ExtVTable; -use vortex_array::extension::EmptyMetadata; use vortex_array::scalar::ScalarValue; use vortex_array::serde::ArrayChildren; use vortex_array::validity::Validity; @@ -407,13 +407,15 @@ impl Scheme for JsonToVariantScheme { }) .transpose()?; - ParquetVariant::try_new( + let variant = ParquetVariant::try_new( parquet_variant.validity()?, compressed_metadata, compressed_value, parquet_variant.typed_value_array().cloned(), - ) - .map(IntoArray::into_array) + )? + .into_array(); + + Ok(VariantToJson::try_new(variant)?.into_array()) } } @@ -427,9 +429,7 @@ mod tests { use vortex_array::IntoArray; use vortex_array::VortexSessionExecute; use vortex_array::accessor::ArrayAccessor; - use vortex_array::arrays::Extension; use vortex_array::arrays::ExtensionArray; - use vortex_array::arrays::VarBinView; use vortex_array::arrays::VarBinViewArray; use vortex_array::arrays::extension::ExtensionArrayExt; use vortex_array::session::ArraySession; @@ -437,7 +437,6 @@ mod tests { use vortex_compressor::builtins::IntConstantScheme; use vortex_compressor::builtins::StringConstantScheme; use vortex_compressor::builtins::StringDictScheme; - use vortex_fsst::FSST; use vortex_session::VortexSession; use super::*; @@ -530,15 +529,16 @@ mod tests { #[test] fn variant_to_json_canonicalizes_to_json_extension() -> VortexResult<()> { - let values = vec![ + let values = [ "0".to_string(), r#"{"a":32}"#.to_string(), r#""hello""#.to_string(), "null".to_string(), ]; - let source = json_array(&values)?; - let source_ext = source.as_::(); - let storage = source_ext.storage_array().clone(); + let storage = + VarBinViewArray::from_iter_str(values.iter().map(String::as_str)).into_array(); + let source = + ExtensionArray::try_new_from_vtable(Json, EmptyMetadata, storage.clone())?.into_array(); let mut exec_ctx = SESSION.create_execution_ctx(); let arrow_array = { @@ -556,7 +556,8 @@ mod tests { let json = wrapped .into_array() .execute::(&mut exec_ctx)?; - assert!(json.ext_dtype().is::()); + assert_eq!(json.dtype(), source.dtype()); + assert!(json.storage_array().dtype().is_utf8()); let json_storage = json .storage_array() .clone() @@ -590,6 +591,23 @@ mod tests { ]) } + #[test] + fn json_to_variant_scheme_wraps_output_as_json() -> VortexResult<()> { + let array = json_array(&json_data())?; + + let variant_compressor = parquet_variant_child_compressor(); + let mut exec_ctx = SESSION.create_execution_ctx(); + let compressed = variant_compressor.compress(&array, &mut exec_ctx)?; + + assert_eq!(compressed.dtype(), array.dtype()); + + let json = compressed.execute::(&mut exec_ctx)?; + assert_eq!(json.dtype(), array.dtype()); + assert!(json.storage_array().dtype().is_utf8()); + + Ok(()) + } + fn print_comparison_output( array: &ArrayRef, string_compressed: &ArrayRef, @@ -629,13 +647,6 @@ mod tests { let mut exec_ctx = SESSION.create_execution_ctx(); let variant_compressed = variant_compressor.compress(&array, &mut exec_ctx)?; - assert!( - variant_compressed.is::(), - "expected ParquetVariant output, got encoding {} with dtype {} and {} bytes", - variant_compressed.encoding_id(), - variant_compressed.dtype(), - variant_compressed.nbytes() - ); assert!( variant_compressed.nbytes() < string_compressed.nbytes(), "Parquet Variant conversion should compress repeated JSON keys: \ @@ -653,19 +664,21 @@ mod tests { fn recursively_compresses_parquet_variant_binary_children() -> VortexResult<()> { let array: ArrayRef = json_array(&json_data())?; + let mut exec_ctx = SESSION.create_execution_ctx(); + let uncompressed_children = + CascadingCompressor::new(vec![&JsonToVariantScheme]).compress(&array, &mut exec_ctx)?; + let variant_compressor = parquet_variant_child_compressor(); let mut exec_ctx = SESSION.create_execution_ctx(); let compressed = variant_compressor.compress(&array, &mut exec_ctx)?; - let parquet_variant = compressed.downcast::(); assert!( - !parquet_variant.metadata_array().is::(), - "expected Parquet Variant metadata child to be compressed, got {}", - parquet_variant.metadata_array().encoding_id(), + compressed.nbytes() < uncompressed_children.nbytes(), + "recursive child compression should reduce Parquet Variant size: compressed={} bytes, uncompressed_children={} bytes", + compressed.nbytes(), + uncompressed_children.nbytes(), ); - assert!(parquet_variant.value_array().is_some()); - assert!(parquet_variant.typed_value_array().is_none()); - + assert_eq!(compressed.dtype(), array.dtype()); Ok(()) } @@ -689,7 +702,6 @@ mod tests { let mut exec_ctx = SESSION.create_execution_ctx(); let with_binary_fsst = parquet_variant_child_compressor().compress(&array, &mut exec_ctx)?; - let parquet_variant = with_binary_fsst.clone().downcast::(); assert!( with_binary_fsst.nbytes() < without_binary_fsst.nbytes(), @@ -697,16 +709,6 @@ mod tests { with_binary_fsst.nbytes(), without_binary_fsst.nbytes(), ); - assert!( - parquet_variant - .value_array() - .is_some_and(|value| value.is::()), - "expected Parquet Variant value child to use binary FSST, got {}", - parquet_variant.value_array().map_or_else( - || "missing".to_string(), - |value| value.encoding_id().to_string() - ), - ); Ok(()) } @@ -732,9 +734,9 @@ mod tests { let variant_compressor = CascadingCompressor::new(vec![ &JsonToVariantScheme, &BinaryDictScheme, - &FSSTScheme, + // &FSSTScheme, &BinaryFSSTScheme, - // &ZstdScheme, + // &crate::schemes::binary::BinaryZstdScheme, &IntConstantScheme, &StringConstantScheme, &FoRScheme, From 834da0e51c10493fe50c4f45304d468b92e2feb7 Mon Sep 17 00:00:00 2001 From: Adam Gutglick Date: Tue, 9 Jun 2026 12:16:23 +0100 Subject: [PATCH 05/11] deps Signed-off-by: Adam Gutglick --- Cargo.lock | 1 + vortex-btrblocks/Cargo.toml | 5 +-- vortex-btrblocks/src/variant/mod.rs | 56 ++--------------------------- 3 files changed, 7 insertions(+), 55 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ac9b69219d0..5d779f6de41 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9837,6 +9837,7 @@ dependencies = [ "vortex-error", "vortex-fastlanes", "vortex-fsst", + "vortex-json", "vortex-mask", "vortex-onpair", "vortex-parquet-variant", diff --git a/vortex-btrblocks/Cargo.toml b/vortex-btrblocks/Cargo.toml index e1c2bc38794..3e8e869c736 100644 --- a/vortex-btrblocks/Cargo.toml +++ b/vortex-btrblocks/Cargo.toml @@ -32,6 +32,7 @@ vortex-decimal-byte-parts = { workspace = true } vortex-error = { workspace = true } vortex-fastlanes = { workspace = true } vortex-fsst = { workspace = true } +vortex-json = { workspace = true, optional = true } vortex-mask = { workspace = true } vortex-onpair = { workspace = true, optional = true } vortex-parquet-variant = { workspace = true, optional = true } @@ -54,7 +55,7 @@ vortex-array = { workspace = true, features = ["_test-harness"] } vortex-session = { workspace = true } [features] - +# This feature enabled unstable encodings for which we don't guarantee stability. unstable_encodings = [ "dep:vortex-tensor", "dep:vortex-onpair", @@ -66,13 +67,13 @@ parquet-variant = [ "dep:vortex-parquet-variant", "dep:parquet-variant-compute", "dep:vortex-session", + "dep:vortex-json", "zstd", ] pco = ["dep:pco", "dep:vortex-pco"] zstd = ["dep:vortex-zstd"] [lints] -# This feature enabled unstable encodings for which we don't guarantee stability. workspace = true [[bench]] diff --git a/vortex-btrblocks/src/variant/mod.rs b/vortex-btrblocks/src/variant/mod.rs index 20678547c59..344e0a7cd85 100644 --- a/vortex-btrblocks/src/variant/mod.rs +++ b/vortex-btrblocks/src/variant/mod.rs @@ -30,9 +30,6 @@ use vortex_array::arrow::to_arrow_null_buffer; use vortex_array::buffer::BufferHandle; use vortex_array::dtype::DType; use vortex_array::dtype::extension::ExtDType; -use vortex_array::dtype::extension::ExtId; -use vortex_array::dtype::extension::ExtVTable; -use vortex_array::scalar::ScalarValue; use vortex_array::serde::ArrayChildren; use vortex_array::validity::Validity; use vortex_array::vtable::NotSupported; @@ -49,6 +46,7 @@ use vortex_error::vortex_bail; use vortex_error::vortex_ensure; use vortex_error::vortex_err; use vortex_error::vortex_panic; +use vortex_json::Json; use vortex_parquet_variant::ParquetVariant; use vortex_parquet_variant::ParquetVariantArrayExt; use vortex_session::VortexSession; @@ -74,47 +72,6 @@ mod variant_to_json_children { pub const SLOT_NAMES: [&str; NUM_SLOTS] = ["variant"]; } -/// JSON logical type backed by UTF-8 string storage. -#[derive(Clone, Debug, Default, PartialEq, Eq, Hash)] -pub struct Json; - -impl ExtVTable for Json { - type Metadata = EmptyMetadata; - type NativeValue<'a> = &'a str; - - fn id(&self) -> ExtId { - ExtId::new("vortex.json") - } - - fn serialize_metadata(&self, _metadata: &Self::Metadata) -> VortexResult> { - Ok(Vec::new()) - } - - fn deserialize_metadata(&self, metadata: &[u8]) -> VortexResult { - vortex_ensure!(metadata.is_empty(), "JSON metadata must be empty"); - Ok(EmptyMetadata) - } - - fn validate_dtype(ext_dtype: &ExtDType) -> VortexResult<()> { - vortex_ensure!( - ext_dtype.storage_dtype().is_utf8(), - "JSON storage dtype must be utf8, got {}", - ext_dtype.storage_dtype() - ); - Ok(()) - } - - fn unpack_native<'a>( - _ext_dtype: &'a ExtDType, - storage_value: &'a ScalarValue, - ) -> VortexResult> { - let ScalarValue::Utf8(value) = storage_value else { - vortex_bail!("JSON storage scalar must be utf8, got {storage_value}"); - }; - Ok(value.as_str()) - } -} - /// Array that exposes a Variant array as JSON strings. #[derive(Debug, Clone)] pub struct VariantToJson; @@ -440,6 +397,7 @@ mod tests { use vortex_session::VortexSession; use super::*; + use crate::schemes::binary; use crate::schemes::binary::BinaryFSSTScheme; use crate::schemes::integer::BitPackingScheme; use crate::schemes::integer::FoRScheme; @@ -734,9 +692,8 @@ mod tests { let variant_compressor = CascadingCompressor::new(vec![ &JsonToVariantScheme, &BinaryDictScheme, - // &FSSTScheme, &BinaryFSSTScheme, - // &crate::schemes::binary::BinaryZstdScheme, + &binary::ZstdScheme, &IntConstantScheme, &StringConstantScheme, &FoRScheme, @@ -748,13 +705,6 @@ mod tests { ]); let mut exec_ctx = SESSION.create_execution_ctx(); let compressed = variant_compressor.compress(&array, &mut exec_ctx)?; - // let extension = compressed.clone().downcast::(); - // let storage = extension.storage_array(); - // assert!( - // storage.is::(), - // "expected JSON extension storage fallback to use zstd, got {}", - // storage.encoding_id(), - // ); print_comparison_output(&array, &string_compressed, &compressed); From 26403279b4eb084b8a0036362c4e333bd795f7b9 Mon Sep 17 00:00:00 2001 From: Adam Gutglick Date: Tue, 9 Jun 2026 12:44:02 +0100 Subject: [PATCH 06/11] re-organize Signed-off-by: Adam Gutglick --- Cargo.lock | 3 +- encodings/parquet-variant/Cargo.toml | 4 + encodings/parquet-variant/src/arrow.rs | 2 +- encodings/parquet-variant/src/json.rs | 315 ++++++++ encodings/parquet-variant/src/lib.rs | 8 + vortex-btrblocks/Cargo.toml | 6 +- vortex-btrblocks/src/builder.rs | 20 + vortex-btrblocks/src/lib.rs | 2 - vortex-btrblocks/src/schemes/mod.rs | 2 + .../src/schemes/variant/json_to_variant.rs | 118 +++ vortex-btrblocks/src/schemes/variant/mod.rs | 11 + vortex-btrblocks/src/schemes/variant/tests.rs | 290 +++++++ vortex-btrblocks/src/variant/mod.rs | 713 ------------------ 13 files changed, 771 insertions(+), 723 deletions(-) create mode 100644 encodings/parquet-variant/src/json.rs create mode 100644 vortex-btrblocks/src/schemes/variant/json_to_variant.rs create mode 100644 vortex-btrblocks/src/schemes/variant/mod.rs create mode 100644 vortex-btrblocks/src/schemes/variant/tests.rs delete mode 100644 vortex-btrblocks/src/variant/mod.rs diff --git a/Cargo.lock b/Cargo.lock index 5d779f6de41..09506ddd6a2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9816,8 +9816,6 @@ dependencies = [ name = "vortex-btrblocks" version = "0.1.0" dependencies = [ - "arrow-array", - "arrow-schema", "codspeed-divan-compat", "itertools 0.14.0", "num-traits", @@ -10471,6 +10469,7 @@ dependencies = [ "vortex-error", "vortex-file", "vortex-io", + "vortex-json", "vortex-layout", "vortex-mask", "vortex-proto", diff --git a/encodings/parquet-variant/Cargo.toml b/encodings/parquet-variant/Cargo.toml index cb5a8492fea..5d3021302a4 100644 --- a/encodings/parquet-variant/Cargo.toml +++ b/encodings/parquet-variant/Cargo.toml @@ -31,6 +31,7 @@ vortex-error = { workspace = true } vortex-mask = { workspace = true } vortex-proto = { workspace = true } vortex-session = { workspace = true } +vortex-json = { workspace = true, optional = true } [dev-dependencies] rstest = { workspace = true } @@ -39,3 +40,6 @@ vortex-array = { workspace = true, features = ["_test-harness"] } vortex-file = { workspace = true, features = ["tokio"] } vortex-io = { workspace = true, features = ["tokio"] } vortex-layout = { workspace = true } + +[features] +json = ["dep:vortex-json"] diff --git a/encodings/parquet-variant/src/arrow.rs b/encodings/parquet-variant/src/arrow.rs index 9bee5c738d3..c3496e79eda 100644 --- a/encodings/parquet-variant/src/arrow.rs +++ b/encodings/parquet-variant/src/arrow.rs @@ -58,7 +58,7 @@ fn parquet_variant_storage_request(fields: &Fields) -> Option<(bool, bool)> { (has_metadata && (has_value || has_typed_value)).then_some((has_value, has_typed_value)) } -fn export_storage_to_target( +pub(crate) fn export_storage_to_target( parquet_array: &T, target_fields: &Fields, ctx: &mut ExecutionCtx, diff --git a/encodings/parquet-variant/src/json.rs b/encodings/parquet-variant/src/json.rs new file mode 100644 index 00000000000..af9f68adfa6 --- /dev/null +++ b/encodings/parquet-variant/src/json.rs @@ -0,0 +1,315 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! JSON extension wrappers for Parquet Variant storage. + +use std::sync::Arc; + +use arrow_schema::DataType; +use arrow_schema::Field; +use vortex_array::Array; +use vortex_array::ArrayId; +use vortex_array::ArrayParts; +use vortex_array::ArrayRef; +use vortex_array::ArrayView; +use vortex_array::EmptyArrayData; +use vortex_array::EmptyMetadata; +use vortex_array::ExecutionCtx; +use vortex_array::ExecutionResult; +use vortex_array::IntoArray; +use vortex_array::arrays::ExtensionArray; +use vortex_array::arrays::VariantArray; +use vortex_array::arrays::variant::VariantArrayExt; +use vortex_array::arrow::FromArrowArray; +use vortex_array::buffer::BufferHandle; +use vortex_array::dtype::DType; +use vortex_array::dtype::extension::ExtDType; +use vortex_array::serde::ArrayChildren; +use vortex_array::validity::Validity; +use vortex_array::vtable::NotSupported; +use vortex_array::vtable::VTable; +use vortex_array::vtable::ValidityVTable; +use vortex_error::VortexResult; +use vortex_error::vortex_bail; +use vortex_error::vortex_ensure; +use vortex_error::vortex_err; +use vortex_error::vortex_panic; +use vortex_json::Json; +use vortex_session::VortexSession; +use vortex_session::registry::CachedId; + +use crate::ParquetVariant; +use crate::ParquetVariantArrayExt; + +mod variant_to_json_children { + pub const VARIANT: usize = 0; + pub const NUM_SLOTS: usize = 1; + pub const SLOT_NAMES: [&str; NUM_SLOTS] = ["variant"]; +} + +/// Array that exposes a Variant array as JSON strings. +#[derive(Debug, Clone)] +pub struct VariantToJson; + +/// A [`VariantToJson`]-encoded array. +pub type VariantToJsonArray = Array; + +impl VariantToJson { + /// Creates a JSON wrapper around a Variant-typed array. + pub fn try_new(variant: ArrayRef) -> VortexResult { + vortex_ensure!( + variant.dtype().is_variant(), + "VariantToJson expects a Variant array, got {}", + variant.dtype() + ); + + let storage_dtype = DType::Utf8(variant.dtype().nullability()); + let dtype = + DType::Extension(ExtDType::::try_new(EmptyMetadata, storage_dtype)?.erased()); + let len = variant.len(); + + Array::try_from_parts( + ArrayParts::new(VariantToJson, dtype, len, EmptyArrayData) + .with_slots(vec![Some(variant)].into()), + ) + } +} + +impl VTable for VariantToJson { + type TypedArrayData = EmptyArrayData; + type OperationsVTable = NotSupported; + type ValidityVTable = Self; + + fn id(&self) -> ArrayId { + static ID: CachedId = CachedId::new("vortex.variant_to_json"); + *ID + } + + fn validate( + &self, + _data: &Self::TypedArrayData, + dtype: &DType, + len: usize, + slots: &[Option], + ) -> VortexResult<()> { + vortex_ensure!( + slots.len() == variant_to_json_children::NUM_SLOTS, + "VariantToJsonArray expects {} slots, got {}", + variant_to_json_children::NUM_SLOTS, + slots.len() + ); + let variant = slots[variant_to_json_children::VARIANT] + .as_ref() + .ok_or_else(|| vortex_err!("VariantToJsonArray variant slot must be present"))?; + + let DType::Extension(ext_dtype) = dtype else { + vortex_bail!("VariantToJsonArray dtype must be a JSON extension, got {dtype}"); + }; + vortex_ensure!( + ext_dtype.is::(), + "VariantToJsonArray dtype must be a JSON extension, got {dtype}" + ); + vortex_ensure!( + variant.dtype() == &DType::Variant(dtype.nullability()), + "VariantToJsonArray child dtype {} does not match JSON dtype nullability {}", + variant.dtype(), + dtype + ); + vortex_ensure!( + variant.len() == len, + "VariantToJsonArray child length {} does not match outer length {}", + variant.len(), + len + ); + + Ok(()) + } + + fn nbuffers(_array: ArrayView<'_, Self>) -> usize { + 0 + } + + fn buffer(_array: ArrayView<'_, Self>, idx: usize) -> BufferHandle { + vortex_panic!("VariantToJsonArray buffer index {idx} out of bounds") + } + + fn buffer_name(_array: ArrayView<'_, Self>, _idx: usize) -> Option { + None + } + + fn serialize( + _array: ArrayView<'_, Self>, + _session: &VortexSession, + ) -> VortexResult>> { + Ok(Some(Vec::new())) + } + + fn deserialize( + &self, + dtype: &DType, + len: usize, + metadata: &[u8], + buffers: &[BufferHandle], + children: &dyn ArrayChildren, + _session: &VortexSession, + ) -> VortexResult> { + vortex_ensure!( + metadata.is_empty(), + "VariantToJsonArray metadata must be empty" + ); + vortex_ensure!( + buffers.is_empty(), + "VariantToJsonArray expects 0 buffers, got {}", + buffers.len() + ); + vortex_ensure!( + children.len() == variant_to_json_children::NUM_SLOTS, + "VariantToJsonArray expects {} children, got {}", + variant_to_json_children::NUM_SLOTS, + children.len() + ); + + let variant_dtype = DType::Variant(dtype.nullability()); + let variant = children.get(variant_to_json_children::VARIANT, &variant_dtype, len)?; + + Ok( + ArrayParts::new(self.clone(), dtype.clone(), len, EmptyArrayData) + .with_slots(vec![Some(variant)].into()), + ) + } + + fn slot_name(_array: ArrayView<'_, Self>, idx: usize) -> String { + match variant_to_json_children::SLOT_NAMES.get(idx) { + Some(name) => (*name).to_string(), + None => vortex_panic!("VariantToJsonArray slot_name index {idx} out of bounds"), + } + } + + fn execute(array: Array, ctx: &mut ExecutionCtx) -> VortexResult { + let variant = array.as_ref().slots()[variant_to_json_children::VARIANT] + .as_ref() + .ok_or_else(|| vortex_err!("VariantToJsonArray variant slot must be present"))?; + let variant = variant.clone().execute::(ctx)?; + vortex_ensure!( + variant.shredded().is_none(), + "VariantToJsonArray can only export unshredded Parquet Variant storage to JSON" + ); + + let parquet_variant = variant + .core_storage() + .as_opt::() + .ok_or_else(|| { + vortex_err!( + "VariantToJsonArray requires Parquet Variant core storage, got {}", + variant.core_storage().encoding_id() + ) + })?; + vortex_ensure!( + parquet_variant.typed_value_array().is_none(), + "VariantToJsonArray can only export unshredded Parquet Variant storage to JSON" + ); + let value = parquet_variant.value_array().ok_or_else(|| { + vortex_err!("VariantToJsonArray requires Parquet Variant value storage") + })?; + let arrow_variant = crate::arrow::export_storage_to_target( + &parquet_variant, + &vec![ + Arc::new(Field::new("metadata", DataType::Binary, false)), + Arc::new(Field::new( + "value", + DataType::Binary, + value.dtype().is_nullable(), + )), + ] + .into(), + ctx, + )?; + let arrow_json = parquet_variant_compute::variant_to_json(&arrow_variant)?; + let storage = ArrayRef::from_arrow(&arrow_json, array.dtype().is_nullable())?; + + Ok(ExecutionResult::done( + ExtensionArray::try_new_from_vtable(Json, EmptyMetadata, storage)?.into_array(), + )) + } +} + +impl ValidityVTable for VariantToJson { + fn validity(array: ArrayView<'_, VariantToJson>) -> VortexResult { + array.slots()[variant_to_json_children::VARIANT] + .as_ref() + .ok_or_else(|| vortex_err!("VariantToJsonArray variant slot must be present"))? + .validity() + } +} + +#[cfg(test)] +mod tests { + use std::sync::LazyLock; + + use vortex_array::IntoArray; + use vortex_array::VortexSessionExecute; + use vortex_array::accessor::ArrayAccessor; + use vortex_array::arrays::ExtensionArray; + use vortex_array::arrays::VarBinViewArray; + use vortex_array::arrays::extension::ExtensionArrayExt; + use vortex_array::arrow::ArrowSessionExt; + use vortex_array::session::ArraySession; + use vortex_session::VortexSession; + + use super::*; + + static SESSION: LazyLock = LazyLock::new(|| { + let session = VortexSession::empty().with::(); + crate::initialize(&session); + session + }); + + #[test] + fn variant_to_json_canonicalizes_to_json_extension() -> VortexResult<()> { + let values = [ + "0".to_string(), + r#"{"a":32}"#.to_string(), + r#""hello""#.to_string(), + "null".to_string(), + ]; + let storage = + VarBinViewArray::from_iter_str(values.iter().map(String::as_str)).into_array(); + let source = + ExtensionArray::try_new_from_vtable(Json, EmptyMetadata, storage.clone())?.into_array(); + + let mut exec_ctx = SESSION.create_execution_ctx(); + let arrow_array = { + let session = exec_ctx.session().clone(); + session + .arrow() + .execute_arrow(storage, None, &mut exec_ctx)? + }; + let arrow_variant = parquet_variant_compute::json_to_variant(&arrow_array)?; + let variant = ParquetVariant::from_arrow_variant(&arrow_variant)?; + + let wrapped = VariantToJson::try_new(variant)?; + assert_eq!(wrapped.dtype(), source.dtype()); + + let json = wrapped + .into_array() + .execute::(&mut exec_ctx)?; + assert_eq!(json.dtype(), source.dtype()); + assert!(json.storage_array().dtype().is_utf8()); + let json_storage = json + .storage_array() + .clone() + .execute::(&mut exec_ctx)?; + let actual = json_storage.with_iterator(|iter| { + iter.map(|value| value.map(<[u8]>::to_vec)) + .collect::>() + }); + let expected = values + .iter() + .map(|value| Some(value.as_bytes().to_vec())) + .collect::>(); + + assert_eq!(actual, expected); + + Ok(()) + } +} diff --git a/encodings/parquet-variant/src/lib.rs b/encodings/parquet-variant/src/lib.rs index 03d2a046442..98476e2a77a 100644 --- a/encodings/parquet-variant/src/lib.rs +++ b/encodings/parquet-variant/src/lib.rs @@ -26,6 +26,8 @@ mod array; mod arrow; +#[cfg(feature = "json")] +mod json; mod kernel; mod operations; mod validity; @@ -34,6 +36,10 @@ mod vtable; use std::sync::Arc; pub use array::ParquetVariantArrayExt; +#[cfg(feature = "json")] +pub use json::VariantToJson; +#[cfg(feature = "json")] +pub use json::VariantToJsonArray; use vortex_array::arrow::ArrowSessionExt; use vortex_array::session::ArraySessionExt; use vortex_session::VortexSession; @@ -43,6 +49,8 @@ pub use vtable::ParquetVariantArray; /// Register Parquet Variant array and Arrow extension support with a session. pub fn initialize(session: &VortexSession) { session.arrays().register(ParquetVariant); + #[cfg(feature = "json")] + session.arrays().register(VariantToJson); session.arrow().register_exporter(Arc::new(ParquetVariant)); session.arrow().register_importer(Arc::new(ParquetVariant)); } diff --git a/vortex-btrblocks/Cargo.toml b/vortex-btrblocks/Cargo.toml index 3e8e869c736..200df38f6a5 100644 --- a/vortex-btrblocks/Cargo.toml +++ b/vortex-btrblocks/Cargo.toml @@ -14,8 +14,6 @@ rust-version = { workspace = true } version = { workspace = true } [dependencies] -arrow-array = { workspace = true, optional = true } -arrow-schema = { workspace = true, optional = true } itertools = { workspace = true } num-traits = { workspace = true } parquet-variant-compute = { workspace = true, optional = true } @@ -35,7 +33,7 @@ vortex-fsst = { workspace = true } vortex-json = { workspace = true, optional = true } vortex-mask = { workspace = true } vortex-onpair = { workspace = true, optional = true } -vortex-parquet-variant = { workspace = true, optional = true } +vortex-parquet-variant = { workspace = true, optional = true, features = ["json"] } vortex-pco = { workspace = true, optional = true } vortex-runend = { workspace = true } vortex-sequence = { workspace = true } @@ -62,8 +60,6 @@ unstable_encodings = [ "vortex-zstd?/unstable_encodings", ] parquet-variant = [ - "dep:arrow-array", - "dep:arrow-schema", "dep:vortex-parquet-variant", "dep:parquet-variant-compute", "dep:vortex-session", diff --git a/vortex-btrblocks/src/builder.rs b/vortex-btrblocks/src/builder.rs index a09c4cc3a3a..b2f75c28546 100644 --- a/vortex-btrblocks/src/builder.rs +++ b/vortex-btrblocks/src/builder.rs @@ -17,6 +17,8 @@ use crate::schemes::float; use crate::schemes::integer; use crate::schemes::string; use crate::schemes::temporal; +#[cfg(feature = "parquet-variant")] +use crate::schemes::variant; /// All available compression schemes. /// @@ -159,6 +161,15 @@ impl BtrBlocksCompressorBuilder { builder } + /// Adds JSON-to-Parquet-Variant compression for [`vortex_json::Json`] extension arrays. + /// + /// This scheme is opt-in because it produces a Parquet Variant-backed wrapper encoding rather + /// than ordinary JSON string storage. + #[cfg(feature = "parquet-variant")] + pub fn with_json_to_variant(self) -> Self { + self.with_new_scheme(&variant::JsonToVariantScheme) + } + /// Adds the TurboQuant lossy vector quantization scheme. /// /// When enabled, [`Vector`] extension arrays are compressed using the TurboQuant algorithm @@ -240,4 +251,13 @@ mod tests { let builder = BtrBlocksCompressorBuilder::default(); assert_eq!(builder.schemes.len(), ALL_SCHEMES.len()); } + + #[cfg(feature = "parquet-variant")] + #[test] + fn json_to_variant_builder_method_adds_scheme() { + let builder = BtrBlocksCompressorBuilder::empty().with_json_to_variant(); + + assert_eq!(builder.schemes.len(), 1); + assert_eq!(builder.schemes[0].id(), variant::JsonToVariantScheme.id()); + } } diff --git a/vortex-btrblocks/src/lib.rs b/vortex-btrblocks/src/lib.rs index 6001d77684f..39db05246a6 100644 --- a/vortex-btrblocks/src/lib.rs +++ b/vortex-btrblocks/src/lib.rs @@ -58,8 +58,6 @@ mod builder; mod canonical_compressor; /// Compression scheme implementations. pub mod schemes; -#[cfg(feature = "parquet-variant")] -pub mod variant; // Re-export framework types from vortex-compressor for backwards compatibility. // Btrblocks-specific exports. diff --git a/vortex-btrblocks/src/schemes/mod.rs b/vortex-btrblocks/src/schemes/mod.rs index 8b8629d3f0a..7d0619b6080 100644 --- a/vortex-btrblocks/src/schemes/mod.rs +++ b/vortex-btrblocks/src/schemes/mod.rs @@ -10,6 +10,8 @@ pub mod float; pub mod integer; pub mod string; pub mod temporal; +#[cfg(feature = "parquet-variant")] +pub mod variant; pub(crate) mod patches; diff --git a/vortex-btrblocks/src/schemes/variant/json_to_variant.rs b/vortex-btrblocks/src/schemes/variant/json_to_variant.rs new file mode 100644 index 00000000000..50d4bc5ba2a --- /dev/null +++ b/vortex-btrblocks/src/schemes/variant/json_to_variant.rs @@ -0,0 +1,118 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use vortex_array::ArrayRef; +use vortex_array::Canonical; +use vortex_array::ExecutionCtx; +use vortex_array::IntoArray; +use vortex_array::arrays::ExtensionArray; +use vortex_array::arrays::extension::ExtensionArrayExt; +use vortex_array::arrow::ArrowSessionExt; +use vortex_compressor::ctx::CompressorContext; +use vortex_compressor::estimate::CompressionEstimate; +use vortex_compressor::estimate::DeferredEstimate; +use vortex_compressor::scheme::Scheme; +use vortex_compressor::scheme::SchemeExt; +use vortex_compressor::stats::ArrayAndStats; +use vortex_error::VortexResult; +use vortex_error::vortex_bail; +use vortex_json::Json; +use vortex_parquet_variant::ParquetVariant; +use vortex_parquet_variant::ParquetVariantArrayExt; +use vortex_parquet_variant::VariantToJson; + +use crate::CascadingCompressor; + +/// Compression scheme that converts JSON string extension arrays to Parquet Variant arrays. +#[derive(Debug)] +pub struct JsonToVariantScheme; + +/// Child indices for recursively compressed Parquet Variant binary children. +mod parquet_variant_children { + /// The Parquet Variant metadata child. + pub const METADATA: usize = 0; + /// The raw Parquet Variant value child. + pub const VALUE: usize = 1; +} + +impl Scheme for JsonToVariantScheme { + fn scheme_name(&self) -> &'static str { + "json_to_variant" + } + + fn matches(&self, canonical: &Canonical) -> bool { + let Canonical::Extension(ext_array) = canonical else { + return false; + }; + + ext_array.ext_dtype().is::() + } + + fn num_children(&self) -> usize { + 2 + } + + fn expected_compression_ratio( + &self, + _data: &ArrayAndStats, + _compress_ctx: CompressorContext, + _exec_ctx: &mut ExecutionCtx, + ) -> CompressionEstimate { + CompressionEstimate::Deferred(DeferredEstimate::Sample) + } + + fn compress( + &self, + compressor: &CascadingCompressor, + data: &ArrayAndStats, + compress_ctx: CompressorContext, + exec_ctx: &mut ExecutionCtx, + ) -> VortexResult { + let array = data.array().clone().execute::(exec_ctx)?; + let storage = array.storage_array().clone(); + + if !storage.dtype().is_utf8() { + vortex_bail!("storage must be utf8"); + } + + let arrow_array = { + let session = exec_ctx.session().clone(); + let arrow = session.arrow(); + arrow.execute_arrow(storage, None, exec_ctx)? + }; + + let array = parquet_variant_compute::json_to_variant(&arrow_array)?; + let parquet_variant = + ParquetVariant::from_arrow_variant(&array)?.downcast::(); + + let compressed_metadata = compressor.compress_child( + parquet_variant.metadata_array(), + &compress_ctx, + self.id(), + parquet_variant_children::METADATA, + exec_ctx, + )?; + let compressed_value = parquet_variant + .value_array() + .map(|value| { + compressor.compress_child( + value, + &compress_ctx, + self.id(), + parquet_variant_children::VALUE, + exec_ctx, + ) + }) + .transpose()?; + + let variant = ParquetVariant::try_new( + parquet_variant.validity()?, + compressed_metadata, + compressed_value, + parquet_variant.typed_value_array().cloned(), + )? + .into_array(); + + Ok(VariantToJson::try_new(variant)?.into_array()) + } +} diff --git a/vortex-btrblocks/src/schemes/variant/mod.rs b/vortex-btrblocks/src/schemes/variant/mod.rs new file mode 100644 index 00000000000..138ad3b2c88 --- /dev/null +++ b/vortex-btrblocks/src/schemes/variant/mod.rs @@ -0,0 +1,11 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! Variant compression schemes. + +mod json_to_variant; + +pub use json_to_variant::JsonToVariantScheme; + +#[cfg(test)] +mod tests; diff --git a/vortex-btrblocks/src/schemes/variant/tests.rs b/vortex-btrblocks/src/schemes/variant/tests.rs new file mode 100644 index 00000000000..71758f1ee0b --- /dev/null +++ b/vortex-btrblocks/src/schemes/variant/tests.rs @@ -0,0 +1,290 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use std::sync::LazyLock; + +use rand::RngExt; +use rand::SeedableRng; +use rand::rngs::StdRng; +use vortex_array::ArrayRef; +use vortex_array::IntoArray; +use vortex_array::VortexSessionExecute; +use vortex_array::arrays::ExtensionArray; +use vortex_array::arrays::VarBinViewArray; +use vortex_array::arrays::extension::ExtensionArrayExt; +use vortex_array::session::ArraySession; +use vortex_compressor::builtins::BinaryDictScheme; +use vortex_compressor::builtins::IntConstantScheme; +use vortex_compressor::builtins::StringConstantScheme; +use vortex_compressor::builtins::StringDictScheme; +use vortex_json::Json; +use vortex_parquet_variant::VariantToJson; +use vortex_session::VortexSession; + +use super::*; +use crate::CascadingCompressor; +use crate::schemes::binary; +use crate::schemes::binary::BinaryFSSTScheme; +use crate::schemes::integer::BitPackingScheme; +use crate::schemes::integer::FoRScheme; +use crate::schemes::integer::RunEndScheme; +use crate::schemes::integer::SequenceScheme; +use crate::schemes::integer::SparseScheme; +use crate::schemes::integer::ZigZagScheme; +use crate::schemes::string::FSSTScheme; + +static SESSION: LazyLock = LazyLock::new(|| { + let session = VortexSession::empty().with::(); + vortex_parquet_variant::initialize(&session); + session +}); + +fn json_data() -> Vec { + let mut rng = StdRng::seed_from_u64(0); + const ACCOUNT_KEYS: &[&str] = &["account_id", "customer_id", "tenant_id", "buyer_id"]; + const REGION_KEYS: &[&str] = &["region", "market", "country"]; + const REGIONS: &[&str] = &["us-east", "us-west", "eu", "apac", "latam"]; + const STATUS_KEYS: &[&str] = &["status", "payment_state", "lifecycle_state"]; + const STATUSES: &[&str] = &["draft", "open", "paid", "void", "past_due"]; + const AMOUNT_KEYS: &[&str] = &["discount", "tax", "shipping", "credit"]; + const FLAG_KEYS: &[&str] = &["autopay", "fraud_review", "priority", "disputed"]; + const TAGS: &[&str] = &["renewal", "manual", "usage", "trial", "enterprise"]; + + (0..1024) + .map(|_| { + let mut fields = vec![ + format!( + r#""{}":"acct_{:04x}""#, + ACCOUNT_KEYS[rng.random_range(0..ACCOUNT_KEYS.len())], + rng.random::(), + ), + format!( + r#""invoice_total":{}.{:02}"#, + rng.random_range(10_u32..100_000), + rng.random_range(0_u32..100), + ), + format!(r#""line_items":{}"#, rng.random_range(1_u32..250)), + ]; + + if rng.random_bool(0.85) { + fields.push(format!( + r#""{}":"{}""#, + STATUS_KEYS[rng.random_range(0..STATUS_KEYS.len())], + STATUSES[rng.random_range(0..STATUSES.len())], + )); + } + if rng.random_bool(0.75) { + fields.push(format!( + r#""{}":"{}""#, + REGION_KEYS[rng.random_range(0..REGION_KEYS.len())], + REGIONS[rng.random_range(0..REGIONS.len())], + )); + } + if rng.random_bool(0.55) { + fields.push(format!( + r#""{}":{}.{:02}"#, + AMOUNT_KEYS[rng.random_range(0..AMOUNT_KEYS.len())], + rng.random_range(0_u32..2_500), + rng.random_range(0_u32..100), + )); + } + if rng.random_bool(0.40) { + fields.push(format!( + r#""{}":{}"#, + FLAG_KEYS[rng.random_range(0..FLAG_KEYS.len())], + rng.random_bool(0.5), + )); + } + if rng.random_bool(0.30) { + fields.push(format!( + r#""tags":["{}","{}"]"#, + TAGS[rng.random_range(0..TAGS.len())], + TAGS[rng.random_range(0..TAGS.len())], + )); + } + + format!("{{{}}}", fields.join(",")) + }) + .collect() +} + +fn json_array(values: &[String]) -> vortex_error::VortexResult { + let storage = VarBinViewArray::from_iter_str(values.iter().map(String::as_str)).into_array(); + Ok( + ExtensionArray::try_new_from_vtable(Json, vortex_array::EmptyMetadata, storage)? + .into_array(), + ) +} + +fn parquet_variant_child_compressor() -> CascadingCompressor { + CascadingCompressor::new(vec![ + &JsonToVariantScheme, + &BinaryDictScheme, + &BinaryFSSTScheme, + &IntConstantScheme, + &FoRScheme, + &SparseScheme, + &BitPackingScheme, + &RunEndScheme, + &SequenceScheme, + &ZigZagScheme, + ]) +} + +#[test] +fn json_to_variant_scheme_wraps_output_as_json() -> vortex_error::VortexResult<()> { + let array = json_array(&json_data())?; + + let variant_compressor = parquet_variant_child_compressor(); + let mut exec_ctx = SESSION.create_execution_ctx(); + let compressed = variant_compressor.compress(&array, &mut exec_ctx)?; + + assert_eq!(compressed.dtype(), array.dtype()); + assert!(compressed.is::()); + + let json = compressed.execute::(&mut exec_ctx)?; + assert_eq!(json.dtype(), array.dtype()); + assert!(json.storage_array().dtype().is_utf8()); + + Ok(()) +} + +fn print_comparison_output(array: &ArrayRef, string_compressed: &ArrayRef, compressed: &ArrayRef) { + let compressed_ratio = array.nbytes() as f64 / compressed.nbytes() as f64; + let compressed_array_ratio = string_compressed.nbytes() as f64 / compressed.nbytes() as f64; + println!( + "Compression sizes: input={} bytes, compressed string={} bytes, compressed output={} bytes", + array.nbytes(), + string_compressed.nbytes(), + compressed.nbytes(), + ); + println!("Compressed output ratio: {compressed_ratio:.2}x\n"); + println!("Compressed string / compressed output ratio: {compressed_array_ratio:.2}x\n"); + println!("JSON input encoding tree:\n{}", array.tree_display()); + println!( + "String-compressed encoding tree:\n{}", + string_compressed.tree_display() + ); + println!( + "Compressed output encoding tree:\n{}", + compressed.tree_display() + ); +} + +#[test] +fn parquet_variant_compresses_repeated_json_keys() -> vortex_error::VortexResult<()> { + let array = json_array(&json_data())?; + + let string_compressor = + CascadingCompressor::new(vec![&StringDictScheme, &StringConstantScheme]); + let mut exec_ctx = SESSION.create_execution_ctx(); + let string_compressed = string_compressor.compress(&array, &mut exec_ctx)?; + + let variant_compressor = parquet_variant_child_compressor(); + let mut exec_ctx = SESSION.create_execution_ctx(); + let variant_compressed = variant_compressor.compress(&array, &mut exec_ctx)?; + + assert!( + variant_compressed.nbytes() < string_compressed.nbytes(), + "Parquet Variant conversion should compress repeated JSON keys: \ + variant={} bytes, input={} bytes", + variant_compressed.nbytes(), + string_compressed.nbytes(), + ); + + print_comparison_output(&array, &string_compressed, &variant_compressed); + + Ok(()) +} + +#[test] +fn recursively_compresses_parquet_variant_binary_children() -> vortex_error::VortexResult<()> { + let array: ArrayRef = json_array(&json_data())?; + + let mut exec_ctx = SESSION.create_execution_ctx(); + let uncompressed_children = + CascadingCompressor::new(vec![&JsonToVariantScheme]).compress(&array, &mut exec_ctx)?; + + let variant_compressor = parquet_variant_child_compressor(); + let mut exec_ctx = SESSION.create_execution_ctx(); + let compressed = variant_compressor.compress(&array, &mut exec_ctx)?; + + assert!( + compressed.nbytes() < uncompressed_children.nbytes(), + "recursive child compression should reduce Parquet Variant size: compressed={} bytes, uncompressed_children={} bytes", + compressed.nbytes(), + uncompressed_children.nbytes(), + ); + assert_eq!(compressed.dtype(), array.dtype()); + Ok(()) +} + +#[test] +fn binary_fsst_improves_parquet_variant_child_compression() -> vortex_error::VortexResult<()> { + let array: ArrayRef = json_array(&json_data())?; + let mut exec_ctx = SESSION.create_execution_ctx(); + let without_binary_fsst = CascadingCompressor::new(vec![ + &JsonToVariantScheme, + &BinaryDictScheme, + &IntConstantScheme, + &FoRScheme, + &SparseScheme, + &BitPackingScheme, + &RunEndScheme, + &SequenceScheme, + &ZigZagScheme, + ]) + .compress(&array, &mut exec_ctx)?; + + let mut exec_ctx = SESSION.create_execution_ctx(); + let with_binary_fsst = parquet_variant_child_compressor().compress(&array, &mut exec_ctx)?; + + assert!( + with_binary_fsst.nbytes() < without_binary_fsst.nbytes(), + "binary FSST should improve Parquet Variant child compression: with={} bytes, without={} bytes", + with_binary_fsst.nbytes(), + without_binary_fsst.nbytes(), + ); + + Ok(()) +} + +#[test] +fn prefers_smaller_extension_storage_over_variant_scheme() -> vortex_error::VortexResult<()> { + let array: ArrayRef = json_array(&json_data())?; + + let string_compressor = CascadingCompressor::new(vec![ + &StringDictScheme, + &FSSTScheme, + &IntConstantScheme, + &StringConstantScheme, + &FoRScheme, + &BitPackingScheme, + &RunEndScheme, + &SequenceScheme, + &ZigZagScheme, + ]); + let mut exec_ctx = SESSION.create_execution_ctx(); + let string_compressed = string_compressor.compress(&array, &mut exec_ctx)?; + + let variant_compressor = CascadingCompressor::new(vec![ + &JsonToVariantScheme, + &BinaryDictScheme, + &BinaryFSSTScheme, + &binary::ZstdScheme, + &IntConstantScheme, + &StringConstantScheme, + &FoRScheme, + &SparseScheme, + &BitPackingScheme, + &RunEndScheme, + &SequenceScheme, + &ZigZagScheme, + ]); + let mut exec_ctx = SESSION.create_execution_ctx(); + let compressed = variant_compressor.compress(&array, &mut exec_ctx)?; + + print_comparison_output(&array, &string_compressed, &compressed); + + Ok(()) +} diff --git a/vortex-btrblocks/src/variant/mod.rs b/vortex-btrblocks/src/variant/mod.rs deleted file mode 100644 index 344e0a7cd85..00000000000 --- a/vortex-btrblocks/src/variant/mod.rs +++ /dev/null @@ -1,713 +0,0 @@ -// SPDX-License-Identifier: Apache-2.0 -// SPDX-FileCopyrightText: Copyright the Vortex contributors - -//! Compression scheme for JSON data into binary variant representation - -use std::sync::Arc; - -use arrow_array::ArrayRef as ArrowArrayRef; -use arrow_array::StructArray as ArrowStructArray; -use arrow_schema::DataType; -use arrow_schema::Field; -use vortex_array::Array; -use vortex_array::ArrayId; -use vortex_array::ArrayParts; -use vortex_array::ArrayRef; -use vortex_array::ArrayView; -use vortex_array::Canonical; -use vortex_array::EmptyArrayData; -use vortex_array::EmptyMetadata; -use vortex_array::ExecutionCtx; -use vortex_array::ExecutionResult; -use vortex_array::IntoArray; -use vortex_array::arrays::ExtensionArray; -use vortex_array::arrays::VariantArray; -use vortex_array::arrays::extension::ExtensionArrayExt; -use vortex_array::arrays::variant::VariantArrayExt; -use vortex_array::arrow::ArrowSessionExt; -use vortex_array::arrow::FromArrowArray; -use vortex_array::arrow::to_arrow_null_buffer; -use vortex_array::buffer::BufferHandle; -use vortex_array::dtype::DType; -use vortex_array::dtype::extension::ExtDType; -use vortex_array::serde::ArrayChildren; -use vortex_array::validity::Validity; -use vortex_array::vtable::NotSupported; -use vortex_array::vtable::VTable; -use vortex_array::vtable::ValidityVTable; -use vortex_compressor::ctx::CompressorContext; -use vortex_compressor::estimate::CompressionEstimate; -use vortex_compressor::estimate::DeferredEstimate; -use vortex_compressor::scheme::Scheme; -use vortex_compressor::scheme::SchemeExt; -use vortex_compressor::stats::ArrayAndStats; -use vortex_error::VortexResult; -use vortex_error::vortex_bail; -use vortex_error::vortex_ensure; -use vortex_error::vortex_err; -use vortex_error::vortex_panic; -use vortex_json::Json; -use vortex_parquet_variant::ParquetVariant; -use vortex_parquet_variant::ParquetVariantArrayExt; -use vortex_session::VortexSession; -use vortex_session::registry::CachedId; - -use crate::CascadingCompressor; - -/// Compression scheme that converts JSON string extension arrays to Parquet Variant arrays. -#[derive(Debug)] -pub struct JsonToVariantScheme; - -/// Child indices for recursively compressed Parquet Variant binary children. -mod parquet_variant_children { - /// The Parquet Variant metadata child. - pub const METADATA: usize = 0; - /// The raw Parquet Variant value child. - pub const VALUE: usize = 1; -} - -mod variant_to_json_children { - pub const VARIANT: usize = 0; - pub const NUM_SLOTS: usize = 1; - pub const SLOT_NAMES: [&str; NUM_SLOTS] = ["variant"]; -} - -/// Array that exposes a Variant array as JSON strings. -#[derive(Debug, Clone)] -pub struct VariantToJson; - -/// A [`VariantToJson`]-encoded array. -pub type VariantToJsonArray = Array; - -impl VariantToJson { - /// Creates a JSON wrapper around a Variant-typed array. - pub fn try_new(variant: ArrayRef) -> VortexResult { - vortex_ensure!( - variant.dtype().is_variant(), - "VariantToJson expects a Variant array, got {}", - variant.dtype() - ); - - let storage_dtype = DType::Utf8(variant.dtype().nullability()); - let dtype = - DType::Extension(ExtDType::::try_new(EmptyMetadata, storage_dtype)?.erased()); - let len = variant.len(); - - Array::try_from_parts( - ArrayParts::new(VariantToJson, dtype, len, EmptyArrayData) - .with_slots(vec![Some(variant)].into()), - ) - } -} - -impl VTable for VariantToJson { - type TypedArrayData = EmptyArrayData; - type OperationsVTable = NotSupported; - type ValidityVTable = Self; - - fn id(&self) -> ArrayId { - static ID: CachedId = CachedId::new("vortex.variant_to_json"); - *ID - } - - fn validate( - &self, - _data: &Self::TypedArrayData, - dtype: &DType, - len: usize, - slots: &[Option], - ) -> VortexResult<()> { - vortex_ensure!( - slots.len() == variant_to_json_children::NUM_SLOTS, - "VariantToJsonArray expects {} slots, got {}", - variant_to_json_children::NUM_SLOTS, - slots.len() - ); - let variant = slots[variant_to_json_children::VARIANT] - .as_ref() - .ok_or_else(|| vortex_err!("VariantToJsonArray variant slot must be present"))?; - - let DType::Extension(ext_dtype) = dtype else { - vortex_bail!("VariantToJsonArray dtype must be a JSON extension, got {dtype}"); - }; - vortex_ensure!( - ext_dtype.is::(), - "VariantToJsonArray dtype must be a JSON extension, got {dtype}" - ); - vortex_ensure!( - variant.dtype() == &DType::Variant(dtype.nullability()), - "VariantToJsonArray child dtype {} does not match JSON dtype nullability {}", - variant.dtype(), - dtype - ); - vortex_ensure!( - variant.len() == len, - "VariantToJsonArray child length {} does not match outer length {}", - variant.len(), - len - ); - - Ok(()) - } - - fn nbuffers(_array: ArrayView<'_, Self>) -> usize { - 0 - } - - fn buffer(_array: ArrayView<'_, Self>, idx: usize) -> BufferHandle { - vortex_panic!("VariantToJsonArray buffer index {idx} out of bounds") - } - - fn buffer_name(_array: ArrayView<'_, Self>, _idx: usize) -> Option { - None - } - - fn serialize( - _array: ArrayView<'_, Self>, - _session: &VortexSession, - ) -> VortexResult>> { - Ok(Some(Vec::new())) - } - - fn deserialize( - &self, - dtype: &DType, - len: usize, - metadata: &[u8], - buffers: &[BufferHandle], - children: &dyn ArrayChildren, - _session: &VortexSession, - ) -> VortexResult> { - vortex_ensure!( - metadata.is_empty(), - "VariantToJsonArray metadata must be empty" - ); - vortex_ensure!( - buffers.is_empty(), - "VariantToJsonArray expects 0 buffers, got {}", - buffers.len() - ); - vortex_ensure!( - children.len() == variant_to_json_children::NUM_SLOTS, - "VariantToJsonArray expects {} children, got {}", - variant_to_json_children::NUM_SLOTS, - children.len() - ); - - let variant_dtype = DType::Variant(dtype.nullability()); - let variant = children.get(variant_to_json_children::VARIANT, &variant_dtype, len)?; - - Ok( - ArrayParts::new(self.clone(), dtype.clone(), len, EmptyArrayData) - .with_slots(vec![Some(variant)].into()), - ) - } - - fn slot_name(_array: ArrayView<'_, Self>, idx: usize) -> String { - match variant_to_json_children::SLOT_NAMES.get(idx) { - Some(name) => (*name).to_string(), - None => vortex_panic!("VariantToJsonArray slot_name index {idx} out of bounds"), - } - } - - fn execute(array: Array, ctx: &mut ExecutionCtx) -> VortexResult { - let variant = array.as_ref().slots()[variant_to_json_children::VARIANT] - .as_ref() - .ok_or_else(|| vortex_err!("VariantToJsonArray variant slot must be present"))?; - let variant = variant.clone().execute::(ctx)?; - vortex_ensure!( - variant.shredded().is_none(), - "VariantToJsonArray can only export unshredded Parquet Variant storage to JSON" - ); - - let parquet_variant = variant - .core_storage() - .as_opt::() - .ok_or_else(|| { - vortex_err!( - "VariantToJsonArray requires Parquet Variant core storage, got {}", - variant.core_storage().encoding_id() - ) - })?; - let arrow_variant = parquet_variant_to_json_arrow(parquet_variant, ctx)?; - let arrow_json = parquet_variant_compute::variant_to_json(&arrow_variant)?; - let storage = ArrayRef::from_arrow(&arrow_json, array.dtype().is_nullable())?; - - Ok(ExecutionResult::done( - ExtensionArray::try_new_from_vtable(Json, EmptyMetadata, storage)?.into_array(), - )) - } -} - -impl ValidityVTable for VariantToJson { - fn validity(array: ArrayView<'_, VariantToJson>) -> VortexResult { - array.slots()[variant_to_json_children::VARIANT] - .as_ref() - .ok_or_else(|| vortex_err!("VariantToJsonArray variant slot must be present"))? - .validity() - } -} - -fn parquet_variant_to_json_arrow( - parquet_variant: ArrayView<'_, ParquetVariant>, - ctx: &mut ExecutionCtx, -) -> VortexResult { - vortex_ensure!( - parquet_variant.typed_value_array().is_none(), - "VariantToJsonArray can only export unshredded Parquet Variant storage to JSON" - ); - let value = parquet_variant - .value_array() - .ok_or_else(|| vortex_err!("VariantToJsonArray requires Parquet Variant value storage"))?; - - let metadata_arrow = { - let target = Field::new("", DataType::Binary, false); - let session = ctx.session().clone(); - session.arrow().execute_arrow( - parquet_variant.metadata_array().clone(), - Some(&target), - ctx, - )? - }; - let value_arrow = { - let target = Field::new("", DataType::Binary, value.dtype().is_nullable()); - let session = ctx.session().clone(); - session - .arrow() - .execute_arrow(value.clone(), Some(&target), ctx)? - }; - let fields = vec![ - Arc::new(Field::new("metadata", DataType::Binary, false)), - Arc::new(Field::new( - "value", - DataType::Binary, - value.dtype().is_nullable(), - )), - ]; - let nulls = to_arrow_null_buffer(parquet_variant.validity()?, parquet_variant.len(), ctx)?; - - Ok(Arc::new(ArrowStructArray::try_new( - fields.into(), - vec![metadata_arrow, value_arrow], - nulls, - )?)) -} - -impl Scheme for JsonToVariantScheme { - fn scheme_name(&self) -> &'static str { - "json_to_variant" - } - - fn matches(&self, canonical: &Canonical) -> bool { - let Canonical::Extension(ext_array) = canonical else { - return false; - }; - - ext_array.ext_dtype().is::() - } - - fn num_children(&self) -> usize { - 2 - } - - fn expected_compression_ratio( - &self, - _data: &ArrayAndStats, - _compress_ctx: CompressorContext, - _exec_ctx: &mut ExecutionCtx, - ) -> CompressionEstimate { - CompressionEstimate::Deferred(DeferredEstimate::Sample) - } - - fn compress( - &self, - compressor: &CascadingCompressor, - data: &ArrayAndStats, - compress_ctx: CompressorContext, - exec_ctx: &mut ExecutionCtx, - ) -> VortexResult { - let array = data.array().clone().execute::(exec_ctx)?; - let storage = array.storage_array().clone(); - - if !storage.dtype().is_utf8() { - vortex_bail!("storage must be utf8"); - } - - let arrow_array = { - let session = exec_ctx.session().clone(); - let arrow = session.arrow(); - arrow.execute_arrow(storage, None, exec_ctx)? - }; - - let array = parquet_variant_compute::json_to_variant(&arrow_array)?; - - let parquet_variant = - ParquetVariant::from_arrow_variant(&array)?.downcast::(); - - let compressed_metadata = compressor.compress_child( - parquet_variant.metadata_array(), - &compress_ctx, - self.id(), - parquet_variant_children::METADATA, - exec_ctx, - )?; - let compressed_value = parquet_variant - .value_array() - .map(|value| { - compressor.compress_child( - value, - &compress_ctx, - self.id(), - parquet_variant_children::VALUE, - exec_ctx, - ) - }) - .transpose()?; - - let variant = ParquetVariant::try_new( - parquet_variant.validity()?, - compressed_metadata, - compressed_value, - parquet_variant.typed_value_array().cloned(), - )? - .into_array(); - - Ok(VariantToJson::try_new(variant)?.into_array()) - } -} - -#[cfg(test)] -mod tests { - use std::sync::LazyLock; - - use rand::RngExt; - use rand::SeedableRng; - use rand::rngs::StdRng; - use vortex_array::IntoArray; - use vortex_array::VortexSessionExecute; - use vortex_array::accessor::ArrayAccessor; - use vortex_array::arrays::ExtensionArray; - use vortex_array::arrays::VarBinViewArray; - use vortex_array::arrays::extension::ExtensionArrayExt; - use vortex_array::session::ArraySession; - use vortex_compressor::builtins::BinaryDictScheme; - use vortex_compressor::builtins::IntConstantScheme; - use vortex_compressor::builtins::StringConstantScheme; - use vortex_compressor::builtins::StringDictScheme; - use vortex_session::VortexSession; - - use super::*; - use crate::schemes::binary; - use crate::schemes::binary::BinaryFSSTScheme; - use crate::schemes::integer::BitPackingScheme; - use crate::schemes::integer::FoRScheme; - use crate::schemes::integer::RunEndScheme; - use crate::schemes::integer::SequenceScheme; - use crate::schemes::integer::SparseScheme; - use crate::schemes::integer::ZigZagScheme; - use crate::schemes::string::FSSTScheme; - - static SESSION: LazyLock = - LazyLock::new(|| VortexSession::empty().with::()); - - fn json_data() -> Vec { - let mut rng = StdRng::seed_from_u64(0); - const ACCOUNT_KEYS: &[&str] = &["account_id", "customer_id", "tenant_id", "buyer_id"]; - const REGION_KEYS: &[&str] = &["region", "market", "country"]; - const REGIONS: &[&str] = &["us-east", "us-west", "eu", "apac", "latam"]; - const STATUS_KEYS: &[&str] = &["status", "payment_state", "lifecycle_state"]; - const STATUSES: &[&str] = &["draft", "open", "paid", "void", "past_due"]; - const AMOUNT_KEYS: &[&str] = &["discount", "tax", "shipping", "credit"]; - const FLAG_KEYS: &[&str] = &["autopay", "fraud_review", "priority", "disputed"]; - const TAGS: &[&str] = &["renewal", "manual", "usage", "trial", "enterprise"]; - - (0..1024) - .map(|_| { - let mut fields = vec![ - format!( - r#""{}":"acct_{:04x}""#, - ACCOUNT_KEYS[rng.random_range(0..ACCOUNT_KEYS.len())], - rng.random::(), - ), - format!( - r#""invoice_total":{}.{:02}"#, - rng.random_range(10_u32..100_000), - rng.random_range(0_u32..100), - ), - format!(r#""line_items":{}"#, rng.random_range(1_u32..250)), - ]; - - if rng.random_bool(0.85) { - fields.push(format!( - r#""{}":"{}""#, - STATUS_KEYS[rng.random_range(0..STATUS_KEYS.len())], - STATUSES[rng.random_range(0..STATUSES.len())], - )); - } - if rng.random_bool(0.75) { - fields.push(format!( - r#""{}":"{}""#, - REGION_KEYS[rng.random_range(0..REGION_KEYS.len())], - REGIONS[rng.random_range(0..REGIONS.len())], - )); - } - if rng.random_bool(0.55) { - fields.push(format!( - r#""{}":{}.{:02}"#, - AMOUNT_KEYS[rng.random_range(0..AMOUNT_KEYS.len())], - rng.random_range(0_u32..2_500), - rng.random_range(0_u32..100), - )); - } - if rng.random_bool(0.40) { - fields.push(format!( - r#""{}":{}"#, - FLAG_KEYS[rng.random_range(0..FLAG_KEYS.len())], - rng.random_bool(0.5), - )); - } - if rng.random_bool(0.30) { - fields.push(format!( - r#""tags":["{}","{}"]"#, - TAGS[rng.random_range(0..TAGS.len())], - TAGS[rng.random_range(0..TAGS.len())], - )); - } - - format!("{{{}}}", fields.join(",")) - }) - .collect() - } - - fn json_array(values: &[String]) -> VortexResult { - let storage = - VarBinViewArray::from_iter_str(values.iter().map(String::as_str)).into_array(); - Ok(ExtensionArray::try_new_from_vtable(Json, EmptyMetadata, storage)?.into_array()) - } - - #[test] - fn variant_to_json_canonicalizes_to_json_extension() -> VortexResult<()> { - let values = [ - "0".to_string(), - r#"{"a":32}"#.to_string(), - r#""hello""#.to_string(), - "null".to_string(), - ]; - let storage = - VarBinViewArray::from_iter_str(values.iter().map(String::as_str)).into_array(); - let source = - ExtensionArray::try_new_from_vtable(Json, EmptyMetadata, storage.clone())?.into_array(); - - let mut exec_ctx = SESSION.create_execution_ctx(); - let arrow_array = { - let session = exec_ctx.session().clone(); - session - .arrow() - .execute_arrow(storage, None, &mut exec_ctx)? - }; - let arrow_variant = parquet_variant_compute::json_to_variant(&arrow_array)?; - let variant = ParquetVariant::from_arrow_variant(&arrow_variant)?; - - let wrapped = VariantToJson::try_new(variant)?; - assert_eq!(wrapped.dtype(), source.dtype()); - - let json = wrapped - .into_array() - .execute::(&mut exec_ctx)?; - assert_eq!(json.dtype(), source.dtype()); - assert!(json.storage_array().dtype().is_utf8()); - let json_storage = json - .storage_array() - .clone() - .execute::(&mut exec_ctx)?; - let actual = json_storage.with_iterator(|iter| { - iter.map(|value| value.map(<[u8]>::to_vec)) - .collect::>() - }); - let expected = values - .iter() - .map(|value| Some(value.as_bytes().to_vec())) - .collect::>(); - - assert_eq!(actual, expected); - - Ok(()) - } - - fn parquet_variant_child_compressor() -> CascadingCompressor { - CascadingCompressor::new(vec![ - &JsonToVariantScheme, - &BinaryDictScheme, - &BinaryFSSTScheme, - &IntConstantScheme, - &FoRScheme, - &SparseScheme, - &BitPackingScheme, - &RunEndScheme, - &SequenceScheme, - &ZigZagScheme, - ]) - } - - #[test] - fn json_to_variant_scheme_wraps_output_as_json() -> VortexResult<()> { - let array = json_array(&json_data())?; - - let variant_compressor = parquet_variant_child_compressor(); - let mut exec_ctx = SESSION.create_execution_ctx(); - let compressed = variant_compressor.compress(&array, &mut exec_ctx)?; - - assert_eq!(compressed.dtype(), array.dtype()); - - let json = compressed.execute::(&mut exec_ctx)?; - assert_eq!(json.dtype(), array.dtype()); - assert!(json.storage_array().dtype().is_utf8()); - - Ok(()) - } - - fn print_comparison_output( - array: &ArrayRef, - string_compressed: &ArrayRef, - compressed: &ArrayRef, - ) { - let compressed_ratio = array.nbytes() as f64 / compressed.nbytes() as f64; - let compressed_array_ratio = string_compressed.nbytes() as f64 / compressed.nbytes() as f64; - println!( - "Compression sizes: input={} bytes, compressed string={} bytes, compressed output={} bytes", - array.nbytes(), - string_compressed.nbytes(), - compressed.nbytes(), - ); - println!("Compressed output ratio: {compressed_ratio:.2}x\n"); - println!("Compressed string / compressed output ratio: {compressed_array_ratio:.2}x\n"); - println!("JSON input encoding tree:\n{}", array.tree_display()); - println!( - "String-compressed encoding tree:\n{}", - string_compressed.tree_display() - ); - println!( - "Compressed output encoding tree:\n{}", - compressed.tree_display() - ); - } - - #[test] - fn parquet_variant_compresses_repeated_json_keys() -> VortexResult<()> { - let array = json_array(&json_data())?; - - let string_compressor = - CascadingCompressor::new(vec![&StringDictScheme, &StringConstantScheme]); - let mut exec_ctx = SESSION.create_execution_ctx(); - let string_compressed = string_compressor.compress(&array, &mut exec_ctx)?; - - let variant_compressor = parquet_variant_child_compressor(); - let mut exec_ctx = SESSION.create_execution_ctx(); - let variant_compressed = variant_compressor.compress(&array, &mut exec_ctx)?; - - assert!( - variant_compressed.nbytes() < string_compressed.nbytes(), - "Parquet Variant conversion should compress repeated JSON keys: \ - variant={} bytes, input={} bytes", - variant_compressed.nbytes(), - string_compressed.nbytes(), - ); - - print_comparison_output(&array, &string_compressed, &variant_compressed); - - Ok(()) - } - - #[test] - fn recursively_compresses_parquet_variant_binary_children() -> VortexResult<()> { - let array: ArrayRef = json_array(&json_data())?; - - let mut exec_ctx = SESSION.create_execution_ctx(); - let uncompressed_children = - CascadingCompressor::new(vec![&JsonToVariantScheme]).compress(&array, &mut exec_ctx)?; - - let variant_compressor = parquet_variant_child_compressor(); - let mut exec_ctx = SESSION.create_execution_ctx(); - let compressed = variant_compressor.compress(&array, &mut exec_ctx)?; - - assert!( - compressed.nbytes() < uncompressed_children.nbytes(), - "recursive child compression should reduce Parquet Variant size: compressed={} bytes, uncompressed_children={} bytes", - compressed.nbytes(), - uncompressed_children.nbytes(), - ); - assert_eq!(compressed.dtype(), array.dtype()); - Ok(()) - } - - #[test] - fn binary_fsst_improves_parquet_variant_child_compression() -> VortexResult<()> { - let array: ArrayRef = json_array(&json_data())?; - let mut exec_ctx = SESSION.create_execution_ctx(); - let without_binary_fsst = CascadingCompressor::new(vec![ - &JsonToVariantScheme, - &BinaryDictScheme, - &IntConstantScheme, - &FoRScheme, - &SparseScheme, - &BitPackingScheme, - &RunEndScheme, - &SequenceScheme, - &ZigZagScheme, - ]) - .compress(&array, &mut exec_ctx)?; - - let mut exec_ctx = SESSION.create_execution_ctx(); - let with_binary_fsst = - parquet_variant_child_compressor().compress(&array, &mut exec_ctx)?; - - assert!( - with_binary_fsst.nbytes() < without_binary_fsst.nbytes(), - "binary FSST should improve Parquet Variant child compression: with={} bytes, without={} bytes", - with_binary_fsst.nbytes(), - without_binary_fsst.nbytes(), - ); - - Ok(()) - } - - #[test] - fn prefers_smaller_extension_storage_over_variant_scheme() -> VortexResult<()> { - let array: ArrayRef = json_array(&json_data())?; - - let string_compressor = CascadingCompressor::new(vec![ - &StringDictScheme, - &FSSTScheme, - &IntConstantScheme, - &StringConstantScheme, - &FoRScheme, - &BitPackingScheme, - &RunEndScheme, - &SequenceScheme, - &ZigZagScheme, - ]); - let mut exec_ctx = SESSION.create_execution_ctx(); - let string_compressed = string_compressor.compress(&array, &mut exec_ctx)?; - - let variant_compressor = CascadingCompressor::new(vec![ - &JsonToVariantScheme, - &BinaryDictScheme, - &BinaryFSSTScheme, - &binary::ZstdScheme, - &IntConstantScheme, - &StringConstantScheme, - &FoRScheme, - &SparseScheme, - &BitPackingScheme, - &RunEndScheme, - &SequenceScheme, - &ZigZagScheme, - ]); - let mut exec_ctx = SESSION.create_execution_ctx(); - let compressed = variant_compressor.compress(&array, &mut exec_ctx)?; - - print_comparison_output(&array, &string_compressed, &compressed); - - Ok(()) - } -} From 3a89190d42d5ce4d48639d0821c70cc658b74387 Mon Sep 17 00:00:00 2001 From: Adam Gutglick Date: Tue, 9 Jun 2026 13:23:50 +0100 Subject: [PATCH 07/11] Keep validity Signed-off-by: Adam Gutglick --- .../src/schemes/variant/json_to_variant.rs | 6 ++- vortex-btrblocks/src/schemes/variant/tests.rs | 50 +++++++++++++++++++ 2 files changed, 55 insertions(+), 1 deletion(-) diff --git a/vortex-btrblocks/src/schemes/variant/json_to_variant.rs b/vortex-btrblocks/src/schemes/variant/json_to_variant.rs index 50d4bc5ba2a..d2c46ec6515 100644 --- a/vortex-btrblocks/src/schemes/variant/json_to_variant.rs +++ b/vortex-btrblocks/src/schemes/variant/json_to_variant.rs @@ -69,6 +69,7 @@ impl Scheme for JsonToVariantScheme { exec_ctx: &mut ExecutionCtx, ) -> VortexResult { let array = data.array().clone().execute::(exec_ctx)?; + let output_nullability = array.dtype().nullability(); let storage = array.storage_array().clone(); if !storage.dtype().is_utf8() { @@ -105,8 +106,11 @@ impl Scheme for JsonToVariantScheme { }) .transpose()?; + let variant_validity = parquet_variant + .validity()? + .union_nullability(output_nullability); let variant = ParquetVariant::try_new( - parquet_variant.validity()?, + variant_validity, compressed_metadata, compressed_value, parquet_variant.typed_value_array().cloned(), diff --git a/vortex-btrblocks/src/schemes/variant/tests.rs b/vortex-btrblocks/src/schemes/variant/tests.rs index 71758f1ee0b..61cfdf675f8 100644 --- a/vortex-btrblocks/src/schemes/variant/tests.rs +++ b/vortex-btrblocks/src/schemes/variant/tests.rs @@ -13,6 +13,7 @@ use vortex_array::arrays::ExtensionArray; use vortex_array::arrays::VarBinViewArray; use vortex_array::arrays::extension::ExtensionArrayExt; use vortex_array::session::ArraySession; +use vortex_array::validity::Validity; use vortex_compressor::builtins::BinaryDictScheme; use vortex_compressor::builtins::IntConstantScheme; use vortex_compressor::builtins::StringConstantScheme; @@ -116,6 +117,25 @@ fn json_array(values: &[String]) -> vortex_error::VortexResult { ) } +fn all_valid_nullable_json_array( + values: impl IntoIterator, +) -> vortex_error::VortexResult { + let storage = VarBinViewArray::from_iter_str(values); + let parts = storage.into_data_parts(); + let storage = VarBinViewArray::new_handle( + parts.views, + parts.buffers, + parts.dtype.as_nullable(), + Validity::AllValid, + ) + .into_array(); + + Ok( + ExtensionArray::try_new_from_vtable(Json, vortex_array::EmptyMetadata, storage)? + .into_array(), + ) +} + fn parquet_variant_child_compressor() -> CascadingCompressor { CascadingCompressor::new(vec![ &JsonToVariantScheme, @@ -149,6 +169,36 @@ fn json_to_variant_scheme_wraps_output_as_json() -> vortex_error::VortexResult<( Ok(()) } +#[test] +fn preserves_nullable_json_dtype_for_all_valid_storage() -> vortex_error::VortexResult<()> { + let values = [r#"{"a":1}"#, r#"{"b":2}"#]; + let storage = VarBinViewArray::from_iter_str(values); + let parts = storage.into_data_parts(); + let storage = VarBinViewArray::new_handle( + parts.views, + parts.buffers, + parts.dtype.as_nullable(), + Validity::AllValid, + ) + .into_array(); + + let array = ExtensionArray::try_new_from_vtable(Json, vortex_array::EmptyMetadata, storage)? + .into_array(); + + assert!(array.dtype().is_nullable()); + + let variant_compressor = CascadingCompressor::new(vec![&JsonToVariantScheme]); + let mut exec_ctx = SESSION.create_execution_ctx(); + let compressed = variant_compressor.compress(&array, &mut exec_ctx)?; + + assert_eq!(compressed.dtype(), array.dtype()); + + let json = compressed.execute::(&mut exec_ctx)?; + assert_eq!(json.dtype(), array.dtype()); + + Ok(()) +} + fn print_comparison_output(array: &ArrayRef, string_compressed: &ArrayRef, compressed: &ArrayRef) { let compressed_ratio = array.nbytes() as f64 / compressed.nbytes() as f64; let compressed_array_ratio = string_compressed.nbytes() as f64 / compressed.nbytes() as f64; From f5f6854a74d5da9d9735a305ad319563f28fa7ed Mon Sep 17 00:00:00 2001 From: Adam Gutglick Date: Tue, 9 Jun 2026 13:28:03 +0100 Subject: [PATCH 08/11] comment Signed-off-by: Adam Gutglick --- vortex-btrblocks/src/schemes/variant/json_to_variant.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/vortex-btrblocks/src/schemes/variant/json_to_variant.rs b/vortex-btrblocks/src/schemes/variant/json_to_variant.rs index d2c46ec6515..17b4f9e2628 100644 --- a/vortex-btrblocks/src/schemes/variant/json_to_variant.rs +++ b/vortex-btrblocks/src/schemes/variant/json_to_variant.rs @@ -24,6 +24,9 @@ use vortex_parquet_variant::VariantToJson; use crate::CascadingCompressor; /// Compression scheme that converts JSON string extension arrays to Parquet Variant arrays. +/// +/// When decompressed, the resulting JSON array might not be byte-to-byte identical, as this +/// compression doesn't maintain whitespaces. #[derive(Debug)] pub struct JsonToVariantScheme; From e7d5465ccc2232679513d5bd04f7f33d574176f1 Mon Sep 17 00:00:00 2001 From: Adam Gutglick Date: Tue, 9 Jun 2026 13:39:37 +0100 Subject: [PATCH 09/11] fmt Signed-off-by: Adam Gutglick --- vortex-btrblocks/Cargo.toml | 4 +++- vortex-btrblocks/src/schemes/variant/tests.rs | 19 ------------------- 2 files changed, 3 insertions(+), 20 deletions(-) diff --git a/vortex-btrblocks/Cargo.toml b/vortex-btrblocks/Cargo.toml index 200df38f6a5..e24f7266347 100644 --- a/vortex-btrblocks/Cargo.toml +++ b/vortex-btrblocks/Cargo.toml @@ -33,7 +33,9 @@ vortex-fsst = { workspace = true } vortex-json = { workspace = true, optional = true } vortex-mask = { workspace = true } vortex-onpair = { workspace = true, optional = true } -vortex-parquet-variant = { workspace = true, optional = true, features = ["json"] } +vortex-parquet-variant = { workspace = true, optional = true, features = [ + "json", +] } vortex-pco = { workspace = true, optional = true } vortex-runend = { workspace = true } vortex-sequence = { workspace = true } diff --git a/vortex-btrblocks/src/schemes/variant/tests.rs b/vortex-btrblocks/src/schemes/variant/tests.rs index 61cfdf675f8..036e5633710 100644 --- a/vortex-btrblocks/src/schemes/variant/tests.rs +++ b/vortex-btrblocks/src/schemes/variant/tests.rs @@ -117,25 +117,6 @@ fn json_array(values: &[String]) -> vortex_error::VortexResult { ) } -fn all_valid_nullable_json_array( - values: impl IntoIterator, -) -> vortex_error::VortexResult { - let storage = VarBinViewArray::from_iter_str(values); - let parts = storage.into_data_parts(); - let storage = VarBinViewArray::new_handle( - parts.views, - parts.buffers, - parts.dtype.as_nullable(), - Validity::AllValid, - ) - .into_array(); - - Ok( - ExtensionArray::try_new_from_vtable(Json, vortex_array::EmptyMetadata, storage)? - .into_array(), - ) -} - fn parquet_variant_child_compressor() -> CascadingCompressor { CascadingCompressor::new(vec![ &JsonToVariantScheme, From 71306c8961a8715e84fc68f4cb76f33bfda72d8a Mon Sep 17 00:00:00 2001 From: Adam Gutglick Date: Tue, 9 Jun 2026 14:27:30 +0100 Subject: [PATCH 10/11] follow ups Signed-off-by: Adam Gutglick --- encodings/parquet-variant/Cargo.toml | 3 +-- vortex-btrblocks/Cargo.toml | 2 -- .../src/schemes/variant/json_to_variant.rs | 21 +++++++++++++++++-- vortex-btrblocks/src/schemes/variant/tests.rs | 2 -- 4 files changed, 20 insertions(+), 8 deletions(-) diff --git a/encodings/parquet-variant/Cargo.toml b/encodings/parquet-variant/Cargo.toml index 5d3021302a4..1b0efb88c83 100644 --- a/encodings/parquet-variant/Cargo.toml +++ b/encodings/parquet-variant/Cargo.toml @@ -12,7 +12,6 @@ readme = { workspace = true } repository = { workspace = true } rust-version = { workspace = true } version = { workspace = true } -publish = true [lints] workspace = true @@ -28,10 +27,10 @@ prost = { workspace = true } vortex-array = { workspace = true } vortex-buffer = { workspace = true } vortex-error = { workspace = true } +vortex-json = { workspace = true, optional = true } vortex-mask = { workspace = true } vortex-proto = { workspace = true } vortex-session = { workspace = true } -vortex-json = { workspace = true, optional = true } [dev-dependencies] rstest = { workspace = true } diff --git a/vortex-btrblocks/Cargo.toml b/vortex-btrblocks/Cargo.toml index e24f7266347..e3c38095614 100644 --- a/vortex-btrblocks/Cargo.toml +++ b/vortex-btrblocks/Cargo.toml @@ -48,7 +48,6 @@ vortex-zstd = { workspace = true, optional = true } [dev-dependencies] divan = { workspace = true } -rand = { workspace = true } rstest = { workspace = true } test-with = { workspace = true } vortex-array = { workspace = true, features = ["_test-harness"] } @@ -66,7 +65,6 @@ parquet-variant = [ "dep:parquet-variant-compute", "dep:vortex-session", "dep:vortex-json", - "zstd", ] pco = ["dep:pco", "dep:vortex-pco"] zstd = ["dep:vortex-zstd"] diff --git a/vortex-btrblocks/src/schemes/variant/json_to_variant.rs b/vortex-btrblocks/src/schemes/variant/json_to_variant.rs index 17b4f9e2628..a32433e3c79 100644 --- a/vortex-btrblocks/src/schemes/variant/json_to_variant.rs +++ b/vortex-btrblocks/src/schemes/variant/json_to_variant.rs @@ -36,11 +36,13 @@ mod parquet_variant_children { pub const METADATA: usize = 0; /// The raw Parquet Variant value child. pub const VALUE: usize = 1; + /// The raw Parquet Variant typed_value child. + pub const TYPED_VALUE: usize = 2; } impl Scheme for JsonToVariantScheme { fn scheme_name(&self) -> &'static str { - "json_to_variant" + "vortex.variant.pq.json" } fn matches(&self, canonical: &Canonical) -> bool { @@ -109,6 +111,21 @@ impl Scheme for JsonToVariantScheme { }) .transpose()?; + // This is currently always none, but we should make sure to compress it + // if it exists + let typed_value = parquet_variant + .typed_value_array() + .map(|typed| { + compressor.compress_child( + typed, + &compress_ctx, + self.id(), + parquet_variant_children::TYPED_VALUE, + exec_ctx, + ) + }) + .transpose()?; + let variant_validity = parquet_variant .validity()? .union_nullability(output_nullability); @@ -116,7 +133,7 @@ impl Scheme for JsonToVariantScheme { variant_validity, compressed_metadata, compressed_value, - parquet_variant.typed_value_array().cloned(), + typed_value, )? .into_array(); diff --git a/vortex-btrblocks/src/schemes/variant/tests.rs b/vortex-btrblocks/src/schemes/variant/tests.rs index 036e5633710..ec53cccc9e6 100644 --- a/vortex-btrblocks/src/schemes/variant/tests.rs +++ b/vortex-btrblocks/src/schemes/variant/tests.rs @@ -24,7 +24,6 @@ use vortex_session::VortexSession; use super::*; use crate::CascadingCompressor; -use crate::schemes::binary; use crate::schemes::binary::BinaryFSSTScheme; use crate::schemes::integer::BitPackingScheme; use crate::schemes::integer::FoRScheme; @@ -302,7 +301,6 @@ fn prefers_smaller_extension_storage_over_variant_scheme() -> vortex_error::Vort &JsonToVariantScheme, &BinaryDictScheme, &BinaryFSSTScheme, - &binary::ZstdScheme, &IntConstantScheme, &StringConstantScheme, &FoRScheme, From b79f254b2073f0975ab295018564294565ac55b3 Mon Sep 17 00:00:00 2001 From: Adam Gutglick Date: Thu, 11 Jun 2026 15:37:38 +0100 Subject: [PATCH 11/11] Uncycle dependencies Signed-off-by: Adam Gutglick --- .../src/schemes/variant/json_to_variant.rs | 24 ++++++------------- 1 file changed, 7 insertions(+), 17 deletions(-) diff --git a/vortex-btrblocks/src/schemes/variant/json_to_variant.rs b/vortex-btrblocks/src/schemes/variant/json_to_variant.rs index a32433e3c79..9557a1e9fee 100644 --- a/vortex-btrblocks/src/schemes/variant/json_to_variant.rs +++ b/vortex-btrblocks/src/schemes/variant/json_to_variant.rs @@ -36,8 +36,6 @@ mod parquet_variant_children { pub const METADATA: usize = 0; /// The raw Parquet Variant value child. pub const VALUE: usize = 1; - /// The raw Parquet Variant typed_value child. - pub const TYPED_VALUE: usize = 2; } impl Scheme for JsonToVariantScheme { @@ -111,20 +109,12 @@ impl Scheme for JsonToVariantScheme { }) .transpose()?; - // This is currently always none, but we should make sure to compress it - // if it exists - let typed_value = parquet_variant - .typed_value_array() - .map(|typed| { - compressor.compress_child( - typed, - &compress_ctx, - self.id(), - parquet_variant_children::TYPED_VALUE, - exec_ctx, - ) - }) - .transpose()?; + // `VariantToJson` can only decode unshredded storage, so producing a typed_value child + // here would create an array that fails at decompression time. + vortex_ensure!( + parquet_variant.typed_value_array().is_none(), + "JsonToVariantScheme does not support shredded (typed_value) Parquet Variant storage" + ); let variant_validity = parquet_variant .validity()? @@ -133,7 +123,7 @@ impl Scheme for JsonToVariantScheme { variant_validity, compressed_metadata, compressed_value, - typed_value, + None, )? .into_array();