Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 0 additions & 11 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ blocking = "1"
chrono = "0.4"
futures = "0.3.32"
futures-concurrency = "7.6.3"
jsonrpcmsg = "0.1.2"
open = "5"
rustc-hash = "2.1.1"
shell-words = "1.1"
Expand Down
51 changes: 27 additions & 24 deletions md/transport-architecture.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,21 +32,24 @@ This separation enables:
- **Testability**: Mock transports for unit testing
- **Clarity**: Clear boundaries between protocol and I/O concerns

### The `jsonrpcmsg::Message` Boundary
### The `RawJsonRpcMessage` Boundary

The key insight is that `jsonrpcmsg::Message` provides a natural, transport-neutral boundary:
The key insight is that `agent_client_protocol::RawJsonRpcMessage` provides a natural,
transport-neutral boundary backed by the JSON-RPC envelope types from
`agent-client-protocol-schema`:

```rust
enum jsonrpcmsg::Message {
Request { method, params, id },
Response { result, error, id },
enum RawJsonRpcMessage {
Request(Request<RawJsonRpcParams>),
Notification(Notification<RawJsonRpcParams>),
Response(Response<serde_json::Value>),
}
```

This type sits between the protocol and transport layers:

- **Above**: Protocol layer works with application types (`OutgoingMessage`, `UntypedMessage`)
- **Below**: Transport layer works with `jsonrpcmsg::Message`
- **Below**: Transport layer works with `RawJsonRpcMessage`
- **Boundary**: Clean, well-defined interface

## Actor Architecture
Expand All @@ -59,27 +62,27 @@ These actors live in `JrConnection` and understand JSON-RPC semantics:

```
Input: mpsc::UnboundedReceiver<OutgoingMessage>
Output: mpsc::UnboundedSender<jsonrpcmsg::Message>
Output: mpsc::UnboundedSender<RawJsonRpcMessage>
```

Responsibilities:

- Assign unique IDs to outgoing requests
- Subscribe to reply_actor for response correlation
- Convert application-level `OutgoingMessage` to protocol-level `jsonrpcmsg::Message`
- Convert application-level `OutgoingMessage` to protocol-level `RawJsonRpcMessage`

#### Incoming Protocol Actor

```
Input: mpsc::UnboundedReceiver<jsonrpcmsg::Message>
Input: mpsc::UnboundedReceiver<RawJsonRpcMessage>
Output: Routes to reply_actor or registered handlers
```

Responsibilities:

- Route responses to reply_actor (matches by ID)
- Route requests/notifications to registered handlers
- Convert `jsonrpcmsg::Request` to `UntypedMessage` for handlers
- Convert schema request/notification envelopes to `UntypedMessage` for handlers

#### Reply Actor

Expand All @@ -100,35 +103,35 @@ These actors are spawned by `IntoJrConnectionTransport` implementations and have
#### Transport Outgoing Actor

```
Input: mpsc::UnboundedReceiver<jsonrpcmsg::Message>
Input: mpsc::UnboundedReceiver<RawJsonRpcMessage>
Output: Writes to I/O (byte stream, channel, socket, etc.)
```

For byte streams:

- Serialize `jsonrpcmsg::Message` to JSON
- Serialize `RawJsonRpcMessage` to JSON
- Write newline-delimited JSON to stream

For in-process channels:

- Directly forward `jsonrpcmsg::Message` to channel
- Directly forward `RawJsonRpcMessage` to channel

#### Transport Incoming Actor

```
Input: Reads from I/O (byte stream, channel, socket, etc.)
Output: mpsc::UnboundedSender<jsonrpcmsg::Message>
Output: mpsc::UnboundedSender<RawJsonRpcMessage>
```

For byte streams:

- Read newline-delimited JSON from stream
- Parse to `jsonrpcmsg::Message`
- Parse to `RawJsonRpcMessage`
- Send to incoming protocol actor

For in-process channels:

- Directly forward `jsonrpcmsg::Message` from channel
- Directly forward `RawJsonRpcMessage` from channel

## Message Flow

