diff --git a/Cargo.lock b/Cargo.lock index b6f67682c4a..09506ddd6a2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9819,6 +9819,7 @@ dependencies = [ "codspeed-divan-compat", "itertools 0.14.0", "num-traits", + "parquet-variant-compute", "pco", "rand 0.10.1", "rstest", @@ -9834,8 +9835,10 @@ dependencies = [ "vortex-error", "vortex-fastlanes", "vortex-fsst", + "vortex-json", "vortex-mask", "vortex-onpair", + "vortex-parquet-variant", "vortex-pco", "vortex-runend", "vortex-sequence", @@ -10466,6 +10469,7 @@ dependencies = [ "vortex-error", "vortex-file", "vortex-io", + "vortex-json", "vortex-layout", "vortex-mask", "vortex-proto", diff --git a/encodings/parquet-variant/Cargo.toml b/encodings/parquet-variant/Cargo.toml index 6b3900abb27..1b0efb88c83 100644 --- a/encodings/parquet-variant/Cargo.toml +++ b/encodings/parquet-variant/Cargo.toml @@ -27,6 +27,7 @@ prost = { workspace = true } vortex-array = { workspace = true } vortex-buffer = { workspace = true } vortex-error = { workspace = true } +vortex-json = { workspace = true, optional = true } vortex-mask = { workspace = true } vortex-proto = { workspace = true } vortex-session = { workspace = true } @@ -38,3 +39,6 @@ vortex-array = { workspace = true, features = ["_test-harness"] } vortex-file = { workspace = true, features = ["tokio"] } vortex-io = { workspace = true, features = ["tokio"] } vortex-layout = { workspace = true } + +[features] +json = ["dep:vortex-json"] diff --git a/encodings/parquet-variant/src/arrow.rs b/encodings/parquet-variant/src/arrow.rs index 9bee5c738d3..c3496e79eda 100644 --- a/encodings/parquet-variant/src/arrow.rs +++ b/encodings/parquet-variant/src/arrow.rs @@ -58,7 +58,7 @@ fn parquet_variant_storage_request(fields: &Fields) -> Option<(bool, bool)> { (has_metadata && (has_value || has_typed_value)).then_some((has_value, has_typed_value)) } -fn export_storage_to_target( +pub(crate) fn export_storage_to_target( parquet_array: &T, target_fields: &Fields, ctx: &mut ExecutionCtx, diff --git a/encodings/parquet-variant/src/json.rs b/encodings/parquet-variant/src/json.rs new file mode 100644 index 00000000000..af9f68adfa6 --- /dev/null +++ b/encodings/parquet-variant/src/json.rs @@ -0,0 +1,315 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! JSON extension wrappers for Parquet Variant storage. + +use std::sync::Arc; + +use arrow_schema::DataType; +use arrow_schema::Field; +use vortex_array::Array; +use vortex_array::ArrayId; +use vortex_array::ArrayParts; +use vortex_array::ArrayRef; +use vortex_array::ArrayView; +use vortex_array::EmptyArrayData; +use vortex_array::EmptyMetadata; +use vortex_array::ExecutionCtx; +use vortex_array::ExecutionResult; +use vortex_array::IntoArray; +use vortex_array::arrays::ExtensionArray; +use vortex_array::arrays::VariantArray; +use vortex_array::arrays::variant::VariantArrayExt; +use vortex_array::arrow::FromArrowArray; +use vortex_array::buffer::BufferHandle; +use vortex_array::dtype::DType; +use vortex_array::dtype::extension::ExtDType; +use vortex_array::serde::ArrayChildren; +use vortex_array::validity::Validity; +use vortex_array::vtable::NotSupported; +use vortex_array::vtable::VTable; +use vortex_array::vtable::ValidityVTable; +use vortex_error::VortexResult; +use vortex_error::vortex_bail; +use vortex_error::vortex_ensure; +use vortex_error::vortex_err; +use vortex_error::vortex_panic; +use vortex_json::Json; +use vortex_session::VortexSession; +use vortex_session::registry::CachedId; + +use crate::ParquetVariant; +use crate::ParquetVariantArrayExt; + +mod variant_to_json_children { + pub const VARIANT: usize = 0; + pub const NUM_SLOTS: usize = 1; + pub const SLOT_NAMES: [&str; NUM_SLOTS] = ["variant"]; +} + +/// Array that exposes a Variant array as JSON strings. +#[derive(Debug, Clone)] +pub struct VariantToJson; + +/// A [`VariantToJson`]-encoded array. +pub type VariantToJsonArray = Array; + +impl VariantToJson { + /// Creates a JSON wrapper around a Variant-typed array. + pub fn try_new(variant: ArrayRef) -> VortexResult { + vortex_ensure!( + variant.dtype().is_variant(), + "VariantToJson expects a Variant array, got {}", + variant.dtype() + ); + + let storage_dtype = DType::Utf8(variant.dtype().nullability()); + let dtype = + DType::Extension(ExtDType::::try_new(EmptyMetadata, storage_dtype)?.erased()); + let len = variant.len(); + + Array::try_from_parts( + ArrayParts::new(VariantToJson, dtype, len, EmptyArrayData) + .with_slots(vec![Some(variant)].into()), + ) + } +} + +impl VTable for VariantToJson { + type TypedArrayData = EmptyArrayData; + type OperationsVTable = NotSupported; + type ValidityVTable = Self; + + fn id(&self) -> ArrayId { + static ID: CachedId = CachedId::new("vortex.variant_to_json"); + *ID + } + + fn validate( + &self, + _data: &Self::TypedArrayData, + dtype: &DType, + len: usize, + slots: &[Option], + ) -> VortexResult<()> { + vortex_ensure!( + slots.len() == variant_to_json_children::NUM_SLOTS, + "VariantToJsonArray expects {} slots, got {}", + variant_to_json_children::NUM_SLOTS, + slots.len() + ); + let variant = slots[variant_to_json_children::VARIANT] + .as_ref() + .ok_or_else(|| vortex_err!("VariantToJsonArray variant slot must be present"))?; + + let DType::Extension(ext_dtype) = dtype else { + vortex_bail!("VariantToJsonArray dtype must be a JSON extension, got {dtype}"); + }; + vortex_ensure!( + ext_dtype.is::(), + "VariantToJsonArray dtype must be a JSON extension, got {dtype}" + ); + vortex_ensure!( + variant.dtype() == &DType::Variant(dtype.nullability()), + "VariantToJsonArray child dtype {} does not match JSON dtype nullability {}", + variant.dtype(), + dtype + ); + vortex_ensure!( + variant.len() == len, + "VariantToJsonArray child length {} does not match outer length {}", + variant.len(), + len + ); + + Ok(()) + } + + fn nbuffers(_array: ArrayView<'_, Self>) -> usize { + 0 + } + + fn buffer(_array: ArrayView<'_, Self>, idx: usize) -> BufferHandle { + vortex_panic!("VariantToJsonArray buffer index {idx} out of bounds") + } + + fn buffer_name(_array: ArrayView<'_, Self>, _idx: usize) -> Option { + None + } + + fn serialize( + _array: ArrayView<'_, Self>, + _session: &VortexSession, + ) -> VortexResult>> { + Ok(Some(Vec::new())) + } + + fn deserialize( + &self, + dtype: &DType, + len: usize, + metadata: &[u8], + buffers: &[BufferHandle], + children: &dyn ArrayChildren, + _session: &VortexSession, + ) -> VortexResult> { + vortex_ensure!( + metadata.is_empty(), + "VariantToJsonArray metadata must be empty" + ); + vortex_ensure!( + buffers.is_empty(), + "VariantToJsonArray expects 0 buffers, got {}", + buffers.len() + ); + vortex_ensure!( + children.len() == variant_to_json_children::NUM_SLOTS, + "VariantToJsonArray expects {} children, got {}", + variant_to_json_children::NUM_SLOTS, + children.len() + ); + + let variant_dtype = DType::Variant(dtype.nullability()); + let variant = children.get(variant_to_json_children::VARIANT, &variant_dtype, len)?; + + Ok( + ArrayParts::new(self.clone(), dtype.clone(), len, EmptyArrayData) + .with_slots(vec![Some(variant)].into()), + ) + } + + fn slot_name(_array: ArrayView<'_, Self>, idx: usize) -> String { + match variant_to_json_children::SLOT_NAMES.get(idx) { + Some(name) => (*name).to_string(), + None => vortex_panic!("VariantToJsonArray slot_name index {idx} out of bounds"), + } + } + + fn execute(array: Array, ctx: &mut ExecutionCtx) -> VortexResult { + let variant = array.as_ref().slots()[variant_to_json_children::VARIANT] + .as_ref() + .ok_or_else(|| vortex_err!("VariantToJsonArray variant slot must be present"))?; + let variant = variant.clone().execute::(ctx)?; + vortex_ensure!( + variant.shredded().is_none(), + "VariantToJsonArray can only export unshredded Parquet Variant storage to JSON" + ); + + let parquet_variant = variant + .core_storage() + .as_opt::() + .ok_or_else(|| { + vortex_err!( + "VariantToJsonArray requires Parquet Variant core storage, got {}", + variant.core_storage().encoding_id() + ) + })?; + vortex_ensure!( + parquet_variant.typed_value_array().is_none(), + "VariantToJsonArray can only export unshredded Parquet Variant storage to JSON" + ); + let value = parquet_variant.value_array().ok_or_else(|| { + vortex_err!("VariantToJsonArray requires Parquet Variant value storage") + })?; + let arrow_variant = crate::arrow::export_storage_to_target( + &parquet_variant, + &vec![ + Arc::new(Field::new("metadata", DataType::Binary, false)), + Arc::new(Field::new( + "value", + DataType::Binary, + value.dtype().is_nullable(), + )), + ] + .into(), + ctx, + )?; + let arrow_json = parquet_variant_compute::variant_to_json(&arrow_variant)?; + let storage = ArrayRef::from_arrow(&arrow_json, array.dtype().is_nullable())?; + + Ok(ExecutionResult::done( + ExtensionArray::try_new_from_vtable(Json, EmptyMetadata, storage)?.into_array(), + )) + } +} + +impl ValidityVTable for VariantToJson { + fn validity(array: ArrayView<'_, VariantToJson>) -> VortexResult { + array.slots()[variant_to_json_children::VARIANT] + .as_ref() + .ok_or_else(|| vortex_err!("VariantToJsonArray variant slot must be present"))? + .validity() + } +} + +#[cfg(test)] +mod tests { + use std::sync::LazyLock; + + use vortex_array::IntoArray; + use vortex_array::VortexSessionExecute; + use vortex_array::accessor::ArrayAccessor; + use vortex_array::arrays::ExtensionArray; + use vortex_array::arrays::VarBinViewArray; + use vortex_array::arrays::extension::ExtensionArrayExt; + use vortex_array::arrow::ArrowSessionExt; + use vortex_array::session::ArraySession; + use vortex_session::VortexSession; + + use super::*; + + static SESSION: LazyLock = LazyLock::new(|| { + let session = VortexSession::empty().with::(); + crate::initialize(&session); + session + }); + + #[test] + fn variant_to_json_canonicalizes_to_json_extension() -> VortexResult<()> { + let values = [ + "0".to_string(), + r#"{"a":32}"#.to_string(), + r#""hello""#.to_string(), + "null".to_string(), + ]; + let storage = + VarBinViewArray::from_iter_str(values.iter().map(String::as_str)).into_array(); + let source = + ExtensionArray::try_new_from_vtable(Json, EmptyMetadata, storage.clone())?.into_array(); + + let mut exec_ctx = SESSION.create_execution_ctx(); + let arrow_array = { + let session = exec_ctx.session().clone(); + session + .arrow() + .execute_arrow(storage, None, &mut exec_ctx)? + }; + let arrow_variant = parquet_variant_compute::json_to_variant(&arrow_array)?; + let variant = ParquetVariant::from_arrow_variant(&arrow_variant)?; + + let wrapped = VariantToJson::try_new(variant)?; + assert_eq!(wrapped.dtype(), source.dtype()); + + let json = wrapped + .into_array() + .execute::(&mut exec_ctx)?; + assert_eq!(json.dtype(), source.dtype()); + assert!(json.storage_array().dtype().is_utf8()); + let json_storage = json + .storage_array() + .clone() + .execute::(&mut exec_ctx)?; + let actual = json_storage.with_iterator(|iter| { + iter.map(|value| value.map(<[u8]>::to_vec)) + .collect::>() + }); + let expected = values + .iter() + .map(|value| Some(value.as_bytes().to_vec())) + .collect::>(); + + assert_eq!(actual, expected); + + Ok(()) + } +} diff --git a/encodings/parquet-variant/src/lib.rs b/encodings/parquet-variant/src/lib.rs index 03d2a046442..98476e2a77a 100644 --- a/encodings/parquet-variant/src/lib.rs +++ b/encodings/parquet-variant/src/lib.rs @@ -26,6 +26,8 @@ mod array; mod arrow; +#[cfg(feature = "json")] +mod json; mod kernel; mod operations; mod validity; @@ -34,6 +36,10 @@ mod vtable; use std::sync::Arc; pub use array::ParquetVariantArrayExt; +#[cfg(feature = "json")] +pub use json::VariantToJson; +#[cfg(feature = "json")] +pub use json::VariantToJsonArray; use vortex_array::arrow::ArrowSessionExt; use vortex_array::session::ArraySessionExt; use vortex_session::VortexSession; @@ -43,6 +49,8 @@ pub use vtable::ParquetVariantArray; /// Register Parquet Variant array and Arrow extension support with a session. pub fn initialize(session: &VortexSession) { session.arrays().register(ParquetVariant); + #[cfg(feature = "json")] + session.arrays().register(VariantToJson); session.arrow().register_exporter(Arc::new(ParquetVariant)); session.arrow().register_importer(Arc::new(ParquetVariant)); } diff --git a/vortex-btrblocks/Cargo.toml b/vortex-btrblocks/Cargo.toml index 1adb6508828..e3c38095614 100644 --- a/vortex-btrblocks/Cargo.toml +++ b/vortex-btrblocks/Cargo.toml @@ -16,6 +16,7 @@ version = { workspace = true } [dependencies] itertools = { workspace = true } num-traits = { workspace = true } +parquet-variant-compute = { workspace = true, optional = true } pco = { workspace = true, optional = true } rand = { workspace = true } rustc-hash = { workspace = true } @@ -29,11 +30,16 @@ vortex-decimal-byte-parts = { workspace = true } vortex-error = { workspace = true } vortex-fastlanes = { workspace = true } vortex-fsst = { workspace = true } +vortex-json = { workspace = true, optional = true } vortex-mask = { workspace = true } vortex-onpair = { workspace = true, optional = true } +vortex-parquet-variant = { workspace = true, optional = true, features = [ + "json", +] } vortex-pco = { workspace = true, optional = true } vortex-runend = { workspace = true } vortex-sequence = { workspace = true } +vortex-session = { workspace = true, optional = true } vortex-sparse = { workspace = true } vortex-tensor = { workspace = true, optional = true } vortex-utils = { workspace = true } @@ -54,6 +60,12 @@ unstable_encodings = [ "dep:vortex-onpair", "vortex-zstd?/unstable_encodings", ] +parquet-variant = [ + "dep:vortex-parquet-variant", + "dep:parquet-variant-compute", + "dep:vortex-session", + "dep:vortex-json", +] pco = ["dep:pco", "dep:vortex-pco"] zstd = ["dep:vortex-zstd"] diff --git a/vortex-btrblocks/src/builder.rs b/vortex-btrblocks/src/builder.rs index c0a0eaeb5eb..b2f75c28546 100644 --- a/vortex-btrblocks/src/builder.rs +++ b/vortex-btrblocks/src/builder.rs @@ -17,6 +17,8 @@ use crate::schemes::float; use crate::schemes::integer; use crate::schemes::string; use crate::schemes::temporal; +#[cfg(feature = "parquet-variant")] +use crate::schemes::variant; /// All available compression schemes. /// @@ -65,6 +67,7 @@ pub const ALL_SCHEMES: &[&dyn Scheme] = &[ // Binary schemes. //////////////////////////////////////////////////////////////////////////////////////////////// &binary::BinaryDictScheme, + &binary::BinaryFSSTScheme, &binary::BinaryConstantScheme, // Decimal schemes. &decimal::DecimalScheme, @@ -158,6 +161,15 @@ impl BtrBlocksCompressorBuilder { builder } + /// Adds JSON-to-Parquet-Variant compression for [`vortex_json::Json`] extension arrays. + /// + /// This scheme is opt-in because it produces a Parquet Variant-backed wrapper encoding rather + /// than ordinary JSON string storage. + #[cfg(feature = "parquet-variant")] + pub fn with_json_to_variant(self) -> Self { + self.with_new_scheme(&variant::JsonToVariantScheme) + } + /// Adds the TurboQuant lossy vector quantization scheme. /// /// When enabled, [`Vector`] extension arrays are compressed using the TurboQuant algorithm @@ -193,6 +205,7 @@ impl BtrBlocksCompressorBuilder { string::StringDictScheme.id(), string::FSSTScheme.id(), binary::BinaryDictScheme.id(), + binary::BinaryFSSTScheme.id(), ]; #[cfg(feature = "unstable_encodings")] excluded.push(string::OnPairScheme.id()); @@ -238,4 +251,13 @@ mod tests { let builder = BtrBlocksCompressorBuilder::default(); assert_eq!(builder.schemes.len(), ALL_SCHEMES.len()); } + + #[cfg(feature = "parquet-variant")] + #[test] + fn json_to_variant_builder_method_adds_scheme() { + let builder = BtrBlocksCompressorBuilder::empty().with_json_to_variant(); + + assert_eq!(builder.schemes.len(), 1); + assert_eq!(builder.schemes[0].id(), variant::JsonToVariantScheme.id()); + } } diff --git a/vortex-btrblocks/src/schemes/binary/fsst.rs b/vortex-btrblocks/src/schemes/binary/fsst.rs new file mode 100644 index 00000000000..fdbdeaf1225 --- /dev/null +++ b/vortex-btrblocks/src/schemes/binary/fsst.rs @@ -0,0 +1,189 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! FSST binary compression. + +use vortex_array::ArrayRef; +use vortex_array::Canonical; +use vortex_array::ExecutionCtx; +use vortex_array::IntoArray; +use vortex_array::arrays::PrimitiveArray; +use vortex_array::arrays::VarBinArray; +use vortex_array::arrays::primitive::PrimitiveArrayExt; +use vortex_array::arrays::varbin::VarBinArrayExt; +use vortex_compressor::estimate::CompressionEstimate; +use vortex_compressor::estimate::DeferredEstimate; +use vortex_error::VortexResult; +use vortex_fsst::FSST; +use vortex_fsst::FSSTArrayExt; +use vortex_fsst::fsst_compress; +use vortex_fsst::fsst_train_compressor; + +use crate::ArrayAndStats; +use crate::CascadingCompressor; +use crate::CompressorContext; +use crate::Scheme; +use crate::SchemeExt; + +/// FSST compression for binary values. +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +pub struct BinaryFSSTScheme; + +impl Scheme for BinaryFSSTScheme { + fn scheme_name(&self) -> &'static str { + "vortex.binary.fsst" + } + + fn matches(&self, canonical: &Canonical) -> bool { + canonical.dtype().is_binary() + } + + /// Children: lengths=0, code_offsets=1. + fn num_children(&self) -> usize { + 2 + } + + fn expected_compression_ratio( + &self, + _data: &ArrayAndStats, + _compress_ctx: CompressorContext, + _exec_ctx: &mut ExecutionCtx, + ) -> CompressionEstimate { + CompressionEstimate::Deferred(DeferredEstimate::Sample) + } + + fn compress( + &self, + compressor: &CascadingCompressor, + data: &ArrayAndStats, + compress_ctx: CompressorContext, + exec_ctx: &mut ExecutionCtx, + ) -> VortexResult { + let binary = data.array_as_varbinview().into_owned(); + let compressor_fsst = fsst_train_compressor(&binary); + let fsst = fsst_compress( + &binary, + binary.len(), + binary.dtype(), + &compressor_fsst, + exec_ctx, + ); + + let uncompressed_lengths_primitive = fsst + .uncompressed_lengths() + .clone() + .execute::(exec_ctx)? + .narrow(exec_ctx)?; + let compressed_original_lengths = compressor.compress_child( + &uncompressed_lengths_primitive.into_array(), + &compress_ctx, + self.id(), + 0, + exec_ctx, + )?; + + let codes_offsets_primitive = fsst + .codes() + .offsets() + .clone() + .execute::(exec_ctx)? + .narrow(exec_ctx)?; + let compressed_codes_offsets = compressor.compress_child( + &codes_offsets_primitive.into_array(), + &compress_ctx, + self.id(), + 1, + exec_ctx, + )?; + let compressed_codes = VarBinArray::try_new( + compressed_codes_offsets, + fsst.codes().bytes().clone(), + fsst.codes().dtype().clone(), + fsst.codes().validity()?, + )?; + + let fsst = FSST::try_new( + fsst.dtype().clone(), + fsst.symbols().clone(), + fsst.symbol_lengths().clone(), + compressed_codes, + compressed_original_lengths, + exec_ctx, + )?; + + Ok(fsst.into_array()) + } +} + +#[cfg(test)] +mod tests { + use std::sync::LazyLock; + + use vortex_array::IntoArray; + use vortex_array::VortexSessionExecute; + use vortex_array::arrays::VarBinViewArray; + use vortex_array::assert_arrays_eq; + use vortex_array::dtype::DType; + use vortex_array::dtype::Nullability; + use vortex_array::session::ArraySession; + use vortex_error::VortexResult; + use vortex_fsst::FSST; + use vortex_session::VortexSession; + + use crate::BtrBlocksCompressor; + + static SESSION: LazyLock = + LazyLock::new(|| VortexSession::empty().with::()); + + fn binary_fsst_data() -> VarBinViewArray { + VarBinViewArray::from_iter( + (0..1024).map(|idx| { + Some(format!("variant-key-{idx:04}-invoice-total-line-items").into_bytes()) + }), + DType::Binary(Nullability::NonNullable), + ) + } + + #[test] + fn default_compressor_uses_fsst_for_binary_data() -> VortexResult<()> { + let array = binary_fsst_data().into_array(); + let compressed = + BtrBlocksCompressor::default().compress(&array, &mut SESSION.create_execution_ctx())?; + + assert!( + compressed.is::(), + "expected binary data to be FSST-compressed, got {}", + compressed.encoding_id(), + ); + assert!(compressed.nbytes() < array.nbytes()); + + let decompressed = + compressed.execute::(&mut SESSION.create_execution_ctx())?; + assert_arrays_eq!(array, decompressed); + + Ok(()) + } + + #[cfg(feature = "zstd")] + #[test] + fn compact_compressor_uses_zstd_for_binary_data() -> VortexResult<()> { + let array = binary_fsst_data().into_array(); + let compressed = crate::BtrBlocksCompressorBuilder::default() + .with_compact() + .build() + .compress(&array, &mut SESSION.create_execution_ctx())?; + + assert!( + compressed.is::(), + "expected compact binary data to be Zstd-compressed, got {}", + compressed.encoding_id(), + ); + assert!(compressed.nbytes() < array.nbytes()); + + let decompressed = + compressed.execute::(&mut SESSION.create_execution_ctx())?; + assert_arrays_eq!(array, decompressed); + + Ok(()) + } +} diff --git a/vortex-btrblocks/src/schemes/binary/mod.rs b/vortex-btrblocks/src/schemes/binary/mod.rs index 44f09f2daa3..f9885f67255 100644 --- a/vortex-btrblocks/src/schemes/binary/mod.rs +++ b/vortex-btrblocks/src/schemes/binary/mod.rs @@ -3,11 +3,13 @@ //! Binary compression schemes. +mod fsst; #[cfg(feature = "zstd")] mod zstd; #[cfg(all(feature = "zstd", feature = "unstable_encodings"))] mod zstd_buffers; +pub use fsst::BinaryFSSTScheme; // Re-export builtin schemes from vortex-compressor. pub use vortex_compressor::builtins::BinaryConstantScheme; pub use vortex_compressor::builtins::BinaryDictScheme; diff --git a/vortex-btrblocks/src/schemes/mod.rs b/vortex-btrblocks/src/schemes/mod.rs index 16123429e86..7d0619b6080 100644 --- a/vortex-btrblocks/src/schemes/mod.rs +++ b/vortex-btrblocks/src/schemes/mod.rs @@ -5,12 +5,13 @@ pub mod binary; pub mod bool; +pub mod decimal; pub mod float; pub mod integer; pub mod string; - -pub mod decimal; pub mod temporal; +#[cfg(feature = "parquet-variant")] +pub mod variant; pub(crate) mod patches; diff --git a/vortex-btrblocks/src/schemes/variant/json_to_variant.rs b/vortex-btrblocks/src/schemes/variant/json_to_variant.rs new file mode 100644 index 00000000000..9557a1e9fee --- /dev/null +++ b/vortex-btrblocks/src/schemes/variant/json_to_variant.rs @@ -0,0 +1,132 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use vortex_array::ArrayRef; +use vortex_array::Canonical; +use vortex_array::ExecutionCtx; +use vortex_array::IntoArray; +use vortex_array::arrays::ExtensionArray; +use vortex_array::arrays::extension::ExtensionArrayExt; +use vortex_array::arrow::ArrowSessionExt; +use vortex_compressor::ctx::CompressorContext; +use vortex_compressor::estimate::CompressionEstimate; +use vortex_compressor::estimate::DeferredEstimate; +use vortex_compressor::scheme::Scheme; +use vortex_compressor::scheme::SchemeExt; +use vortex_compressor::stats::ArrayAndStats; +use vortex_error::VortexResult; +use vortex_error::vortex_bail; +use vortex_json::Json; +use vortex_parquet_variant::ParquetVariant; +use vortex_parquet_variant::ParquetVariantArrayExt; +use vortex_parquet_variant::VariantToJson; + +use crate::CascadingCompressor; + +/// Compression scheme that converts JSON string extension arrays to Parquet Variant arrays. +/// +/// When decompressed, the resulting JSON array might not be byte-to-byte identical, as this +/// compression doesn't maintain whitespaces. +#[derive(Debug)] +pub struct JsonToVariantScheme; + +/// Child indices for recursively compressed Parquet Variant binary children. +mod parquet_variant_children { + /// The Parquet Variant metadata child. + pub const METADATA: usize = 0; + /// The raw Parquet Variant value child. + pub const VALUE: usize = 1; +} + +impl Scheme for JsonToVariantScheme { + fn scheme_name(&self) -> &'static str { + "vortex.variant.pq.json" + } + + fn matches(&self, canonical: &Canonical) -> bool { + let Canonical::Extension(ext_array) = canonical else { + return false; + }; + + ext_array.ext_dtype().is::() + } + + fn num_children(&self) -> usize { + 2 + } + + fn expected_compression_ratio( + &self, + _data: &ArrayAndStats, + _compress_ctx: CompressorContext, + _exec_ctx: &mut ExecutionCtx, + ) -> CompressionEstimate { + CompressionEstimate::Deferred(DeferredEstimate::Sample) + } + + fn compress( + &self, + compressor: &CascadingCompressor, + data: &ArrayAndStats, + compress_ctx: CompressorContext, + exec_ctx: &mut ExecutionCtx, + ) -> VortexResult { + let array = data.array().clone().execute::(exec_ctx)?; + let output_nullability = array.dtype().nullability(); + let storage = array.storage_array().clone(); + + if !storage.dtype().is_utf8() { + vortex_bail!("storage must be utf8"); + } + + let arrow_array = { + let session = exec_ctx.session().clone(); + let arrow = session.arrow(); + arrow.execute_arrow(storage, None, exec_ctx)? + }; + + let array = parquet_variant_compute::json_to_variant(&arrow_array)?; + let parquet_variant = + ParquetVariant::from_arrow_variant(&array)?.downcast::(); + + let compressed_metadata = compressor.compress_child( + parquet_variant.metadata_array(), + &compress_ctx, + self.id(), + parquet_variant_children::METADATA, + exec_ctx, + )?; + let compressed_value = parquet_variant + .value_array() + .map(|value| { + compressor.compress_child( + value, + &compress_ctx, + self.id(), + parquet_variant_children::VALUE, + exec_ctx, + ) + }) + .transpose()?; + + // `VariantToJson` can only decode unshredded storage, so producing a typed_value child + // here would create an array that fails at decompression time. + vortex_ensure!( + parquet_variant.typed_value_array().is_none(), + "JsonToVariantScheme does not support shredded (typed_value) Parquet Variant storage" + ); + + let variant_validity = parquet_variant + .validity()? + .union_nullability(output_nullability); + let variant = ParquetVariant::try_new( + variant_validity, + compressed_metadata, + compressed_value, + None, + )? + .into_array(); + + Ok(VariantToJson::try_new(variant)?.into_array()) + } +} diff --git a/vortex-btrblocks/src/schemes/variant/mod.rs b/vortex-btrblocks/src/schemes/variant/mod.rs new file mode 100644 index 00000000000..138ad3b2c88 --- /dev/null +++ b/vortex-btrblocks/src/schemes/variant/mod.rs @@ -0,0 +1,11 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! Variant compression schemes. + +mod json_to_variant; + +pub use json_to_variant::JsonToVariantScheme; + +#[cfg(test)] +mod tests; diff --git a/vortex-btrblocks/src/schemes/variant/tests.rs b/vortex-btrblocks/src/schemes/variant/tests.rs new file mode 100644 index 00000000000..ec53cccc9e6 --- /dev/null +++ b/vortex-btrblocks/src/schemes/variant/tests.rs @@ -0,0 +1,319 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use std::sync::LazyLock; + +use rand::RngExt; +use rand::SeedableRng; +use rand::rngs::StdRng; +use vortex_array::ArrayRef; +use vortex_array::IntoArray; +use vortex_array::VortexSessionExecute; +use vortex_array::arrays::ExtensionArray; +use vortex_array::arrays::VarBinViewArray; +use vortex_array::arrays::extension::ExtensionArrayExt; +use vortex_array::session::ArraySession; +use vortex_array::validity::Validity; +use vortex_compressor::builtins::BinaryDictScheme; +use vortex_compressor::builtins::IntConstantScheme; +use vortex_compressor::builtins::StringConstantScheme; +use vortex_compressor::builtins::StringDictScheme; +use vortex_json::Json; +use vortex_parquet_variant::VariantToJson; +use vortex_session::VortexSession; + +use super::*; +use crate::CascadingCompressor; +use crate::schemes::binary::BinaryFSSTScheme; +use crate::schemes::integer::BitPackingScheme; +use crate::schemes::integer::FoRScheme; +use crate::schemes::integer::RunEndScheme; +use crate::schemes::integer::SequenceScheme; +use crate::schemes::integer::SparseScheme; +use crate::schemes::integer::ZigZagScheme; +use crate::schemes::string::FSSTScheme; + +static SESSION: LazyLock = LazyLock::new(|| { + let session = VortexSession::empty().with::(); + vortex_parquet_variant::initialize(&session); + session +}); + +fn json_data() -> Vec { + let mut rng = StdRng::seed_from_u64(0); + const ACCOUNT_KEYS: &[&str] = &["account_id", "customer_id", "tenant_id", "buyer_id"]; + const REGION_KEYS: &[&str] = &["region", "market", "country"]; + const REGIONS: &[&str] = &["us-east", "us-west", "eu", "apac", "latam"]; + const STATUS_KEYS: &[&str] = &["status", "payment_state", "lifecycle_state"]; + const STATUSES: &[&str] = &["draft", "open", "paid", "void", "past_due"]; + const AMOUNT_KEYS: &[&str] = &["discount", "tax", "shipping", "credit"]; + const FLAG_KEYS: &[&str] = &["autopay", "fraud_review", "priority", "disputed"]; + const TAGS: &[&str] = &["renewal", "manual", "usage", "trial", "enterprise"]; + + (0..1024) + .map(|_| { + let mut fields = vec![ + format!( + r#""{}":"acct_{:04x}""#, + ACCOUNT_KEYS[rng.random_range(0..ACCOUNT_KEYS.len())], + rng.random::(), + ), + format!( + r#""invoice_total":{}.{:02}"#, + rng.random_range(10_u32..100_000), + rng.random_range(0_u32..100), + ), + format!(r#""line_items":{}"#, rng.random_range(1_u32..250)), + ]; + + if rng.random_bool(0.85) { + fields.push(format!( + r#""{}":"{}""#, + STATUS_KEYS[rng.random_range(0..STATUS_KEYS.len())], + STATUSES[rng.random_range(0..STATUSES.len())], + )); + } + if rng.random_bool(0.75) { + fields.push(format!( + r#""{}":"{}""#, + REGION_KEYS[rng.random_range(0..REGION_KEYS.len())], + REGIONS[rng.random_range(0..REGIONS.len())], + )); + } + if rng.random_bool(0.55) { + fields.push(format!( + r#""{}":{}.{:02}"#, + AMOUNT_KEYS[rng.random_range(0..AMOUNT_KEYS.len())], + rng.random_range(0_u32..2_500), + rng.random_range(0_u32..100), + )); + } + if rng.random_bool(0.40) { + fields.push(format!( + r#""{}":{}"#, + FLAG_KEYS[rng.random_range(0..FLAG_KEYS.len())], + rng.random_bool(0.5), + )); + } + if rng.random_bool(0.30) { + fields.push(format!( + r#""tags":["{}","{}"]"#, + TAGS[rng.random_range(0..TAGS.len())], + TAGS[rng.random_range(0..TAGS.len())], + )); + } + + format!("{{{}}}", fields.join(",")) + }) + .collect() +} + +fn json_array(values: &[String]) -> vortex_error::VortexResult { + let storage = VarBinViewArray::from_iter_str(values.iter().map(String::as_str)).into_array(); + Ok( + ExtensionArray::try_new_from_vtable(Json, vortex_array::EmptyMetadata, storage)? + .into_array(), + ) +} + +fn parquet_variant_child_compressor() -> CascadingCompressor { + CascadingCompressor::new(vec![ + &JsonToVariantScheme, + &BinaryDictScheme, + &BinaryFSSTScheme, + &IntConstantScheme, + &FoRScheme, + &SparseScheme, + &BitPackingScheme, + &RunEndScheme, + &SequenceScheme, + &ZigZagScheme, + ]) +} + +#[test] +fn json_to_variant_scheme_wraps_output_as_json() -> vortex_error::VortexResult<()> { + let array = json_array(&json_data())?; + + let variant_compressor = parquet_variant_child_compressor(); + let mut exec_ctx = SESSION.create_execution_ctx(); + let compressed = variant_compressor.compress(&array, &mut exec_ctx)?; + + assert_eq!(compressed.dtype(), array.dtype()); + assert!(compressed.is::()); + + let json = compressed.execute::(&mut exec_ctx)?; + assert_eq!(json.dtype(), array.dtype()); + assert!(json.storage_array().dtype().is_utf8()); + + Ok(()) +} + +#[test] +fn preserves_nullable_json_dtype_for_all_valid_storage() -> vortex_error::VortexResult<()> { + let values = [r#"{"a":1}"#, r#"{"b":2}"#]; + let storage = VarBinViewArray::from_iter_str(values); + let parts = storage.into_data_parts(); + let storage = VarBinViewArray::new_handle( + parts.views, + parts.buffers, + parts.dtype.as_nullable(), + Validity::AllValid, + ) + .into_array(); + + let array = ExtensionArray::try_new_from_vtable(Json, vortex_array::EmptyMetadata, storage)? + .into_array(); + + assert!(array.dtype().is_nullable()); + + let variant_compressor = CascadingCompressor::new(vec![&JsonToVariantScheme]); + let mut exec_ctx = SESSION.create_execution_ctx(); + let compressed = variant_compressor.compress(&array, &mut exec_ctx)?; + + assert_eq!(compressed.dtype(), array.dtype()); + + let json = compressed.execute::(&mut exec_ctx)?; + assert_eq!(json.dtype(), array.dtype()); + + Ok(()) +} + +fn print_comparison_output(array: &ArrayRef, string_compressed: &ArrayRef, compressed: &ArrayRef) { + let compressed_ratio = array.nbytes() as f64 / compressed.nbytes() as f64; + let compressed_array_ratio = string_compressed.nbytes() as f64 / compressed.nbytes() as f64; + println!( + "Compression sizes: input={} bytes, compressed string={} bytes, compressed output={} bytes", + array.nbytes(), + string_compressed.nbytes(), + compressed.nbytes(), + ); + println!("Compressed output ratio: {compressed_ratio:.2}x\n"); + println!("Compressed string / compressed output ratio: {compressed_array_ratio:.2}x\n"); + println!("JSON input encoding tree:\n{}", array.tree_display()); + println!( + "String-compressed encoding tree:\n{}", + string_compressed.tree_display() + ); + println!( + "Compressed output encoding tree:\n{}", + compressed.tree_display() + ); +} + +#[test] +fn parquet_variant_compresses_repeated_json_keys() -> vortex_error::VortexResult<()> { + let array = json_array(&json_data())?; + + let string_compressor = + CascadingCompressor::new(vec![&StringDictScheme, &StringConstantScheme]); + let mut exec_ctx = SESSION.create_execution_ctx(); + let string_compressed = string_compressor.compress(&array, &mut exec_ctx)?; + + let variant_compressor = parquet_variant_child_compressor(); + let mut exec_ctx = SESSION.create_execution_ctx(); + let variant_compressed = variant_compressor.compress(&array, &mut exec_ctx)?; + + assert!( + variant_compressed.nbytes() < string_compressed.nbytes(), + "Parquet Variant conversion should compress repeated JSON keys: \ + variant={} bytes, input={} bytes", + variant_compressed.nbytes(), + string_compressed.nbytes(), + ); + + print_comparison_output(&array, &string_compressed, &variant_compressed); + + Ok(()) +} + +#[test] +fn recursively_compresses_parquet_variant_binary_children() -> vortex_error::VortexResult<()> { + let array: ArrayRef = json_array(&json_data())?; + + let mut exec_ctx = SESSION.create_execution_ctx(); + let uncompressed_children = + CascadingCompressor::new(vec![&JsonToVariantScheme]).compress(&array, &mut exec_ctx)?; + + let variant_compressor = parquet_variant_child_compressor(); + let mut exec_ctx = SESSION.create_execution_ctx(); + let compressed = variant_compressor.compress(&array, &mut exec_ctx)?; + + assert!( + compressed.nbytes() < uncompressed_children.nbytes(), + "recursive child compression should reduce Parquet Variant size: compressed={} bytes, uncompressed_children={} bytes", + compressed.nbytes(), + uncompressed_children.nbytes(), + ); + assert_eq!(compressed.dtype(), array.dtype()); + Ok(()) +} + +#[test] +fn binary_fsst_improves_parquet_variant_child_compression() -> vortex_error::VortexResult<()> { + let array: ArrayRef = json_array(&json_data())?; + let mut exec_ctx = SESSION.create_execution_ctx(); + let without_binary_fsst = CascadingCompressor::new(vec![ + &JsonToVariantScheme, + &BinaryDictScheme, + &IntConstantScheme, + &FoRScheme, + &SparseScheme, + &BitPackingScheme, + &RunEndScheme, + &SequenceScheme, + &ZigZagScheme, + ]) + .compress(&array, &mut exec_ctx)?; + + let mut exec_ctx = SESSION.create_execution_ctx(); + let with_binary_fsst = parquet_variant_child_compressor().compress(&array, &mut exec_ctx)?; + + assert!( + with_binary_fsst.nbytes() < without_binary_fsst.nbytes(), + "binary FSST should improve Parquet Variant child compression: with={} bytes, without={} bytes", + with_binary_fsst.nbytes(), + without_binary_fsst.nbytes(), + ); + + Ok(()) +} + +#[test] +fn prefers_smaller_extension_storage_over_variant_scheme() -> vortex_error::VortexResult<()> { + let array: ArrayRef = json_array(&json_data())?; + + let string_compressor = CascadingCompressor::new(vec![ + &StringDictScheme, + &FSSTScheme, + &IntConstantScheme, + &StringConstantScheme, + &FoRScheme, + &BitPackingScheme, + &RunEndScheme, + &SequenceScheme, + &ZigZagScheme, + ]); + let mut exec_ctx = SESSION.create_execution_ctx(); + let string_compressed = string_compressor.compress(&array, &mut exec_ctx)?; + + let variant_compressor = CascadingCompressor::new(vec![ + &JsonToVariantScheme, + &BinaryDictScheme, + &BinaryFSSTScheme, + &IntConstantScheme, + &StringConstantScheme, + &FoRScheme, + &SparseScheme, + &BitPackingScheme, + &RunEndScheme, + &SequenceScheme, + &ZigZagScheme, + ]); + let mut exec_ctx = SESSION.create_execution_ctx(); + let compressed = variant_compressor.compress(&array, &mut exec_ctx)?; + + print_comparison_output(&array, &string_compressed, &compressed); + + Ok(()) +}