Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions vortex-array/src/arc_swap_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,14 @@ impl<K, V> ArcSwapMap<K, V> {
f(&self.inner.load())
}

/// Return the current snapshot as a shared [`Arc`], without copying the map.
///
/// Unlike [`read`](Self::read), the returned snapshot outlives the call, so callers can
/// cache it and keep observing a consistent point-in-time view across many lookups.
pub(crate) fn load_full(&self) -> Arc<HashMap<K, V>> {
self.inner.load_full()
}

/// Replace the map with the result of applying `f` to a private copy.
///
/// Writes are copy-on-write via [`ArcSwap::rcu`], so `f` may run more than
Expand Down
28 changes: 22 additions & 6 deletions vortex-array/src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ use vortex_error::VortexResult;
use vortex_error::vortex_bail;
use vortex_error::vortex_ensure;
use vortex_error::vortex_panic;
use vortex_session::Ref;
use vortex_session::SessionExt;
use vortex_session::VortexSession;

Expand All @@ -43,6 +42,7 @@ use crate::memory::HostAllocatorRef;
use crate::memory::MemorySessionExt;
use crate::optimizer::ArrayOptimizer;
use crate::optimizer::kernels::ArrayKernels;
use crate::optimizer::kernels::KernelSnapshot;
use crate::stats::ArrayStats;
use crate::stats::StatsSet;

Expand Down Expand Up @@ -305,6 +305,16 @@ struct StackFrame {
#[derive(Debug, Clone)]
pub struct ExecutionCtx {
session: VortexSession,
/// Snapshot of the session's [`ArrayKernels`] execute-parent kernels, resolved once at
/// construction.
///
/// The registry is session-scoped and mutable through its public `register_*` methods, so
/// this context sees the kernels as registered when it was created; later registrations are
/// picked up by the next context (contexts are created per evaluation). Caching the snapshot
/// avoids a per-array-node session clone plus a sharded `DashMap` `RwLock` probe in the hot
/// `execute_until` loop, and avoids holding the session-variable read guard across kernel
/// invocation. `None` means the session had no [`ArrayKernels`] when the context was created.
kernels: Option<KernelSnapshot>,
#[cfg(debug_assertions)]
id: usize,
#[cfg(debug_assertions)]
Expand All @@ -314,8 +324,10 @@ pub struct ExecutionCtx {
impl ExecutionCtx {
/// Create a new execution context with the given session.
pub fn new(session: VortexSession) -> Self {
let kernels = session.get_opt::<ArrayKernels>().map(|k| k.snapshot());
Self {
session,
kernels,
#[cfg(debug_assertions)]
id: {
static EXEC_CTX_ID: AtomicUsize = AtomicUsize::new(0);
Expand All @@ -331,6 +343,12 @@ impl ExecutionCtx {
&self.session
}

/// Get the [`KernelSnapshot`] resolved once for this execution context, if the session had an
/// [`ArrayKernels`] registry. Cheap to clone (one `Arc` clone).
pub(crate) fn kernels(&self) -> Option<&KernelSnapshot> {
self.kernels.as_ref()
}

/// Get the session-scoped host allocator for this execution context.
pub fn allocator(&self) -> HostAllocatorRef {
self.session.allocator()
Expand Down Expand Up @@ -424,8 +442,7 @@ impl Executable for ArrayRef {
}
}

let tmp_session = ctx.session().clone();
let kernels = tmp_session.get_opt::<ArrayKernels>();
let kernels = ctx.kernels().cloned();

for (slot_idx, slot) in array.slots().iter().enumerate() {
let Some(child) = slot else { continue };
Expand Down Expand Up @@ -542,7 +559,7 @@ fn execute_parent_for_child(
parent: &ArrayRef,
child: &ArrayRef,
slot_idx: usize,
kernels: Option<&Ref<ArrayKernels>>,
kernels: Option<&KernelSnapshot>,
ctx: &mut ExecutionCtx,
) -> VortexResult<Option<ArrayRef>> {
if let Some(kernels) = kernels
Expand All @@ -561,8 +578,7 @@ fn execute_parent_for_child(

/// Try execute_parent on each occupied slot of the array.
fn try_execute_parent(array: &ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult<Option<ArrayRef>> {
let tmp_session = ctx.session().clone();
let kernels = tmp_session.get_opt::<ArrayKernels>();
let kernels = ctx.kernels().cloned();

for (slot_idx, slot) in array.slots().iter().enumerate() {
let Some(child) = slot else { continue };
Expand Down
35 changes: 35 additions & 0 deletions vortex-array/src/optimizer/kernels.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use vortex_session::SessionExt;
use vortex_session::SessionVar;
use vortex_session::registry::Id;
use vortex_utils::aliases::DefaultHashBuilder;
use vortex_utils::aliases::hash_map::HashMap;

use crate::ArrayRef;
use crate::ExecutionCtx;
Expand Down Expand Up @@ -194,6 +195,40 @@ impl ArrayKernels {
pub fn find_execute_parent(&self, parent: Id, child: Id) -> Option<Arc<[ExecuteParentFn]>> {
self.execute_parent.get(&hash_fn_id(parent, child))
}

/// Capture an owned, cheaply-cloneable [`KernelSnapshot`] of the currently-registered
/// execute-parent kernels.
///
/// The underlying registry map is loaded once into an [`Arc`], so the snapshot is an `Arc`
/// clone (no map copy) that outlives the session-variable borrow. Registrations made after
/// the snapshot is taken are not visible through it.
pub(crate) fn snapshot(&self) -> KernelSnapshot {
KernelSnapshot {
execute_parent: self.execute_parent.load_full(),
}
}
}

/// An owned, point-in-time view of the execute-parent kernels registered on an [`ArrayKernels`]
/// registry.
///
/// Holding the registry map directly (rather than re-probing the session per array node) lets the
/// executor resolve [`ArrayKernels`] once per execution context. Cloning is one [`Arc`] clone.
#[derive(Debug, Clone)]
pub(crate) struct KernelSnapshot {
execute_parent: Arc<HashMap<ExecuteParentFnId, Arc<[ExecuteParentFn]>>>,
}

impl KernelSnapshot {
/// Look up the [`ExecuteParentFn`]s registered for `(parent, child)`.
pub(crate) fn find_execute_parent(
&self,
parent: Id,
child: Id,
) -> Option<Arc<[ExecuteParentFn]>> {
let id = hash_fn_id(parent, child);
self.execute_parent.get(&id).cloned()
}
}

fn hash_fn_id(parent: Id, child: Id) -> u64 {
Expand Down
Loading