Skip to content

[feat] provide save/load checkpoint interfaces#124

Open
dodatboii wants to merge 6 commits into
Ascend:mainfrom
dodatboii:dev_dump
Open

[feat] provide save/load checkpoint interfaces#124
dodatboii wants to merge 6 commits into
Ascend:mainfrom
dodatboii:dev_dump

Conversation

@dodatboii

Copy link
Copy Markdown
Contributor

Summary

  • Add tq.save_checkpoint(checkpoint_dir, *, include_storage, metadata) and tq.load_checkpoint(checkpoint_dir) as top-level public APIs
  • Controller and each storage unit write state directly to file in-process (only a bool ACK goes through Ray object store), avoiding large payload transmission over Ray
  • Save order: controller first, then storage units in parallel — guarantees consistency without pausing the data flow (storage unit data is always a superset of what the controller has confirmed)
  • Atomic save via a .tmp directory that is renamed on success and deleted on failure, ensuring no partial checkpoint is left on disk
  • Load validates storage unit count before touching any state; restoration is position-based (global_idx % num_units) so storage unit IDs regenerated across restarts are handled correctly

Checkpoint layout:

checkpoint_dir/
├── metadata.json          # timestamp, storage unit list, user metadata
├── controller_state.pkl   # TransferQueueController full state
└── storage_units/
    ├── su_0_<id>.pkl
    └── su_1_<id>.pkl

Test plan

  • pytest tests/e2e/test_checkpoint_e2e.py -v
    • save creates expected files and metadata structure
    • load restores controller partitions, key mappings, and per-sample tags
    • load restores storage data and round-trips tensors correctly across multiple partitions
    • include_storage=False saves only controller state
    • error cases: uninitialized system, missing directory/metadata, storage unit count mismatch
    • failed save leaves no partial directory on disk
    • non-tensor fields (NonTensorStack) and variable-length (jagged) tensor fields survive round-trip

Signed-off-by: yxstev <zhangyixiang9@huawei.com>
@ascend-robot

Copy link
Copy Markdown

CLA Signature Pass

dodatboii, thanks for your pull request. All authors of the commits have signed the CLA. 👍

@0oshowero0 0oshowero0 requested a review from Copilot June 15, 2026 03:47

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copilot was unable to review this pull request because the user who requested the review has reached their quota limit.

Comment thread transfer_queue/interface.py
Comment thread transfer_queue/interface.py Outdated
Comment thread transfer_queue/interface.py
Comment thread transfer_queue/interface.py Outdated
Comment thread transfer_queue/interface.py Outdated
Comment thread transfer_queue/interface.py Outdated
Comment thread transfer_queue/interface.py Outdated
Comment thread transfer_queue/interface.py Outdated
Comment thread transfer_queue/interface.py Outdated
Comment thread transfer_queue/interface.py Outdated
Comment thread transfer_queue/storage/simple_storage.py Outdated
Comment thread transfer_queue/interface.py Outdated
Comment thread transfer_queue/controller.py Outdated
Comment thread transfer_queue/controller.py Outdated
Comment thread transfer_queue/interface.py
Signed-off-by: yxstev <zhangyixiang9@huawei.com>
Controller and storage unit checkpoint operations are now triggered via ZMQ messages (CHECKPOINT_DUMP/RESTORE) rather than direct ray.get() calls. StorageManager base class exposes dump_checkpoint/restore_checkpoint, with AsyncSimpleStorageManager implementing them via the storage unit ZMQ channel. interface.py no longer accesses _TQ_STORAGE or _TQ_CONTROLLER ray handles for checkpoint; all calls go through TransferQueueClient.

Signed-off-by: yxstev <zhangyixiang9@huawei.com>
Signed-off-by: yxstev <zhangyixiang9@huawei.com>
Signed-off-by: yxstev <zhangyixiang9@huawei.com>
@ascend-robot

Copy link
Copy Markdown

CLA Signature Pass

dodatboii, thanks for your pull request. All authors of the commits have signed the CLA. 👍

Signed-off-by: yxstev <zhangyixiang9@huawei.com>
@ascend-robot

Copy link
Copy Markdown

CLA Signature Pass

dodatboii, thanks for your pull request. All authors of the commits have signed the CLA. 👍

CHECKPOINT_DUMP_RESPONSE = "CHECKPOINT_DUMP_RESPONSE"
CHECKPOINT_RESTORE = "CHECKPOINT_RESTORE"
CHECKPOINT_RESTORE_RESPONSE = "CHECKPOINT_RESTORE_RESPONSE"
SAVE_LOAD_CKPT_ERROR = "SAVE_LOAD_CKPT_ERROR"

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need

Comment on lines +105 to +108
CHECKPOINT_DUMP = "CHECKPOINT_DUMP"
CHECKPOINT_DUMP_RESPONSE = "CHECKPOINT_DUMP_RESPONSE"
CHECKPOINT_RESTORE = "CHECKPOINT_RESTORE"
CHECKPOINT_RESTORE_RESPONSE = "CHECKPOINT_RESTORE_RESPONSE"

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Distinguish between controller zmq event & simple storage zmq event

Comment on lines +355 to +366
@property
def storage_checkpoint_required(self) -> bool:
"""Whether storage contents must be checkpointed for correct restore.

Returns True for in-memory backends (e.g. SimpleStorage) where data
is lost on restart and must be serialized. Returns False for persistent
KV backends (e.g. MooncakeStore, Yuanrong) where data survives restarts
and only controller metadata needs to be saved.

Subclasses should override this to reflect their actual persistence model.
"""
return False

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This property is strange

Comment on lines +368 to +385
async def dump_checkpoint(self, output_dir: str) -> list[dict]:
"""Dump all storage units to files in output_dir.

Returns:
List of dicts, each with keys: position, storage_unit_id.

Raises:
NotImplementedError: If this storage backend does not support checkpoint.
"""
raise NotImplementedError(f"{self.__class__.__name__} does not support checkpoint")

async def restore_checkpoint(self, checkpoint_dir: str, su_info_list: list[dict]) -> None:
"""Restore all storage units from files in checkpoint_dir.

Args:
checkpoint_dir: Path to the checkpoint directory containing storage unit files.
su_info_list: Ordered list of storage unit info dicts from metadata.json.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These interfaces are not universal to other backends

logger.error(f"[{self.controller_id}]: dump_to_file failed: {e}")
return False

def restore_from_file(self, path: str) -> bool:

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unify the interface name

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants