diff --git a/vortex-array/src/arc_swap_map.rs b/vortex-array/src/arc_swap_map.rs index aff46c22d73..f4004b189cf 100644 --- a/vortex-array/src/arc_swap_map.rs +++ b/vortex-array/src/arc_swap_map.rs @@ -50,6 +50,14 @@ impl ArcSwapMap { f(&self.inner.load()) } + /// Return the current snapshot as a shared [`Arc`], without copying the map. + /// + /// Unlike [`read`](Self::read), the returned snapshot outlives the call, so callers can + /// cache it and keep observing a consistent point-in-time view across many lookups. + pub(crate) fn load_full(&self) -> Arc> { + self.inner.load_full() + } + /// Replace the map with the result of applying `f` to a private copy. /// /// Writes are copy-on-write via [`ArcSwap::rcu`], so `f` may run more than diff --git a/vortex-array/src/executor.rs b/vortex-array/src/executor.rs index d6070ac1a4d..8fe78f4a0ad 100644 --- a/vortex-array/src/executor.rs +++ b/vortex-array/src/executor.rs @@ -26,7 +26,6 @@ use vortex_error::VortexResult; use vortex_error::vortex_bail; use vortex_error::vortex_ensure; use vortex_error::vortex_panic; -use vortex_session::Ref; use vortex_session::SessionExt; use vortex_session::VortexSession; @@ -43,6 +42,7 @@ use crate::memory::HostAllocatorRef; use crate::memory::MemorySessionExt; use crate::optimizer::ArrayOptimizer; use crate::optimizer::kernels::ArrayKernels; +use crate::optimizer::kernels::KernelSnapshot; use crate::stats::ArrayStats; use crate::stats::StatsSet; @@ -305,6 +305,16 @@ struct StackFrame { #[derive(Debug, Clone)] pub struct ExecutionCtx { session: VortexSession, + /// Snapshot of the session's [`ArrayKernels`] execute-parent kernels, resolved once at + /// construction. + /// + /// The registry is session-scoped and mutable through its public `register_*` methods, so + /// this context sees the kernels as registered when it was created; later registrations are + /// picked up by the next context (contexts are created per evaluation). Caching the snapshot + /// avoids a per-array-node session clone plus a sharded `DashMap` `RwLock` probe in the hot + /// `execute_until` loop, and avoids holding the session-variable read guard across kernel + /// invocation. `None` means the session had no [`ArrayKernels`] when the context was created. + kernels: Option, #[cfg(debug_assertions)] id: usize, #[cfg(debug_assertions)] @@ -314,8 +324,10 @@ pub struct ExecutionCtx { impl ExecutionCtx { /// Create a new execution context with the given session. pub fn new(session: VortexSession) -> Self { + let kernels = session.get_opt::().map(|k| k.snapshot()); Self { session, + kernels, #[cfg(debug_assertions)] id: { static EXEC_CTX_ID: AtomicUsize = AtomicUsize::new(0); @@ -331,6 +343,12 @@ impl ExecutionCtx { &self.session } + /// Get the [`KernelSnapshot`] resolved once for this execution context, if the session had an + /// [`ArrayKernels`] registry. Cheap to clone (one `Arc` clone). + pub(crate) fn kernels(&self) -> Option<&KernelSnapshot> { + self.kernels.as_ref() + } + /// Get the session-scoped host allocator for this execution context. pub fn allocator(&self) -> HostAllocatorRef { self.session.allocator() @@ -424,8 +442,7 @@ impl Executable for ArrayRef { } } - let tmp_session = ctx.session().clone(); - let kernels = tmp_session.get_opt::(); + let kernels = ctx.kernels().cloned(); for (slot_idx, slot) in array.slots().iter().enumerate() { let Some(child) = slot else { continue }; @@ -542,7 +559,7 @@ fn execute_parent_for_child( parent: &ArrayRef, child: &ArrayRef, slot_idx: usize, - kernels: Option<&Ref>, + kernels: Option<&KernelSnapshot>, ctx: &mut ExecutionCtx, ) -> VortexResult> { if let Some(kernels) = kernels @@ -561,8 +578,7 @@ fn execute_parent_for_child( /// Try execute_parent on each occupied slot of the array. fn try_execute_parent(array: &ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult> { - let tmp_session = ctx.session().clone(); - let kernels = tmp_session.get_opt::(); + let kernels = ctx.kernels().cloned(); for (slot_idx, slot) in array.slots().iter().enumerate() { let Some(child) = slot else { continue }; diff --git a/vortex-array/src/optimizer/kernels.rs b/vortex-array/src/optimizer/kernels.rs index 93407a2c42d..a94985d38a1 100644 --- a/vortex-array/src/optimizer/kernels.rs +++ b/vortex-array/src/optimizer/kernels.rs @@ -35,6 +35,7 @@ use vortex_session::SessionExt; use vortex_session::SessionVar; use vortex_session::registry::Id; use vortex_utils::aliases::DefaultHashBuilder; +use vortex_utils::aliases::hash_map::HashMap; use crate::ArrayRef; use crate::ExecutionCtx; @@ -194,6 +195,40 @@ impl ArrayKernels { pub fn find_execute_parent(&self, parent: Id, child: Id) -> Option> { self.execute_parent.get(&hash_fn_id(parent, child)) } + + /// Capture an owned, cheaply-cloneable [`KernelSnapshot`] of the currently-registered + /// execute-parent kernels. + /// + /// The underlying registry map is loaded once into an [`Arc`], so the snapshot is an `Arc` + /// clone (no map copy) that outlives the session-variable borrow. Registrations made after + /// the snapshot is taken are not visible through it. + pub(crate) fn snapshot(&self) -> KernelSnapshot { + KernelSnapshot { + execute_parent: self.execute_parent.load_full(), + } + } +} + +/// An owned, point-in-time view of the execute-parent kernels registered on an [`ArrayKernels`] +/// registry. +/// +/// Holding the registry map directly (rather than re-probing the session per array node) lets the +/// executor resolve [`ArrayKernels`] once per execution context. Cloning is one [`Arc`] clone. +#[derive(Debug, Clone)] +pub(crate) struct KernelSnapshot { + execute_parent: Arc>>, +} + +impl KernelSnapshot { + /// Look up the [`ExecuteParentFn`]s registered for `(parent, child)`. + pub(crate) fn find_execute_parent( + &self, + parent: Id, + child: Id, + ) -> Option> { + let id = hash_fn_id(parent, child); + self.execute_parent.get(&id).cloned() + } } fn hash_fn_id(parent: Id, child: Id) -> u64 {