Expand All @@ -142,9 +145,9 @@ User Handler
Outgoing Protocol Actor
| - Assign ID (for requests)
| - Subscribe to replies
| - Convert to jsonrpcmsg::Message
| - Convert to RawJsonRpcMessage
v
| jsonrpcmsg::Message
| RawJsonRpcMessage
|
Transport Outgoing Actor
| - Serialize (byte streams)
Expand All @@ -162,7 +165,7 @@ Transport Incoming Actor
| - Parse (byte streams)
| - Or forward directly (channels)
v
| jsonrpcmsg::Message
| RawJsonRpcMessage
|
Incoming Protocol Actor
| - Route responses → reply_actor
Expand All @@ -188,8 +191,8 @@ pub trait IntoJrConnectionTransport {
fn setup_transport(
self,
cx: &JrConnectionCx,
outgoing_rx: mpsc::UnboundedReceiver<jsonrpcmsg::Message>,
incoming_tx: mpsc::UnboundedSender<jsonrpcmsg::Message>,
outgoing_rx: mpsc::UnboundedReceiver<RawJsonRpcMessage>,
incoming_tx: mpsc::UnboundedSender<RawJsonRpcMessage>,
) -> Result<(), Error>;
}
```
Expand All @@ -216,7 +219,7 @@ impl<OB: AsyncWrite, IB: AsyncRead> IntoJrConnectionTransport for (OB, IB) {
cx.spawn(async move {
let mut lines = BufReader::new(incoming_bytes).lines();
while let Some(line) = lines.next().await {
let message: jsonrpcmsg::Message = serde_json::from_str(&line?)?;
let message: RawJsonRpcMessage = serde_json::from_str(&line?)?;
incoming_tx.unbounded_send(message)?;
}
Ok(())
Expand Down Expand Up @@ -250,8 +253,8 @@ For components in the same process, skip serialization entirely:

```rust
pub struct ChannelTransport {
outgoing: mpsc::UnboundedSender<jsonrpcmsg::Message>,
incoming: mpsc::UnboundedReceiver<jsonrpcmsg::Message>,
outgoing: mpsc::UnboundedSender<RawJsonRpcMessage>,
incoming: mpsc::UnboundedReceiver<RawJsonRpcMessage>,
}

impl IntoJrConnectionTransport for ChannelTransport {
Expand Down
10 changes: 5 additions & 5 deletions src/agent-client-protocol-conductor/src/snoop.rs
Original file line number Diff line number Diff line change
@@ -1,25 +1,25 @@
use agent_client_protocol::{Channel, ConnectTo, DynConnectTo, Role, jsonrpcmsg};
use agent_client_protocol::{Channel, ConnectTo, DynConnectTo, RawJsonRpcMessage, Role};
use futures::StreamExt;
use futures_concurrency::future::TryJoin;

pub struct SnooperComponent<R: Role> {
base_component: DynConnectTo<R>,
incoming_message: Box<
dyn FnMut(&jsonrpcmsg::Message) -> Result<(), agent_client_protocol::Error> + Send + Sync,
dyn FnMut(&RawJsonRpcMessage) -> Result<(), agent_client_protocol::Error> + Send + Sync,
>,
outgoing_message: Box<
dyn FnMut(&jsonrpcmsg::Message) -> Result<(), agent_client_protocol::Error> + Send + Sync,
dyn FnMut(&RawJsonRpcMessage) -> Result<(), agent_client_protocol::Error> + Send + Sync,
>,
}

impl<R: Role> SnooperComponent<R> {
pub fn new(
base_component: impl ConnectTo<R>,
incoming_message: impl FnMut(&jsonrpcmsg::Message) -> Result<(), agent_client_protocol::Error>
incoming_message: impl FnMut(&RawJsonRpcMessage) -> Result<(), agent_client_protocol::Error>
+ Send
+ Sync
+ 'static,
outgoing_message: impl FnMut(&jsonrpcmsg::Message) -> Result<(), agent_client_protocol::Error>
outgoing_message: impl FnMut(&RawJsonRpcMessage) -> Result<(), agent_client_protocol::Error>
+ Send
+ Sync
+ 'static,
Expand Down
Loading
Loading