diff --git a/block/src/async_io.rs b/block/src/async_io.rs index bbdd77779..52323bb32 100644 --- a/block/src/async_io.rs +++ b/block/src/async_io.rs @@ -109,4 +109,12 @@ pub trait AsyncIo: Send { fn alignment(&self) -> u64 { SECTOR_SIZE } + + /// Returns true when this implementation has request pairings in flight + /// that have not yet been acked to the guest. Only the mirroring + /// implementation tracks such pairings, plain backends always return + /// false. + fn has_inflight_requests(&self) -> bool { + false + } } diff --git a/block/src/error.rs b/block/src/error.rs index 645057005..5019bfff8 100644 --- a/block/src/error.rs +++ b/block/src/error.rs @@ -42,6 +42,18 @@ pub enum BlockErrorKind { NotFound, /// An internal counter or limit was exceeded. Overflow, + /// The file already exists, when disk creation was requested. + AlreadyExists, + /// A mirror operation was requested but no mirror is active for the device. + MirrorNotActive, + /// A completion was requested but the mirror has not reached the ready phase. + MirrorNotReady, + /// A mirror swap was requested but was unsuccessful. + MirrorSwap, + /// A mirror completion is already in progress. + MirrorCompletionInProgress, + /// A mirror operation was requested while the device is paused. + MirrorDevicePaused, } impl Display for BlockErrorKind { @@ -54,6 +66,14 @@ impl Display for BlockErrorKind { Self::OutOfBounds => write!(f, "Out of bounds"), Self::NotFound => write!(f, "Not found"), Self::Overflow => write!(f, "Overflow"), + Self::AlreadyExists => write!(f, "Already exists"), + Self::MirrorNotActive => write!(f, "No active mirror for the device"), + Self::MirrorNotReady => write!(f, "Mirror is not yet ready, cannot complete"), + Self::MirrorSwap => write!(f, "Failed to swap AsyncIO in virtqueue worker for mirror"), + Self::MirrorCompletionInProgress => write!(f, "Mirror completion already in progress"), + Self::MirrorDevicePaused => { + write!(f, "Mirror operation rejected: the device is paused") + } } } } diff --git a/block/src/factory.rs b/block/src/factory.rs index ffe65f7d9..3e1cd18b1 100644 --- a/block/src/factory.rs +++ b/block/src/factory.rs @@ -21,6 +21,7 @@ use crate::block_io_uring_is_supported; use crate::disk_file::AsyncFullDiskFile; use crate::error::{BlockError, BlockErrorKind, BlockResult}; use crate::fixed_vhd_disk::FixedVhdDisk; +use crate::qcow::{QcowFile, RawFile}; use crate::qcow_disk::QcowDisk; use crate::raw_disk::{RawBackend, RawDisk}; use crate::vhdx_sync::VhdxDiskSync; @@ -203,6 +204,47 @@ fn open_qcow2( )) } +/// Create a new disk image at `options.path` of the given image type +/// and logical `size`. The file must not exist yet. +pub fn create_disk( + options: &DiskOpenOptions<'_>, + image_type: ImageType, + size: u64, +) -> BlockResult<()> { + if options.path.exists() { + return Err(BlockError::from_kind(BlockErrorKind::AlreadyExists).with_path(options.path)); + } + let file = fs::OpenOptions::new() + .read(true) + .write(true) + .create_new(true) + .open(options.path) + .map_err(|e| { + BlockError::from_kind(BlockErrorKind::Io) + .with_path(options.path) + .with_source(e) + })?; + + match image_type { + ImageType::Raw => { + file.set_len(size) + .map_err(|e| BlockError::from(e).with_path(options.path))?; + } + ImageType::Qcow2 => { + let raw_file = RawFile::new(file.try_clone()?, options.direct); + QcowFile::new(raw_file, 3, size, options.sparse) + .map_err(|e| e.with_path(options.path))?; + } + _ => { + return Err( + BlockError::from_kind(BlockErrorKind::UnsupportedFeature).with_path(options.path) + ); + } + } + + Ok(()) +} + #[cfg(test)] mod unit_tests { use std::io::Write; diff --git a/block/src/lib.rs b/block/src/lib.rs index 9d688f5ff..5e58f69cf 100644 --- a/block/src/lib.rs +++ b/block/src/lib.rs @@ -20,6 +20,7 @@ pub mod fixed_vhd; pub mod fixed_vhd_async; pub mod fixed_vhd_disk; pub mod fixed_vhd_sync; +pub mod mirror; pub mod qcow; #[cfg(feature = "io_uring")] pub(crate) mod qcow_async; diff --git a/block/src/mirror.rs b/block/src/mirror.rs new file mode 100644 index 000000000..90eb5882e --- /dev/null +++ b/block/src/mirror.rs @@ -0,0 +1,1300 @@ +// Copyright © 2026 Cyberus Technology GmbH +// +// SPDX-License-Identifier: Apache-2.0 + +//! Blockdev-mirroring for virtio-blk devices. +//! +//! Mirrors guest writes to a destination disk while a background +//! worker copies existing data from source to destination. Once +//! both sides are in sync the device manager can complete the mirror, +//! switching the device to serve I/O from the destination. + +use std::collections::{BTreeMap, VecDeque}; +use std::os::fd::{AsRawFd, RawFd}; +use std::path::PathBuf; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::{Arc, Condvar, Mutex}; +use std::thread::JoinHandle; +use std::{io, mem, thread}; + +use libc::{iovec, off_t}; +use log::warn; +use vmm_sys_util::epoll; +use vmm_sys_util::eventfd::EventFd; + +use crate::async_io::{AsyncIo, AsyncIoError, AsyncIoResult}; +use crate::disk_file::AsyncFullDiskFile; +use crate::error::BlockResult; +use crate::{BatchRequest, RequestType}; + +/// Block size for the copy worker, in which it copies data from +/// source to destination and holds the range lock. +pub const MIRROR_BLOCK_SIZE: usize = 512 * 1024; // 512 KiB + +/// Serializes overlapping byte ranges between the copy worker and the +/// per-queue mirror writes. +/// +/// Each party calls [`Self::lock_range`] before submitting I/O and +/// holds the returned [`RangeGuard`] until completion. A conflicting +/// request blocks on a `Condvar` until the held guard is dropped. +/// Lookups are O(log n) on the number of held ranges. +struct RangeLockManager { + /// Held ranges as `start -> end_exclusive`. The mutex makes the + /// overlap check and insert in [`Self::lock_range`] atomic with + /// respect to releases in [`RangeGuard::drop`]. + ranges: Mutex>, + /// Notified on guard drop. Waiters re-check their range. + cv: Condvar, +} + +impl RangeLockManager { + pub fn new() -> Arc { + Arc::new(Self { + ranges: Mutex::new(BTreeMap::new()), + cv: Condvar::new(), + }) + } + + /// Returns true if `[start, end)` overlaps any range in `ranges`. + fn overlaps_any(ranges: &BTreeMap, start: u64, end: u64) -> bool { + ranges + .range(..end) + .next_back() + .is_some_and(|(_, &e)| e > start) + } + + /// Acquires an exclusive lock on `[offset, offset + length)`. + /// Blocks while any held range overlaps. + fn lock_range(self: &Arc, offset: u64, length: u64) -> io::Result { + if length == 0 { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + "Range length is zero", + )); + } + + let end = offset + .checked_add(length) + .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidInput, "Range overflow"))?; + // Wait until no held range overlaps, then claim it. + let mut ranges = self + .cv + .wait_while(self.ranges.lock().unwrap(), |ranges| { + RangeLockManager::overlaps_any(ranges, offset, end) + }) + .unwrap(); + ranges.insert(offset, end); + + Ok(RangeGuard { + mgr: Arc::clone(self), + start: offset, + }) + } + + /// Acquires a [`RangeGuard`] covering the contiguous bytes from + /// `offset` through the end of `iovecs`. + fn lock_iovecs(self: &Arc, offset: off_t, iovecs: &[iovec]) -> io::Result { + let total_len = iovecs + .iter() + .try_fold(0u64, |acc, v| acc.checked_add(v.iov_len as u64)) + .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidInput, "iovec length overflow"))?; + + self.lock_range(offset as u64, total_len) + } +} + +/// RAII handle for a range held in a [`RangeLockManager`]. Drop +/// releases the range and wakes all waiters. +struct RangeGuard { + mgr: Arc, + start: u64, +} +impl Drop for RangeGuard { + fn drop(&mut self) { + let mut ranges = self.mgr.ranges.lock().unwrap(); + ranges.remove(&self.start); + self.mgr.cv.notify_all(); + } +} + +/// Phase of a mirror. +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum MirrorPhase { + /// Background copy is in progress. + Running, + /// All blocks copied. Source and destination are in sync. + Ready, + /// Switch-over to the destination is in progress. + Completing, + /// All virtqueues switched to the destination. + Completed, + /// Mirror cancellation is in progress. + Cancelling, + /// The mirror has failed. + Failed(String), +} + +/// State shared by the copy worker and the per-queue mirroring +/// `AsyncIo` handles. +/// +/// Held in an `Arc` so all threads see the same phase and progress +/// counters. +pub struct MirrorState { + /// Current phase of the mirror. + phase: Mutex, + range_locks: Arc, + copied_bytes: AtomicU64, + total_bytes: u64, +} + +impl MirrorState { + pub fn new(logical_disk_size: u64) -> Arc { + Arc::new(Self { + phase: Mutex::new(MirrorPhase::Running), + range_locks: RangeLockManager::new(), + copied_bytes: AtomicU64::new(0), + total_bytes: logical_disk_size, + }) + } + + /// Returns a snapshot of the current phase. + pub fn phase(&self) -> MirrorPhase { + self.phase.lock().unwrap().clone() + } + + /// Attempts a phase transition. Only the documented transitions are + /// applied. Any other attempt is ignored and logged. + /// + /// Allowed transitions: + /// ```text + /// Running -> Ready | Cancelling | Failed(_) + /// Ready -> Completing | Cancelling | Failed(_) + /// Completing -> Completed + /// Failed(_) -> Cancelling + /// ``` + /// Plus idempotent self-transitions. `Completed` and `Cancelling` are + /// terminal: the mirror handle is dropped out of them, after which + /// `Block::mirror_status` reports no active mirror. + pub fn transition_to_phase(&self, target: MirrorPhase) { + use MirrorPhase::*; + let mut current = self.phase.lock().unwrap(); + + // Ignore idempotent transitions to the current state + if mem::discriminant(&*current) == mem::discriminant(&target) { + return; + } + + let transition_allowed = matches!( + (&*current, &target), + (Running, Ready) + | (Running, Cancelling) + | (Running, Failed(_)) + | (Ready, Completing) + | (Ready, Cancelling) + | (Ready, Failed(_)) + | (Completing, Completed) + | (Failed(_), Cancelling) + ); + + if !transition_allowed { + warn!( + "Invalid mirror phase transition attempted: {:?} -> {:?}", + *current, target + ); + return; + } + + *current = target; + } + + pub fn status(&self) -> MirrorStatus { + MirrorStatus { + phase: self.phase(), + copied_bytes: self.copied_bytes.load(Ordering::Relaxed), + total_bytes: self.total_bytes, + } + } +} + +pub struct MirrorStatus { + pub phase: MirrorPhase, + pub copied_bytes: u64, + pub total_bytes: u64, +} + +/// Per-queue `AsyncIo` handle for a mirror. +pub struct MirroringAsyncIo { + source: Box, + destination: Box, + state: Arc, + /// Completions of inflight requests to be popped by `next_completed_request`. + inflight_completions: VecDeque<(u64, i32)>, + /// Reusable waiters parked on the source and destination notifier eventfds + /// while a mirrored write awaits its completions. Built once so each write + /// does not pay the epoll setup cost. + source_waiter: EpollWaiter, + dest_waiter: EpollWaiter, + /// Set once this virtqueue worker observes a failure. While true, the + /// virtqueue worker forwards only to the source and ignores the destination. + source_passthrough: bool, +} +impl MirroringAsyncIo { + #[allow(dead_code)] + /// Builds a [`MirroringAsyncIo`] for one virtqueue, wrapped in + /// `Box`. + /// + /// A mirrored write waits for both the source and destination completions + /// inside the write call, so this struct is the only reader of the + /// destination notifier. The virtqueue worker watches only the source + /// notifier, which it still needs to pick up read completions. + pub fn create( + source_disk: &dyn AsyncFullDiskFile, + destination_disk: &dyn AsyncFullDiskFile, + state: Arc, + ring_depth: u32, + ) -> BlockResult> { + let source = source_disk.create_async_io(ring_depth)?; + let destination = destination_disk.create_async_io(ring_depth)?; + let source_waiter = EpollWaiter::new(source.notifier().as_raw_fd())?; + let dest_waiter = EpollWaiter::new(destination.notifier().as_raw_fd())?; + + Ok(Box::new(MirroringAsyncIo { + source, + destination, + state, + inflight_completions: VecDeque::new(), + source_waiter, + dest_waiter, + source_passthrough: false, + })) + } + + /// Flip the mirror to the `Failed` phase. The operator must cancel to + /// clean up the destination and the copy worker. + fn fail(&mut self, reason: String) { + // Phase fails the mirror globally, passthrough is per worker, so other queues fail independently. + self.state.transition_to_phase(MirrorPhase::Failed(reason)); + self.source_passthrough = true; + } + + /// Helper that submits an `AsyncIo` request to both source and destination. + /// + /// Source error bubbles to the guest. Destination error fails the mirror + /// but is hidden from the guest, since `source` is the disk the guest sees. + fn mirror_request( + &mut self, + request_label: &str, + submit_source: S, + submit_destination: D, + ) -> AsyncIoResult<()> + where + S: FnOnce(&mut Box) -> AsyncIoResult<()>, + D: FnOnce(&mut Box) -> AsyncIoResult<()>, + { + submit_source(&mut self.source)?; + if let Err(e) = submit_destination(&mut self.destination) { + self.fail(format!("destination {request_label} submit failed: {e:?}")); + } + Ok(()) + } + + /// Block until `user_data`'s source (and, unless already degraded to + /// passthrough, destination) completion arrives, then queue the single + /// guest-visible `(user_data, src_result)`. Other completions seen while + /// waiting (e.g. an async read finishing) are stashed for later delivery. + fn wait_for_completions(&mut self, user_data: u64) -> io::Result<()> { + let src_result = Self::await_completion( + &mut self.source, + &self.source_waiter, + &mut self.inflight_completions, + user_data, + )?; + + if !self.source_passthrough { + match Self::await_completion( + &mut self.destination, + &self.dest_waiter, + &mut self.inflight_completions, + user_data, + ) { + // Destination reported an I/O error. + Ok(dest_result) if dest_result < 0 => self.fail(format!( + "destination completion failed: user_data={user_data}" + )), + Ok(_) => {} + // The destination wait itself failed (broken notifier or epoll). + // Hide it from the guest like any other destination failure. + Err(e) => self.fail(format!( + "destination wait failed for user_data={user_data}: {e}" + )), + } + } + + self.inflight_completions.push_back((user_data, src_result)); + let _ = self.source.notifier().write(1); + Ok(()) + } + + /// Drain `io` until `user_data`'s own completion appears and push + /// additional ones to `inflight_completions`. + fn await_completion( + io: &mut Box, + waiter: &EpollWaiter, + inflight_completions: &mut VecDeque<(u64, i32)>, + user_data: u64, + ) -> io::Result { + loop { + let (id, res) = waiter.next_completion(io)?; + if id == user_data { + return Ok(res); + } + inflight_completions.push_back((id, res)); + } + } +} + +impl AsyncIo for MirroringAsyncIo { + fn notifier(&self) -> &EventFd { + self.source.notifier() + } + + fn read_vectored( + &mut self, + offset: off_t, + iovecs: &[iovec], + user_data: u64, + ) -> AsyncIoResult<()> { + self.source.read_vectored(offset, iovecs, user_data) + } + + fn write_vectored( + &mut self, + offset: off_t, + iovecs: &[iovec], + user_data: u64, + ) -> AsyncIoResult<()> { + if self.source_passthrough { + return self.source.write_vectored(offset, iovecs, user_data); + } + + let _guard = self + .state + .range_locks + .lock_iovecs(offset, iovecs) + .map_err(AsyncIoError::WriteVectored)?; + + self.mirror_request( + "write_vectored", + |src| src.write_vectored(offset, iovecs, user_data), + |dst| dst.write_vectored(offset, iovecs, user_data), + )?; + + self.wait_for_completions(user_data) + .map_err(AsyncIoError::WriteVectored)?; + Ok(()) + } + + fn fsync(&mut self, user_data: Option) -> AsyncIoResult<()> { + if self.source_passthrough { + return self.source.fsync(user_data); + } + + self.mirror_request( + "fsync", + |src| src.fsync(user_data), + |dst| dst.fsync(user_data), + )?; + + // A tracked fsync (Some) waits for its completion. A barrier fsync (None) does not. + if let Some(user_data) = user_data { + self.wait_for_completions(user_data) + .map_err(AsyncIoError::Fsync)?; + } + Ok(()) + } + + fn punch_hole(&mut self, offset: u64, length: u64, user_data: u64) -> AsyncIoResult<()> { + if self.source_passthrough { + return self.source.punch_hole(offset, length, user_data); + } + + let _guard = self + .state + .range_locks + .lock_range(offset, length) + .map_err(AsyncIoError::PunchHole)?; + self.mirror_request( + "punch_hole", + |src| src.punch_hole(offset, length, user_data), + |dst| dst.punch_hole(offset, length, user_data), + )?; + + self.wait_for_completions(user_data) + .map_err(AsyncIoError::PunchHole)?; + Ok(()) + } + + fn write_zeroes(&mut self, offset: u64, length: u64, user_data: u64) -> AsyncIoResult<()> { + if self.source_passthrough { + return self.source.write_zeroes(offset, length, user_data); + } + + let _guard = self + .state + .range_locks + .lock_range(offset, length) + .map_err(AsyncIoError::WriteZeroes)?; + self.mirror_request( + "write_zeroes", + |src| src.write_zeroes(offset, length, user_data), + |dst| dst.write_zeroes(offset, length, user_data), + )?; + + self.wait_for_completions(user_data) + .map_err(AsyncIoError::WriteZeroes)?; + Ok(()) + } + + fn next_completed_request(&mut self) -> Option<(u64, i32)> { + // Mirrored writes are awaited synchronously, only reads and post-failure passthrough writes surface here. + while let Some((id, res)) = self.source.next_completed_request() { + self.inflight_completions.push_back((id, res)); + } + self.inflight_completions.pop_front() + } + + fn batch_requests_enabled(&self) -> bool { + if self.source_passthrough { + return self.source.batch_requests_enabled(); + } + + true + } + + fn submit_batch_requests(&mut self, batch_request: &[BatchRequest]) -> AsyncIoResult<()> { + if self.source_passthrough { + return self.source.submit_batch_requests(batch_request); + } + + for req in batch_request { + let result = match req.request_type { + RequestType::In => self.read_vectored(req.offset, &req.iovecs, req.user_data), + RequestType::Out => self.write_vectored(req.offset, &req.iovecs, req.user_data), + // Only In and Out are batched, see request.rs. + _ => unreachable!("Unexpected batch request type: {:?}", req.request_type), + }; + + // Push partial batch error to completions, vectored op has not + // pushed it to the inflight_completions queue. + if result.is_err() { + self.inflight_completions + .push_back((req.user_data, -libc::EIO)); + let _ = self.source.notifier().write(1); + } + } + Ok(()) + } + + fn alignment(&self) -> u64 { + if self.source_passthrough { + return self.source.alignment(); + } + + // Stricter alignment wins. Same iovec goes to both backends. + self.source.alignment().max(self.destination.alignment()) + } + + fn has_inflight_requests(&self) -> bool { + !self.inflight_completions.is_empty() + } +} + +/// Owns the copy worker thread's [`JoinHandle`]. The thread is joined +/// on [`Self::join`] or on drop, which blocks until the worker finishes +/// its current block, since cancellation is only observed between blocks. +pub struct CopyWorkerHandle { + join: Option>, +} + +impl CopyWorkerHandle { + /// Waits for the copy worker thread to finish. Idempotent: + /// subsequent calls return `Ok(())` without blocking. + pub fn join(&mut self) -> thread::Result<()> { + if let Some(t) = self.join.take() { + return t.join(); + } + + Ok(()) + } +} + +impl Drop for CopyWorkerHandle { + fn drop(&mut self) { + self.join().ok(); + } +} + +/// Background thread that copies existing source bytes to destination +/// in fixed-size blocks. Holds a [`RangeGuard`] across each block so +/// the virtqueue mirror writes cannot race the copy. +pub struct CopyWorker { + source_io: Box, + dest_io: Box, + dest_is_sparse: bool, + state: Arc, + /// Once allocated, the buffer is reused for all blocks to avoid repeated allocations. + buf: Vec, + /// Tracks the next user_data for request and completion notifications. + next_user_data: u64, + source_waiter: EpollWaiter, + dest_waiter: EpollWaiter, +} +impl CopyWorker { + /// Builds a worker on top of two async I/O handles. Queue depth 1 + /// is enough, as the worker is sequential. The caller must initialize the + /// destination disk. + /// + /// Start the worker thread with [`Self::spawn`]. + pub fn new( + source_disk: &dyn AsyncFullDiskFile, + destination_disk: &dyn AsyncFullDiskFile, + state: Arc, + block_size_bytes: usize, + ) -> BlockResult { + let source_io = source_disk.create_async_io(1)?; + let dest_io = destination_disk.create_async_io(1)?; + let source_waiter = EpollWaiter::new(source_io.notifier().as_raw_fd())?; + let dest_waiter = EpollWaiter::new(dest_io.notifier().as_raw_fd())?; + + Ok(Self { + source_io, + dest_io, + dest_is_sparse: destination_disk.supports_sparse_operations(), + state, + buf: vec![0; block_size_bytes], + next_user_data: 0, + source_waiter, + dest_waiter, + }) + } + + /// Spawns the worker on a named thread and returns its handle. + /// On error inside the thread, the migration phase transitions + /// to [`MirrorPhase::Failed`]. + pub fn spawn(self) -> io::Result { + let state = self.state.clone(); + let join = thread::Builder::new() + .name("blockdev-mirror-copy-worker".into()) + .spawn(move || { + let mut worker = self; + if let Err(e) = worker.run() { + state.transition_to_phase(MirrorPhase::Failed(format!( + "Copy worker failed: {e:?}" + ))); + } + })?; + + Ok(CopyWorkerHandle { join: Some(join) }) + } + + /// Drives the block-by-block copy for predefined [`MirrorState::total_bytes`], + /// then transitions the migration phase to [`MirrorPhase::Ready`]. + fn run(&mut self) -> io::Result<()> { + let total_size = self.state.total_bytes; + let max_length = self.buf.len() as u64; + let mut offset = 0; + + while offset < total_size { + // Return early on cancellation or failure. + if self.state.phase() != MirrorPhase::Running { + return Ok(()); + } + + let length = max_length.min(total_size - offset) as usize; + self.copy_block(offset, length)?; + offset += length as u64; + } + + self.state.transition_to_phase(MirrorPhase::Ready); + Ok(()) + } + + /// Copies `length` bytes at `offset` from source to destination. + /// + /// Holds a range lock for the duration so virtqueue mirror writes cannot race + /// the copy. Uses `self.buf` for the copy to avoid repeated allocations. + fn copy_block(&mut self, offset: u64, length: usize) -> io::Result<()> { + let _guard = self.state.range_locks.lock_range(offset, length as u64)?; + + // Create a single iovec for the requested block. + let iovecs = [iovec { + iov_base: self.buf.as_mut_ptr().cast(), + iov_len: length, + }]; + + // Read from source into buf. + self.buf[..length].fill(0); + let read_id = self.generate_user_data(); + self.source_io + .read_vectored(offset as off_t, &iovecs, read_id) + .map_err(|e| io::Error::other(format!("async io read_vectored failed: {e}")))?; + let (user_data, result) = self.source_waiter.next_completion(&mut self.source_io)?; + if result < 0 { + return Err(io::Error::from_raw_os_error(-result)); + } + debug_assert_eq!(user_data, read_id); + + let write_id = self.generate_user_data(); + if self.dest_is_sparse && self.buf[..length].iter().all(|&b| b == 0) { + // Source block is all zeros: punch a hole to keep the destination sparse. + self.dest_io + .punch_hole(offset, length as u64, write_id) + .map_err(|e| io::Error::other(format!("async io punch_hole failed: {e}")))?; + } else { + // Write buf to destination. + self.dest_io + .write_vectored(offset as off_t, &iovecs, write_id) + .map_err(|e| io::Error::other(format!("async io write_vectored failed: {e}")))?; + } + + let (user_data, result) = self.dest_waiter.next_completion(&mut self.dest_io)?; + if result < 0 { + return Err(io::Error::from_raw_os_error(-result)); + } + debug_assert_eq!(user_data, write_id); + + self.state + .copied_bytes + .fetch_add(length as u64, Ordering::Relaxed); + + Ok(()) + } + + /// Returns the current [`Self::next_user_data`] and increments it, wrapping on overflow. + fn generate_user_data(&mut self) -> u64 { + let user_data = self.next_user_data; + self.next_user_data = self.next_user_data.wrapping_add(1); + + user_data + } +} + +/// Handle returned by `Block::start_mirror`. The owner (typically the +/// device manager) keeps it alive for the duration of the mirror to +/// observe `MirrorState` and to retain the [`CopyWorker`] thread. +pub struct BlockMirrorHandle { + pub state: Arc, + pub copy_worker: CopyWorkerHandle, + pub destination: Box, + pub destination_path: PathBuf, +} + +/// Single-fd `epoll` wrapper. Built once per eventfd and reused for +/// every `wait()` call so the copy worker doesn't pay setup cost per +/// block. +/// +/// `wait()` blocks until the eventfd becomes readable. +struct EpollWaiter { + epoll: epoll::Epoll, +} + +impl EpollWaiter { + /// Creates a reusable `EpollWaiter` for the given eventfd. + fn new(event_fd: RawFd) -> io::Result { + let epoll = epoll::Epoll::new()?; + epoll.ctl( + epoll::ControlOperation::Add, + event_fd, + epoll::EpollEvent::new(epoll::EventSet::IN, 0), + )?; + Ok(Self { epoll }) + } + + /// Blocks until the event fd becomes readable. Retries on EINTR. + fn wait(&self) -> io::Result<()> { + let mut events = [epoll::EpollEvent::default(); 1]; + loop { + match self.epoll.wait(-1, &mut events) { + Ok(_) => return Ok(()), + Err(e) if e.kind() == io::ErrorKind::Interrupted => continue, + Err(e) => return Err(e), + } + } + } + + /// Blocks until `io` reports a completion, then returns it. + fn next_completion(&self, io: &mut Box) -> io::Result<(u64, i32)> { + loop { + if let Some(completion) = io.next_completed_request() { + return Ok(completion); + } + self.wait()?; + // Drain the eventfd so the next wait does not fire on a stale signal. + let _ = io.notifier().read()?; + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + /// Overlap is detected whether the held range precedes the query or starts + /// inside it. + #[test] + fn overlaps_detects_overlap() { + let mut preceding = BTreeMap::new(); + preceding.insert(10u64, 25u64); + assert!(RangeLockManager::overlaps_any(&preceding, 20, 30)); + + let mut starts_inside = BTreeMap::new(); + starts_inside.insert(10u64, 20u64); + starts_inside.insert(25u64, 30u64); + assert!(RangeLockManager::overlaps_any(&starts_inside, 21, 26)); + } + + #[test] + fn overlaps_disjoint_returns_false() { + let mut locked = BTreeMap::new(); + locked.insert(10u64, 20u64); + locked.insert(30u64, 40u64); + assert!(!RangeLockManager::overlaps_any(&locked, 22, 28)); + } + + #[test] + fn overlaps_touching_boundary_is_not_overlap() { + let mut locked = BTreeMap::new(); + locked.insert(10u64, 20u64); + assert!(!RangeLockManager::overlaps_any(&locked, 20, 30)); + } + + use std::collections::VecDeque; + use std::sync::mpsc; + use std::time::Duration; + + /// In-memory [`AsyncIo`] backend for driving [`MirroringAsyncIo`] in a unit + /// test without a real fd, io_uring, or the copy worker. Each submission is + /// recorded as an immediately-available completion and the notifier eventfd + /// is signaled, so a synchronous wait loop on the notifier observes it. + struct MockAsyncIo { + evt: EventFd, + completions: VecDeque<(u64, i32)>, + /// When set, the `write_vectored` submit at this 0-based index returns + /// an error instead of completing. Drives the destination-failure and + /// partial-batch paths. + fail_on_nth_write: Option, + writes_seen: usize, + } + + impl MockAsyncIo { + fn new() -> Self { + Self { + evt: EventFd::new(libc::EFD_NONBLOCK).unwrap(), + completions: VecDeque::new(), + fail_on_nth_write: None, + writes_seen: 0, + } + } + + /// Record a completion and wake any waiter parked on the notifier. + fn complete(&mut self, user_data: u64, result: i32) { + self.completions.push_back((user_data, result)); + self.evt.write(1).unwrap(); + } + } + + impl AsyncIo for MockAsyncIo { + fn notifier(&self) -> &EventFd { + &self.evt + } + fn read_vectored(&mut self, _o: off_t, _i: &[iovec], ud: u64) -> AsyncIoResult<()> { + self.complete(ud, 0); + Ok(()) + } + fn write_vectored(&mut self, _o: off_t, _i: &[iovec], ud: u64) -> AsyncIoResult<()> { + let index = self.writes_seen; + self.writes_seen += 1; + if self.fail_on_nth_write == Some(index) { + return Err(AsyncIoError::WriteVectored(io::Error::other( + "injected write submit failure", + ))); + } + self.complete(ud, 0); + Ok(()) + } + fn fsync(&mut self, ud: Option) -> AsyncIoResult<()> { + if let Some(ud) = ud { + self.complete(ud, 0); + } + Ok(()) + } + fn punch_hole(&mut self, _o: u64, _l: u64, ud: u64) -> AsyncIoResult<()> { + self.complete(ud, 0); + Ok(()) + } + fn write_zeroes(&mut self, _o: u64, _l: u64, ud: u64) -> AsyncIoResult<()> { + self.complete(ud, 0); + Ok(()) + } + fn next_completed_request(&mut self) -> Option<(u64, i32)> { + self.completions.pop_front() + } + } + + fn mirror_with_mocks() -> MirroringAsyncIo { + mirror_from( + MockAsyncIo::new(), + MockAsyncIo::new(), + MirrorState::new(1 << 20), + ) + } + + /// The one place to update when `MirroringAsyncIo`'s fields change. + fn mirror_from( + source: S, + destination: D, + state: Arc, + ) -> MirroringAsyncIo { + let source_waiter = EpollWaiter::new(source.notifier().as_raw_fd()).unwrap(); + let dest_waiter = EpollWaiter::new(destination.notifier().as_raw_fd()).unwrap(); + MirroringAsyncIo { + source: Box::new(source), + destination: Box::new(destination), + state, + inflight_completions: VecDeque::new(), + source_passthrough: false, + source_waiter, + dest_waiter, + } + } + + /// One iovec over `buf`. The mocks never read it, so it only needs to + /// outlive the submit call. + fn iov_of(buf: &[u8]) -> [iovec; 1] { + [iovec { + iov_base: buf.as_ptr() as *mut libc::c_void, + iov_len: buf.len(), + }] + } + + /// Runs `f` on a worker thread and fails the test if it does not finish + /// within `timeout`. Turns a submit-path deadlock into a clean failure + /// instead of a hung suite: the worker stays blocked, but the test thread + /// resumes after the timeout and panics. + fn run_with_watchdog(timeout: Duration, f: impl FnOnce() + Send + 'static) { + let (tx, rx) = mpsc::channel(); + thread::spawn(move || { + f(); + let _ = tx.send(()); + }); + if rx.recv_timeout(timeout).is_err() { + panic!("scenario did not finish within {timeout:?} (deadlock)"); + } + } + + /// Drains completions until `n` have arrived (or the budget is exhausted). + fn drain_n(mirror: &mut MirroringAsyncIo, n: usize) -> Vec { + let mut acked = Vec::new(); + for _ in 0..64 { + while let Some((user_data, result)) = mirror.next_completed_request() { + assert_eq!(result, 0, "unexpected error completion"); + acked.push(user_data); + } + if acked.len() >= n { + break; + } + } + acked + } + + /// Two overlapping guest writes submitted before either is reaped must both + /// complete in submission order without deadlocking. + #[test] + fn overlapping_writes_complete_in_order() { + run_with_watchdog(Duration::from_secs(5), || { + let mut mirror = mirror_with_mocks(); + let buf = [0u8; 4096]; + let iov = iov_of(&buf); + + mirror.write_vectored(0, &iov, 1).unwrap(); + mirror.write_vectored(0, &iov, 2).unwrap(); + + assert_eq!( + drain_n(&mut mirror, 2), + vec![1, 2], + "both overlapping writes complete in submission order" + ); + }); + } + + /// While the copy worker holds a range (simulated by holding a `RangeGuard` + /// on the shared lock manager), an overlapping guest write must block and + /// proceed only once the range is released. + #[test] + fn copy_worker_hold_serializes_overlapping_guest_write() { + let state = MirrorState::new(1 << 20); + // The "copy worker" holds [0, 4096). + let guard = state.range_locks.lock_range(0, 4096).unwrap(); + + let mut mirror = mirror_from(MockAsyncIo::new(), MockAsyncIo::new(), state.clone()); + + let (tx, rx) = mpsc::channel(); + let handle = thread::spawn(move || { + let buf = [0u8; 4096]; + let iov = iov_of(&buf); + mirror.write_vectored(0, &iov, 1).unwrap(); + tx.send(()).unwrap(); + }); + + // The held range must block the overlapping guest write. + assert!( + rx.recv_timeout(Duration::from_millis(200)).is_err(), + "guest write proceeded while the copy worker held the range" + ); + + // Releasing the range lets the write through. + drop(guard); + assert!( + rx.recv_timeout(Duration::from_secs(5)).is_ok(), + "guest write did not proceed after the range was released" + ); + handle.join().unwrap(); + } + + /// Reads are source-only passthrough (no range lock) and still complete. + #[test] + fn read_passes_through_to_source() { + run_with_watchdog(Duration::from_secs(5), || { + let mut mirror = mirror_with_mocks(); + let buf = [0u8; 4096]; + let iov = iov_of(&buf); + + mirror.read_vectored(0, &iov, 7).unwrap(); + + let mut got = None; + for _ in 0..64 { + if let Some(c) = mirror.next_completed_request() { + got = Some(c); + break; + } + } + assert_eq!(got, Some((7, 0)), "read completes via the source"); + }); + } + + /// A destination submit failure degrades the mirror to source passthrough: + /// the phase goes `Failed`, and both the failing write and a subsequent + /// write still complete to the guest off the source alone. + #[test] + fn destination_submit_failure_degrades_to_passthrough() { + run_with_watchdog(Duration::from_secs(5), || { + let mut dest = MockAsyncIo::new(); + dest.fail_on_nth_write = Some(0); + let mut mirror = mirror_from(MockAsyncIo::new(), dest, MirrorState::new(1 << 20)); + let buf = [0u8; 4096]; + let iov = iov_of(&buf); + + mirror.write_vectored(0, &iov, 1).unwrap(); + assert!( + matches!(mirror.state.phase(), MirrorPhase::Failed(_)), + "destination failure transitions the mirror to Failed" + ); + + // Subsequent write goes to the source only. + mirror.write_vectored(0, &iov, 2).unwrap(); + + let mut acked = drain_n(&mut mirror, 2); + acked.sort(); + assert_eq!(acked, vec![1, 2], "both writes complete off the source"); + }); + } + + /// Mock backend whose completions are withheld until [`Gate::release`], so a + /// test can hold a write parked in `wait_for_completions`. + struct GatedMockAsyncIo { + evt: EventFd, + inner: Arc>, + /// Notified on each submit, so a test can wait until the in-flight write + /// has reached this backend (and so already holds its range guard). + on_submit: mpsc::Sender<()>, + } + + struct GatedInner { + /// Submitted, not yet released. + pending: VecDeque<(u64, i32)>, + /// Released, deliverable via `next_completed_request`. + ready: VecDeque<(u64, i32)>, + } + + /// Releases a [`GatedMockAsyncIo`]'s withheld completions from another thread. + struct Gate { + evt: EventFd, + inner: Arc>, + } + + impl Gate { + fn release(&self) { + let mut inner = self.inner.lock().unwrap(); + while let Some(completion) = inner.pending.pop_front() { + inner.ready.push_back(completion); + } + self.evt.write(1).unwrap(); + } + } + + impl GatedMockAsyncIo { + fn new(on_submit: mpsc::Sender<()>) -> Self { + Self { + evt: EventFd::new(libc::EFD_NONBLOCK).unwrap(), + inner: Arc::new(Mutex::new(GatedInner { + pending: VecDeque::new(), + ready: VecDeque::new(), + })), + on_submit, + } + } + + fn gate(&self) -> Gate { + Gate { + evt: self.evt.try_clone().unwrap(), + inner: Arc::clone(&self.inner), + } + } + + fn submit(&self, user_data: u64) { + self.inner.lock().unwrap().pending.push_back((user_data, 0)); + let _ = self.on_submit.send(()); + } + } + + impl AsyncIo for GatedMockAsyncIo { + fn notifier(&self) -> &EventFd { + &self.evt + } + fn read_vectored(&mut self, _o: off_t, _i: &[iovec], ud: u64) -> AsyncIoResult<()> { + self.submit(ud); + Ok(()) + } + fn write_vectored(&mut self, _o: off_t, _i: &[iovec], ud: u64) -> AsyncIoResult<()> { + self.submit(ud); + Ok(()) + } + fn fsync(&mut self, ud: Option) -> AsyncIoResult<()> { + if let Some(ud) = ud { + self.submit(ud); + } + Ok(()) + } + fn punch_hole(&mut self, _o: u64, _l: u64, ud: u64) -> AsyncIoResult<()> { + self.submit(ud); + Ok(()) + } + fn write_zeroes(&mut self, _o: u64, _l: u64, ud: u64) -> AsyncIoResult<()> { + self.submit(ud); + Ok(()) + } + fn next_completed_request(&mut self) -> Option<(u64, i32)> { + self.inner.lock().unwrap().ready.pop_front() + } + } + + /// The range guard must stay held across the whole synchronous submit+wait, + /// not just acquisition. A regression to `let _ =` drops it early and lets + /// an overlapping `lock_range` acquire while the write is still in flight. + #[test] + fn guard_is_held_across_submit_and_wait() { + let state = MirrorState::new(1 << 20); + + // Source completes immediately; destination is gated, so the write parks + // waiting on the destination completion while holding the range lock. + let (submitted_tx, submitted_rx) = mpsc::channel(); + let dest = GatedMockAsyncIo::new(submitted_tx); + let gate = dest.gate(); + let mut mirror = mirror_from(MockAsyncIo::new(), dest, state.clone()); + + let writer = thread::spawn(move || { + let buf = [0u8; 4096]; + let iov = iov_of(&buf); + mirror.write_vectored(0, &iov, 1).unwrap(); + }); + + // The write reached the destination submit, so its range guard is held. + submitted_rx.recv().unwrap(); + + // An overlapping lock_range must block while the in-flight write holds it. + let locker_state = state.clone(); + let (locked_tx, locked_rx) = mpsc::channel(); + let locker = thread::spawn(move || { + let _g = locker_state.range_locks.lock_range(0, 4096).unwrap(); + locked_tx.send(()).unwrap(); + }); + assert!( + locked_rx.recv_timeout(Duration::from_millis(200)).is_err(), + "lock_range acquired while the in-flight write still held the range" + ); + + // Releasing the destination completion lets the write finish and drop its + // guard, which unblocks the overlapping lock_range. + gate.release(); + writer.join().unwrap(); + assert!( + locked_rx.recv_timeout(Duration::from_secs(5)).is_ok(), + "lock_range did not acquire after the write released the range" + ); + locker.join().unwrap(); + } + + /// A single-iovec `Out` batch entry backed by `buf`. + fn batch_write(offset: off_t, buf: &[u8], user_data: u64) -> BatchRequest { + BatchRequest { + offset, + iovecs: [iovec { + iov_base: buf.as_ptr() as *mut libc::c_void, + iov_len: buf.len(), + }] + .into_iter() + .collect(), + user_data, + request_type: RequestType::Out, + } + } + + /// A mid-batch submit failure must still return `Ok` with one completion per + /// entry (an error completion for the failed one). The worker records the + /// batch as in-flight only on `Ok`, so aborting with `Err` strands the + /// completions already queued for earlier entries. + #[test] + fn failed_batch_submit_accounts_every_request() { + // Second write (index 1) fails at submit on the source; the first + // already went through. + let mut source = MockAsyncIo::new(); + source.fail_on_nth_write = Some(1); + let mut mirror = mirror_from(source, MockAsyncIo::new(), MirrorState::new(1 << 20)); + let buf = [0u8; 4096]; + + let batch = [batch_write(0, &buf, 1), batch_write(4096, &buf, 2)]; + + mirror + .submit_batch_requests(&batch) + .expect("a mid-batch submit failure must not fail the whole batch"); + + let mut completions = Vec::new(); + while let Some(c) = mirror.next_completed_request() { + completions.push(c); + } + completions.sort_by_key(|(user_data, _)| *user_data); + + assert_eq!( + completions.len(), + 2, + "every batch entry owes exactly one completion" + ); + assert_eq!(completions[0], (1, 0), "first write completes successfully"); + assert_eq!(completions[1].0, 2, "second entry is still accounted"); + assert!( + completions[1].1 < 0, + "second entry carries an error result (reported IOERR), not an orphan" + ); + } + + /// The lifecycle advances Running -> Ready -> Completing -> Completed, each + /// state reached only from its documented predecessor. + #[test] + fn phase_advances_through_the_lifecycle() { + let state = MirrorState::new(1 << 20); + assert_eq!(state.phase(), MirrorPhase::Running); + state.transition_to_phase(MirrorPhase::Ready); + state.transition_to_phase(MirrorPhase::Completing); + state.transition_to_phase(MirrorPhase::Completed); + assert_eq!(state.phase(), MirrorPhase::Completed); + } + + /// A transition not in the table is ignored, leaving the phase unchanged. + #[test] + fn invalid_phase_transition_is_ignored() { + let state = MirrorState::new(1 << 20); + // Running -> Completed skips Ready and Completing, so it is rejected. + state.transition_to_phase(MirrorPhase::Completed); + assert_eq!(state.phase(), MirrorPhase::Running); + } + + /// `Completed` is terminal: no later transition takes effect. + #[test] + fn completed_phase_is_terminal() { + let state = MirrorState::new(1 << 20); + state.transition_to_phase(MirrorPhase::Ready); + state.transition_to_phase(MirrorPhase::Completing); + state.transition_to_phase(MirrorPhase::Completed); + state.transition_to_phase(MirrorPhase::Cancelling); + assert_eq!(state.phase(), MirrorPhase::Completed); + } + + /// A failure keeps its first reason (transitions compare only the variant) + /// and can still move to `Cancelling` for cleanup. + #[test] + fn failed_keeps_first_reason_then_cancels() { + let state = MirrorState::new(1 << 20); + state.transition_to_phase(MirrorPhase::Failed("first".into())); + state.transition_to_phase(MirrorPhase::Failed("second".into())); + assert_eq!(state.phase(), MirrorPhase::Failed("first".into())); + state.transition_to_phase(MirrorPhase::Cancelling); + assert_eq!(state.phase(), MirrorPhase::Cancelling); + } + + /// A tracked fsync (`Some`) flushes both backends and surfaces one guest + /// completion for its user_data. + #[test] + fn tracked_fsync_completes_to_guest() { + run_with_watchdog(Duration::from_secs(5), || { + let mut mirror = mirror_with_mocks(); + mirror.fsync(Some(5)).unwrap(); + assert_eq!(drain_n(&mut mirror, 1), vec![5]); + }); + } + + /// A barrier fsync (`None`) flushes both backends but owes the guest no + /// completion, so nothing surfaces. + #[test] + fn barrier_fsync_surfaces_no_completion() { + let mut mirror = mirror_with_mocks(); + mirror.fsync(None).unwrap(); + assert!(mirror.next_completed_request().is_none()); + } + + /// `write_zeroes` mirrors to both backends under the range lock and + /// surfaces one guest completion, like a write. + #[test] + fn write_zeroes_mirrors_and_completes() { + run_with_watchdog(Duration::from_secs(5), || { + let mut mirror = mirror_with_mocks(); + mirror.write_zeroes(0, 4096, 3).unwrap(); + assert_eq!(drain_n(&mut mirror, 1), vec![3]); + }); + } + + /// Once degraded to passthrough, every mutating op forwards to the source + /// alone and still completes, with no destination and no range lock. + #[test] + fn degraded_mirror_passes_all_ops_through_to_source() { + run_with_watchdog(Duration::from_secs(5), || { + let mut dest = MockAsyncIo::new(); + dest.fail_on_nth_write = Some(0); + let mut mirror = mirror_from(MockAsyncIo::new(), dest, MirrorState::new(1 << 20)); + let buf = [0u8; 4096]; + let iov = iov_of(&buf); + + // The first write fails on the destination and flips to passthrough. + mirror.write_vectored(0, &iov, 1).unwrap(); + assert!(matches!(mirror.state.phase(), MirrorPhase::Failed(_))); + + // Subsequent ops take the source-only passthrough branch. + mirror.fsync(Some(2)).unwrap(); + mirror.punch_hole(0, 4096, 3).unwrap(); + mirror.write_zeroes(0, 4096, 4).unwrap(); + + let mut acked = drain_n(&mut mirror, 4); + acked.sort(); + assert_eq!(acked, vec![1, 2, 3, 4]); + }); + } +} diff --git a/docs/disk_mirroring.md b/docs/disk_mirroring.md new file mode 100644 index 000000000..edad98db2 --- /dev/null +++ b/docs/disk_mirroring.md @@ -0,0 +1,175 @@ +# Disk Mirroring + +Disk mirroring copies a running VM's disk to another file on the host and +keeps the two in sync, so the disk image can be moved to a different backing +store without stopping the guest. It is the Cloud Hypervisor counterpart of +QEMU's `blockdev-mirror`. + +A typical use is rebalancing storage: when the share backing a disk image +fills up, the operator mirrors that disk onto a file on another share and +switches the VM over to it. + +## Overview + +Mirroring runs as a sequence of phases driven by four API calls: + +- `/vm.disk-mirror-start` begins mirroring a disk onto a destination path. +- `/vm.disk-mirror-status` reports the current phase and copy progress. +- `/vm.disk-mirror-complete` switches the VM over to the destination. +- `/vm.disk-mirror-cancel` aborts and keeps the VM on the source. + +```mermaid +stateDiagram-v2 + [*] --> running: disk-mirror-start + running --> ready: background copy finished + ready --> completing: disk-mirror-complete + completing --> completed: all queues switched + completed --> [*] + running --> cancelling: disk-mirror-cancel + ready --> cancelling: disk-mirror-cancel + failed --> cancelling: disk-mirror-cancel + running --> failed: destination I/O error + ready --> failed: destination I/O error + cancelling --> [*] +``` + +While `running`, a background worker copies the existing data block by block. +At the same time every guest write is forwarded to both disks, so once the +copy finishes the two are identical. Reaching `ready` means the two disks are +in sync and stay so until the operator completes or cancels. + +## Operator usage + +The examples use `curl` against the VMM's API socket. Replace the socket path +and the disk identifier with your own. The disk identifier is the device `id` +shown by `vm.info` (the same `id` used when the disk was configured or hot +added). + +### Start a mirror + +```console +curl --unix-socket /tmp/cloud-hypervisor.sock -i \ + -X PUT 'http://localhost/api/v1/vm.disk-mirror-start' \ + -H 'Content-Type: application/json' \ + -d '{"id": "_disk0", "destination_path": "/new/store/disk0.raw"}' +``` + +This switches the disk to a mirroring backend and starts the background copy. +The VM keeps serving I/O throughout. A `204` response means mirroring started. + +### Check progress + +```console +curl --unix-socket /tmp/cloud-hypervisor.sock \ + -X PUT 'http://localhost/api/v1/vm.disk-mirror-status' \ + -H 'Content-Type: application/json' \ + -d '{"id": "_disk0"}' +``` + +The response reports the phase and how far the copy has progressed: + +```json +{"phase": "running", "copied_bytes": 1073741824, "total_bytes": 4294967296} +``` + +`phase` is one of `running`, `ready`, `completing`, `completed`, +`cancelling`, or `failed`. A `failed` status also carries a `failure` field +describing what went wrong. Poll this endpoint until the phase becomes +`ready`. + +### Complete the mirror + +Once the phase is `ready`, switch the VM over to the destination: + +```console +curl --unix-socket /tmp/cloud-hypervisor.sock -i \ + -X PUT 'http://localhost/api/v1/vm.disk-mirror-complete' \ + -H 'Content-Type: application/json' \ + -d '{"id": "_disk0"}' +``` + +The call blocks until the switch-over finishes. On success (`204`) the VM +serves all I/O from the destination disk and the source disk can be removed. +Completion is only accepted from the `ready` phase. A `404` or `400` leaves the +mirror active, so you can fix the cause and retry. + +### Cancel the mirror + +At any time before completion the operator can abort and keep the VM on the +source disk: + +```console +curl --unix-socket /tmp/cloud-hypervisor.sock -i \ + -X PUT 'http://localhost/api/v1/vm.disk-mirror-cancel' \ + -H 'Content-Type: application/json' \ + -d '{"id": "_disk0"}' +``` + +The destination disk is released and the VM continues on the source. Cancel is +refused once completion has been requested, because by then a queue may +already be writing only to the destination. + +### Failure handling + +If the destination disk fails (for example its backing store becomes +unreachable), the mirror moves to `failed` and the affected queues fall back +to serving the guest from the source disk, so the guest keeps running on +intact data. The operator then cancels the failed mirror to release the +destination. + +### Unrecoverable errors + +Completing a mirror cannot be undone. Once the switch to the destination +begins, some virtqueues may already be writing only to the destination, so +there is no consistent state to roll back to. If a queue cannot be switched +over during completion, the VMM aborts. The alternative would leave the disk +half on the source and half on the destination and could lose acknowledged +writes. This is rare: it needs a queue worker to fail mid-swap (for example an +epoll registration error), or its switch-over command to be lost or +unacknowledged. + +### Conflicting operations + +While a mirror is active, the VMM rejects operations that would disturb it: +snapshotting, live migration, resizing the disk, removing the device, and +rebooting, shutting down, or deleting the VM. Complete or cancel the mirror +first. Pausing the VM is allowed, but a mirror cannot be started, completed, +or cancelled while the device is paused. + +## Implementation details + +Mirroring is built from two cooperating pieces and a range lock that keeps +them from corrupting each other: + +```mermaid +flowchart LR + guest[Guest] -->|read / write| mio[MirroringAsyncIo] + mio -->|reads, all writes| src[(Source disk)] + mio -->|writes only| dst[(Destination disk)] + cw[CopyWorker] -->|read block| src + cw -->|write block| dst + mio -.range lock.- rl((RangeLockManager)) + cw -.range lock.- rl +``` + +**CopyWorker.** A background thread copies the source disk to the destination +in 512 KiB blocks. A block that reads back as all zeros is punched as a hole +on the destination instead of being written, so sparse images stay sparse. The +worker updates the copied-byte counter that `vm.disk-mirror-status` reports, +and stops early once the phase becomes terminal. + +**MirroringAsyncIo.** When a mirror starts, each virtqueue worker's `AsyncIo` +backend is swapped for a `MirroringAsyncIo`. It forwards reads to the source +and forwards every mutating operation (`write_vectored`, `fsync`, +`punch_hole`, `write_zeroes`) to both the source and the destination. The +completions of the two sides are awaited inside the write call, so an error on +the destination can be handled before the guest sees the write as done. On a +destination error that queue degrades to source passthrough and the mirror +fails, rather than letting the guest diverge from intact data. + +**Range lock.** The CopyWorker and the guest writes can target overlapping +byte ranges at the same time. Each side takes an exclusive lock on the range +it is about to touch and holds it until its I/O completes, so a copy and a +guest write to the same region cannot interleave into an inconsistent result. +Lookups are over a small set of held ranges, so the lock is cheap in the +common non-overlapping case. diff --git a/fuzz/fuzz_targets/http_api.rs b/fuzz/fuzz_targets/http_api.rs index aa3841243..c2b2fa023 100644 --- a/fuzz/fuzz_targets/http_api.rs +++ b/fuzz/fuzz_targets/http_api.rs @@ -113,6 +113,22 @@ impl RequestHandler for StubApiRequestHandler { Ok(()) } + fn vm_disk_mirror_start(&mut self, _: String, _: PathBuf) -> Result<(), VmError> { + Ok(()) + } + + fn vm_disk_mirror_status(&mut self, _: String) -> Result>, VmError> { + Ok(None) + } + + fn vm_disk_mirror_complete(&mut self, _: String) -> Result<(), VmError> { + Ok(()) + } + + fn vm_disk_mirror_cancel(&mut self, _: String) -> Result<(), VmError> { + Ok(()) + } + #[cfg(target_arch = "x86_64")] fn vm_coredump(&mut self, _: &str) -> Result<(), VmError> { Ok(()) diff --git a/virtio-devices/src/block.rs b/virtio-devices/src/block.rs index 261091e49..16f85dc33 100644 --- a/virtio-devices/src/block.rs +++ b/virtio-devices/src/block.rs @@ -15,15 +15,20 @@ use std::ops::Deref; use std::os::unix::io::AsRawFd; use std::path::PathBuf; use std::sync::atomic::{AtomicBool, AtomicU8, AtomicU64, AtomicUsize, Ordering}; -use std::sync::{Arc, Barrier}; +use std::sync::mpsc::{Receiver, Sender}; +use std::sync::{Arc, Barrier, Mutex, mpsc}; use std::time::{Duration, Instant}; use std::{io, result, thread}; use anyhow::anyhow; use block::async_io::{AsyncIo, AsyncIoError}; use block::disk_file::AsyncFullDiskFile; -use block::error::BlockError; +use block::error::{BlockError, BlockErrorKind, BlockResult}; use block::fcntl::{LockError, LockGranularity, LockGranularityChoice, LockType, get_lock_state}; +use block::mirror::{ + BlockMirrorHandle, CopyWorker, CopyWorkerHandle, MIRROR_BLOCK_SIZE, MirrorPhase, MirrorState, + MirrorStatus, MirroringAsyncIo, +}; use block::{ ExecuteAsync, ExecuteError, MAX_DISCARD_WRITE_ZEROES_SEG, Request, RequestType, VirtioBlockConfig, build_serial, fcntl, @@ -63,6 +68,12 @@ const COMPLETION_EVENT: u16 = EPOLL_HELPER_EVENT_LAST + 2; // New 'wake up' event from the rate limiter const RATE_LIMITER_EVENT: u16 = EPOLL_HELPER_EVENT_LAST + 3; +// A `BlockQueueCommand` has been queued for this worker to apply (e.g. swap disk_image). +const BLOCK_COMMAND_EVENT: u16 = EPOLL_HELPER_EVENT_LAST + 4; + +// Maximum duration to wait for a command to be acknowledged by the virtqueue worker. +const MIRROR_COMMAND_ACK_TIMEOUT: Duration = Duration::from_secs(5); + // latency scale, for reduce precision loss in calculate. const LATENCY_SCALE: u64 = 10000; @@ -112,10 +123,119 @@ pub enum Error { ConfigChange(#[source] io::Error), #[error("Disk resize failed")] DiskResize(#[source] BlockError), + #[error("Mirror is currently active")] + MirrorActive, + #[error("Failed applying mirror command: {0}")] + MirrorSwap(String), } pub type Result = result::Result; +/// Lifecycle command kind for a virtqueue worker. +#[derive(Debug)] +pub enum BlockQueueCommandKind { + /// Replace the plain source backend with a mirroring backend. + InstallMirror, + /// Replace the mirroring backend with a plain destination backend. + CompleteToDestination, + /// Replace the mirroring backend with a plain source backend. + CancelToSource, +} + +/// Acknowledgement sent by one virtqueue worker after handling a command. +pub struct BlockQueueAck { + /// ID of the command that is being acknowledged. + pub op_id: u64, + /// Result of applying the command inside the worker. + pub result: Result<()>, +} + +/// Command sent from `Block` to one virtqueue worker to change the worker's +/// active block I/O backend. +pub struct BlockQueueCommand { + /// Unique id for this lifecycle operation, used to match the succeeding acknowledgement. + pub op_id: u64, + /// Lifecycle action the worker should apply. + pub kind: BlockQueueCommandKind, + /// New async I/O backend that will replace the worker's current + /// `disk_image` after the old backend has drained. + /// + /// For start this is a `MirroringAsyncIo`. For cancel this is a plain + /// source `AsyncIo`. For completion this is a plain destination `AsyncIo`. + pub async_io: Box, + + /// Channel used by the worker to report that the command was applied or + /// failed. + pub ack: Sender, +} + +impl BlockQueueCommand { + pub fn install_mirror( + op_id: u64, + async_io: Box, + ack: Sender, + ) -> Self { + BlockQueueCommand { + op_id, + kind: BlockQueueCommandKind::InstallMirror, + async_io, + ack, + } + } + + pub fn complete_to_destination( + op_id: u64, + async_io: Box, + ack: Sender, + ) -> Self { + BlockQueueCommand { + op_id, + kind: BlockQueueCommandKind::CompleteToDestination, + async_io, + ack, + } + } + + /// Cancel mirroring and revert to the original source backend. + pub fn cancel_to_source( + op_id: u64, + async_io: Box, + ack: Sender, + ) -> Self { + BlockQueueCommand { + op_id, + kind: BlockQueueCommandKind::CancelToSource, + async_io, + ack, + } + } +} + +/// Per-virtqueue plumbing for swapping the worker's `disk_image` at +/// runtime. +/// +/// `cmd` and `evt` are shared with the API thread, which puts a +/// [`BlockQueueCommand`] into `cmd` (from [`Block::start_mirror`], +/// `complete_mirror`, or `cancel_mirror`) and writes to `evt` to wake the +/// worker. The worker takes the command and applies it. +pub struct BlockQueueCommandReceiver { + /// Next [`BlockQueueCommand`] to apply. Written by the API thread, + /// taken by the worker on `BLOCK_COMMAND_EVENT`. + pub cmd: Arc>>, + /// Wakes the worker after `cmd` is filled. Fires `BLOCK_COMMAND_EVENT` + /// on the worker's epoll set. + pub evt: EventFd, + /// Command taken from `cmd` and held until `disk_image` reports no + /// in-flight requests. Owned and accessed only by the worker. + pending_block_queue_command: Option, +} + +struct BlockQueueCommandSender { + cmd: Arc>>, + evt: EventFd, + queue_size: u16, +} + // latency will be records as microseconds, average latency // will be save as scaled value. #[derive(Clone)] @@ -193,6 +313,8 @@ struct BlockEpollHandler { host_cpus: Option>, acked_features: u64, disable_sector0_writes: bool, + /// Receives mirror lifecycle commands for this virtqueue worker. + cmd_receiver: Option, } fn has_feature(features: u64, feature_flag: u64) -> bool { @@ -249,7 +371,15 @@ impl BlockEpollHandler { if draining_active_requests.load(Ordering::SeqCst) { return Ok(()); } - + // Defer submitting new descriptors while a mirror swap is draining. + // The queue_evt is kicked at the end of the swap. + if self + .cmd_receiver + .as_ref() + .is_some_and(|m| m.pending_block_queue_command.is_some()) + { + return Ok(()); + } let queue = &mut self.queue; let queue_size = queue.size(); let mut batch_requests = Vec::new(); @@ -466,6 +596,86 @@ impl BlockEpollHandler { self.try_signal_used_queue() } + fn apply_block_queue_command( + disk_image: &mut Box, + command: BlockQueueCommand, + helper: &mut EpollHelper, + ) -> result::Result<(), Error> { + let BlockQueueCommand { + op_id: _, + kind: _, + async_io: new_disk_image, + ack: _, + } = command; + + let new_disk_fd = new_disk_image.notifier().as_raw_fd(); + let old_disk_fd = disk_image.notifier().as_raw_fd(); + + // Register the new backend's completion eventFd. + helper + .add_event(new_disk_fd, COMPLETION_EVENT) + .map_err(|e| { + Error::MirrorSwap(format!("Failed to register new disk notifier: {e:?}")) + })?; + + // Deregister the old backend's completion eventFd. + if let Err(e) = + helper.del_event_custom(old_disk_fd, COMPLETION_EVENT, epoll::Events::EPOLLIN) + { + // Rollback the new disk_image registration. + let _ = helper.del_event_custom(new_disk_fd, COMPLETION_EVENT, epoll::Events::EPOLLIN); + return Err(Error::MirrorSwap(format!( + "Failed to deregister old disk notifier: {e:?}" + ))); + } + + // Commit the swap. + *disk_image = new_disk_image; + + Ok(()) + } + + /// Applies a pending mirror update if one is staged and the current + /// `disk_image` has no in-flight requests. Returns `Ok(())` without + /// changes when either condition is not met. The next completion + /// event will trigger another attempt. + fn try_apply_pending_block_queue_command( + &mut self, + helper: &mut EpollHelper, + ) -> result::Result<(), EpollHelperError> { + // If any disk requests are in flight, we can't apply the pending command. + if !self.inflight_requests.is_empty() || self.disk_image.has_inflight_requests() { + return Ok(()); + } + + let Some(cmd_receiver) = self.cmd_receiver.as_mut() else { + return Ok(()); + }; + + let Some(command) = cmd_receiver.pending_block_queue_command.take() else { + return Ok(()); + }; + + let op_id = command.op_id; + let ack = command.ack.clone(); + + let result = Self::apply_block_queue_command(&mut self.disk_image, command, helper); + + let _ = ack.send(BlockQueueAck { op_id, result }); + + // While the command was pending, QUEUE_AVAIL_EVENT handling consumed the + // guest's kicks without submitting (see the guard in process_queue_submit). + // The guest won't kick again for descriptors it already queued, so process + // the avail ring now, whether the command succeeded or failed, or those + // requests stall until unrelated guest I/O arrives. + let rate_limit_reached = self.rate_limiter.as_ref().is_some_and(|r| r.is_blocked()); + if !rate_limit_reached { + self.process_queue_submit_and_signal()?; + } + + Ok(()) + } + #[inline] fn find_inflight_request(&mut self, completed_head: u16) -> Result { // This loop neatly handles the fast path where the completions are @@ -682,6 +892,9 @@ impl BlockEpollHandler { if let Some(rate_limiter) = &self.rate_limiter { helper.add_event(rate_limiter.as_raw_fd(), RATE_LIMITER_EVENT)?; } + if let Some(cmd_receiver) = &self.cmd_receiver { + helper.add_event(cmd_receiver.evt.as_raw_fd(), BLOCK_COMMAND_EVENT)?; + } self.set_queue_thread_affinity(); helper.run(paused, paused_sync, self)?; @@ -692,7 +905,7 @@ impl BlockEpollHandler { impl EpollHelperHandler for BlockEpollHandler { fn handle_event( &mut self, - _helper: &mut EpollHelper, + helper: &mut EpollHelper, event: &epoll::Event, ) -> result::Result<(), EpollHelperError> { let ev_type = event.data as u16; @@ -726,6 +939,7 @@ impl EpollHelperHandler for BlockEpollHandler { if !rate_limit_reached { self.process_queue_submit_and_signal()?; } + self.try_apply_pending_block_queue_command(helper)?; } RATE_LIMITER_EVENT => { if let Some(rate_limiter) = &mut self.rate_limiter { @@ -744,6 +958,25 @@ impl EpollHelperHandler for BlockEpollHandler { ))); } } + BLOCK_COMMAND_EVENT => { + if let Some(cmd_receiver) = self.cmd_receiver.as_mut() { + cmd_receiver.evt.read().map_err(|e| { + EpollHelperError::HandleEvent(anyhow!( + "Failed to read block command event: {e:?}" + )) + })?; + if let Some(update) = cmd_receiver.cmd.lock().unwrap().take() + && let Some(stale) = + cmd_receiver.pending_block_queue_command.replace(update) + { + warn!( + "Replacing pending block queue command {:?} before it was applied", + stale.kind + ); + } + } + self.try_apply_pending_block_queue_command(helper)?; + } _ => { return Err(EpollHelperError::HandleEvent(anyhow!( "Unexpected event: {ev_type}" @@ -774,6 +1007,12 @@ pub struct Block { device_status: Arc, active_request_count: Arc, draining_active_requests: Arc, + /// Per-virtqueue mirror writer-side handles, populated at + /// activation. `Block::start_mirror` fills each slot with a + /// [`BlockQueueCommand`] and writes the corresponding evt. + queue_cmd_senders: Vec, + next_queue_cmd_op_id: u64, + mirror_handle: Option, } #[derive(Serialize, Deserialize)] @@ -940,6 +1179,9 @@ impl Block { device_status: Arc::new(AtomicU8::new(0)), active_request_count: Arc::new(AtomicUsize::new(0)), draining_active_requests: Arc::new(AtomicBool::new(false)), + queue_cmd_senders: Vec::new(), + next_queue_cmd_op_id: 1, + mirror_handle: None, }) } @@ -1090,6 +1332,10 @@ impl Block { return Err(Error::InvalidSize); } + if self.mirror_handle.is_some() { + return Err(Error::MirrorActive); + } + self.disk_image .resize(new_size) .map_err(Error::DiskResize)?; @@ -1109,6 +1355,314 @@ impl Block { .map_err(Error::ConfigChange) } + pub fn logical_size(&self) -> BlockResult { + self.disk_image.logical_size() + } + + /// Start mirroring the device's disk to `destination`. + /// + /// Each virtqueue worker swaps its `disk_image` to a new + /// [`MirroringAsyncIo`] that fans every mutating request out to both + /// backends. A background [`CopyWorker`] copies existing source bytes + /// to destination until all initial bytes are copied. + /// The [`MirroringAsyncIo`] stays in place until completion, keeping the device's + /// disk and `destination` in sync. + /// + /// Returns an error if the destination is smaller than the source, on + /// `logical_size()` failure, [`MirroringAsyncIo`] construction failure, or + /// copy worker spawn failure. + pub fn start_mirror( + &mut self, + destination: Box, + destination_path: PathBuf, + ) -> BlockResult<()> { + self.ensure_not_paused_for_mirror()?; + let source_size = self.disk_image.logical_size()?; + let dest_size = destination.logical_size()?; + if dest_size < source_size { + return Err(BlockError::new( + BlockErrorKind::Io, + io::Error::other(format!( + "mirror destination ({dest_size} bytes) is smaller than source ({source_size} bytes)" + )), + )); + } + + let state = MirrorState::new(source_size); + let op_id = self.next_mirror_op_id(); + let (ack_tx, ack_rx) = mpsc::channel(); + + let mut commands = Vec::with_capacity(self.queue_cmd_senders.len()); + for sender in &self.queue_cmd_senders { + let async_io = MirroringAsyncIo::create( + self.disk_image.as_ref(), + destination.as_ref(), + state.clone(), + sender.queue_size as u32, + )?; + commands.push(( + sender, + BlockQueueCommand::install_mirror(op_id, async_io, ack_tx.clone()), + )); + } + + drop(ack_tx); + + let install_result: BlockResult = (|| { + Self::send_mirror_queue_commands(commands)?; + Self::wait_for_mirror_queue_command_acks(op_id, &ack_rx, self.queue_cmd_senders.len())?; + CopyWorker::new( + self.disk_image.as_ref(), + destination.as_ref(), + state.clone(), + MIRROR_BLOCK_SIZE, + )? + .spawn() + .map_err(|e| BlockError::new(BlockErrorKind::Io, e)) + })(); + + let copy_worker = match install_result { + Ok(worker) => worker, + Err(e) => { + state.transition_to_phase(MirrorPhase::Failed(format!( + "mirror install failed: {e}" + ))); + + // Don't mask the install error on revert err. + if let Err(revert_err) = self.revert_queues_to_source() { + error!( + "failed to revert virtqueues to source after mirror install failure: {revert_err}" + ); + } + return Err(e); + } + }; + + self.mirror_handle = Some(BlockMirrorHandle { + state, + copy_worker, + destination, + destination_path, + }); + Ok(()) + } + + /// Switch the device's mirroring wrapper to the destination disk. + /// + /// Each virtqueue worker swaps its [`MirroringAsyncIo`] for a plain + /// [`AsyncIo`] on the destination through the same slot and eventfd + /// mechanism used to install the mirror. After this call the source + /// disk is no longer used by the VM and the operator can detach or + /// remove it. + /// + /// Returns [`BlockErrorKind::MirrorNotActive`] when no mirror is + /// active for the device, and [`BlockErrorKind::MirrorNotReady`] when + /// the copy worker has not yet reported the ready phase or the mirror + /// has since failed. Both errors return before any queue command is + /// sent, so the mirror handle is left in place and the caller can poll + /// the state and retry. + /// + /// # Panics + /// + /// Panics if a queue command cannot be sent or acknowledged after the + /// switch-over has started. At that point some queues may already write + /// to the destination only, and there is no revert that keeps + /// acknowledged writes, so aborting is preferred over data loss. + pub fn complete_mirror(&mut self) -> BlockResult { + self.ensure_not_paused_for_mirror()?; + let op_id = self.next_mirror_op_id(); + let (ack_tx, ack_rx) = mpsc::channel(); + + let handle = self + .mirror_handle + .as_ref() + .ok_or_else(|| BlockError::from_kind(BlockErrorKind::MirrorNotActive))?; + + // Only allow completing when the copy worker is in the ready phase or as a retry. + if !matches!( + handle.state.phase(), + MirrorPhase::Ready | MirrorPhase::Completing + ) { + return Err(BlockError::from_kind(BlockErrorKind::MirrorNotReady)); + } + + let mut commands = Vec::with_capacity(self.queue_cmd_senders.len()); + for sender in &self.queue_cmd_senders { + let async_io = handle + .destination + .create_async_io(sender.queue_size as u32)?; + commands.push(( + sender, + BlockQueueCommand::complete_to_destination(op_id, async_io, ack_tx.clone()), + )); + } + + drop(ack_tx); + + // A concurrent destination failure may have moved the mirror to + // Failed since the phase guard above. Confirm Completing took effect + // before sending any command, otherwise we would swap the device + // onto a failed mirror. + handle.state.transition_to_phase(MirrorPhase::Completing); + if handle.state.phase() != MirrorPhase::Completing { + return Err(BlockError::from_kind(BlockErrorKind::MirrorNotReady)); + } + + // Once the first command is sent a queue may write to the destination + // only, so a partial switch-over has no safe revert. We panic rather + // than risk losing acknowledged writes. + Self::send_mirror_queue_commands(commands).expect("mirror queue commands sent"); + Self::wait_for_mirror_queue_command_acks(op_id, &ack_rx, self.queue_cmd_senders.len()) + .expect("mirror queue command acks received"); + handle.state.transition_to_phase(MirrorPhase::Completed); + + // Pre-build succeeded, own the destination now and commit the completion. + let BlockMirrorHandle { + destination, + destination_path, + copy_worker: _, + state: _, + } = self.mirror_handle.take().unwrap(); + + self.disk_image = destination; + self.disk_path = destination_path.clone(); + Ok(destination_path) + } + + /// Fails with `MirrorDevicePaused` when the device is paused, since a + /// parked worker cannot apply a staged mirror command. + fn ensure_not_paused_for_mirror(&self) -> BlockResult<()> { + if self.common.paused.load(Ordering::SeqCst) { + return Err(BlockError::from_kind(BlockErrorKind::MirrorDevicePaused)); + } + Ok(()) + } + + fn next_mirror_op_id(&mut self) -> u64 { + let op_id = self.next_queue_cmd_op_id; + self.next_queue_cmd_op_id = self.next_queue_cmd_op_id.wrapping_add(1); + op_id + } + + fn mirror_swap_error(msg: impl Into) -> BlockError { + BlockError::new(BlockErrorKind::MirrorSwap, io::Error::other(msg.into())) + } + + fn send_mirror_queue_commands( + commands: Vec<(&BlockQueueCommandSender, BlockQueueCommand)>, + ) -> BlockResult<()> { + for (sender, command) in commands { + let mut slot = sender.cmd.lock().unwrap(); + + if slot.is_some() { + return Err(Self::mirror_swap_error("mirror command slot is occupied")); + } + + *slot = Some(command); + sender.evt.write(1).map_err(|e| { + Self::mirror_swap_error(format!("failed to notify mirror queue worker: {e}")) + })?; + } + + Ok(()) + } + + /// Wait for n acknowledgments of a mirror command with the given op_id on ack_rx, returning + /// an error if a timeout occurs or if any ack reports an error or mismatched op_id. + fn wait_for_mirror_queue_command_acks( + op_id: u64, + ack_rx: &Receiver, + expected_acks: usize, + ) -> BlockResult<()> { + for _ in 0..expected_acks { + let ack = ack_rx + .recv_timeout(MIRROR_COMMAND_ACK_TIMEOUT) + .map_err(|e| Self::mirror_swap_error(format!("mirror command ack timeout: {e}")))?; + + if ack.op_id != op_id { + return Err(Self::mirror_swap_error(format!( + "received mirror command ack for op_id {}, expected {}", + ack.op_id, op_id + ))); + } + + ack.result.map_err(|e| { + Self::mirror_swap_error(format!("mirror command failed in queue worker: {e}")) + })?; + } + + Ok(()) + } + + /// Swap every virtqueue worker back to a plain AsyncIo on the source disk. + fn revert_queues_to_source(&mut self) -> BlockResult<()> { + let op_id = self.next_mirror_op_id(); + let (ack_tx, ack_rx) = mpsc::channel(); + let mut commands = Vec::with_capacity(self.queue_cmd_senders.len()); + for sender in &self.queue_cmd_senders { + let async_io = self.disk_image.create_async_io(sender.queue_size as u32)?; + commands.push(( + sender, + BlockQueueCommand::cancel_to_source(op_id, async_io, ack_tx.clone()), + )); + } + drop(ack_tx); + Self::send_mirror_queue_commands(commands)?; + Self::wait_for_mirror_queue_command_acks(op_id, &ack_rx, self.queue_cmd_senders.len()) + } + + /// Cancel an active mirror and revert the device to the source disk. + /// + /// Transitions the mirror to [`MirrorPhase::Cancelling`] to mark that + /// cancellation has started, reverts every virtqueue worker to a plain + /// [`AsyncIo`] on the source, then drops the handle, which joins the + /// copy worker and releases the destination. + /// + /// Returns [`BlockErrorKind::MirrorNotActive`] when no mirror is active, + /// and [`BlockErrorKind::MirrorCompletionInProgress`] once a completion + /// has been attempted, because a queue may already write to the + /// destination only and reverting would lose acknowledged guest writes. + /// + /// If the revert fails the mirror stays in [`MirrorPhase::Cancelling`] + /// with the handle held, so calling this again retries the revert and + /// finishes the cancellation. + /// + /// Blocks until the copy worker finishes its current block and joins, + /// which can stall on a slow or hung destination. + pub fn cancel_mirror(&mut self) -> BlockResult<()> { + self.ensure_not_paused_for_mirror()?; + let state = self + .mirror_handle + .as_ref() + .ok_or_else(|| BlockError::from_kind(BlockErrorKind::MirrorNotActive))? + .state + .clone(); + + if !matches!( + state.phase(), + MirrorPhase::Running + | MirrorPhase::Ready + | MirrorPhase::Failed(_) + | MirrorPhase::Cancelling + ) { + return Err(BlockError::from_kind( + BlockErrorKind::MirrorCompletionInProgress, + )); + } + + state.transition_to_phase(MirrorPhase::Cancelling); + self.revert_queues_to_source()?; + + drop(self.mirror_handle.take().unwrap()); + + Ok(()) + } + + /// Returns a snapshot of the current mirror progress. + pub fn mirror_status(&self) -> Option { + self.mirror_handle.as_ref().map(|h| h.state.status()) + } + #[cfg(fuzzing)] pub fn wait_for_epoll_threads(&mut self) { self.common.wait_for_epoll_threads(); @@ -1190,6 +1744,12 @@ impl VirtioDevice for Block { let mut epoll_threads = Vec::new(); let event_idx = self.common.feature_acked(VIRTIO_RING_F_EVENT_IDX.into()); + // Reset and pre-allocate per-virtqueue mirror handoffs. The + // writer-side (slot + evt) is kept on `Block`. The receiver-side + // is handed to the BlockEpollHandler. + self.queue_cmd_senders.clear(); + self.queue_cmd_senders.reserve(queues.len()); + for i in 0..queues.len() { let (_, mut queue, queue_evt) = queues.remove(0); queue.set_event_idx(event_idx); @@ -1198,6 +1758,26 @@ impl VirtioDevice for Block { let (kill_evt, pause_evt) = self.common.dup_eventfds(); let queue_idx = i as u16; + let queue_command: Arc>> = Arc::new(Mutex::new(None)); + let queue_command_evt = EventFd::new(libc::EFD_NONBLOCK).map_err(|e| { + error!("failed to create mirror eventfd: {e}"); + ActivateError::BadActivate + })?; + let mirror_handler_evt = queue_command_evt.try_clone().map_err(|e| { + error!("failed to clone mirror eventfd: {e}"); + ActivateError::BadActivate + })?; + let cmd_receiver = BlockQueueCommandReceiver { + cmd: Arc::clone(&queue_command), + evt: mirror_handler_evt, + pending_block_queue_command: None, + }; + self.queue_cmd_senders.push(BlockQueueCommandSender { + cmd: queue_command, + evt: queue_command_evt, + queue_size, + }); + let mut handler = BlockEpollHandler { queue_index: queue_idx, queue, @@ -1233,6 +1813,7 @@ impl VirtioDevice for Block { disable_sector0_writes: self.disable_sector0_writes, active_request_count: self.active_request_count.clone(), draining_active_requests: self.draining_active_requests.clone(), + cmd_receiver: Some(cmd_receiver), }; let paused = self.common.paused.clone(); @@ -1257,6 +1838,13 @@ impl VirtioDevice for Block { } fn reset(&mut self) { + // Cancel any active blockdev-mirror. + if self.mirror_handle.is_some() + && let Err(e) = self.cancel_mirror() + { + error!("failed to cancel disk mirror on device reset: {e}"); + } + self.common.reset(); self.draining_active_requests.store(false, Ordering::SeqCst); self.active_request_count.store(0, Ordering::SeqCst); @@ -1348,6 +1936,12 @@ impl Snapshottable for Block { } fn snapshot(&mut self) -> std::result::Result { + if self.mirror_handle.is_some() { + return Err(MigratableError::Snapshot(anyhow!( + "Cannot snapshot while mirror is active" + ))); + } + Snapshot::new_from_state(&self.state()) } } diff --git a/vmm/src/api/http/http_endpoint.rs b/vmm/src/api/http/http_endpoint.rs index 57aa6c446..6eb7ea94c 100644 --- a/vmm/src/api/http/http_endpoint.rs +++ b/vmm/src/api/http/http_endpoint.rs @@ -37,6 +37,7 @@ use std::fs::File; use std::sync::mpsc::Sender; +use block::error::BlockErrorKind; use log::info; use micro_http::{Body, Method, Request, Response, StatusCode, Version}; use vmm_sys_util::eventfd::EventFd; @@ -48,13 +49,16 @@ use crate::api::http::{EndpointHandler, HttpError, error_response}; use crate::api::{ AddDisk, ApiAction, ApiError, ApiRequest, NetConfig, VmAddDevice, VmAddFs, VmAddGenericVhostUser, VmAddNet, VmAddPmem, VmAddUserDevice, VmAddVdpa, VmAddVsock, VmBoot, - VmCancelMigration, VmConfig, VmCounters, VmDelete, VmMigrationProgress, VmNmi, VmPause, + VmCancelMigration, VmConfig, VmCounters, VmDelete, VmDiskMirrorCancel, VmDiskMirrorCancelData, + VmDiskMirrorComplete, VmDiskMirrorCompleteData, VmDiskMirrorStart, VmDiskMirrorStartData, + VmDiskMirrorStatus, VmDiskMirrorStatusData, VmMigrationProgress, VmNmi, VmPause, VmPostMigrationAnnounce, VmPowerButton, VmReboot, VmReceiveMigration, VmReceiveMigrationData, VmRemoveDevice, VmResize, VmResizeDisk, VmResizeZone, VmRestore, VmResume, VmSendMigration, VmShutdown, VmSnapshot, }; use crate::config::RestoreConfig; use crate::cpu::Error as CpuError; +use crate::device_manager::DeviceManagerError; use crate::vm::Error as VmError; /// Helper module for attaching externally opened FDs to config objects. @@ -518,6 +522,121 @@ impl PutHandler for VmSendMigration { impl GetHandler for VmSendMigration {} +impl PutHandler for VmDiskMirrorStart { + fn handle_request( + &'static self, + api_notifier: EventFd, + api_sender: Sender, + body: &Option, + _files: Vec, + ) -> Result, HttpError> { + let body = body.as_ref().ok_or(HttpError::BadRequest)?; + let data: VmDiskMirrorStartData = serde_json::from_slice(body.raw())?; + + self.send(api_notifier, api_sender, data) + .map_err(|e| match &e { + ApiError::VmDiskMirrorStart(VmError::DeviceManager( + DeviceManagerError::UnknownDeviceId(_), + )) => HttpError::NotFound, + ApiError::VmDiskMirrorStart(VmError::DeviceManager( + DeviceManagerError::BlockMirrorDestAlreadyExists(_, _), + )) => HttpError::BadRequest, + ApiError::VmDiskMirrorStart(VmError::DeviceManager( + DeviceManagerError::BlockMirrorAlreadyActive(_), + )) => HttpError::BadRequest, + _ => HttpError::ApiError(e), + }) + } +} + +impl GetHandler for VmDiskMirrorStart {} + +impl PutHandler for VmDiskMirrorStatus { + fn handle_request( + &'static self, + api_notifier: EventFd, + api_sender: Sender, + body: &Option, + _files: Vec, + ) -> Result, HttpError> { + let body = body.as_ref().ok_or(HttpError::BadRequest)?; + let data: VmDiskMirrorStatusData = serde_json::from_slice(body.raw())?; + + self.send(api_notifier, api_sender, data) + .map_err(|e| match &e { + ApiError::VmDiskMirrorStatus(VmError::DeviceManager( + DeviceManagerError::UnknownDeviceId(_), + )) => HttpError::NotFound, + ApiError::VmDiskMirrorStatus(VmError::DeviceManager( + DeviceManagerError::BlockMirrorNotActive(_), + )) => HttpError::NotFound, + _ => HttpError::ApiError(e), + }) + } +} + +impl GetHandler for VmDiskMirrorStatus {} + +impl PutHandler for VmDiskMirrorComplete { + fn handle_request( + &'static self, + api_notifier: EventFd, + api_sender: Sender, + body: &Option, + _files: Vec, + ) -> Result, HttpError> { + let body = body.as_ref().ok_or(HttpError::BadRequest)?; + let data: VmDiskMirrorCompleteData = serde_json::from_slice(body.raw())?; + + self.send(api_notifier, api_sender, data) + .map_err(|e| match &e { + ApiError::VmDiskMirrorComplete(VmError::DeviceManager( + DeviceManagerError::UnknownDeviceId(_), + )) => HttpError::NotFound, + ApiError::VmDiskMirrorComplete(VmError::DeviceManager( + DeviceManagerError::BlockMirrorComplete(_, block_err), + )) => match block_err.kind() { + BlockErrorKind::MirrorNotActive => HttpError::NotFound, + BlockErrorKind::MirrorNotReady => HttpError::BadRequest, + _ => HttpError::ApiError(e), + }, + _ => HttpError::ApiError(e), + }) + } +} + +impl GetHandler for VmDiskMirrorComplete {} + +impl PutHandler for VmDiskMirrorCancel { + fn handle_request( + &'static self, + api_notifier: EventFd, + api_sender: Sender, + body: &Option, + _files: Vec, + ) -> Result, HttpError> { + let body = body.as_ref().ok_or(HttpError::BadRequest)?; + let data: VmDiskMirrorCancelData = serde_json::from_slice(body.raw())?; + + self.send(api_notifier, api_sender, data) + .map_err(|e| match &e { + ApiError::VmDiskMirrorCancel(VmError::DeviceManager( + DeviceManagerError::UnknownDeviceId(_), + )) => HttpError::NotFound, + ApiError::VmDiskMirrorCancel(VmError::DeviceManager( + DeviceManagerError::BlockMirrorCancel(_, block_err), + )) => match block_err.kind() { + BlockErrorKind::MirrorNotActive => HttpError::NotFound, + BlockErrorKind::MirrorCompletionInProgress => HttpError::BadRequest, + _ => HttpError::ApiError(e), + }, + _ => HttpError::ApiError(e), + }) + } +} + +impl GetHandler for VmDiskMirrorCancel {} + impl PutHandler for VmResize { fn handle_request( &'static self, diff --git a/vmm/src/api/http/mod.rs b/vmm/src/api/http/mod.rs index 5464ca87a..59097a582 100644 --- a/vmm/src/api/http/mod.rs +++ b/vmm/src/api/http/mod.rs @@ -30,9 +30,10 @@ use crate::api::VmCoredump; use crate::api::{ AddDisk, ApiError, ApiRequest, VmAddDevice, VmAddFs, VmAddGenericVhostUser, VmAddNet, VmAddPmem, VmAddUserDevice, VmAddVdpa, VmAddVsock, VmBoot, VmCancelMigration, VmCounters, - VmDelete, VmMigrationProgress, VmNmi, VmPause, VmPostMigrationAnnounce, VmPowerButton, - VmReboot, VmReceiveMigration, VmRemoveDevice, VmResize, VmResizeDisk, VmResizeZone, VmRestore, - VmResume, VmSendMigration, VmShutdown, VmSnapshot, + VmDelete, VmDiskMirrorCancel, VmDiskMirrorComplete, VmDiskMirrorStart, VmDiskMirrorStatus, + VmMigrationProgress, VmNmi, VmPause, VmPostMigrationAnnounce, VmPowerButton, VmReboot, + VmReceiveMigration, VmRemoveDevice, VmResize, VmResizeDisk, VmResizeZone, VmRestore, VmResume, + VmSendMigration, VmShutdown, VmSnapshot, }; use crate::landlock::Landlock; use crate::seccomp_filters::{Thread, get_seccomp_filter}; @@ -141,6 +142,7 @@ pub trait EndpointHandler { error_response(e, StatusCode::BadRequest) } Err(e @ HttpError::TooManyRequests) => error_response(e, StatusCode::TooManyRequests), + Err(e @ HttpError::NotFound) => error_response(e, StatusCode::NotFound), Err(e) => error_response(e, StatusCode::InternalServerError), } } @@ -233,6 +235,22 @@ pub static HTTP_ROUTES: LazyLock = LazyLock::new(|| { endpoint!("/vm.delete"), Box::new(VmActionHandler::new(&VmDelete)), ); + r.routes.insert( + endpoint!("/vm.disk-mirror-start"), + Box::new(VmActionHandler::new(&VmDiskMirrorStart)), + ); + r.routes.insert( + endpoint!("/vm.disk-mirror-status"), + Box::new(VmActionHandler::new(&VmDiskMirrorStatus)), + ); + r.routes.insert( + endpoint!("/vm.disk-mirror-complete"), + Box::new(VmActionHandler::new(&VmDiskMirrorComplete)), + ); + r.routes.insert( + endpoint!("/vm.disk-mirror-cancel"), + Box::new(VmActionHandler::new(&VmDiskMirrorCancel)), + ); r.routes.insert(endpoint!("/vm.info"), Box::new(VmInfo {})); r.routes.insert( endpoint!("/vm.pause"), diff --git a/vmm/src/api/mod.rs b/vmm/src/api/mod.rs index 6d1f7c6d7..bfdcd01dd 100644 --- a/vmm/src/api/mod.rs +++ b/vmm/src/api/mod.rs @@ -40,6 +40,7 @@ use std::str::FromStr; use std::sync::mpsc::{RecvError, SendError, Sender, channel}; use std::time::Duration; +use block::mirror::{MirrorPhase, MirrorStatus}; use log::{info, trace}; use micro_http::Body; use option_parser::{OptionParser, OptionParserError, Toggle}; @@ -149,6 +150,20 @@ pub enum ApiError { #[error("The disk could not be resized")] VmResizeDisk(#[source] VmError), + /// Error starting disk mirror + #[error("Error starting disk mirror")] + VmDiskMirrorStart(#[source] VmError), + + #[error("Error reading disk mirror state")] + VmDiskMirrorStatus(#[source] VmError), + + #[error("Error completing disk mirror")] + VmDiskMirrorComplete(#[source] VmError), + + /// Error cancelling disk mirror + #[error("Error cancelling disk mirror")] + VmDiskMirrorCancel(#[source] VmError), + /// The memory zone could not be resized. #[error("The memory zone could not be resized")] VmResizeZone(#[source] VmError), @@ -235,6 +250,55 @@ pub struct VmInfoResponse { pub device_tree: Option, } +#[derive(Clone, Deserialize, Serialize, Default, Debug)] +pub struct VmDiskMirrorStartData { + pub id: String, + pub destination_path: PathBuf, +} + +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct VmDiskMirrorStatusData { + pub id: String, +} + +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct VmDiskMirrorCompleteData { + pub id: String, +} + +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct VmDiskMirrorCancelData { + pub id: String, +} + +#[derive(Clone, Debug, Serialize)] +pub struct VmDiskMirrorStatusResponse { + pub phase: String, // "running" | "ready" | "completing" | "completed" | "cancelling" | "failed" + pub copied_bytes: u64, + pub total_bytes: u64, + #[serde(skip_serializing_if = "Option::is_none")] + pub failure: Option, +} + +impl From for VmDiskMirrorStatusResponse { + fn from(s: MirrorStatus) -> Self { + let (phase, failure) = match s.phase { + MirrorPhase::Running => ("running".to_owned(), None), + MirrorPhase::Ready => ("ready".to_owned(), None), + MirrorPhase::Cancelling => ("cancelling".to_owned(), None), + MirrorPhase::Failed(reason) => ("failed".to_owned(), Some(reason)), + MirrorPhase::Completing => ("completing".to_owned(), None), + MirrorPhase::Completed => ("completed".to_owned(), None), + }; + Self { + phase, + copied_bytes: s.copied_bytes, + total_bytes: s.total_bytes, + failure, + } + } +} + #[derive(Clone, Deserialize, Serialize)] pub struct VmmPingResponse { pub build_version: String, @@ -752,6 +816,16 @@ pub trait RequestHandler { fn vm_resize_zone(&mut self, id: String, desired_ram: u64) -> Result<(), VmError>; fn vm_resize_disk(&mut self, id: String, desired_size: u64) -> Result<(), VmError>; + fn vm_disk_mirror_start( + &mut self, + id: String, + destination_path: PathBuf, + ) -> Result<(), VmError>; + + fn vm_disk_mirror_status(&mut self, id: String) -> Result>, VmError>; + fn vm_disk_mirror_complete(&mut self, id: String) -> Result<(), VmError>; + + fn vm_disk_mirror_cancel(&mut self, id: String) -> Result<(), VmError>; fn vm_add_device(&mut self, device_cfg: DeviceConfig) -> Result>, VmError>; @@ -1379,6 +1453,124 @@ impl ApiAction for VmDelete { get_response_body(self, api_evt, api_sender, data) } } +pub struct VmDiskMirrorStart; +impl ApiAction for VmDiskMirrorStart { + type RequestBody = VmDiskMirrorStartData; + type ResponseBody = Option; + + fn request(&self, data: Self::RequestBody, response_sender: Sender) -> ApiRequest { + Box::new(move |vmm| { + let response = vmm + .vm_disk_mirror_start(data.id, data.destination_path) + .map_err(ApiError::VmDiskMirrorStart) + .map(|_| ApiResponsePayload::Empty); + + response_sender + .send(response) + .map_err(VmmError::ApiResponseSend)?; + + Ok(false) + }) + } + + fn send( + &self, + api_evt: EventFd, + api_sender: Sender, + data: Self::RequestBody, + ) -> ApiResult { + get_response_body(self, api_evt, api_sender, data) + } +} + +pub struct VmDiskMirrorStatus; +impl ApiAction for VmDiskMirrorStatus { + type RequestBody = VmDiskMirrorStatusData; + type ResponseBody = Option; + + fn request(&self, data: Self::RequestBody, response_sender: Sender) -> ApiRequest { + Box::new(move |vmm| { + let response = vmm + .vm_disk_mirror_status(data.id) + .map_err(ApiError::VmDiskMirrorStatus) + .map(ApiResponsePayload::VmAction); + + response_sender + .send(response) + .map_err(VmmError::ApiResponseSend)?; + Ok(false) + }) + } + + fn send( + &self, + api_evt: EventFd, + api_sender: Sender, + data: Self::RequestBody, + ) -> ApiResult { + get_response_body(self, api_evt, api_sender, data) + } +} + +pub struct VmDiskMirrorComplete; + +impl ApiAction for VmDiskMirrorComplete { + type RequestBody = VmDiskMirrorCompleteData; + type ResponseBody = Option; + + fn request(&self, data: Self::RequestBody, response_sender: Sender) -> ApiRequest { + Box::new(move |vmm| { + let response = vmm + .vm_disk_mirror_complete(data.id) + .map_err(ApiError::VmDiskMirrorComplete) + .map(|_| ApiResponsePayload::Empty); + + response_sender + .send(response) + .map_err(VmmError::ApiResponseSend)?; + Ok(false) + }) + } + + fn send( + &self, + api_evt: EventFd, + api_sender: Sender, + data: Self::RequestBody, + ) -> ApiResult { + get_response_body(self, api_evt, api_sender, data) + } +} + +pub struct VmDiskMirrorCancel; + +impl ApiAction for VmDiskMirrorCancel { + type RequestBody = VmDiskMirrorCancelData; + type ResponseBody = Option; + + fn request(&self, data: Self::RequestBody, response_sender: Sender) -> ApiRequest { + Box::new(move |vmm| { + let response = vmm + .vm_disk_mirror_cancel(data.id) + .map_err(ApiError::VmDiskMirrorCancel) + .map(|_| ApiResponsePayload::Empty); + + response_sender + .send(response) + .map_err(VmmError::ApiResponseSend)?; + Ok(false) + }) + } + + fn send( + &self, + api_evt: EventFd, + api_sender: Sender, + data: Self::RequestBody, + ) -> ApiResult { + get_response_body(self, api_evt, api_sender, data) + } +} pub struct VmInfo; diff --git a/vmm/src/api/openapi/cloud-hypervisor.yaml b/vmm/src/api/openapi/cloud-hypervisor.yaml index fd5ccec53..3eb0cd01b 100644 --- a/vmm/src/api/openapi/cloud-hypervisor.yaml +++ b/vmm/src/api/openapi/cloud-hypervisor.yaml @@ -510,6 +510,88 @@ paths: 500: description: The VM migration could not be sent. + /vm.disk-mirror-start: + put: + summary: Start mirroring a disk to a destination + requestBody: + description: The disk to mirror and the destination path + content: + application/json: + schema: + $ref: "#/components/schemas/VmDiskMirrorStartData" + required: true + responses: + 204: + description: Disk mirroring was successfully started. + 400: + description: A mirror is already active for the disk, or the destination is not usable. + 404: + description: No disk with the given identifier was found. + 500: + description: Disk mirroring could not be started. + + /vm.disk-mirror-status: + put: + summary: Query the status of a disk mirror + requestBody: + description: The identifier of the mirrored disk + content: + application/json: + schema: + $ref: "#/components/schemas/VmDiskMirrorStatusData" + required: true + responses: + 200: + description: The current status of the disk mirror. + content: + application/json: + schema: + $ref: "#/components/schemas/VmDiskMirrorStatusResponse" + 404: + description: No disk with the given identifier was found, or no mirror is active for it. + 500: + description: The disk mirror status could not be retrieved. + + /vm.disk-mirror-complete: + put: + summary: Complete a disk mirror and switch to the destination + requestBody: + description: The identifier of the mirrored disk + content: + application/json: + schema: + $ref: "#/components/schemas/VmDiskMirrorCompleteData" + required: true + responses: + 204: + description: The disk mirror was completed and the device now uses the destination. + 400: + description: The mirror is not ready to complete. + 404: + description: No disk with the given identifier was found, or no mirror is active for it. + 500: + description: The disk mirror could not be completed. + + /vm.disk-mirror-cancel: + put: + summary: Cancel a disk mirror and keep the source disk + requestBody: + description: The identifier of the mirrored disk + content: + application/json: + schema: + $ref: "#/components/schemas/VmDiskMirrorCancelData" + required: true + responses: + 204: + description: The disk mirror was cancelled and the device keeps using the source. + 400: + description: The mirror cannot be cancelled because completion is already in progress. + 404: + description: No disk with the given identifier was found, or no mirror is active for it. + 500: + description: The disk mirror could not be cancelled. + components: schemas: VmmPingResponse: @@ -1564,3 +1646,63 @@ components: type: string access: type: string + + VmDiskMirrorStartData: + required: + - id + - destination_path + type: object + properties: + id: + type: string + destination_path: + type: string + + VmDiskMirrorStatusData: + required: + - id + type: object + properties: + id: + type: string + + VmDiskMirrorStatusResponse: + required: + - phase + - copied_bytes + - total_bytes + type: object + properties: + phase: + type: string + enum: + - running + - ready + - completing + - completed + - cancelling + - failed + copied_bytes: + type: integer + format: int64 + total_bytes: + type: integer + format: int64 + failure: + type: string + + VmDiskMirrorCompleteData: + required: + - id + type: object + properties: + id: + type: string + + VmDiskMirrorCancelData: + required: + - id + type: object + properties: + id: + type: string diff --git a/vmm/src/device_manager.rs b/vmm/src/device_manager.rs index 60e07f002..094609b6d 100644 --- a/vmm/src/device_manager.rs +++ b/vmm/src/device_manager.rs @@ -35,7 +35,8 @@ use arch::{DeviceType, MmioDeviceInfo}; use arch::{NumaNodes, layout}; use block::ImageType; use block::error::BlockError; -use block::factory::{DiskOpenOptions, open_disk}; +use block::factory::{DiskOpenOptions, create_disk, open_disk}; +use block::mirror::MirrorStatus; #[cfg(target_arch = "riscv64")] use devices::aia; #[cfg(target_arch = "x86_64")] @@ -679,6 +680,39 @@ pub enum DeviceManagerError { specified: ImageType, detected: ImageType, }, + + /// No block mirroring is active for the current device. + #[error("No block mirroring is active for the current disk with identifier: {0}")] + BlockMirrorNotActive(String), + + /// Mirroring is already active for the current device. + #[error( + "Failed to start block mirroring for the disk with identifier: {0} as mirroring is already active" + )] + BlockMirrorAlreadyActive(String), + + /// The mirror destination path is already backing one of the VM's disks. + #[error("Cannot mirror to '{0}': it is already in use as a disk image by this VM")] + BlockMirrorDestinationInUse(String), + + /// Cannot perform given action, as the device is currently performing a block mirroring operation. + #[error( + "Failed to perform the requested action for the disk with identifier: {0} as it is currently performing a block mirroring operation" + )] + BlockMirrorActive(String), + + /// The block mirroring destination path already exists. + #[error( + "The block mirroring destination path already exists for the disk with identifier: {0} at path: {1}" + )] + BlockMirrorDestAlreadyExists(String, String), + + #[error("Failed to complete block mirror for disk {0}: {1}")] + BlockMirrorComplete(String, #[source] BlockError), + + /// Cancelling the block mirror failed. + #[error("Failed to cancel the block mirror for the disk with identifier: {0}")] + BlockMirrorCancel(String, #[source] BlockError), } pub type DeviceManagerResult = result::Result; @@ -4761,16 +4795,20 @@ impl DeviceManager { // Release advisory locks by dropping all references. // Linux automatically releases all locks of that file if the last open FD is closed. { - let maybe_block_device_index = self + if let Some(index) = self .block_devices .iter() - .enumerate() - .find(|(_, dev)| { - let dev = dev.lock().unwrap(); - dev.id() == id - }) - .map(|(i, _)| i); - if let Some(index) = maybe_block_device_index { + .position(|dev| dev.lock().unwrap().id() == id) + { + // Deny removal of active mirroring block device. + if self.block_devices[index] + .lock() + .unwrap() + .mirror_status() + .is_some() + { + return Err(DeviceManagerError::BlockMirrorActive(id.to_string())); + } let _ = self.block_devices.swap_remove(index); } } @@ -5320,6 +5358,164 @@ impl DeviceManager { } } + /// Start mirroring the disk identified by `device_id` to a new + /// file at `dest_path`. + /// + /// The destination file must not exist yet. It is created with the + /// same image format and backend flags as the source disk, sized to + /// match the source's logical size, and handed to the virtio block + /// device which mirrors later guest writes out to both backends + /// while a background worker copies the existing source contents. + /// + /// Returns an error if no disk with the given identifier is attached + /// to the VM, or the destination cannot be created or opened. + pub fn mirror_disk(&self, device_id: &str, dest_path: &Path) -> DeviceManagerResult<()> { + for dev in &self.block_devices { + let mut disk = dev.lock().unwrap(); + if disk.id() != device_id { + continue; + } + + if let Some(status) = disk.mirror_status() { + return Err(DeviceManagerError::BlockMirrorAlreadyActive(format!( + "{device_id} is in phase {:?}, cancel the mirror before starting a new one", + status.phase + ))); + } + + // Refuse a destination that already backs one of this VM's disks, comparing canonicalized paths. + let canon = |p: &Path| std::fs::canonicalize(p).unwrap_or_else(|_| p.to_path_buf()); + let dest_canon = canon(dest_path); + let dest_in_use = self + .config + .lock() + .unwrap() + .disks + .iter() + .flatten() + .filter_map(|d| d.path.as_deref()) + .any(|src| canon(src) == dest_canon); + if dest_in_use { + return Err(DeviceManagerError::BlockMirrorDestinationInUse( + dest_path.display().to_string(), + )); + } + + let (options, image_type) = { + let cfg = self.config.lock().unwrap(); + let disks = cfg + .disks + .as_ref() + .ok_or_else(|| DeviceManagerError::UnknownDeviceId(device_id.to_string()))?; + + let src = disks + .iter() + .find(|d| d.pci_common.id.as_deref() == Some(device_id)) + .ok_or_else(|| DeviceManagerError::UnknownDeviceId(device_id.to_string()))?; + + ( + &DiskOpenOptions { + path: dest_path, + readonly: false, // ignore source's readonly, mirroring needs write access. + direct: src.direct, + sparse: src.sparse, + backing_files: src.backing_files, + disable_io_uring: src.disable_io_uring, + disable_aio: src.disable_aio, + }, + src.image_type, + ) + }; + + // TODO: make this configurable via request flags (create_disk, + // use_existing_disk). For now, create the destination only when it + // is missing and open it either way. + if !dest_path.exists() { + let logical_size = disk.logical_size().map_err(DeviceManagerError::Disk)?; + create_disk(options, image_type, logical_size).map_err(DeviceManagerError::Disk)?; + } + let dest_disk = open_disk(options).map_err(DeviceManagerError::Disk)?.disk; + + disk.start_mirror(dest_disk, dest_path.to_path_buf()) + .map_err(DeviceManagerError::Disk)?; + + return Ok(()); + } + + Err(DeviceManagerError::UnknownDeviceId(device_id.to_string())) + } + + /// Return the current state of the active mirror for the disk + /// identified by `device_id`. + /// + /// Returns an error if no disk with the given identifier is + /// attached to the VM, or if the disk has no active mirror. + pub fn mirror_disk_status(&self, device_id: &str) -> DeviceManagerResult { + for dev in &self.block_devices { + let disk = dev.lock().unwrap(); + + if disk.id() == device_id { + return disk.mirror_status().ok_or_else(|| { + DeviceManagerError::BlockMirrorNotActive(device_id.to_string()) + }); + } + } + + Err(DeviceManagerError::UnknownDeviceId(device_id.to_string())) + } + + /// Completes the active block mirror for the disk identified by `device_id`, + /// switching over to the destination disk. Errors if no disk with that + /// identifier is attached, if no mirror is active, or if the mirror is not + /// yet ready. + pub fn mirror_disk_complete(&self, device_id: &str) -> DeviceManagerResult<()> { + for dev in &self.block_devices { + let mut disk = dev.lock().unwrap(); + if disk.id() == device_id { + let new_path = disk.complete_mirror().map_err(|e| { + DeviceManagerError::BlockMirrorComplete(device_id.to_string(), e) + })?; + + // Repoint the config entry so a rebuild reopens the destination. + if let Some(cfg) = self + .config + .lock() + .unwrap() + .disks + .as_mut() + .and_then(|disks| { + disks + .iter_mut() + .find(|d| d.pci_common.id.as_deref() == Some(device_id)) + }) + { + cfg.path = Some(new_path); + } + + return Ok(()); + } + } + Err(DeviceManagerError::UnknownDeviceId(device_id.to_string())) + } + + /// Cancels the active block mirror for the disk identified by + /// `device_id`, reverting all virtqueue workers to the source disk + /// and releasing the destination. Errors if no disk with that + /// identifier is attached, if no mirror is active, if a completion has + /// already been attempted, or if reverting the virtqueue workers + /// fails. + pub fn mirror_disk_cancel(&self, device_id: &str) -> DeviceManagerResult<()> { + for dev in &self.block_devices { + let mut disk = dev.lock().unwrap(); + if disk.id() == device_id { + return disk + .cancel_mirror() + .map_err(|e| DeviceManagerError::BlockMirrorCancel(device_id.to_string(), e)); + } + } + Err(DeviceManagerError::UnknownDeviceId(device_id.to_string())) + } + /// Helps the environment converge quickly after a live migration by /// prompting devices to advertise the VM from its new host. /// @@ -5362,6 +5558,13 @@ impl DeviceManager { MAX_DELAY, ); } + + /// Returns true if there is an active mirror in any of the block devices, false otherwise. + pub fn any_active_block_mirrors(&self) -> bool { + self.block_devices + .iter() + .any(|dev| dev.lock().unwrap().mirror_status().is_some()) + } } /// Starts a thread that periodically performs the post-migration announcements. diff --git a/vmm/src/lib.rs b/vmm/src/lib.rs index c94eee40e..e847c7001 100644 --- a/vmm/src/lib.rs +++ b/vmm/src/lib.rs @@ -17,7 +17,6 @@ use std::fs::File; use std::io::{Read, Write, stdout}; use std::os::unix::io::{AsRawFd, FromRawFd, RawFd}; use std::panic::AssertUnwindSafe; -#[cfg(feature = "guest_debug")] use std::path::PathBuf; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::{Receiver, RecvError, SendError, Sender}; @@ -63,8 +62,8 @@ use vmm_sys_util::signal::unblock_signal; use vmm_sys_util::sock_ctrl_msg::ScmSocket; use crate::api::{ - ApiRequest, ApiResponse, RequestHandler, TimeoutStrategy, VmInfoResponse, - VmReceiveMigrationData, VmSendMigrationData, VmmPingResponse, + ApiRequest, ApiResponse, RequestHandler, TimeoutStrategy, VmDiskMirrorStatusResponse, + VmInfoResponse, VmReceiveMigrationData, VmSendMigrationData, VmmPingResponse, }; use crate::config::{MemoryRestoreMode, RestoreConfig, add_to_config}; #[cfg(all(target_arch = "x86_64", feature = "guest_debug"))] @@ -2626,6 +2625,10 @@ impl RequestHandler for Vmm { fn vm_snapshot(&mut self, destination_url: &str) -> result::Result<(), VmError> { match self.vm { MaybeVmOwnership::Vmm(ref mut vm) => { + if vm.any_active_block_mirrors() { + return Err(VmError::ActiveBlockMirror); + } + // Drain console_info so that FDs are not reused let _ = self.console_info.take(); vm.snapshot() @@ -2722,6 +2725,11 @@ impl RequestHandler for Vmm { MaybeVmOwnership::Migration(_) => return Err(VmError::VmMigrating), MaybeVmOwnership::None => return Err(VmError::VmNotRunning), }; + + if vm.any_active_block_mirrors() { + return Err(VmError::ActiveBlockMirror); + } + // Drain console_info so that the FDs are not reused let _ = self.console_info.take(); let r = vm.shutdown(); @@ -2743,6 +2751,11 @@ impl RequestHandler for Vmm { MaybeVmOwnership::Migration(_) => return Err(VmError::VmMigrating), MaybeVmOwnership::None => return Err(VmError::VmNotRunning), }; + + if vm.any_active_block_mirrors() { + return Err(VmError::ActiveBlockMirror); + } + let config = vm.get_config(); vm.shutdown()?; self.vm = MaybeVmOwnership::None; @@ -2864,7 +2877,11 @@ impl RequestHandler for Vmm { } match &self.vm { - MaybeVmOwnership::Vmm(_vm) => { + MaybeVmOwnership::Vmm(vm) => { + if vm.any_active_block_mirrors() { + return Err(VmError::ActiveBlockMirror); + } + event!("vm", "deleted"); // If a VM is booted, we first try to shut it down. @@ -3370,8 +3387,14 @@ impl RequestHandler for Vmm { .context("Invalid send migration configuration") .map_err(MigratableError::MigrateSend)?; - match self.vm { - MaybeVmOwnership::Vmm(_) => (), + match &self.vm { + MaybeVmOwnership::Vmm(vm) => { + if vm.any_active_block_mirrors() { + return Err(MigratableError::MigrateSend(anyhow!( + "Cannot start migration with active disk mirrors" + ))); + } + } MaybeVmOwnership::Migration(_) => { return Err(MigratableError::MigrateSend(anyhow!( "There is already an ongoing migration" @@ -3508,6 +3531,55 @@ impl RequestHandler for Vmm { let lock = MIGRATION_PROGRESS_SNAPSHOT.lock().unwrap(); lock.clone() } + + fn vm_disk_mirror_start( + &mut self, + id: String, + destination_path: PathBuf, + ) -> result::Result<(), VmError> { + self.vm_config.as_ref().ok_or(VmError::VmNotCreated)?; + + match self.vm { + MaybeVmOwnership::Vmm(ref mut vm) => vm.mirror_disk(&id, &destination_path), + MaybeVmOwnership::Migration(_) => Err(VmError::VmMigrating), + MaybeVmOwnership::None => Err(VmError::DiskMirrorStart), + } + } + + fn vm_disk_mirror_status(&mut self, id: String) -> result::Result>, VmError> { + self.vm_config.as_ref().ok_or(VmError::VmNotCreated)?; + + match self.vm { + MaybeVmOwnership::Vmm(ref vm) => { + let status = vm.mirror_disk_status(&id)?; + let response: VmDiskMirrorStatusResponse = status.into(); + let json = serde_json::to_vec(&response).map_err(|_| VmError::DiskMirrorStatus)?; + Ok(Some(json)) + } + MaybeVmOwnership::Migration(_) => Err(VmError::VmMigrating), + MaybeVmOwnership::None => Err(VmError::DiskMirrorStatus), + } + } + + fn vm_disk_mirror_complete(&mut self, id: String) -> result::Result<(), VmError> { + self.vm_config.as_ref().ok_or(VmError::VmNotCreated)?; + + match self.vm { + MaybeVmOwnership::Vmm(ref mut vm) => vm.mirror_disk_complete(&id), + MaybeVmOwnership::Migration(_) => Err(VmError::VmMigrating), + MaybeVmOwnership::None => Err(VmError::DiskMirrorComplete), + } + } + + fn vm_disk_mirror_cancel(&mut self, id: String) -> result::Result<(), VmError> { + self.vm_config.as_ref().ok_or(VmError::VmNotCreated)?; + + match self.vm { + MaybeVmOwnership::Vmm(ref mut vm) => vm.mirror_disk_cancel(&id), + MaybeVmOwnership::Migration(_) => Err(VmError::VmMigrating), + MaybeVmOwnership::None => Err(VmError::DiskMirrorCancel), + } + } } const CPU_MANAGER_SNAPSHOT_ID: &str = "cpu-manager"; diff --git a/vmm/src/seccomp_filters.rs b/vmm/src/seccomp_filters.rs index 69c4a83f3..4b2192df2 100644 --- a/vmm/src/seccomp_filters.rs +++ b/vmm/src/seccomp_filters.rs @@ -890,6 +890,9 @@ fn vcpu_thread_rules( (libc::SYS_dup, vec![]), (libc::SYS_exit, vec![]), (libc::SYS_epoll_ctl, vec![]), + (libc::SYS_eventfd2, vec![]), + (libc::SYS_io_uring_setup, vec![]), + (libc::SYS_io_uring_register, vec![]), ( libc::SYS_fallocate, or![and![Cond::new( diff --git a/vmm/src/vm.rs b/vmm/src/vm.rs index d62173e5f..f9c127ad4 100644 --- a/vmm/src/vm.rs +++ b/vmm/src/vm.rs @@ -19,6 +19,7 @@ use std::mem::size_of; use std::num::Wrapping; use std::ops::Deref; use std::os::unix::net::UnixStream; +use std::path::Path; use std::sync::{Arc, Mutex}; #[cfg(not(target_arch = "riscv64"))] use std::time::Instant; @@ -34,6 +35,7 @@ use arch::x86_64::MAX_SUPPORTED_CPUS_LEGACY; #[cfg(feature = "tdx")] use arch::x86_64::tdx::TdvfSection; use arch::{EntryPoint, NumaNode, NumaNodes, get_host_cpu_phys_bits}; +use block::mirror::MirrorStatus; use devices::AcpiNotificationFlags; #[cfg(target_arch = "aarch64")] use devices::interrupt_controller; @@ -269,6 +271,21 @@ pub enum Error { #[error("Failed resizing a disk image")] ResizeDisk, + #[error("Failed to start disk mirror")] + DiskMirrorStart, + + #[error("Failed to read disk mirror state")] + DiskMirrorStatus, + + #[error("Failed to complete disk mirror")] + DiskMirrorComplete, + + #[error("Failed to cancel disk mirror")] + DiskMirrorCancel, + + #[error("At least one disk mirror is active")] + ActiveBlockMirror, + #[error("Cannot activate virtio devices")] ActivateVirtioDevices(#[source] DeviceManagerError), @@ -3301,6 +3318,49 @@ impl Vm { .map_err(Error::ErrorNmi); } + pub fn mirror_disk(&self, id: &str, dest_path: &Path) -> Result<()> { + self.device_manager + .lock() + .unwrap() + .mirror_disk(id, dest_path) + .map_err(Error::DeviceManager)?; + + Ok(()) + } + + pub fn mirror_disk_status(&self, id: &str) -> Result { + self.device_manager + .lock() + .unwrap() + .mirror_disk_status(id) + .map_err(Error::DeviceManager) + } + + pub fn mirror_disk_complete(&self, id: &str) -> Result<()> { + self.device_manager + .lock() + .unwrap() + .mirror_disk_complete(id) + .map_err(Error::DeviceManager)?; + Ok(()) + } + + pub fn mirror_disk_cancel(&self, id: &str) -> Result<()> { + self.device_manager + .lock() + .unwrap() + .mirror_disk_cancel(id) + .map_err(Error::DeviceManager) + } + + /// Returns true if there is an active mirror in any of the block devices, false otherwise. + pub fn any_active_block_mirrors(&self) -> bool { + self.device_manager + .lock() + .unwrap() + .any_active_block_mirrors() + } + /// Calls [`DeviceManager::post_migration_announce`]. pub fn post_migration_announce(&self) { self.device_manager