Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions encodings/experimental/onpair/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,7 @@ vortex-array = { workspace = true, features = ["_test-harness"] }
[[bench]]
name = "decode"
harness = false

[[bench]]
name = "like"
harness = false
191 changes: 191 additions & 0 deletions encodings/experimental/onpair/benches/like.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright the Vortex contributors
//
//! LIKE-pushdown microbenchmarks for the OnPair Vortex array.
//!
//! Compares two ways of evaluating `LIKE 'prefix%'` / `LIKE '%needle%'` on an
//! OnPair-encoded URL column:
//!
//! * `pushdown` — the compressed-domain per-code DFA kernel
//! (`<OnPair as LikeKernel>::like`), which never decompresses.
//! * `fallback` — what the engine does when no pushdown kernel handles the
//! predicate: canonicalise to `VarBinViewArray` (decompress) and run the
//! standard string LIKE.
//!
//! Both arms produce the same boolean result; the delta is the cost the
//! pushdown avoids.

#![allow(
clippy::cast_possible_truncation,
clippy::cast_lossless,
clippy::panic,
clippy::tests_outside_test_module,
clippy::unwrap_used,
clippy::expect_used
)]

use std::sync::LazyLock;

use divan::Bencher;
use divan::black_box;
use vortex_array::ArrayRef;
use vortex_array::Canonical;
use vortex_array::ExecutionCtx;
use vortex_array::IntoArray;
use vortex_array::VortexSessionExecute;
use vortex_array::arrays::ConstantArray;
use vortex_array::arrays::VarBinArray;
use vortex_array::arrays::VarBinViewArray;
use vortex_array::arrays::scalar_fn::ScalarFnFactoryExt;
use vortex_array::dtype::DType;
use vortex_array::dtype::Nullability;
use vortex_array::scalar_fn::fns::like::Like;
use vortex_array::scalar_fn::fns::like::LikeKernel;
use vortex_array::scalar_fn::fns::like::LikeOptions;
use vortex_array::session::ArraySession;
use vortex_onpair::DEFAULT_DICT12_CONFIG;
use vortex_onpair::OnPair;
use vortex_onpair::OnPairArray;
use vortex_onpair::onpair_compress;
use vortex_session::VortexSession;

fn main() {
divan::main();
}

const N_ROWS: usize = 200_000;

static SESSION: LazyLock<VortexSession> =
LazyLock::new(|| VortexSession::empty().with::<ArraySession>());

fn ctx() -> ExecutionCtx {
SESSION.create_execution_ctx()
}

/// A ClickBench-shaped URL corpus: high lexical overlap, a handful of hosts and
/// path templates, so OnPair's learned dictionary applies.
static CORPUS: LazyLock<OnPairArray> = LazyLock::new(|| {
let hosts = [
"https://www.example.com",
"http://ads.adriver.ru",
"https://yandex.ru/search",
"https://www.google.com/maps",
"http://shop.bonprix.ru",
"https://video.yandex.ru/users",
];
let paths = [
"/page/",
"/product/",
"/category/",
"/search?text=",
"/index?id=",
"/cart/item/",
];
let rows: Vec<String> = (0..N_ROWS)
.map(|i| {
let h = hosts[i % hosts.len()];
let p = paths[(i / 7) % paths.len()];
format!("{h}{p}{}", i % 9973)
})
.collect();
let refs: Vec<Option<&str>> = rows.iter().map(|s| Some(s.as_str())).collect();
let varbin = VarBinArray::from_iter(refs, DType::Utf8(Nullability::NonNullable));
let len = varbin.len();
let dtype = varbin.dtype().clone();
onpair_compress(&varbin, len, &dtype, DEFAULT_DICT12_CONFIG).unwrap()
});

/// A small (4k-row) corpus with a full-sized dictionary, so the per-call DFA
/// table construction dominates — the dict-encoded ClickBench regime, where the
/// kernel scans only the deduplicated values.
static CORPUS_SMALL: LazyLock<OnPairArray> = LazyLock::new(|| {
let rows: Vec<String> = (0..4000)
.map(|i| {
format!(
"https://host{}.example.com/path/{}/item/{}?ref={}",
i % 89,
(i * 7) % 1000,
(i * 13) % 5000,
i
)
})
.collect();
let refs: Vec<Option<&str>> = rows.iter().map(|s| Some(s.as_str())).collect();
let varbin = VarBinArray::from_iter(refs, DType::Utf8(Nullability::NonNullable));
let len = varbin.len();
let dtype = varbin.dtype().clone();
onpair_compress(&varbin, len, &dtype, DEFAULT_DICT12_CONFIG).unwrap()
});

fn pattern_array(pattern: &str) -> ArrayRef {
ConstantArray::new(pattern, CORPUS.len()).into_array()
}

/// Run the pushdown kernel on a given corpus (build + scan).
fn run_pushdown_on(corpus: &OnPairArray, pattern: &str) -> ArrayRef {
let mut ctx = ctx();
let p = ConstantArray::new(pattern, corpus.len()).into_array();
<OnPair as LikeKernel>::like(corpus.as_view(), &p, LikeOptions::default(), &mut ctx)
.unwrap()
.expect("OnPair pushdown should handle this pattern")
}

/// Build-dominated: needles range from common bytes (many relevant codes) to
/// rare bytes (almost all codes skipped).
#[divan::bench(args = ["%google%", "%example%", "%zqxj%"])]
fn contains_pushdown_buildheavy(bencher: Bencher, pattern: &str) {
bencher.bench(|| black_box(run_pushdown_on(&CORPUS_SMALL, pattern)));
}

/// Compressed-domain pushdown: the DFA kernel, no decompression.
fn run_pushdown(pattern: &ArrayRef) -> ArrayRef {
let mut ctx = ctx();
<OnPair as LikeKernel>::like(CORPUS.as_view(), pattern, LikeOptions::default(), &mut ctx)
.unwrap()
.expect("OnPair pushdown should handle this pattern")
}

/// Fallback: decompress to `VarBinViewArray`, then run the standard LIKE.
fn run_fallback(pattern: &ArrayRef) -> ArrayRef {
let mut ctx = ctx();
let canonical = CORPUS
.clone()
.into_array()
.execute::<VarBinViewArray>(&mut ctx)
.unwrap()
.into_array();
Like.try_new_array(
CORPUS.len(),
LikeOptions::default(),
[canonical, pattern.clone()],
)
.unwrap()
.into_array()
.execute::<Canonical>(&mut ctx)
.unwrap()
.into_array()
}

#[divan::bench(args = ["https://www.example.com%", "https://www.google.com/maps%"])]
fn prefix_pushdown(bencher: Bencher, pattern: &str) {
let p = pattern_array(pattern);
bencher.bench(|| black_box(run_pushdown(&p)));
}

#[divan::bench(args = ["https://www.example.com%", "https://www.google.com/maps%"])]
fn prefix_fallback(bencher: Bencher, pattern: &str) {
let p = pattern_array(pattern);
bencher.bench(|| black_box(run_fallback(&p)));
}

#[divan::bench(args = ["%yandex%", "%/search?text=%", "%bonprix%"])]
fn contains_pushdown(bencher: Bencher, pattern: &str) {
let p = pattern_array(pattern);
bencher.bench(|| black_box(run_pushdown(&p)));
}

#[divan::bench(args = ["%yandex%", "%/search?text=%", "%bonprix%"])]
fn contains_fallback(bencher: Bencher, pattern: &str) {
let p = pattern_array(pattern);
bencher.bench(|| black_box(run_fallback(&p)));
}
131 changes: 131 additions & 0 deletions encodings/experimental/onpair/src/compute/like.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright the Vortex contributors

//! `LIKE` pushdown for OnPair, evaluating `prefix%` and `%needle%` patterns
//! directly on the compressed code stream via a per-code DFA (see [`crate::dfa`]).

use std::sync::LazyLock;
#[cfg(test)]
use std::sync::atomic::AtomicUsize;
#[cfg(test)]
use std::sync::atomic::Ordering;

