From 25e6bbff54fda01742b5b2564bc1010549827061 Mon Sep 17 00:00:00 2001 From: Roy Lin Date: Thu, 11 Jun 2026 13:16:05 +0800 Subject: [PATCH 1/7] feat(pool): warm-sandbox daemon + `pool run` client (P1 MVP) The `pool` command pre-warmed VMs but nothing consumed them (stop/status were stubs; no run-path wiring; no keepalive, so pooled VMs could exit). Complete it into the low-risk keepalive+exec MVP from docs/cow-snapshot-fork-design.md: - `pool start` now boots the pooled VMs with a keepalive main (sleep loop) so they stay up with their exec server ready, and serves a Unix socket. - New `pool run -- CMD` client: connects, the daemon acquires a pre-booted VM, runs CMD via the existing guest exec server (no cold boot), returns stdout/stderr/exit code, and destroys the used VM; the pool replenishes a fresh one in the background. Removes cold boot from the hot path without touching guest-init's lifecycle (unlike the full deferred-main-spawn, which is deferred as the higher-risk P2). Requests are served sequentially for now (one sandbox at a time); concurrency is a follow-up. Protocol: length-prefixed JSON over the Unix socket. --- src/cli/src/commands/pool.rs | 287 ++++++++++++++++++++++++++++++----- 1 file changed, 251 insertions(+), 36 deletions(-) diff --git a/src/cli/src/commands/pool.rs b/src/cli/src/commands/pool.rs index 520b4b7..d3411b9 100644 --- a/src/cli/src/commands/pool.rs +++ b/src/cli/src/commands/pool.rs @@ -1,19 +1,28 @@ -//! `a3s-box pool` — Warm VM pool management. +//! `a3s-box pool` — Warm VM pool daemon + client. //! -//! Pre-boots MicroVMs so that `run` can acquire an already-ready VM -//! instead of waiting for the full boot sequence (~200ms → ~0ms). +//! Pre-boots keepalive MicroVMs of one image so a command can run in an +//! already-ready sandbox instead of paying a full cold boot. `pool start` is the +//! daemon (pre-warms a pool and serves requests over a Unix socket); `pool run` +//! is the client (runs a command in a fresh warm sandbox via the guest exec +//! server, no cold boot). This is the low-risk keepalive+exec MVP from +//! docs/cow-snapshot-fork-design.md — it removes cold boot from the hot path +//! without touching guest-init's lifecycle. //! //! Subcommands: -//! pool start --size N --image IMAGE Start the warm pool -//! pool stop Drain and stop the pool -//! pool status Show idle count, hit rate, stats +//! pool start --image IMAGE --size N [--socket P] Daemon: pre-warm + serve +//! pool run [--socket P] -- CMD... Client: run CMD in a sandbox +//! pool stop / pool status Discoverability helpers use clap::{Parser, Subcommand}; +use serde::{Deserialize, Serialize}; use a3s_box_core::config::{BoxConfig, PoolConfig}; use a3s_box_core::event::EventEmitter; use a3s_box_runtime::pool::{PoolStats, WarmPool}; +/// Default Unix socket the `pool` daemon listens on. +const DEFAULT_SOCKET: &str = "/tmp/a3s-box-pool.sock"; + /// Manage the warm VM pool. #[derive(Parser)] pub struct PoolArgs { @@ -24,8 +33,10 @@ pub struct PoolArgs { /// Pool subcommands. #[derive(Subcommand)] pub enum PoolAction { - /// Start the warm pool (pre-boot VMs in the background) + /// Start the warm pool daemon (pre-boot VMs + serve `pool run` over a socket) Start(PoolStartArgs), + /// Run a command in a fresh warm sandbox (client of `pool start`) + Run(PoolRunArgs), /// Drain and stop the warm pool Stop(PoolStopArgs), /// Show warm pool statistics @@ -51,11 +62,27 @@ pub struct PoolStartArgs { #[arg(long, default_value = "300")] pub ttl: u64, + /// Unix socket to serve `pool run` requests on + #[arg(long, default_value = DEFAULT_SOCKET)] + pub socket: String, + /// Output as JSON #[arg(long)] pub json: bool, } +/// Arguments for `pool run`. +#[derive(Parser)] +pub struct PoolRunArgs { + /// Unix socket of the `pool start` daemon + #[arg(long, default_value = DEFAULT_SOCKET)] + pub socket: String, + + /// Command and arguments to run in a fresh warm sandbox + #[arg(last = true, required = true)] + pub cmd: Vec, +} + /// Arguments for `pool stop`. #[derive(Parser)] pub struct PoolStopArgs { @@ -72,15 +99,40 @@ pub struct PoolStatusArgs { pub json: bool, } +/// Wire request/response for the `pool` Unix-socket protocol (length-prefixed JSON). +#[derive(Serialize, Deserialize)] +struct RunRequest { + cmd: Vec, +} + +#[derive(Serialize, Deserialize)] +struct RunResponse { + stdout: Vec, + stderr: Vec, + exit_code: i32, + error: Option, +} + /// Execute a pool command. pub async fn execute(args: PoolArgs) -> Result<(), Box> { match args.action { PoolAction::Start(a) => execute_start(a).await, + PoolAction::Run(a) => execute_run(a).await, PoolAction::Stop(a) => execute_stop(a).await, PoolAction::Status(a) => execute_status(a).await, } } +/// Keepalive main process so a pooled VM stays up with its exec server available; +/// the real `pool run` command runs via exec, not as this main. +fn keepalive_cmd() -> Vec { + vec![ + "/bin/sh".to_string(), + "-c".to_string(), + "trap 'exit 0' TERM INT; while :; do sleep 3600; done".to_string(), + ] +} + async fn execute_start(args: PoolStartArgs) -> Result<(), Box> { if args.size == 0 { return Err("--size must be greater than 0".into()); @@ -97,8 +149,11 @@ async fn execute_start(args: PoolStartArgs) -> Result<(), Box Result<(), Box Result<(), Box Result<(), Box> { + use tokio::net::UnixListener; + + let _ = std::fs::remove_file(socket); + let listener = UnixListener::bind(socket)?; + if !json { + println!("Listening on {} (Ctrl-C to drain and stop)", socket); + } + + loop { + tokio::select! { + accepted = listener.accept() => { + let (mut stream, _) = accepted?; + if let Err(e) = handle_conn(pool, &mut stream).await { + tracing::warn!(error = %e, "pool connection failed"); + } + } + _ = tokio::signal::ctrl_c() => { + let _ = std::fs::remove_file(socket); + break; + } + } + } + Ok(()) +} + +#[cfg(not(windows))] +async fn handle_conn(pool: &WarmPool, stream: &mut tokio::net::UnixStream) -> std::io::Result<()> { + // 60s exec cap — generous for a sandbox command. + const EXEC_TIMEOUT_NS: u64 = 60_000_000_000; + + let req: RunRequest = serde_json::from_slice(&read_frame(stream).await?) + .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?; + + // Acquire a warm VM and run the command. Keep the VM so we can tear it down + // AFTER responding (a one-shot sandbox is discarded; the pool replenishes a + // fresh one) — the client's latency must not include VM teardown. + let mut used_vm = None; + let resp = match pool.acquire().await { + Ok(vm) => { + let resp = match vm.exec_command(req.cmd, EXEC_TIMEOUT_NS).await { + Ok(o) => RunResponse { + stdout: o.stdout, + stderr: o.stderr, + exit_code: o.exit_code, + error: None, + }, + Err(e) => RunResponse { + stdout: vec![], + stderr: vec![], + exit_code: -1, + error: Some(e.to_string()), + }, + }; + used_vm = Some(vm); + resp + } + Err(e) => RunResponse { + stdout: vec![], + stderr: vec![], + exit_code: -1, + error: Some(format!("acquire failed: {e}")), + }, + }; + + let bytes = serde_json::to_vec(&resp) + .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?; + write_frame(stream, &bytes).await?; + + // Tear down the used sandbox in the background so neither the client nor the + // daemon's accept loop blocks on it. + if let Some(mut vm) = used_vm { + tokio::spawn(async move { + let _ = vm.destroy().await; + }); + } + Ok(()) +} + +#[cfg(not(windows))] +async fn execute_run(args: PoolRunArgs) -> Result<(), Box> { + use std::io::Write; + use tokio::net::UnixStream; + + let mut stream = UnixStream::connect(&args.socket).await.map_err(|e| { + format!( + "Failed to connect to pool daemon at {} ({}). Is `a3s-box pool start` running?", + args.socket, e + ) + })?; + + write_frame( + &mut stream, + &serde_json::to_vec(&RunRequest { cmd: args.cmd })?, + ) + .await?; + let resp: RunResponse = serde_json::from_slice(&read_frame(&mut stream).await?)?; + + if let Some(err) = resp.error { + eprintln!("pool error: {err}"); + std::process::exit(1); + } + std::io::stdout().write_all(&resp.stdout)?; + std::io::stderr().write_all(&resp.stderr)?; + std::process::exit(resp.exit_code); +} +#[cfg(windows)] +async fn serve( + _pool: &WarmPool, + _socket: &str, + _json: bool, +) -> Result<(), Box> { + eprintln!( + "pool socket serving is not supported on Windows; pool stays pre-warmed until Ctrl-C." + ); + tokio::signal::ctrl_c().await?; Ok(()) } +#[cfg(windows)] +async fn execute_run(_args: PoolRunArgs) -> Result<(), Box> { + Err("`pool run` is not supported on Windows".into()) +} + +/// Length-prefixed (u32 LE) framing for the Unix-socket protocol. +#[cfg(not(windows))] +async fn write_frame( + w: &mut W, + data: &[u8], +) -> std::io::Result<()> { + w.write_all(&(data.len() as u32).to_le_bytes()).await?; + w.write_all(data).await?; + w.flush().await +} + +#[cfg(not(windows))] +async fn read_frame(r: &mut R) -> std::io::Result> { + let mut len = [0u8; 4]; + r.read_exact(&mut len).await?; + let mut buf = vec![0u8; u32::from_le_bytes(len) as usize]; + r.read_exact(&mut buf).await?; + Ok(buf) +} + async fn execute_stop(_args: PoolStopArgs) -> Result<(), Box> { // Pool stop is handled by sending SIGINT to the `pool start` process. - // This subcommand exists for discoverability and future daemon support. eprintln!("Send SIGINT (Ctrl-C) to the running `a3s-box pool start` process to drain and stop the pool."); Ok(()) } async fn execute_status(_args: PoolStatusArgs) -> Result<(), Box> { - // Pool status requires a running pool instance. In the current in-process - // model, status is printed by the `pool start` process itself. - // This subcommand is a placeholder for future daemon/IPC support. + // Status lives in the running daemon; expose via Prometheus for now. eprintln!("Pool status is shown by the running `a3s-box pool start` process."); eprintln!("Use Prometheus metrics (a3s_box_warm_pool_*) for live observability."); Ok(()) @@ -214,11 +417,39 @@ mod tests { fn test_format_stats_json_is_valid_structure() { let stats = sample_stats(); let json = format_stats_json("alpine:latest", &stats); - // Must start and end with braces assert!(json.starts_with('{')); assert!(json.ends_with('}')); } + #[test] + fn test_keepalive_cmd_is_a_sleep_loop() { + let c = keepalive_cmd(); + assert_eq!(c[0], "/bin/sh"); + assert!(c.last().unwrap().contains("sleep")); + } + + #[test] + fn test_run_request_response_roundtrip() { + let req = RunRequest { + cmd: vec!["echo".into(), "hi".into()], + }; + let bytes = serde_json::to_vec(&req).unwrap(); + let parsed: RunRequest = serde_json::from_slice(&bytes).unwrap(); + assert_eq!(parsed.cmd, vec!["echo", "hi"]); + + let resp = RunResponse { + stdout: b"hi\n".to_vec(), + stderr: vec![], + exit_code: 0, + error: None, + }; + let rb = serde_json::to_vec(&resp).unwrap(); + let rp: RunResponse = serde_json::from_slice(&rb).unwrap(); + assert_eq!(rp.stdout, b"hi\n"); + assert_eq!(rp.exit_code, 0); + assert!(rp.error.is_none()); + } + #[tokio::test] async fn test_execute_start_size_zero_fails() { let args = PoolStartArgs { @@ -226,6 +457,7 @@ mod tests { size: 0, max: 5, ttl: 300, + socket: DEFAULT_SOCKET.to_string(), json: false, }; let result = execute_start(args).await; @@ -240,6 +472,7 @@ mod tests { size: 10, max: 5, ttl: 300, + socket: DEFAULT_SOCKET.to_string(), json: false, }; let result = execute_start(args).await; @@ -252,31 +485,13 @@ mod tests { #[tokio::test] async fn test_execute_stop_is_ok() { - let args = PoolStopArgs { json: false }; - // stop is a no-op (prints message), should not error - let result = execute_stop(args).await; + let result = execute_stop(PoolStopArgs { json: false }).await; assert!(result.is_ok()); } #[tokio::test] async fn test_execute_status_is_ok() { - let args = PoolStatusArgs { json: false }; - let result = execute_status(args).await; + let result = execute_status(PoolStatusArgs { json: false }).await; assert!(result.is_ok()); } - - #[test] - fn test_pool_start_args_defaults() { - // Verify default values match expected warm pool behavior - let args = PoolStartArgs { - image: "alpine:latest".to_string(), - size: 2, - max: 8, - ttl: 300, - json: false, - }; - assert_eq!(args.size, 2); - assert_eq!(args.max, 8); - assert_eq!(args.ttl, 300); - } } From 108c34a2006ebc76bdd286e7892d10554ff87701 Mon Sep 17 00:00:00 2001 From: Roy Lin Date: Thu, 11 Jun 2026 14:17:11 +0800 Subject: [PATCH 2/7] test(pool): cover the Unix-socket wire protocol Add CI-runnable tests for the framing + request/response handshake (previously only POC-verified on KVM): frame roundtrip, full client/server protocol over a real Unix socket with a stub server, and truncated-stream error handling. --- src/cli/src/commands/pool.rs | 69 ++++++++++++++++++++++++++++++++++++ 1 file changed, 69 insertions(+) diff --git a/src/cli/src/commands/pool.rs b/src/cli/src/commands/pool.rs index d3411b9..1a44e23 100644 --- a/src/cli/src/commands/pool.rs +++ b/src/cli/src/commands/pool.rs @@ -494,4 +494,73 @@ mod tests { let result = execute_status(PoolStatusArgs { json: false }).await; assert!(result.is_ok()); } + + #[cfg(not(windows))] + #[tokio::test] + async fn test_frame_roundtrip() { + // write_frame then read_frame must return the exact bytes. + let (mut a, mut b) = tokio::io::duplex(4096); + let payload = serde_json::to_vec(&RunRequest { + cmd: vec!["echo".into(), "hi there".into()], + }) + .unwrap(); + write_frame(&mut a, &payload).await.unwrap(); + let got = read_frame(&mut b).await.unwrap(); + let parsed: RunRequest = serde_json::from_slice(&got).unwrap(); + assert_eq!(parsed.cmd, vec!["echo", "hi there"]); + } + + #[cfg(not(windows))] + #[tokio::test] + async fn test_socket_request_response_protocol() { + // Exercise the full client/server wire protocol over a real Unix socket + // (the exact framing `serve` and `pool run` use), with a stub server + // standing in for the VM pool's acquire+exec. + use tokio::net::{UnixListener, UnixStream}; + + let dir = tempfile::tempdir().unwrap(); + let sock = dir.path().join("pool.sock"); + let listener = UnixListener::bind(&sock).unwrap(); + + let server = tokio::spawn(async move { + let (mut s, _) = listener.accept().await.unwrap(); + let req: RunRequest = + serde_json::from_slice(&read_frame(&mut s).await.unwrap()).unwrap(); + let resp = RunResponse { + stdout: format!("ran {:?}", req.cmd).into_bytes(), + stderr: vec![], + exit_code: 0, + error: None, + }; + write_frame(&mut s, &serde_json::to_vec(&resp).unwrap()) + .await + .unwrap(); + }); + + let mut client = UnixStream::connect(&sock).await.unwrap(); + let req = RunRequest { + cmd: vec!["ls".into(), "-la".into()], + }; + write_frame(&mut client, &serde_json::to_vec(&req).unwrap()) + .await + .unwrap(); + let resp: RunResponse = + serde_json::from_slice(&read_frame(&mut client).await.unwrap()).unwrap(); + + assert_eq!(resp.exit_code, 0); + assert!(resp.error.is_none()); + assert!(String::from_utf8_lossy(&resp.stdout).contains("ls")); + server.await.unwrap(); + } + + #[cfg(not(windows))] + #[tokio::test] + async fn test_read_frame_truncated_errors() { + // A truncated stream must error, not hang or panic. + use tokio::io::AsyncWriteExt; + let (mut a, mut b) = tokio::io::duplex(64); + a.write_all(&[1u8, 0]).await.unwrap(); // partial 4-byte length prefix + drop(a); + assert!(read_frame(&mut b).await.is_err()); + } } From d0615d519b51afd3828e568c686ee7603adc8c68 Mon Sep 17 00:00:00 2001 From: Roy Lin Date: Thu, 11 Jun 2026 14:28:55 +0800 Subject: [PATCH 3/7] feat(pool): concurrent request serving + real-VM e2e test - serve() now handles each `pool run` concurrently (Arc + spawned task per connection) instead of one-at-a-time, so independent sandboxes don't queue. Added WarmPool::drain_idle(&self) so the pool can be shut down from behind the Arc (signal_shutdown stops the replenisher; drain_idle destroys idle VMs); in-flight requests keep their own acquired VM. - Added a host-backed e2e test (test_real_pool_warm_run, #[ignore]): spawns the daemon, waits for its socket, runs a command in a warm sandbox and asserts the output, then fires 3 concurrent `pool run`s and asserts all succeed. Adds a spawn_background helper to the test harness. --- src/cli/src/commands/pool.rs | 31 +++++----- src/cli/tests/host_smoke.rs | 95 +++++++++++++++++++++++++++++++ src/cli/tests/support/mod.rs | 16 ++++++ src/runtime/src/pool/warm_pool.rs | 22 +++++++ 4 files changed, 151 insertions(+), 13 deletions(-) diff --git a/src/cli/src/commands/pool.rs b/src/cli/src/commands/pool.rs index 1a44e23..55ad9c0 100644 --- a/src/cli/src/commands/pool.rs +++ b/src/cli/src/commands/pool.rs @@ -159,7 +159,8 @@ async fn execute_start(args: PoolStartArgs) -> Result<(), Box Result<(), Box, socket: &str, json: bool, ) -> Result<(), Box> { @@ -207,12 +204,20 @@ async fn serve( tokio::select! { accepted = listener.accept() => { let (mut stream, _) = accepted?; - if let Err(e) = handle_conn(pool, &mut stream).await { - tracing::warn!(error = %e, "pool connection failed"); - } + let pool = pool.clone(); + tokio::spawn(async move { + if let Err(e) = handle_conn(&pool, &mut stream).await { + tracing::warn!(error = %e, "pool connection failed"); + } + }); } _ = tokio::signal::ctrl_c() => { let _ = std::fs::remove_file(socket); + if !json { + println!("Draining warm pool..."); + } + pool.signal_shutdown(); + let _ = pool.drain_idle().await; break; } } diff --git a/src/cli/tests/host_smoke.rs b/src/cli/tests/host_smoke.rs index 5f13957..4571ae7 100644 --- a/src/cli/tests/host_smoke.rs +++ b/src/cli/tests/host_smoke.rs @@ -472,3 +472,98 @@ fn test_real_compose_smoke() { let ps = cli.ok(&["ps", "-a"]); assert!(!ps.contains(service_box)); } + +/// Warm-pool daemon end-to-end: `pool start` pre-warms VMs, then `pool run` +/// executes a command in a fresh warm sandbox via the guest exec server (no cold +/// boot). Also exercises CONCURRENT `pool run`s — the daemon serves them +/// concurrently. Host-backed (needs KVM + a runnable image). +#[test] +#[ignore] +fn test_real_pool_warm_run() { + let cli = CliTest::new(); + let image = host_smoke_image(); + seed_runnable_alpine_image(&cli, &image); + let socket = cli + .home_path() + .join("pool.sock") + .to_str() + .expect("utf8 socket path") + .to_string(); + + // Daemon: pre-warm a small pool and listen on the socket. + let mut daemon = cli.spawn_background(&[ + "pool", + "start", + "--image", + image.as_str(), + "--size", + "3", + "--max", + "6", + "--socket", + socket.as_str(), + ]); + + // The daemon binds the socket once WarmPool::start returns; wait for it. + let sock_path = cli.home_path().join("pool.sock"); + let start = std::time::Instant::now(); + while !sock_path.exists() { + if start.elapsed() > Duration::from_secs(120) { + let _ = daemon.kill(); + panic!("pool daemon never created its socket"); + } + if let Ok(Some(status)) = daemon.try_wait() { + panic!("pool daemon exited early: {status}"); + } + std::thread::sleep(Duration::from_millis(200)); + } + // Give the pool a moment to have at least one idle VM ready. + std::thread::sleep(Duration::from_secs(5)); + + // Single warm run. + let (out, err, ok) = cli.output(&[ + "pool", + "run", + "--socket", + socket.as_str(), + "--", + "echo", + "pool-e2e-ok", + ]); + assert!(ok, "pool run failed.\nstdout:\n{out}\nstderr:\n{err}"); + assert!(out.contains("pool-e2e-ok"), "unexpected output: {out:?}"); + + // Concurrent runs — the daemon serves them concurrently. + std::thread::scope(|s| { + let handles: Vec<_> = (0..3) + .map(|i| { + let cli = &cli; + let socket = socket.as_str(); + s.spawn(move || { + let tag = format!("conc-{i}"); + let (out, _e, ok) = cli.output(&[ + "pool", + "run", + "--socket", + socket, + "--", + "echo", + tag.as_str(), + ]); + (ok, out, tag) + }) + }) + .collect(); + for h in handles { + let (ok, out, tag) = h.join().expect("concurrent run thread panicked"); + assert!(ok, "concurrent pool run failed: {out}"); + assert!( + out.contains(&tag), + "concurrent run output {out:?} missing {tag}" + ); + } + }); + + let _ = daemon.kill(); + let _ = daemon.wait(); +} diff --git a/src/cli/tests/support/mod.rs b/src/cli/tests/support/mod.rs index c20de64..11680fb 100644 --- a/src/cli/tests/support/mod.rs +++ b/src/cli/tests/support/mod.rs @@ -128,6 +128,22 @@ impl CliTest { ) } + /// Spawn `a3s-box ` as a background process (e.g. a daemon), inheriting + /// the test env plus `A3S_HOME`. The caller owns the `Child` and must kill it. + pub fn spawn_background(&self, args: &[&str]) -> std::process::Child { + eprintln!(" $ a3s-box {} &", args.join(" ")); + self.command(args) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .spawn() + .unwrap_or_else(|e| { + panic!( + "failed to spawn background `a3s-box {}`: {e}", + args.join(" ") + ) + }) + } + pub fn output_with_stdin(&self, args: &[&str], stdin: &[u8]) -> (String, String, bool) { eprintln!(" $ printf ... | a3s-box {}", args.join(" ")); diff --git a/src/runtime/src/pool/warm_pool.rs b/src/runtime/src/pool/warm_pool.rs index 2adfc36..372ef86 100644 --- a/src/runtime/src/pool/warm_pool.rs +++ b/src/runtime/src/pool/warm_pool.rs @@ -313,6 +313,28 @@ impl WarmPool { Ok(()) } + /// Destroy all idle VMs without consuming the pool (`&self`), so it can be + /// shut down from behind an `Arc` (e.g. a daemon serving concurrent requests). + /// Pair with [`Self::signal_shutdown`] first to stop the background replenisher; + /// its task then exits on its own (it watches the shutdown channel). + pub async fn drain_idle(&self) -> Result<()> { + let mut idle = self.idle.lock().await; + let count = idle.len(); + for warm_vm in idle.drain(..) { + let mut vm = warm_vm.vm; + if let Err(e) = vm.destroy().await { + tracing::warn!( + box_id = %vm.box_id(), + error = %e, + "Failed to destroy pooled VM during drain_idle" + ); + } + } + self.stats.lock().await.idle_count = 0; + tracing::info!(destroyed = count, "Warm pool idle VMs drained"); + Ok(()) + } + /// Remove and destroy specific idle VMs by their box IDs. /// /// Used when `fill_to_min` partially fails and needs to rollback From 2c23a553e8b29b859829b550c225ec56164c2b5a Mon Sep 17 00:00:00 2001 From: Roy Lin Date: Thu, 11 Jun 2026 14:52:14 +0800 Subject: [PATCH 4/7] feat(pool): multi-image multiplexing (lazy per-image pools) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit One daemon can now serve sandboxes of different images. Added a PoolRegistry keyed by image that lazily starts (and pre-warms) a WarmPool on first use: - `pool start --image X` is now optional and sets the DEFAULT image; the daemon also warms a pool for any other image requested via `pool run --image Y` on first use. - `pool run [--image Y] -- CMD`: the request carries an optional image (defaults to the daemon's). RunRequest gains a `#[serde(default)] image` field — wire back-compat (older clients / default-image daemons omit it). - Shutdown drains idle VMs across all pools (drain_all). Extends the e2e test (test_real_pool_warm_run) with a lazy second image via `pool run --image`. Turns the single-image MVP into a real sandbox controller. --- src/cli/src/commands/pool.rs | 216 ++++++++++++++++++++++++----------- src/cli/tests/host_smoke.rs | 26 ++++- 2 files changed, 175 insertions(+), 67 deletions(-) diff --git a/src/cli/src/commands/pool.rs b/src/cli/src/commands/pool.rs index 55ad9c0..b13fa2e 100644 --- a/src/cli/src/commands/pool.rs +++ b/src/cli/src/commands/pool.rs @@ -46,9 +46,10 @@ pub enum PoolAction { /// Arguments for `pool start`. #[derive(Parser)] pub struct PoolStartArgs { - /// OCI image to pre-boot (e.g. alpine:latest) + /// Image to pre-warm (optional). Sandboxes default to this image; `pool run` + /// may request any other image, which the daemon warms on first use. #[arg(long)] - pub image: String, + pub image: Option, /// Number of VMs to keep pre-booted (min_idle) #[arg(long, default_value = "2")] @@ -78,6 +79,11 @@ pub struct PoolRunArgs { #[arg(long, default_value = DEFAULT_SOCKET)] pub socket: String, + /// Image to run in (defaults to the daemon's --image). The daemon warms a + /// pool for this image on first use. + #[arg(long)] + pub image: Option, + /// Command and arguments to run in a fresh warm sandbox #[arg(last = true, required = true)] pub cmd: Vec, @@ -102,6 +108,9 @@ pub struct PoolStatusArgs { /// Wire request/response for the `pool` Unix-socket protocol (length-prefixed JSON). #[derive(Serialize, Deserialize)] struct RunRequest { + /// Image to run in; `None` means use the daemon's default image. + #[serde(default)] + image: Option, cmd: Vec, } @@ -133,6 +142,62 @@ fn keepalive_cmd() -> Vec { ] } +/// A registry of warm pools keyed by image, created lazily on first use, so one +/// daemon can serve sandboxes of different images. +struct PoolRegistry { + pools: tokio::sync::Mutex>>, + default_image: Option, + size: usize, + max: usize, + ttl: u64, +} + +impl PoolRegistry { + /// The warm pool for `image`, lazily started (and pre-warmed in the background) + /// on first use. `WarmPool::start` returns once the replenisher is spawned, so + /// holding the map lock across it is brief. + async fn get_or_create(&self, image: &str) -> Result, String> { + let mut pools = self.pools.lock().await; + if let Some(pool) = pools.get(image) { + return Ok(pool.clone()); + } + let pool_config = PoolConfig { + enabled: true, + min_idle: self.size, + max_size: self.max, + idle_ttl_secs: self.ttl, + ..Default::default() + }; + let box_config = BoxConfig { + image: image.to_string(), + cmd: keepalive_cmd(), + pool: pool_config.clone(), + ..Default::default() + }; + let pool = std::sync::Arc::new( + WarmPool::start(pool_config, box_config, EventEmitter::new(256)) + .await + .map_err(|e| e.to_string())?, + ); + pools.insert(image.to_string(), pool.clone()); + Ok(pool) + } + + /// Resolve the image for a request: the requested one, else the daemon default. + fn resolve_image(&self, requested: Option) -> Option { + requested.or_else(|| self.default_image.clone()) + } + + /// Stop replenishment and destroy idle VMs across all pools (shutdown). + async fn drain_all(&self) { + let pools = self.pools.lock().await; + for pool in pools.values() { + pool.signal_shutdown(); + let _ = pool.drain_idle().await; + } + } +} + async fn execute_start(args: PoolStartArgs) -> Result<(), Box> { if args.size == 0 { return Err("--size must be greater than 0".into()); @@ -141,41 +206,42 @@ async fn execute_start(args: PoolStartArgs) -> Result<(), Box println!("{}", format_stats_json(image, stats)), + None => println!( + r#"{{"default_image":null,"max":{},"socket":"{}"}}"#, + args.max, args.socket + ), + } } else { println!("Warm pool started"); - println!(" image: {}", args.image); - println!(" min_idle: {}", args.size); + match &args.image { + Some(i) => println!(" default image: {i} (pre-warming {})", args.size), + None => println!(" default image: (none — `pool run` must pass --image)"), + } println!(" max: {}", args.max); println!(" ttl: {}s", args.ttl); - println!(" idle: {}", stats.idle_count); println!(" socket: {}", args.socket); } - serve(pool, &args.socket, args.json).await?; + serve(registry, &args.socket, args.json).await?; if !args.json { println!("Done."); @@ -188,7 +254,7 @@ async fn execute_start(args: PoolStartArgs) -> Result<(), Box, + registry: std::sync::Arc, socket: &str, json: bool, ) -> Result<(), Box> { @@ -204,9 +270,9 @@ async fn serve( tokio::select! { accepted = listener.accept() => { let (mut stream, _) = accepted?; - let pool = pool.clone(); + let registry = registry.clone(); tokio::spawn(async move { - if let Err(e) = handle_conn(&pool, &mut stream).await { + if let Err(e) = handle_conn(®istry, &mut stream).await { tracing::warn!(error = %e, "pool connection failed"); } }); @@ -214,10 +280,9 @@ async fn serve( _ = tokio::signal::ctrl_c() => { let _ = std::fs::remove_file(socket); if !json { - println!("Draining warm pool..."); + println!("Draining warm pools..."); } - pool.signal_shutdown(); - let _ = pool.drain_idle().await; + registry.drain_all().await; break; } } @@ -226,41 +291,51 @@ async fn serve( } #[cfg(not(windows))] -async fn handle_conn(pool: &WarmPool, stream: &mut tokio::net::UnixStream) -> std::io::Result<()> { +fn err_resp(msg: impl Into) -> RunResponse { + RunResponse { + stdout: vec![], + stderr: vec![], + exit_code: -1, + error: Some(msg.into()), + } +} + +#[cfg(not(windows))] +async fn handle_conn( + registry: &PoolRegistry, + stream: &mut tokio::net::UnixStream, +) -> std::io::Result<()> { // 60s exec cap — generous for a sandbox command. const EXEC_TIMEOUT_NS: u64 = 60_000_000_000; let req: RunRequest = serde_json::from_slice(&read_frame(stream).await?) .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?; - // Acquire a warm VM and run the command. Keep the VM so we can tear it down - // AFTER responding (a one-shot sandbox is discarded; the pool replenishes a - // fresh one) — the client's latency must not include VM teardown. + // Resolve the image, get-or-create its pool, acquire a warm VM, run the + // command. Keep the VM so we tear it down AFTER responding (a one-shot sandbox + // is discarded; the pool replenishes a fresh one) — the client's latency must + // not include VM teardown. let mut used_vm = None; - let resp = match pool.acquire().await { - Ok(vm) => { - let resp = match vm.exec_command(req.cmd, EXEC_TIMEOUT_NS).await { - Ok(o) => RunResponse { - stdout: o.stdout, - stderr: o.stderr, - exit_code: o.exit_code, - error: None, - }, - Err(e) => RunResponse { - stdout: vec![], - stderr: vec![], - exit_code: -1, - error: Some(e.to_string()), - }, - }; - used_vm = Some(vm); - resp - } - Err(e) => RunResponse { - stdout: vec![], - stderr: vec![], - exit_code: -1, - error: Some(format!("acquire failed: {e}")), + let resp = match registry.resolve_image(req.image) { + None => err_resp("no image: pass --image or start the daemon with --image"), + Some(image) => match registry.get_or_create(&image).await { + Err(e) => err_resp(format!("pool for {image}: {e}")), + Ok(pool) => match pool.acquire().await { + Err(e) => err_resp(format!("acquire failed: {e}")), + Ok(vm) => { + let resp = match vm.exec_command(req.cmd, EXEC_TIMEOUT_NS).await { + Ok(o) => RunResponse { + stdout: o.stdout, + stderr: o.stderr, + exit_code: o.exit_code, + error: None, + }, + Err(e) => err_resp(e.to_string()), + }; + used_vm = Some(vm); + resp + } + }, }, }; @@ -292,7 +367,10 @@ async fn execute_run(args: PoolRunArgs) -> Result<(), Box write_frame( &mut stream, - &serde_json::to_vec(&RunRequest { cmd: args.cmd })?, + &serde_json::to_vec(&RunRequest { + image: args.image, + cmd: args.cmd, + })?, ) .await?; let resp: RunResponse = serde_json::from_slice(&read_frame(&mut stream).await?)?; @@ -308,7 +386,7 @@ async fn execute_run(args: PoolRunArgs) -> Result<(), Box #[cfg(windows)] async fn serve( - _pool: &WarmPool, + _registry: std::sync::Arc, _socket: &str, _json: bool, ) -> Result<(), Box> { @@ -436,11 +514,17 @@ mod tests { #[test] fn test_run_request_response_roundtrip() { let req = RunRequest { + image: Some("alpine:latest".into()), cmd: vec!["echo".into(), "hi".into()], }; let bytes = serde_json::to_vec(&req).unwrap(); let parsed: RunRequest = serde_json::from_slice(&bytes).unwrap(); assert_eq!(parsed.cmd, vec!["echo", "hi"]); + assert_eq!(parsed.image.as_deref(), Some("alpine:latest")); + + // image is optional on the wire (older clients / default-image daemons). + let no_img: RunRequest = serde_json::from_slice(br#"{"cmd":["ls"]}"#).unwrap(); + assert!(no_img.image.is_none()); let resp = RunResponse { stdout: b"hi\n".to_vec(), @@ -458,7 +542,7 @@ mod tests { #[tokio::test] async fn test_execute_start_size_zero_fails() { let args = PoolStartArgs { - image: "alpine:latest".to_string(), + image: Some("alpine:latest".to_string()), size: 0, max: 5, ttl: 300, @@ -473,7 +557,7 @@ mod tests { #[tokio::test] async fn test_execute_start_size_exceeds_max_fails() { let args = PoolStartArgs { - image: "alpine:latest".to_string(), + image: Some("alpine:latest".to_string()), size: 10, max: 5, ttl: 300, @@ -506,6 +590,7 @@ mod tests { // write_frame then read_frame must return the exact bytes. let (mut a, mut b) = tokio::io::duplex(4096); let payload = serde_json::to_vec(&RunRequest { + image: None, cmd: vec!["echo".into(), "hi there".into()], }) .unwrap(); @@ -544,6 +629,7 @@ mod tests { let mut client = UnixStream::connect(&sock).await.unwrap(); let req = RunRequest { + image: Some("alpine:latest".into()), cmd: vec!["ls".into(), "-la".into()], }; write_frame(&mut client, &serde_json::to_vec(&req).unwrap()) diff --git a/src/cli/tests/host_smoke.rs b/src/cli/tests/host_smoke.rs index 4571ae7..7b8380e 100644 --- a/src/cli/tests/host_smoke.rs +++ b/src/cli/tests/host_smoke.rs @@ -475,8 +475,9 @@ fn test_real_compose_smoke() { /// Warm-pool daemon end-to-end: `pool start` pre-warms VMs, then `pool run` /// executes a command in a fresh warm sandbox via the guest exec server (no cold -/// boot). Also exercises CONCURRENT `pool run`s — the daemon serves them -/// concurrently. Host-backed (needs KVM + a runnable image). +/// boot). Also exercises CONCURRENT `pool run`s (served concurrently) and a +/// LAZY second image via `pool run --image`. Host-backed (needs KVM + a runnable +/// image). #[test] #[ignore] fn test_real_pool_warm_run() { @@ -564,6 +565,27 @@ fn test_real_pool_warm_run() { } }); + // Multi-image: a second image the daemon was NOT started with — its pool is + // created lazily on the first `pool run --image`. + let second = format!("coverage-pool-second:{}", unique_tag("img2")); + cli.ok(&["tag", &image, &second]); + let (out2, err2, ok2) = cli.output(&[ + "pool", + "run", + "--socket", + socket.as_str(), + "--image", + second.as_str(), + "--", + "echo", + "multiimg-ok", + ]); + assert!( + ok2, + "multi-image pool run failed.\nstdout:\n{out2}\nstderr:\n{err2}" + ); + assert!(out2.contains("multiimg-ok"), "unexpected output: {out2:?}"); + let _ = daemon.kill(); let _ = daemon.wait(); } From eee02579d7b2330791e2e32d7fdb83e3717f7acc Mon Sep 17 00:00:00 2001 From: Roy Lin Date: Thu, 11 Jun 2026 15:06:34 +0800 Subject: [PATCH 5/7] feat(pool): pre-warm a fleet at startup via --warm image[=count] So the common sandbox images are warm-ready instead of cold on first request. `pool start --warm python:3=4,node:20` pre-warms each listed image at startup (count defaults to --size); any other image is still warmed lazily on first use. - Added parse_warm_spec (image[=count], whitespace-tolerant; unit-tested). - PoolRegistry::get_or_create_with_size lets a pre-warm use a per-image count; the lazy path keeps the daemon default size. - e2e test now starts the daemon with --warm =2 and runs that image, exercising startup pre-warm end-to-end. --- src/cli/src/commands/pool.rs | 80 +++++++++++++++++++++++++++++++++--- src/cli/tests/host_smoke.rs | 17 ++++---- 2 files changed, 85 insertions(+), 12 deletions(-) diff --git a/src/cli/src/commands/pool.rs b/src/cli/src/commands/pool.rs index b13fa2e..f214b3e 100644 --- a/src/cli/src/commands/pool.rs +++ b/src/cli/src/commands/pool.rs @@ -67,6 +67,11 @@ pub struct PoolStartArgs { #[arg(long, default_value = DEFAULT_SOCKET)] pub socket: String, + /// Extra images to pre-warm at startup, `image[=count]` (count defaults to + /// --size). Repeat or comma-separate: `--warm python:3=4,node:20`. + #[arg(long, value_delimiter = ',')] + pub warm: Vec, + /// Output as JSON #[arg(long)] pub json: bool, @@ -142,6 +147,24 @@ fn keepalive_cmd() -> Vec { ] } +/// Parse a `--warm` entry of the form `image[=count]` (count defaults to `default_size`). +fn parse_warm_spec(entry: &str, default_size: usize) -> Result<(String, usize), String> { + match entry.split_once('=') { + Some((image, count)) => { + let image = image.trim(); + if image.is_empty() { + return Err(format!("missing image in '{entry}'")); + } + let count: usize = count + .trim() + .parse() + .map_err(|_| format!("invalid warm count in '{entry}'"))?; + Ok((image.to_string(), count)) + } + None => Ok((entry.trim().to_string(), default_size)), + } +} + /// A registry of warm pools keyed by image, created lazily on first use, so one /// daemon can serve sandboxes of different images. struct PoolRegistry { @@ -154,17 +177,21 @@ struct PoolRegistry { impl PoolRegistry { /// The warm pool for `image`, lazily started (and pre-warmed in the background) - /// on first use. `WarmPool::start` returns once the replenisher is spawned, so - /// holding the map lock across it is brief. - async fn get_or_create(&self, image: &str) -> Result, String> { + /// on first use, with `min_idle = size`. `WarmPool::start` returns once the + /// replenisher is spawned, so holding the map lock across it is brief. + async fn get_or_create_with_size( + &self, + image: &str, + size: usize, + ) -> Result, String> { let mut pools = self.pools.lock().await; if let Some(pool) = pools.get(image) { return Ok(pool.clone()); } let pool_config = PoolConfig { enabled: true, - min_idle: self.size, - max_size: self.max, + min_idle: size, + max_size: self.max.max(size), idle_ttl_secs: self.ttl, ..Default::default() }; @@ -183,6 +210,11 @@ impl PoolRegistry { Ok(pool) } + /// Lazy pool for `image` at the daemon's default size. + async fn get_or_create(&self, image: &str) -> Result, String> { + self.get_or_create_with_size(image, self.size).await + } + /// Resolve the image for a request: the requested one, else the daemon default. fn resolve_image(&self, requested: Option) -> Option { requested.or_else(|| self.default_image.clone()) @@ -222,6 +254,17 @@ async fn execute_start(args: PoolStartArgs) -> Result<(), Box = Vec::new(); + for entry in &args.warm { + let (image, count) = parse_warm_spec(entry, args.size)?; + if count == 0 { + return Err(format!("--warm count must be > 0 (in '{entry}')").into()); + } + registry.get_or_create_with_size(&image, count).await?; + warmed_extra.push((image, count)); + } + if args.json { match &default_stats { Some((image, stats)) => println!("{}", format_stats_json(image, stats)), @@ -236,6 +279,9 @@ async fn execute_start(args: PoolStartArgs) -> Result<(), Box println!(" default image: {i} (pre-warming {})", args.size), None => println!(" default image: (none — `pool run` must pass --image)"), } + for (image, count) in &warmed_extra { + println!(" pre-warmed: {image} (size {count})"); + } println!(" max: {}", args.max); println!(" ttl: {}s", args.ttl); println!(" socket: {}", args.socket); @@ -511,6 +557,28 @@ mod tests { assert!(c.last().unwrap().contains("sleep")); } + #[test] + fn test_parse_warm_spec() { + // image=count + assert_eq!( + parse_warm_spec("python:3=4", 2).unwrap(), + ("python:3".to_string(), 4) + ); + // bare image → default size + assert_eq!( + parse_warm_spec("node:20", 7).unwrap(), + ("node:20".to_string(), 7) + ); + // whitespace tolerated + assert_eq!( + parse_warm_spec(" alpine = 3 ", 2).unwrap(), + ("alpine".to_string(), 3) + ); + // bad count / empty image error out + assert!(parse_warm_spec("alpine=notanum", 2).is_err()); + assert!(parse_warm_spec("=4", 2).is_err()); + } + #[test] fn test_run_request_response_roundtrip() { let req = RunRequest { @@ -547,6 +615,7 @@ mod tests { max: 5, ttl: 300, socket: DEFAULT_SOCKET.to_string(), + warm: vec![], json: false, }; let result = execute_start(args).await; @@ -562,6 +631,7 @@ mod tests { max: 5, ttl: 300, socket: DEFAULT_SOCKET.to_string(), + warm: vec![], json: false, }; let result = execute_start(args).await; diff --git a/src/cli/tests/host_smoke.rs b/src/cli/tests/host_smoke.rs index 7b8380e..3865623 100644 --- a/src/cli/tests/host_smoke.rs +++ b/src/cli/tests/host_smoke.rs @@ -476,14 +476,18 @@ fn test_real_compose_smoke() { /// Warm-pool daemon end-to-end: `pool start` pre-warms VMs, then `pool run` /// executes a command in a fresh warm sandbox via the guest exec server (no cold /// boot). Also exercises CONCURRENT `pool run`s (served concurrently) and a -/// LAZY second image via `pool run --image`. Host-backed (needs KVM + a runnable -/// image). +/// SECOND image pre-warmed at startup via `--warm`, run with `pool run --image`. +/// Host-backed (needs KVM + a runnable image). #[test] #[ignore] fn test_real_pool_warm_run() { let cli = CliTest::new(); let image = host_smoke_image(); seed_runnable_alpine_image(&cli, &image); + // A second image (retag of the first) the daemon pre-warms at startup via --warm. + let second = format!("coverage-pool-second:{}", unique_tag("img2")); + cli.ok(&["tag", &image, &second]); + let warm_spec = format!("{second}=2"); let socket = cli .home_path() .join("pool.sock") @@ -491,7 +495,7 @@ fn test_real_pool_warm_run() { .expect("utf8 socket path") .to_string(); - // Daemon: pre-warm a small pool and listen on the socket. + // Daemon: pre-warm the default pool + a second image via --warm; listen on the socket. let mut daemon = cli.spawn_background(&[ "pool", "start", @@ -501,6 +505,8 @@ fn test_real_pool_warm_run() { "3", "--max", "6", + "--warm", + warm_spec.as_str(), "--socket", socket.as_str(), ]); @@ -565,10 +571,7 @@ fn test_real_pool_warm_run() { } }); - // Multi-image: a second image the daemon was NOT started with — its pool is - // created lazily on the first `pool run --image`. - let second = format!("coverage-pool-second:{}", unique_tag("img2")); - cli.ok(&["tag", &image, &second]); + // Multi-image: run in the second image, pre-warmed at startup via --warm. let (out2, err2, ok2) = cli.output(&[ "pool", "run", From 49ba32e00763547984b64c093cc939c347bf9169 Mon Sep 17 00:00:00 2001 From: Roy Lin Date: Thu, 11 Jun 2026 15:21:26 +0800 Subject: [PATCH 6/7] feat(pool): live `pool status` over the socket (per-image stats) `pool status` was a stub pointing at Prometheus; now it queries the running daemon over the Unix socket and prints per-image pool stats (idle / created / acquired / evicted), or `--json`. - Wire protocol is now a tagged `Request` envelope ({"op":"run",...} / {"op":"status"}) so the daemon can dispatch; `pool run` sends Run, `pool status` sends Status. RunResponse unchanged; new StatusResponse/ImageStat. - PoolRegistry::stats() snapshots every image's WarmPool stats, sorted. - `pool status` gains --socket. e2e test now asserts status lists both warmed images. 14 unit tests (added Request-envelope tagging + no-daemon error). --- src/cli/src/commands/pool.rs | 154 ++++++++++++++++++++++++++++++++--- src/cli/tests/host_smoke.rs | 14 +++- 2 files changed, 152 insertions(+), 16 deletions(-) diff --git a/src/cli/src/commands/pool.rs b/src/cli/src/commands/pool.rs index f214b3e..0c22b0e 100644 --- a/src/cli/src/commands/pool.rs +++ b/src/cli/src/commands/pool.rs @@ -105,12 +105,26 @@ pub struct PoolStopArgs { /// Arguments for `pool status`. #[derive(Parser)] pub struct PoolStatusArgs { + /// Unix socket of the `pool start` daemon + #[arg(long, default_value = DEFAULT_SOCKET)] + pub socket: String, + /// Output as JSON #[arg(long)] pub json: bool, } -/// Wire request/response for the `pool` Unix-socket protocol (length-prefixed JSON). +/// Wire protocol for the `pool` Unix socket (length-prefixed JSON). +/// +/// Client→daemon request: run a command, or query status. Tagged so the daemon +/// can dispatch; the client parses the response type matching what it sent. +#[derive(Serialize, Deserialize)] +#[serde(tag = "op", rename_all = "snake_case")] +enum Request { + Run(RunRequest), + Status, +} + #[derive(Serialize, Deserialize)] struct RunRequest { /// Image to run in; `None` means use the daemon's default image. @@ -127,6 +141,21 @@ struct RunResponse { error: Option, } +/// Live stats for one image's warm pool. +#[derive(Serialize, Deserialize)] +struct ImageStat { + image: String, + idle: usize, + total_created: u64, + total_acquired: u64, + total_evicted: u64, +} + +#[derive(Serialize, Deserialize)] +struct StatusResponse { + images: Vec, +} + /// Execute a pool command. pub async fn execute(args: PoolArgs) -> Result<(), Box> { match args.action { @@ -228,6 +257,24 @@ impl PoolRegistry { let _ = pool.drain_idle().await; } } + + /// Snapshot live per-image stats, sorted by image name. + async fn stats(&self) -> Vec { + let pools = self.pools.lock().await; + let mut out = Vec::with_capacity(pools.len()); + for (image, pool) in pools.iter() { + let s = pool.stats().await; + out.push(ImageStat { + image: image.clone(), + idle: s.idle_count, + total_created: s.total_created, + total_acquired: s.total_acquired, + total_evicted: s.total_evicted, + }); + } + out.sort_by(|a, b| a.image.cmp(&b.image)); + out + } } async fn execute_start(args: PoolStartArgs) -> Result<(), Box> { @@ -354,22 +401,35 @@ async fn handle_conn( // 60s exec cap — generous for a sandbox command. const EXEC_TIMEOUT_NS: u64 = 60_000_000_000; - let req: RunRequest = serde_json::from_slice(&read_frame(stream).await?) + let req: Request = serde_json::from_slice(&read_frame(stream).await?) .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?; + // `status` is a simple query — answer and return. + let run = match req { + Request::Status => { + let resp = StatusResponse { + images: registry.stats().await, + }; + let bytes = serde_json::to_vec(&resp) + .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?; + return write_frame(stream, &bytes).await; + } + Request::Run(run) => run, + }; + // Resolve the image, get-or-create its pool, acquire a warm VM, run the // command. Keep the VM so we tear it down AFTER responding (a one-shot sandbox // is discarded; the pool replenishes a fresh one) — the client's latency must // not include VM teardown. let mut used_vm = None; - let resp = match registry.resolve_image(req.image) { + let resp = match registry.resolve_image(run.image) { None => err_resp("no image: pass --image or start the daemon with --image"), Some(image) => match registry.get_or_create(&image).await { Err(e) => err_resp(format!("pool for {image}: {e}")), Ok(pool) => match pool.acquire().await { Err(e) => err_resp(format!("acquire failed: {e}")), Ok(vm) => { - let resp = match vm.exec_command(req.cmd, EXEC_TIMEOUT_NS).await { + let resp = match vm.exec_command(run.cmd, EXEC_TIMEOUT_NS).await { Ok(o) => RunResponse { stdout: o.stdout, stderr: o.stderr, @@ -413,10 +473,10 @@ async fn execute_run(args: PoolRunArgs) -> Result<(), Box write_frame( &mut stream, - &serde_json::to_vec(&RunRequest { + &serde_json::to_vec(&Request::Run(RunRequest { image: args.image, cmd: args.cmd, - })?, + }))?, ) .await?; let resp: RunResponse = serde_json::from_slice(&read_frame(&mut stream).await?)?; @@ -474,13 +534,44 @@ async fn execute_stop(_args: PoolStopArgs) -> Result<(), Box Result<(), Box> { - // Status lives in the running daemon; expose via Prometheus for now. - eprintln!("Pool status is shown by the running `a3s-box pool start` process."); - eprintln!("Use Prometheus metrics (a3s_box_warm_pool_*) for live observability."); +#[cfg(not(windows))] +async fn execute_status(args: PoolStatusArgs) -> Result<(), Box> { + use tokio::net::UnixStream; + + let mut stream = UnixStream::connect(&args.socket).await.map_err(|e| { + format!( + "Failed to connect to pool daemon at {} ({}). Is `a3s-box pool start` running?", + args.socket, e + ) + })?; + + write_frame(&mut stream, &serde_json::to_vec(&Request::Status)?).await?; + let resp: StatusResponse = serde_json::from_slice(&read_frame(&mut stream).await?)?; + + if args.json { + println!("{}", serde_json::to_string(&resp.images)?); + } else if resp.images.is_empty() { + println!("No warm pools yet (no images warmed)."); + } else { + println!( + "{:<40} {:>5} {:>8} {:>9} {:>8}", + "IMAGE", "IDLE", "CREATED", "ACQUIRED", "EVICTED" + ); + for s in &resp.images { + println!( + "{:<40} {:>5} {:>8} {:>9} {:>8}", + s.image, s.idle, s.total_created, s.total_acquired, s.total_evicted + ); + } + } Ok(()) } +#[cfg(windows)] +async fn execute_status(_args: PoolStatusArgs) -> Result<(), Box> { + Err("`pool status` is not supported on Windows".into()) +} + /// Format pool stats as a JSON string. fn format_stats_json(image: &str, stats: &PoolStats) -> String { let hit_rate = if stats.total_acquired > 0 { @@ -648,10 +739,47 @@ mod tests { assert!(result.is_ok()); } + #[cfg(not(windows))] #[tokio::test] - async fn test_execute_status_is_ok() { - let result = execute_status(PoolStatusArgs { json: false }).await; - assert!(result.is_ok()); + async fn test_execute_status_no_daemon_errors() { + // With no daemon listening, status fails with a connect hint (not a panic). + let result = execute_status(PoolStatusArgs { + socket: "/tmp/a3s-box-pool-does-not-exist.sock".to_string(), + json: false, + }) + .await; + assert!(result.is_err()); + assert!(result.unwrap_err().to_string().contains("pool daemon")); + } + + #[test] + fn test_request_envelope_tagging() { + // Run carries an op tag + the flattened RunRequest; Status is a bare tag. + let run = serde_json::to_string(&Request::Run(RunRequest { + image: Some("alpine".into()), + cmd: vec!["echo".into(), "hi".into()], + })) + .unwrap(); + assert!(run.contains(r#""op":"run""#)); + assert!(run.contains(r#""cmd":["echo","hi"]"#)); + + let status = serde_json::to_string(&Request::Status).unwrap(); + assert_eq!(status, r#"{"op":"status"}"#); + + // StatusResponse round-trips. + let sr = StatusResponse { + images: vec![ImageStat { + image: "alpine".into(), + idle: 2, + total_created: 5, + total_acquired: 3, + total_evicted: 1, + }], + }; + let parsed: StatusResponse = + serde_json::from_slice(&serde_json::to_vec(&sr).unwrap()).unwrap(); + assert_eq!(parsed.images[0].image, "alpine"); + assert_eq!(parsed.images[0].idle, 2); } #[cfg(not(windows))] diff --git a/src/cli/tests/host_smoke.rs b/src/cli/tests/host_smoke.rs index 3865623..b1dd913 100644 --- a/src/cli/tests/host_smoke.rs +++ b/src/cli/tests/host_smoke.rs @@ -475,9 +475,9 @@ fn test_real_compose_smoke() { /// Warm-pool daemon end-to-end: `pool start` pre-warms VMs, then `pool run` /// executes a command in a fresh warm sandbox via the guest exec server (no cold -/// boot). Also exercises CONCURRENT `pool run`s (served concurrently) and a -/// SECOND image pre-warmed at startup via `--warm`, run with `pool run --image`. -/// Host-backed (needs KVM + a runnable image). +/// boot). Also exercises CONCURRENT `pool run`s (served concurrently), a SECOND +/// image pre-warmed at startup via `--warm` (run with `pool run --image`), and +/// `pool status` over the socket. Host-backed (needs KVM + a runnable image). #[test] #[ignore] fn test_real_pool_warm_run() { @@ -589,6 +589,14 @@ fn test_real_pool_warm_run() { ); assert!(out2.contains("multiimg-ok"), "unexpected output: {out2:?}"); + // Status: the daemon reports its per-image pools over the socket. + let status = cli.ok(&["pool", "status", "--socket", socket.as_str()]); + assert!(status.contains("IDLE"), "status header missing:\n{status}"); + assert!( + status.contains(image.as_str()) && status.contains(second.as_str()), + "status should list both warmed images:\n{status}" + ); + let _ = daemon.kill(); let _ = daemon.wait(); } From 379fec067c42d0ea03daa4d023b514da8a05290b Mon Sep 17 00:00:00 2001 From: Roy Lin Date: Thu, 11 Jun 2026 15:34:24 +0800 Subject: [PATCH 7/7] =?UTF-8?q?feat(pool):=20backpressure=20=E2=80=94=20bo?= =?UTF-8?q?und=20concurrent=20in-flight=20sandboxes?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit WarmPool::acquire boots a VM on a pool miss with NO max_size cap (max is only enforced in release/replenish), so a burst of concurrent `pool run`s would boot unbounded VMs and exhaust the host. Add a per-image semaphore (permits = max_size) acquired before pool.acquire and released only after the VM is torn down: excess requests queue for a slot instead of exploding. - PoolEntry { pool, sem }; the registry hands out the entry, handle_conn holds the owned permit through the backgrounded destroy. - Unit test asserts peak concurrency never exceeds the permit count. --- src/cli/src/commands/pool.rs | 134 +++++++++++++++++++++++++---------- 1 file changed, 95 insertions(+), 39 deletions(-) diff --git a/src/cli/src/commands/pool.rs b/src/cli/src/commands/pool.rs index 0c22b0e..71b0a0a 100644 --- a/src/cli/src/commands/pool.rs +++ b/src/cli/src/commands/pool.rs @@ -194,10 +194,20 @@ fn parse_warm_spec(entry: &str, default_size: usize) -> Result<(String, usize), } } +/// One image's warm pool plus a semaphore bounding concurrent in-flight sandboxes. +/// `WarmPool::acquire` boots on a pool miss with no `max_size` cap, so without this +/// a burst of `pool run`s would boot unbounded VMs; the permit makes excess +/// requests queue instead. +#[derive(Clone)] +struct PoolEntry { + pool: std::sync::Arc, + sem: std::sync::Arc, +} + /// A registry of warm pools keyed by image, created lazily on first use, so one /// daemon can serve sandboxes of different images. struct PoolRegistry { - pools: tokio::sync::Mutex>>, + pools: tokio::sync::Mutex>, default_image: Option, size: usize, max: usize, @@ -205,22 +215,20 @@ struct PoolRegistry { } impl PoolRegistry { - /// The warm pool for `image`, lazily started (and pre-warmed in the background) + /// The pool entry for `image`, lazily started (and pre-warmed in the background) /// on first use, with `min_idle = size`. `WarmPool::start` returns once the - /// replenisher is spawned, so holding the map lock across it is brief. - async fn get_or_create_with_size( - &self, - image: &str, - size: usize, - ) -> Result, String> { + /// replenisher is spawned, so holding the map lock across it is brief. The + /// concurrency semaphore is sized to the pool's `max_size`. + async fn get_or_create_with_size(&self, image: &str, size: usize) -> Result { let mut pools = self.pools.lock().await; - if let Some(pool) = pools.get(image) { - return Ok(pool.clone()); + if let Some(entry) = pools.get(image) { + return Ok(entry.clone()); } + let max_size = self.max.max(size); let pool_config = PoolConfig { enabled: true, min_idle: size, - max_size: self.max.max(size), + max_size, idle_ttl_secs: self.ttl, ..Default::default() }; @@ -235,12 +243,16 @@ impl PoolRegistry { .await .map_err(|e| e.to_string())?, ); - pools.insert(image.to_string(), pool.clone()); - Ok(pool) + let entry = PoolEntry { + pool, + sem: std::sync::Arc::new(tokio::sync::Semaphore::new(max_size)), + }; + pools.insert(image.to_string(), entry.clone()); + Ok(entry) } /// Lazy pool for `image` at the daemon's default size. - async fn get_or_create(&self, image: &str) -> Result, String> { + async fn get_or_create(&self, image: &str) -> Result { self.get_or_create_with_size(image, self.size).await } @@ -252,9 +264,9 @@ impl PoolRegistry { /// Stop replenishment and destroy idle VMs across all pools (shutdown). async fn drain_all(&self) { let pools = self.pools.lock().await; - for pool in pools.values() { - pool.signal_shutdown(); - let _ = pool.drain_idle().await; + for entry in pools.values() { + entry.pool.signal_shutdown(); + let _ = entry.pool.drain_idle().await; } } @@ -262,8 +274,8 @@ impl PoolRegistry { async fn stats(&self) -> Vec { let pools = self.pools.lock().await; let mut out = Vec::with_capacity(pools.len()); - for (image, pool) in pools.iter() { - let s = pool.stats().await; + for (image, entry) in pools.iter() { + let s = entry.pool.stats().await; out.push(ImageStat { image: image.clone(), idle: s.idle_count, @@ -295,8 +307,8 @@ async fn execute_start(args: PoolStartArgs) -> Result<(), Box err_resp("no image: pass --image or start the daemon with --image"), Some(image) => match registry.get_or_create(&image).await { Err(e) => err_resp(format!("pool for {image}: {e}")), - Ok(pool) => match pool.acquire().await { - Err(e) => err_resp(format!("acquire failed: {e}")), - Ok(vm) => { - let resp = match vm.exec_command(run.cmd, EXEC_TIMEOUT_NS).await { - Ok(o) => RunResponse { - stdout: o.stdout, - stderr: o.stderr, - exit_code: o.exit_code, - error: None, - }, - Err(e) => err_resp(e.to_string()), - }; - used_vm = Some(vm); - resp + Ok(entry) => { + // Backpressure: wait for a slot so a burst doesn't boot unbounded VMs. + let permit = entry + .sem + .clone() + .acquire_owned() + .await + .expect("pool semaphore is never closed"); + match entry.pool.acquire().await { + Err(e) => err_resp(format!("acquire failed: {e}")), + Ok(vm) => { + let resp = match vm.exec_command(run.cmd, EXEC_TIMEOUT_NS).await { + Ok(o) => RunResponse { + stdout: o.stdout, + stderr: o.stderr, + exit_code: o.exit_code, + error: None, + }, + Err(e) => err_resp(e.to_string()), + }; + used = Some((vm, permit)); + resp + } } - }, + } }, }; @@ -450,10 +473,11 @@ async fn handle_conn( write_frame(stream, &bytes).await?; // Tear down the used sandbox in the background so neither the client nor the - // daemon's accept loop blocks on it. - if let Some(mut vm) = used_vm { + // daemon's accept loop blocks on it; release the concurrency permit afterwards. + if let Some((mut vm, permit)) = used { tokio::spawn(async move { let _ = vm.destroy().await; + drop(permit); }); } Ok(()) @@ -670,6 +694,38 @@ mod tests { assert!(parse_warm_spec("=4", 2).is_err()); } + #[tokio::test] + async fn test_backpressure_bounds_concurrency() { + // The contract PoolEntry relies on: a permit (held until teardown) caps + // concurrent in-flight sandboxes to the semaphore size, so a burst queues + // instead of all running at once. + use std::sync::atomic::{AtomicUsize, Ordering}; + use std::sync::Arc; + + let sem = Arc::new(tokio::sync::Semaphore::new(2)); + let live = Arc::new(AtomicUsize::new(0)); + let peak = Arc::new(AtomicUsize::new(0)); + + let mut handles = Vec::new(); + for _ in 0..6 { + let (sem, live, peak) = (sem.clone(), live.clone(), peak.clone()); + handles.push(tokio::spawn(async move { + let _permit = sem.acquire_owned().await.unwrap(); + let now = live.fetch_add(1, Ordering::SeqCst) + 1; + peak.fetch_max(now, Ordering::SeqCst); + tokio::time::sleep(std::time::Duration::from_millis(20)).await; + live.fetch_sub(1, Ordering::SeqCst); + })); + } + for h in handles { + h.await.unwrap(); + } + assert!( + peak.load(Ordering::SeqCst) <= 2, + "concurrency exceeded the permit limit" + ); + } + #[test] fn test_run_request_response_roundtrip() { let req = RunRequest {