diff --git a/src/cli/src/commands/pool.rs b/src/cli/src/commands/pool.rs index 520b4b7..71b0a0a 100644 --- a/src/cli/src/commands/pool.rs +++ b/src/cli/src/commands/pool.rs @@ -1,19 +1,28 @@ -//! `a3s-box pool` — Warm VM pool management. +//! `a3s-box pool` — Warm VM pool daemon + client. //! -//! Pre-boots MicroVMs so that `run` can acquire an already-ready VM -//! instead of waiting for the full boot sequence (~200ms → ~0ms). +//! Pre-boots keepalive MicroVMs of one image so a command can run in an +//! already-ready sandbox instead of paying a full cold boot. `pool start` is the +//! daemon (pre-warms a pool and serves requests over a Unix socket); `pool run` +//! is the client (runs a command in a fresh warm sandbox via the guest exec +//! server, no cold boot). This is the low-risk keepalive+exec MVP from +//! docs/cow-snapshot-fork-design.md — it removes cold boot from the hot path +//! without touching guest-init's lifecycle. //! //! Subcommands: -//! pool start --size N --image IMAGE Start the warm pool -//! pool stop Drain and stop the pool -//! pool status Show idle count, hit rate, stats +//! pool start --image IMAGE --size N [--socket P] Daemon: pre-warm + serve +//! pool run [--socket P] -- CMD... Client: run CMD in a sandbox +//! pool stop / pool status Discoverability helpers use clap::{Parser, Subcommand}; +use serde::{Deserialize, Serialize}; use a3s_box_core::config::{BoxConfig, PoolConfig}; use a3s_box_core::event::EventEmitter; use a3s_box_runtime::pool::{PoolStats, WarmPool}; +/// Default Unix socket the `pool` daemon listens on. +const DEFAULT_SOCKET: &str = "/tmp/a3s-box-pool.sock"; + /// Manage the warm VM pool. #[derive(Parser)] pub struct PoolArgs { @@ -24,8 +33,10 @@ pub struct PoolArgs { /// Pool subcommands. #[derive(Subcommand)] pub enum PoolAction { - /// Start the warm pool (pre-boot VMs in the background) + /// Start the warm pool daemon (pre-boot VMs + serve `pool run` over a socket) Start(PoolStartArgs), + /// Run a command in a fresh warm sandbox (client of `pool start`) + Run(PoolRunArgs), /// Drain and stop the warm pool Stop(PoolStopArgs), /// Show warm pool statistics @@ -35,9 +46,10 @@ pub enum PoolAction { /// Arguments for `pool start`. #[derive(Parser)] pub struct PoolStartArgs { - /// OCI image to pre-boot (e.g. alpine:latest) + /// Image to pre-warm (optional). Sandboxes default to this image; `pool run` + /// may request any other image, which the daemon warms on first use. #[arg(long)] - pub image: String, + pub image: Option, /// Number of VMs to keep pre-booted (min_idle) #[arg(long, default_value = "2")] @@ -51,11 +63,37 @@ pub struct PoolStartArgs { #[arg(long, default_value = "300")] pub ttl: u64, + /// Unix socket to serve `pool run` requests on + #[arg(long, default_value = DEFAULT_SOCKET)] + pub socket: String, + + /// Extra images to pre-warm at startup, `image[=count]` (count defaults to + /// --size). Repeat or comma-separate: `--warm python:3=4,node:20`. + #[arg(long, value_delimiter = ',')] + pub warm: Vec, + /// Output as JSON #[arg(long)] pub json: bool, } +/// Arguments for `pool run`. +#[derive(Parser)] +pub struct PoolRunArgs { + /// Unix socket of the `pool start` daemon + #[arg(long, default_value = DEFAULT_SOCKET)] + pub socket: String, + + /// Image to run in (defaults to the daemon's --image). The daemon warms a + /// pool for this image on first use. + #[arg(long)] + pub image: Option, + + /// Command and arguments to run in a fresh warm sandbox + #[arg(last = true, required = true)] + pub cmd: Vec, +} + /// Arguments for `pool stop`. #[derive(Parser)] pub struct PoolStopArgs { @@ -67,20 +105,190 @@ pub struct PoolStopArgs { /// Arguments for `pool status`. #[derive(Parser)] pub struct PoolStatusArgs { + /// Unix socket of the `pool start` daemon + #[arg(long, default_value = DEFAULT_SOCKET)] + pub socket: String, + /// Output as JSON #[arg(long)] pub json: bool, } +/// Wire protocol for the `pool` Unix socket (length-prefixed JSON). +/// +/// Client→daemon request: run a command, or query status. Tagged so the daemon +/// can dispatch; the client parses the response type matching what it sent. +#[derive(Serialize, Deserialize)] +#[serde(tag = "op", rename_all = "snake_case")] +enum Request { + Run(RunRequest), + Status, +} + +#[derive(Serialize, Deserialize)] +struct RunRequest { + /// Image to run in; `None` means use the daemon's default image. + #[serde(default)] + image: Option, + cmd: Vec, +} + +#[derive(Serialize, Deserialize)] +struct RunResponse { + stdout: Vec, + stderr: Vec, + exit_code: i32, + error: Option, +} + +/// Live stats for one image's warm pool. +#[derive(Serialize, Deserialize)] +struct ImageStat { + image: String, + idle: usize, + total_created: u64, + total_acquired: u64, + total_evicted: u64, +} + +#[derive(Serialize, Deserialize)] +struct StatusResponse { + images: Vec, +} + /// Execute a pool command. pub async fn execute(args: PoolArgs) -> Result<(), Box> { match args.action { PoolAction::Start(a) => execute_start(a).await, + PoolAction::Run(a) => execute_run(a).await, PoolAction::Stop(a) => execute_stop(a).await, PoolAction::Status(a) => execute_status(a).await, } } +/// Keepalive main process so a pooled VM stays up with its exec server available; +/// the real `pool run` command runs via exec, not as this main. +fn keepalive_cmd() -> Vec { + vec![ + "/bin/sh".to_string(), + "-c".to_string(), + "trap 'exit 0' TERM INT; while :; do sleep 3600; done".to_string(), + ] +} + +/// Parse a `--warm` entry of the form `image[=count]` (count defaults to `default_size`). +fn parse_warm_spec(entry: &str, default_size: usize) -> Result<(String, usize), String> { + match entry.split_once('=') { + Some((image, count)) => { + let image = image.trim(); + if image.is_empty() { + return Err(format!("missing image in '{entry}'")); + } + let count: usize = count + .trim() + .parse() + .map_err(|_| format!("invalid warm count in '{entry}'"))?; + Ok((image.to_string(), count)) + } + None => Ok((entry.trim().to_string(), default_size)), + } +} + +/// One image's warm pool plus a semaphore bounding concurrent in-flight sandboxes. +/// `WarmPool::acquire` boots on a pool miss with no `max_size` cap, so without this +/// a burst of `pool run`s would boot unbounded VMs; the permit makes excess +/// requests queue instead. +#[derive(Clone)] +struct PoolEntry { + pool: std::sync::Arc, + sem: std::sync::Arc, +} + +/// A registry of warm pools keyed by image, created lazily on first use, so one +/// daemon can serve sandboxes of different images. +struct PoolRegistry { + pools: tokio::sync::Mutex>, + default_image: Option, + size: usize, + max: usize, + ttl: u64, +} + +impl PoolRegistry { + /// The pool entry for `image`, lazily started (and pre-warmed in the background) + /// on first use, with `min_idle = size`. `WarmPool::start` returns once the + /// replenisher is spawned, so holding the map lock across it is brief. The + /// concurrency semaphore is sized to the pool's `max_size`. + async fn get_or_create_with_size(&self, image: &str, size: usize) -> Result { + let mut pools = self.pools.lock().await; + if let Some(entry) = pools.get(image) { + return Ok(entry.clone()); + } + let max_size = self.max.max(size); + let pool_config = PoolConfig { + enabled: true, + min_idle: size, + max_size, + idle_ttl_secs: self.ttl, + ..Default::default() + }; + let box_config = BoxConfig { + image: image.to_string(), + cmd: keepalive_cmd(), + pool: pool_config.clone(), + ..Default::default() + }; + let pool = std::sync::Arc::new( + WarmPool::start(pool_config, box_config, EventEmitter::new(256)) + .await + .map_err(|e| e.to_string())?, + ); + let entry = PoolEntry { + pool, + sem: std::sync::Arc::new(tokio::sync::Semaphore::new(max_size)), + }; + pools.insert(image.to_string(), entry.clone()); + Ok(entry) + } + + /// Lazy pool for `image` at the daemon's default size. + async fn get_or_create(&self, image: &str) -> Result { + self.get_or_create_with_size(image, self.size).await + } + + /// Resolve the image for a request: the requested one, else the daemon default. + fn resolve_image(&self, requested: Option) -> Option { + requested.or_else(|| self.default_image.clone()) + } + + /// Stop replenishment and destroy idle VMs across all pools (shutdown). + async fn drain_all(&self) { + let pools = self.pools.lock().await; + for entry in pools.values() { + entry.pool.signal_shutdown(); + let _ = entry.pool.drain_idle().await; + } + } + + /// Snapshot live per-image stats, sorted by image name. + async fn stats(&self) -> Vec { + let pools = self.pools.lock().await; + let mut out = Vec::with_capacity(pools.len()); + for (image, entry) in pools.iter() { + let s = entry.pool.stats().await; + out.push(ImageStat { + image: image.clone(), + idle: s.idle_count, + total_created: s.total_created, + total_acquired: s.total_acquired, + total_evicted: s.total_evicted, + }); + } + out.sort_by(|a, b| a.image.cmp(&b.image)); + out + } +} + async fn execute_start(args: PoolStartArgs) -> Result<(), Box> { if args.size == 0 { return Err("--size must be greater than 0".into()); @@ -89,66 +297,305 @@ async fn execute_start(args: PoolStartArgs) -> Result<(), Box = Vec::new(); + for entry in &args.warm { + let (image, count) = parse_warm_spec(entry, args.size)?; + if count == 0 { + return Err(format!("--warm count must be > 0 (in '{entry}')").into()); + } + registry.get_or_create_with_size(&image, count).await?; + warmed_extra.push((image, count)); + } if args.json { - println!("{}", format_stats_json(&args.image, &stats)); + match &default_stats { + Some((image, stats)) => println!("{}", format_stats_json(image, stats)), + None => println!( + r#"{{"default_image":null,"max":{},"socket":"{}"}}"#, + args.max, args.socket + ), + } } else { println!("Warm pool started"); - println!(" image: {}", args.image); - println!(" min_idle: {}", args.size); + match &args.image { + Some(i) => println!(" default image: {i} (pre-warming {})", args.size), + None => println!(" default image: (none — `pool run` must pass --image)"), + } + for (image, count) in &warmed_extra { + println!(" pre-warmed: {image} (size {count})"); + } println!(" max: {}", args.max); println!(" ttl: {}s", args.ttl); - println!(" idle: {}", stats.idle_count); + println!(" socket: {}", args.socket); } - // Keep pool alive until signal - tokio::signal::ctrl_c().await?; + serve(registry, &args.socket, args.json).await?; - if !args.json { - println!("\nDraining warm pool..."); - } - pool.drain().await?; if !args.json { println!("Done."); } + Ok(()) +} + +/// Accept `pool run` connections until Ctrl-C, serving each request concurrently +/// so independent sandboxes don't queue behind one another. On shutdown, stop the +/// replenisher and destroy idle VMs (in-flight requests keep their own acquired VM). +#[cfg(not(windows))] +async fn serve( + registry: std::sync::Arc, + socket: &str, + json: bool, +) -> Result<(), Box> { + use tokio::net::UnixListener; + + let _ = std::fs::remove_file(socket); + let listener = UnixListener::bind(socket)?; + if !json { + println!("Listening on {} (Ctrl-C to drain and stop)", socket); + } + loop { + tokio::select! { + accepted = listener.accept() => { + let (mut stream, _) = accepted?; + let registry = registry.clone(); + tokio::spawn(async move { + if let Err(e) = handle_conn(®istry, &mut stream).await { + tracing::warn!(error = %e, "pool connection failed"); + } + }); + } + _ = tokio::signal::ctrl_c() => { + let _ = std::fs::remove_file(socket); + if !json { + println!("Draining warm pools..."); + } + registry.drain_all().await; + break; + } + } + } Ok(()) } +#[cfg(not(windows))] +fn err_resp(msg: impl Into) -> RunResponse { + RunResponse { + stdout: vec![], + stderr: vec![], + exit_code: -1, + error: Some(msg.into()), + } +} + +#[cfg(not(windows))] +async fn handle_conn( + registry: &PoolRegistry, + stream: &mut tokio::net::UnixStream, +) -> std::io::Result<()> { + // 60s exec cap — generous for a sandbox command. + const EXEC_TIMEOUT_NS: u64 = 60_000_000_000; + + let req: Request = serde_json::from_slice(&read_frame(stream).await?) + .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?; + + // `status` is a simple query — answer and return. + let run = match req { + Request::Status => { + let resp = StatusResponse { + images: registry.stats().await, + }; + let bytes = serde_json::to_vec(&resp) + .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?; + return write_frame(stream, &bytes).await; + } + Request::Run(run) => run, + }; + + // Resolve the image, get-or-create its pool, acquire a warm VM, run the + // command. Keep the VM so we tear it down AFTER responding (a one-shot sandbox + // is discarded; the pool replenishes a fresh one) — the client's latency must + // not include VM teardown. + // Holds (vm, permit) until after the response: the permit bounds concurrent + // in-flight sandboxes and is released only once the VM is torn down. + let mut used = None; + let resp = match registry.resolve_image(run.image) { + None => err_resp("no image: pass --image or start the daemon with --image"), + Some(image) => match registry.get_or_create(&image).await { + Err(e) => err_resp(format!("pool for {image}: {e}")), + Ok(entry) => { + // Backpressure: wait for a slot so a burst doesn't boot unbounded VMs. + let permit = entry + .sem + .clone() + .acquire_owned() + .await + .expect("pool semaphore is never closed"); + match entry.pool.acquire().await { + Err(e) => err_resp(format!("acquire failed: {e}")), + Ok(vm) => { + let resp = match vm.exec_command(run.cmd, EXEC_TIMEOUT_NS).await { + Ok(o) => RunResponse { + stdout: o.stdout, + stderr: o.stderr, + exit_code: o.exit_code, + error: None, + }, + Err(e) => err_resp(e.to_string()), + }; + used = Some((vm, permit)); + resp + } + } + } + }, + }; + + let bytes = serde_json::to_vec(&resp) + .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?; + write_frame(stream, &bytes).await?; + + // Tear down the used sandbox in the background so neither the client nor the + // daemon's accept loop blocks on it; release the concurrency permit afterwards. + if let Some((mut vm, permit)) = used { + tokio::spawn(async move { + let _ = vm.destroy().await; + drop(permit); + }); + } + Ok(()) +} + +#[cfg(not(windows))] +async fn execute_run(args: PoolRunArgs) -> Result<(), Box> { + use std::io::Write; + use tokio::net::UnixStream; + + let mut stream = UnixStream::connect(&args.socket).await.map_err(|e| { + format!( + "Failed to connect to pool daemon at {} ({}). Is `a3s-box pool start` running?", + args.socket, e + ) + })?; + + write_frame( + &mut stream, + &serde_json::to_vec(&Request::Run(RunRequest { + image: args.image, + cmd: args.cmd, + }))?, + ) + .await?; + let resp: RunResponse = serde_json::from_slice(&read_frame(&mut stream).await?)?; + + if let Some(err) = resp.error { + eprintln!("pool error: {err}"); + std::process::exit(1); + } + std::io::stdout().write_all(&resp.stdout)?; + std::io::stderr().write_all(&resp.stderr)?; + std::process::exit(resp.exit_code); +} + +#[cfg(windows)] +async fn serve( + _registry: std::sync::Arc, + _socket: &str, + _json: bool, +) -> Result<(), Box> { + eprintln!( + "pool socket serving is not supported on Windows; pool stays pre-warmed until Ctrl-C." + ); + tokio::signal::ctrl_c().await?; + Ok(()) +} + +#[cfg(windows)] +async fn execute_run(_args: PoolRunArgs) -> Result<(), Box> { + Err("`pool run` is not supported on Windows".into()) +} + +/// Length-prefixed (u32 LE) framing for the Unix-socket protocol. +#[cfg(not(windows))] +async fn write_frame( + w: &mut W, + data: &[u8], +) -> std::io::Result<()> { + w.write_all(&(data.len() as u32).to_le_bytes()).await?; + w.write_all(data).await?; + w.flush().await +} + +#[cfg(not(windows))] +async fn read_frame(r: &mut R) -> std::io::Result> { + let mut len = [0u8; 4]; + r.read_exact(&mut len).await?; + let mut buf = vec![0u8; u32::from_le_bytes(len) as usize]; + r.read_exact(&mut buf).await?; + Ok(buf) +} + async fn execute_stop(_args: PoolStopArgs) -> Result<(), Box> { // Pool stop is handled by sending SIGINT to the `pool start` process. - // This subcommand exists for discoverability and future daemon support. eprintln!("Send SIGINT (Ctrl-C) to the running `a3s-box pool start` process to drain and stop the pool."); Ok(()) } -async fn execute_status(_args: PoolStatusArgs) -> Result<(), Box> { - // Pool status requires a running pool instance. In the current in-process - // model, status is printed by the `pool start` process itself. - // This subcommand is a placeholder for future daemon/IPC support. - eprintln!("Pool status is shown by the running `a3s-box pool start` process."); - eprintln!("Use Prometheus metrics (a3s_box_warm_pool_*) for live observability."); +#[cfg(not(windows))] +async fn execute_status(args: PoolStatusArgs) -> Result<(), Box> { + use tokio::net::UnixStream; + + let mut stream = UnixStream::connect(&args.socket).await.map_err(|e| { + format!( + "Failed to connect to pool daemon at {} ({}). Is `a3s-box pool start` running?", + args.socket, e + ) + })?; + + write_frame(&mut stream, &serde_json::to_vec(&Request::Status)?).await?; + let resp: StatusResponse = serde_json::from_slice(&read_frame(&mut stream).await?)?; + + if args.json { + println!("{}", serde_json::to_string(&resp.images)?); + } else if resp.images.is_empty() { + println!("No warm pools yet (no images warmed)."); + } else { + println!( + "{:<40} {:>5} {:>8} {:>9} {:>8}", + "IMAGE", "IDLE", "CREATED", "ACQUIRED", "EVICTED" + ); + for s in &resp.images { + println!( + "{:<40} {:>5} {:>8} {:>9} {:>8}", + s.image, s.idle, s.total_created, s.total_acquired, s.total_evicted + ); + } + } Ok(()) } +#[cfg(windows)] +async fn execute_status(_args: PoolStatusArgs) -> Result<(), Box> { + Err("`pool status` is not supported on Windows".into()) +} + /// Format pool stats as a JSON string. fn format_stats_json(image: &str, stats: &PoolStats) -> String { let hit_rate = if stats.total_acquired > 0 { @@ -214,18 +661,108 @@ mod tests { fn test_format_stats_json_is_valid_structure() { let stats = sample_stats(); let json = format_stats_json("alpine:latest", &stats); - // Must start and end with braces assert!(json.starts_with('{')); assert!(json.ends_with('}')); } + #[test] + fn test_keepalive_cmd_is_a_sleep_loop() { + let c = keepalive_cmd(); + assert_eq!(c[0], "/bin/sh"); + assert!(c.last().unwrap().contains("sleep")); + } + + #[test] + fn test_parse_warm_spec() { + // image=count + assert_eq!( + parse_warm_spec("python:3=4", 2).unwrap(), + ("python:3".to_string(), 4) + ); + // bare image → default size + assert_eq!( + parse_warm_spec("node:20", 7).unwrap(), + ("node:20".to_string(), 7) + ); + // whitespace tolerated + assert_eq!( + parse_warm_spec(" alpine = 3 ", 2).unwrap(), + ("alpine".to_string(), 3) + ); + // bad count / empty image error out + assert!(parse_warm_spec("alpine=notanum", 2).is_err()); + assert!(parse_warm_spec("=4", 2).is_err()); + } + + #[tokio::test] + async fn test_backpressure_bounds_concurrency() { + // The contract PoolEntry relies on: a permit (held until teardown) caps + // concurrent in-flight sandboxes to the semaphore size, so a burst queues + // instead of all running at once. + use std::sync::atomic::{AtomicUsize, Ordering}; + use std::sync::Arc; + + let sem = Arc::new(tokio::sync::Semaphore::new(2)); + let live = Arc::new(AtomicUsize::new(0)); + let peak = Arc::new(AtomicUsize::new(0)); + + let mut handles = Vec::new(); + for _ in 0..6 { + let (sem, live, peak) = (sem.clone(), live.clone(), peak.clone()); + handles.push(tokio::spawn(async move { + let _permit = sem.acquire_owned().await.unwrap(); + let now = live.fetch_add(1, Ordering::SeqCst) + 1; + peak.fetch_max(now, Ordering::SeqCst); + tokio::time::sleep(std::time::Duration::from_millis(20)).await; + live.fetch_sub(1, Ordering::SeqCst); + })); + } + for h in handles { + h.await.unwrap(); + } + assert!( + peak.load(Ordering::SeqCst) <= 2, + "concurrency exceeded the permit limit" + ); + } + + #[test] + fn test_run_request_response_roundtrip() { + let req = RunRequest { + image: Some("alpine:latest".into()), + cmd: vec!["echo".into(), "hi".into()], + }; + let bytes = serde_json::to_vec(&req).unwrap(); + let parsed: RunRequest = serde_json::from_slice(&bytes).unwrap(); + assert_eq!(parsed.cmd, vec!["echo", "hi"]); + assert_eq!(parsed.image.as_deref(), Some("alpine:latest")); + + // image is optional on the wire (older clients / default-image daemons). + let no_img: RunRequest = serde_json::from_slice(br#"{"cmd":["ls"]}"#).unwrap(); + assert!(no_img.image.is_none()); + + let resp = RunResponse { + stdout: b"hi\n".to_vec(), + stderr: vec![], + exit_code: 0, + error: None, + }; + let rb = serde_json::to_vec(&resp).unwrap(); + let rp: RunResponse = serde_json::from_slice(&rb).unwrap(); + assert_eq!(rp.stdout, b"hi\n"); + assert_eq!(rp.exit_code, 0); + assert!(rp.error.is_none()); + } + #[tokio::test] async fn test_execute_start_size_zero_fails() { let args = PoolStartArgs { - image: "alpine:latest".to_string(), + image: Some("alpine:latest".to_string()), size: 0, max: 5, ttl: 300, + socket: DEFAULT_SOCKET.to_string(), + warm: vec![], json: false, }; let result = execute_start(args).await; @@ -236,10 +773,12 @@ mod tests { #[tokio::test] async fn test_execute_start_size_exceeds_max_fails() { let args = PoolStartArgs { - image: "alpine:latest".to_string(), + image: Some("alpine:latest".to_string()), size: 10, max: 5, ttl: 300, + socket: DEFAULT_SOCKET.to_string(), + warm: vec![], json: false, }; let result = execute_start(args).await; @@ -252,31 +791,121 @@ mod tests { #[tokio::test] async fn test_execute_stop_is_ok() { - let args = PoolStopArgs { json: false }; - // stop is a no-op (prints message), should not error - let result = execute_stop(args).await; + let result = execute_stop(PoolStopArgs { json: false }).await; assert!(result.is_ok()); } + #[cfg(not(windows))] #[tokio::test] - async fn test_execute_status_is_ok() { - let args = PoolStatusArgs { json: false }; - let result = execute_status(args).await; - assert!(result.is_ok()); + async fn test_execute_status_no_daemon_errors() { + // With no daemon listening, status fails with a connect hint (not a panic). + let result = execute_status(PoolStatusArgs { + socket: "/tmp/a3s-box-pool-does-not-exist.sock".to_string(), + json: false, + }) + .await; + assert!(result.is_err()); + assert!(result.unwrap_err().to_string().contains("pool daemon")); } #[test] - fn test_pool_start_args_defaults() { - // Verify default values match expected warm pool behavior - let args = PoolStartArgs { - image: "alpine:latest".to_string(), - size: 2, - max: 8, - ttl: 300, - json: false, + fn test_request_envelope_tagging() { + // Run carries an op tag + the flattened RunRequest; Status is a bare tag. + let run = serde_json::to_string(&Request::Run(RunRequest { + image: Some("alpine".into()), + cmd: vec!["echo".into(), "hi".into()], + })) + .unwrap(); + assert!(run.contains(r#""op":"run""#)); + assert!(run.contains(r#""cmd":["echo","hi"]"#)); + + let status = serde_json::to_string(&Request::Status).unwrap(); + assert_eq!(status, r#"{"op":"status"}"#); + + // StatusResponse round-trips. + let sr = StatusResponse { + images: vec![ImageStat { + image: "alpine".into(), + idle: 2, + total_created: 5, + total_acquired: 3, + total_evicted: 1, + }], + }; + let parsed: StatusResponse = + serde_json::from_slice(&serde_json::to_vec(&sr).unwrap()).unwrap(); + assert_eq!(parsed.images[0].image, "alpine"); + assert_eq!(parsed.images[0].idle, 2); + } + + #[cfg(not(windows))] + #[tokio::test] + async fn test_frame_roundtrip() { + // write_frame then read_frame must return the exact bytes. + let (mut a, mut b) = tokio::io::duplex(4096); + let payload = serde_json::to_vec(&RunRequest { + image: None, + cmd: vec!["echo".into(), "hi there".into()], + }) + .unwrap(); + write_frame(&mut a, &payload).await.unwrap(); + let got = read_frame(&mut b).await.unwrap(); + let parsed: RunRequest = serde_json::from_slice(&got).unwrap(); + assert_eq!(parsed.cmd, vec!["echo", "hi there"]); + } + + #[cfg(not(windows))] + #[tokio::test] + async fn test_socket_request_response_protocol() { + // Exercise the full client/server wire protocol over a real Unix socket + // (the exact framing `serve` and `pool run` use), with a stub server + // standing in for the VM pool's acquire+exec. + use tokio::net::{UnixListener, UnixStream}; + + let dir = tempfile::tempdir().unwrap(); + let sock = dir.path().join("pool.sock"); + let listener = UnixListener::bind(&sock).unwrap(); + + let server = tokio::spawn(async move { + let (mut s, _) = listener.accept().await.unwrap(); + let req: RunRequest = + serde_json::from_slice(&read_frame(&mut s).await.unwrap()).unwrap(); + let resp = RunResponse { + stdout: format!("ran {:?}", req.cmd).into_bytes(), + stderr: vec![], + exit_code: 0, + error: None, + }; + write_frame(&mut s, &serde_json::to_vec(&resp).unwrap()) + .await + .unwrap(); + }); + + let mut client = UnixStream::connect(&sock).await.unwrap(); + let req = RunRequest { + image: Some("alpine:latest".into()), + cmd: vec!["ls".into(), "-la".into()], }; - assert_eq!(args.size, 2); - assert_eq!(args.max, 8); - assert_eq!(args.ttl, 300); + write_frame(&mut client, &serde_json::to_vec(&req).unwrap()) + .await + .unwrap(); + let resp: RunResponse = + serde_json::from_slice(&read_frame(&mut client).await.unwrap()).unwrap(); + + assert_eq!(resp.exit_code, 0); + assert!(resp.error.is_none()); + assert!(String::from_utf8_lossy(&resp.stdout).contains("ls")); + server.await.unwrap(); + } + + #[cfg(not(windows))] + #[tokio::test] + async fn test_read_frame_truncated_errors() { + // A truncated stream must error, not hang or panic. + use tokio::io::AsyncWriteExt; + let (mut a, mut b) = tokio::io::duplex(64); + a.write_all(&[1u8, 0]).await.unwrap(); // partial 4-byte length prefix + drop(a); + assert!(read_frame(&mut b).await.is_err()); } } diff --git a/src/cli/tests/host_smoke.rs b/src/cli/tests/host_smoke.rs index 5f13957..b1dd913 100644 --- a/src/cli/tests/host_smoke.rs +++ b/src/cli/tests/host_smoke.rs @@ -472,3 +472,131 @@ fn test_real_compose_smoke() { let ps = cli.ok(&["ps", "-a"]); assert!(!ps.contains(service_box)); } + +/// Warm-pool daemon end-to-end: `pool start` pre-warms VMs, then `pool run` +/// executes a command in a fresh warm sandbox via the guest exec server (no cold +/// boot). Also exercises CONCURRENT `pool run`s (served concurrently), a SECOND +/// image pre-warmed at startup via `--warm` (run with `pool run --image`), and +/// `pool status` over the socket. Host-backed (needs KVM + a runnable image). +#[test] +#[ignore] +fn test_real_pool_warm_run() { + let cli = CliTest::new(); + let image = host_smoke_image(); + seed_runnable_alpine_image(&cli, &image); + // A second image (retag of the first) the daemon pre-warms at startup via --warm. + let second = format!("coverage-pool-second:{}", unique_tag("img2")); + cli.ok(&["tag", &image, &second]); + let warm_spec = format!("{second}=2"); + let socket = cli + .home_path() + .join("pool.sock") + .to_str() + .expect("utf8 socket path") + .to_string(); + + // Daemon: pre-warm the default pool + a second image via --warm; listen on the socket. + let mut daemon = cli.spawn_background(&[ + "pool", + "start", + "--image", + image.as_str(), + "--size", + "3", + "--max", + "6", + "--warm", + warm_spec.as_str(), + "--socket", + socket.as_str(), + ]); + + // The daemon binds the socket once WarmPool::start returns; wait for it. + let sock_path = cli.home_path().join("pool.sock"); + let start = std::time::Instant::now(); + while !sock_path.exists() { + if start.elapsed() > Duration::from_secs(120) { + let _ = daemon.kill(); + panic!("pool daemon never created its socket"); + } + if let Ok(Some(status)) = daemon.try_wait() { + panic!("pool daemon exited early: {status}"); + } + std::thread::sleep(Duration::from_millis(200)); + } + // Give the pool a moment to have at least one idle VM ready. + std::thread::sleep(Duration::from_secs(5)); + + // Single warm run. + let (out, err, ok) = cli.output(&[ + "pool", + "run", + "--socket", + socket.as_str(), + "--", + "echo", + "pool-e2e-ok", + ]); + assert!(ok, "pool run failed.\nstdout:\n{out}\nstderr:\n{err}"); + assert!(out.contains("pool-e2e-ok"), "unexpected output: {out:?}"); + + // Concurrent runs — the daemon serves them concurrently. + std::thread::scope(|s| { + let handles: Vec<_> = (0..3) + .map(|i| { + let cli = &cli; + let socket = socket.as_str(); + s.spawn(move || { + let tag = format!("conc-{i}"); + let (out, _e, ok) = cli.output(&[ + "pool", + "run", + "--socket", + socket, + "--", + "echo", + tag.as_str(), + ]); + (ok, out, tag) + }) + }) + .collect(); + for h in handles { + let (ok, out, tag) = h.join().expect("concurrent run thread panicked"); + assert!(ok, "concurrent pool run failed: {out}"); + assert!( + out.contains(&tag), + "concurrent run output {out:?} missing {tag}" + ); + } + }); + + // Multi-image: run in the second image, pre-warmed at startup via --warm. + let (out2, err2, ok2) = cli.output(&[ + "pool", + "run", + "--socket", + socket.as_str(), + "--image", + second.as_str(), + "--", + "echo", + "multiimg-ok", + ]); + assert!( + ok2, + "multi-image pool run failed.\nstdout:\n{out2}\nstderr:\n{err2}" + ); + assert!(out2.contains("multiimg-ok"), "unexpected output: {out2:?}"); + + // Status: the daemon reports its per-image pools over the socket. + let status = cli.ok(&["pool", "status", "--socket", socket.as_str()]); + assert!(status.contains("IDLE"), "status header missing:\n{status}"); + assert!( + status.contains(image.as_str()) && status.contains(second.as_str()), + "status should list both warmed images:\n{status}" + ); + + let _ = daemon.kill(); + let _ = daemon.wait(); +} diff --git a/src/cli/tests/support/mod.rs b/src/cli/tests/support/mod.rs index c20de64..11680fb 100644 --- a/src/cli/tests/support/mod.rs +++ b/src/cli/tests/support/mod.rs @@ -128,6 +128,22 @@ impl CliTest { ) } + /// Spawn `a3s-box ` as a background process (e.g. a daemon), inheriting + /// the test env plus `A3S_HOME`. The caller owns the `Child` and must kill it. + pub fn spawn_background(&self, args: &[&str]) -> std::process::Child { + eprintln!(" $ a3s-box {} &", args.join(" ")); + self.command(args) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .spawn() + .unwrap_or_else(|e| { + panic!( + "failed to spawn background `a3s-box {}`: {e}", + args.join(" ") + ) + }) + } + pub fn output_with_stdin(&self, args: &[&str], stdin: &[u8]) -> (String, String, bool) { eprintln!(" $ printf ... | a3s-box {}", args.join(" ")); diff --git a/src/runtime/src/pool/warm_pool.rs b/src/runtime/src/pool/warm_pool.rs index 2adfc36..372ef86 100644 --- a/src/runtime/src/pool/warm_pool.rs +++ b/src/runtime/src/pool/warm_pool.rs @@ -313,6 +313,28 @@ impl WarmPool { Ok(()) } + /// Destroy all idle VMs without consuming the pool (`&self`), so it can be + /// shut down from behind an `Arc` (e.g. a daemon serving concurrent requests). + /// Pair with [`Self::signal_shutdown`] first to stop the background replenisher; + /// its task then exits on its own (it watches the shutdown channel). + pub async fn drain_idle(&self) -> Result<()> { + let mut idle = self.idle.lock().await; + let count = idle.len(); + for warm_vm in idle.drain(..) { + let mut vm = warm_vm.vm; + if let Err(e) = vm.destroy().await { + tracing::warn!( + box_id = %vm.box_id(), + error = %e, + "Failed to destroy pooled VM during drain_idle" + ); + } + } + self.stats.lock().await.idle_count = 0; + tracing::info!(destroyed = count, "Warm pool idle VMs drained"); + Ok(()) + } + /// Remove and destroy specific idle VMs by their box IDs. /// /// Used when `fill_to_min` partially fails and needs to rollback