use num_traits::AsPrimitive;
use vortex_array::ArrayRef;
use vortex_array::ArrayView;
use vortex_array::ExecutionCtx;
use vortex_array::IntoArray;
use vortex_array::arrays::BoolArray;
use vortex_array::arrays::PrimitiveArray;
use vortex_array::match_each_integer_ptype;
use vortex_array::scalar_fn::fns::like::LikeKernel;
use vortex_array::scalar_fn::fns::like::LikeOptions;
use vortex_error::VortexResult;

use crate::OnPair;
use crate::OnPairArraySlotsExt;
use crate::decode::collect_widened;
use crate::dfa::OnPairMatcher;

/// Test-only counter of how many times the kernel actually committed to the
/// compressed-domain pushdown (i.e. returned `Some` via the DFA path). Lets
/// tests assert that the pushdown genuinely fires through the execution engine,
/// including through wrappers like `Dict` and `Shared`.
#[cfg(test)]
pub(crate) static PUSHDOWN_HITS: AtomicUsize = AtomicUsize::new(0);

/// Escape hatch for measuring or debugging the compressed-domain LIKE pushdown:
/// set `VORTEX_ONPAIR_LIKE_PUSHDOWN=0` (or `off`/`false`) to force the kernel to
/// decline, falling back to canonical decompression + LIKE. Read once.
static PUSHDOWN_DISABLED: LazyLock<bool> = LazyLock::new(|| {
matches!(
std::env::var("VORTEX_ONPAIR_LIKE_PUSHDOWN").as_deref(),
Ok("0") | Ok("off") | Ok("false")
)
});

impl LikeKernel for OnPair {
fn like(
array: ArrayView<'_, Self>,
pattern: &ArrayRef,
options: LikeOptions,
ctx: &mut ExecutionCtx,
) -> VortexResult<Option<ArrayRef>> {
if *PUSHDOWN_DISABLED {
return Ok(None);
}

let Some(pattern_scalar) = pattern.as_constant() else {
return Ok(None);
};

if options.case_insensitive {
return Ok(None);
}

let pattern_bytes: &[u8] = if let Some(s) = pattern_scalar.as_utf8_opt() {
let Some(v) = s.value() else {
return Ok(None);
};
v.as_ref()
} else if let Some(b) = pattern_scalar.as_binary_opt() {
let Some(v) = b.value() else {
return Ok(None);
};
v
} else {
return Ok(None);
};

let dict_bytes = array.dict_bytes();
let dict_offsets = collect_widened::<u32>(array.dict_offsets(), ctx)?;

let Some(matcher) = OnPairMatcher::try_new(
dict_bytes.as_slice(),
dict_offsets.as_slice(),
pattern_bytes,
)?
else {
return Ok(None);
};

#[cfg(test)]
PUSHDOWN_HITS.fetch_add(1, Ordering::Relaxed);

let negated = options.negated;
let n = array.len();

// `codes_offsets` are per-row boundaries into the (possibly sliced)
// `codes` child. A sliced OnPair keeps the full `codes` child and only
// narrows these offsets, so `offsets[0]` may be > 0; slice the `codes`
// window to `[offsets[0], offsets[n])` before materialising it, exactly
// as the canonical decoder does.
let offsets = array
.codes_offsets()
.clone()
.execute::<PrimitiveArray>(ctx)?;
let (code_start, code_end): (usize, usize) =
match_each_integer_ptype!(offsets.ptype(), |T| {
let s = offsets.as_slice::<T>();
(
AsPrimitive::<usize>::as_(s[0]),
AsPrimitive::<usize>::as_(s[n]),
)
});

let codes = collect_widened::<u16>(&array.codes().slice(code_start..code_end)?, ctx)?;
let codes = codes.as_slice();

let result = match_each_integer_ptype!(offsets.ptype(), |T| {
let off = offsets.as_slice::<T>();
matcher.scan_to_bitbuf(n, off, code_start, codes, negated)
});

let validity = array
.array()
.validity()?
.union_nullability(pattern_scalar.dtype().nullability());

Ok(Some(BoolArray::new(result, validity).into_array()))
}
}
1 change: 1 addition & 0 deletions encodings/experimental/onpair/src/compute/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@

mod cast;
mod filter;
pub(crate) mod like;
mod slice;
Loading
Loading