ctoolbox/workspace/ipc/
protocol.rs

1//! IPC protocol types and message envelopes for workspace communication.
2//!
3//! This module defines transport-level envelopes and streaming controls. It
4//! intentionally avoids embedding business logic; services should delegate to
5//! external modules for actual work.
6
7use crate::workspace::ipc::auth::capability::{CapabilitySet, CapabilityToken};
8use crate::workspace::ipc::types::{BlobToken, RequestId, StreamId};
9use serde::{Deserialize, Serialize};
10
11/// Top-level message envelope for framed transport.
12#[derive(Debug, Clone, Serialize, Deserialize)]
13pub enum Message {
14    /// Initial handshake from client to server (child -> workspace or workspace -> service).
15    Hello(Hello),
16    /// Response to Hello with bound capabilities or an error.
17    HelloOk(HelloOk),
18    HelloErr(HelloErr),
19
20    /// RPC request with correlation id.
21    Request(Request),
22    /// RPC response associated with a request id.
23    Response(Response),
24
25    /// Event (server-initiated notification).
26    Event(Event),
27
28    /// Stream control messages for high-volume data.
29    Stream(StreamControl),
30
31    /// Cancel a pending request id.
32    Cancel(Cancel),
33}
34
35/// Authentication handshake payload.
36#[derive(Debug, Clone, Serialize, Deserialize)]
37pub struct Hello {
38    pub token: CapabilityToken,
39    /// Optional client info (version, process kind).
40    pub client_info: Option<ClientInfo>,
41}
42
43#[derive(Debug, Clone, Serialize, Deserialize)]
44pub struct ClientInfo {
45    pub name: String,
46    pub version: String,
47    pub process_kind: String,
48}
49
50#[derive(Debug, Clone, Serialize, Deserialize)]
51pub struct HelloOk {
52    pub bound_capabilities: CapabilitySet,
53}
54
55#[derive(Debug, Clone, Serialize, Deserialize)]
56pub struct HelloErr {
57    pub message: String,
58}
59
60/// Request envelope. The `method` is a typed identifier, and `args` is a postcard-encoded payload for the method.
61#[derive(Debug, Clone, Serialize, Deserialize)]
62pub struct Request {
63    pub id: RequestId,
64    pub method: MethodId,
65    /// Postcard-encoded args for the method.
66    pub args: Vec<u8>,
67}
68
69/// Response envelope. Either ok with a postcard-encoded result or error with a message/code.
70#[derive(Debug, Clone, Serialize, Deserialize)]
71pub struct Response {
72    pub id: RequestId,
73    pub ok: bool,
74    /// If ok, postcard-encoded result.
75    pub result: Option<Vec<u8>>,
76    /// If error, a short machine-readable code and message.
77    pub error: Option<RpcError>,
78}
79
80#[derive(Debug, Clone, Serialize, Deserialize)]
81pub struct RpcError {
82    pub code: String,
83    pub message: String,
84}
85
86/// Events are server-initiated notifications (e.g., `child_exit`,
87/// `permission_changed`).
88#[derive(Debug, Clone, Serialize, Deserialize)]
89pub struct Event {
90    pub topic: EventTopic,
91    /// Postcard-encoded payload per topic.
92    pub payload: Vec<u8>,
93}
94
95impl Event {
96    /// Construct an empty heartbeat event payload.
97    pub fn heartbeat() -> Self {
98        Self {
99            topic: EventTopic::Heartbeat,
100            payload: Vec::new(),
101        }
102    }
103}
104
105#[derive(Debug, Clone, Serialize, Deserialize)]
106pub enum EventTopic {
107    Heartbeat,
108    ChildExited,
109    PermissionChanged,
110    StorageCompacted,
111    Custom(String),
112}
113
114/// Stream controls for large flows.
115#[derive(Debug, Clone, Serialize, Deserialize)]
116pub enum StreamControl {
117    Start {
118        id: StreamId,
119        kind: StreamKind,
120        /// Optionally pass a blob token for data plane.
121        blob: Option<BlobToken>,
122    },
123    Next {
124        id: StreamId,
125        /// Raw chunk if using control-plane streaming, else omitted when using blob.
126        chunk: Option<Vec<u8>>,
127    },
128    End {
129        id: StreamId,
130        ok: bool,
131        error: Option<String>,
132    },
133}
134
135#[derive(Debug, Clone, Serialize, Deserialize)]
136pub enum StreamKind {
137    VideoFrames,
138    ImageBytes,
139    FileRead,
140    FileWrite,
141
142    /// `IoNetwork`: stream bytes for `read_file`.
143    IoNetworkReadFile,
144
145    /// `IoNetwork`: stream bytes for `fetch` response body.
146    IoNetworkFetchBody,
147
148    Custom(String),
149}
150
151/// A compact method identifier (service + method).
152#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
153pub struct MethodId {
154    pub service: String,
155    pub method: String,
156}
157
158/// Typed service API marker to allow compile-time mapping between Rust methods and `MethodId`.
159pub trait RpcService {
160    /// The name of the service for routing and authorization, e.g., "storage".
161    const SERVICE_NAME: &'static str;
162
163    /// Handle an incoming postcard-encoded request for a method id, producing a postcard-encoded response or error.
164    ///
165    /// Implementations should:
166    /// - decode args to the appropriate request type
167    /// - execute the method
168    /// - encode the result to postcard
169    /// - return `Ok(Some(Vec<u8>))` for non-void results, `Ok(None)` for void, or `Err(RpcError)`
170    fn dispatch(
171        &self,
172        method: &str,
173        args: &[u8],
174    ) -> Result<Option<Vec<u8>>, RpcError>;
175}
176
177/// Request cancellation for a given id.
178#[derive(Debug, Clone, Serialize, Deserialize)]
179pub struct Cancel {
180    pub id: RequestId,
181}
182
183impl Hello {
184    /// Construct a Hello message payload.
185    pub fn new(
186        token: CapabilityToken,
187        client_info: Option<ClientInfo>,
188    ) -> Self {
189        Self { token, client_info }
190    }
191}
192
193impl HelloOk {
194    /// Construct a successful handshake response.
195    pub fn new(bound_capabilities: CapabilitySet) -> Self {
196        Self { bound_capabilities }
197    }
198}
199
200impl HelloErr {
201    /// Construct a failed handshake response with a reason.
202    pub fn new<S: Into<String>>(message: S) -> Self {
203        Self {
204            message: message.into(),
205        }
206    }
207}
208
209/// Compact helper for building a `MethodId`.
210impl MethodId {
211    pub fn new<S: Into<String>>(service: S, method: S) -> Self {
212        Self {
213            service: service.into(),
214            method: method.into(),
215        }
216    }
217}
218
219impl RpcError {
220    /// Create an "unauthorized" error.
221    pub fn unauthorized<S: Into<String>>(message: S) -> Self {
222        Self {
223            code: RPC_ERROR_CODE_UNAUTHORIZED.into(),
224            message: message.into(),
225        }
226    }
227
228    /// Create a "`capability_denied`" error (e.g., quota/rate limit exceeded).
229    pub fn capability_denied<S: Into<String>>(message: S) -> Self {
230        Self {
231            code: RPC_ERROR_CODE_CAPABILITY_DENIED.into(),
232            message: message.into(),
233        }
234    }
235
236    /// Create a "cancelled" error.
237    pub fn cancelled<S: Into<String>>(message: S) -> Self {
238        Self {
239            code: RPC_ERROR_CODE_CANCELLED.into(),
240            message: message.into(),
241        }
242    }
243
244    /// Create an "internal" error.
245    pub fn internal<S: Into<String>>(message: S) -> Self {
246        Self {
247            code: RPC_ERROR_CODE_INTERNAL.into(),
248            message: message.into(),
249        }
250    }
251}
252
253impl Cancel {
254    /// Construct a Cancel payload.
255    pub fn new(id: RequestId) -> Self {
256        Self { id }
257    }
258}
259
260/// Standard RPC error codes used across the IPC boundary.
261pub const RPC_ERROR_CODE_UNAUTHORIZED: &str = "unauthorized";
262pub const RPC_ERROR_CODE_CAPABILITY_DENIED: &str = "capability_denied";
263pub const RPC_ERROR_CODE_RATE_LIMITED: &str = "rate_limited";
264pub const RPC_ERROR_CODE_CANCELLED: &str = "cancelled";
265pub const RPC_ERROR_CODE_INTERNAL: &str = "internal";
266pub const RPC_ERROR_CODE_NOT_IMPLEMENTED: &str = "not_implemented";
267
268/// When payloads are larger than this threshold, prefer blob-backed flows.
269///
270/// This is a protocol-level constant so clients and servers agree on the
271/// heuristic, but callers may still override it (e.g., for testing).
272pub const IO_NETWORK_BLOB_THRESHOLD_BYTES: u64 = 256 * 1024;
273
274/// Default chunk size used for control-plane streaming via `StreamControl::Next`.
275pub const STREAM_CONTROL_PLANE_CHUNK_BYTES: usize = 16 * 1024;
276
277impl Request {
278    /// Create a `tracing` span for handling this request.
279    ///
280    /// The span carries `request_id`, `service`, and `method` so downstream logs
281    /// can be correlated without manually repeating those fields.
282    #[must_use]
283    pub fn span(&self) -> tracing::Span {
284        tracing::info_span!(
285            "ipc.request",
286            request_id = self.id,
287            service = %self.method.service,
288            method = %self.method.method
289        )
290    }
291}
292
293impl StreamControl {
294    /// Return the stream id for all `StreamControl` variants.
295    #[must_use]
296    pub fn stream_id(&self) -> StreamId {
297        match self {
298            StreamControl::Start { id, .. } => *id,
299            StreamControl::Next { id, .. } => *id,
300            StreamControl::End { id, .. } => *id,
301        }
302    }
303
304    /// Create a `tracing` span for handling this stream control message.
305    ///
306    /// Carries `stream_id`, `variant`, `kind` (when present), and blob metadata
307    /// (when present) to correlate logs across the stream lifecycle.
308    #[must_use]
309    pub fn span(&self) -> tracing::Span {
310        match self {
311            StreamControl::Start { id, kind, blob } => tracing::info_span!(
312                "ipc.stream",
313                stream_id = *id,
314                variant = "start",
315                stream_kind = ?kind,
316                blob_id = blob.as_ref().map(|b| b.id).map(tracing::field::display),
317                blob_size = blob.as_ref().map(|b| b.size),
318                blob_lease_ms = blob.as_ref().and_then(|b| b.lease_ms),
319            ),
320            StreamControl::Next { id, chunk } => tracing::info_span!(
321                "ipc.stream",
322                stream_id = *id,
323                variant = "next",
324                chunk_len = chunk.as_ref().map(std::vec::Vec::len),
325            ),
326            StreamControl::End { id, ok, error } => tracing::info_span!(
327                "ipc.stream",
328                stream_id = *id,
329                variant = "end",
330                ok = *ok,
331                error = error.as_deref(),
332            ),
333        }
334    }
335}
336
337#[cfg(test)]
338mod tests {
339    use super::*;
340    use anyhow::{Result, ensure};
341
342    use std::collections::HashMap;
343    use std::sync::Mutex;
344    use tracing::field::{Field, Visit};
345    use tracing_subscriber::layer::{Context as LayerContext, Layer};
346    use tracing_subscriber::prelude::*;
347    use tracing_subscriber::registry::LookupSpan;
348
349    #[derive(Default)]
350    struct FieldCaptureVisitor {
351        fields: HashMap<String, String>,
352    }
353
354    impl Visit for FieldCaptureVisitor {
355        fn record_str(&mut self, field: &Field, value: &str) {
356            let _ = self
357                .fields
358                .insert(field.name().to_string(), value.to_string());
359        }
360
361        fn record_u64(&mut self, field: &Field, value: u64) {
362            let _ = self
363                .fields
364                .insert(field.name().to_string(), value.to_string());
365        }
366
367        fn record_i64(&mut self, field: &Field, value: i64) {
368            let _ = self
369                .fields
370                .insert(field.name().to_string(), value.to_string());
371        }
372
373        fn record_bool(&mut self, field: &Field, value: bool) {
374            let _ = self
375                .fields
376                .insert(field.name().to_string(), value.to_string());
377        }
378
379        fn record_debug(&mut self, field: &Field, value: &dyn std::fmt::Debug) {
380            let _ = self
381                .fields
382                .insert(field.name().to_string(), format!("{value:?}"));
383        }
384    }
385
386    #[derive(Default)]
387    struct CaptureLayer {
388        spans: Mutex<HashMap<tracing::span::Id, HashMap<String, String>>>,
389        events: Mutex<Vec<HashMap<String, String>>>,
390    }
391
392    impl CaptureLayer {
393        fn drain_events(&self) -> Vec<HashMap<String, String>> {
394            if let Ok(mut guard) = self.events.lock() {
395                std::mem::take(&mut *guard)
396            } else {
397                Vec::new()
398            }
399        }
400    }
401
402    impl<S> Layer<S> for CaptureLayer
403    where
404        S: tracing::Subscriber + for<'a> LookupSpan<'a>,
405    {
406        fn on_new_span(
407            &self,
408            attrs: &tracing::span::Attributes<'_>,
409            id: &tracing::span::Id,
410            _ctx: LayerContext<'_, S>,
411        ) {
412            let mut v = FieldCaptureVisitor::default();
413            attrs.record(&mut v);
414
415            if let Ok(mut spans) = self.spans.lock() {
416                let _ = spans.insert(id.clone(), v.fields);
417            }
418        }
419
420        fn on_event(
421            &self,
422            event: &tracing::Event<'_>,
423            ctx: LayerContext<'_, S>,
424        ) {
425            let mut merged: HashMap<String, String> = HashMap::new();
426
427            if let Some(scope) = ctx.event_scope(event) {
428                if let Ok(spans) = self.spans.lock() {
429                    for span in scope.from_root() {
430                        if let Some(fields) = spans.get(&span.id()) {
431                            merged.extend(fields.clone());
432                        }
433                    }
434                }
435            }
436
437            let mut v = FieldCaptureVisitor::default();
438            event.record(&mut v);
439            merged.extend(v.fields);
440
441            if let Ok(mut events) = self.events.lock() {
442                events.push(merged);
443            }
444        }
445    }
446
447    #[crate::ctb_test]
448    fn spans_attach_request_and_stream_fields() -> Result<()> {
449        let req = Request {
450            id: 42,
451            method: MethodId::new("svc", "do_work"),
452            args: vec![],
453        };
454
455        let stream = StreamControl::Start {
456            id: 7,
457            kind: StreamKind::IoNetworkFetchBody,
458            blob: Some(BlobToken {
459                id: Default::default(),
460                size: 123,
461                lease_ms: Some(456),
462            }),
463        };
464
465        req.span().in_scope(|| {
466            tracing::info!("inside request span");
467        });
468
469        stream.span().in_scope(|| {
470            tracing::info!("inside stream span");
471        });
472
473        // The macro injects logs_contain() function
474        assert!(logs_contain("inside request span"));
475        assert!(logs_contain("inside stream span"));
476
477        Ok(())
478    }
479
480    /// Round-trip a value via postcard, ensuring serialized bytes are stable.
481    fn roundtrip_bytes<T>(value: &T) -> Result<()>
482    where
483        T: serde::Serialize + for<'de> serde::de::Deserialize<'de>,
484    {
485        let bytes = postcard::to_stdvec(value)?;
486        let decoded: T = postcard::from_bytes(&bytes)?;
487        let bytes2 = postcard::to_stdvec(&decoded)?;
488        ensure!(bytes == bytes2, "postcard roundtrip bytes mismatch");
489        Ok(())
490    }
491
492    /// Round-trip Message::Hello via postcard.
493    #[crate::ctb_test]
494    fn message_hello_roundtrip() -> Result<()> {
495        let msg = Message::Hello(Hello {
496            token: Default::default(),
497            client_info: Some(ClientInfo {
498                name: "client".into(),
499                version: "1.0.0".into(),
500                process_kind: "test".into(),
501            }),
502        });
503        roundtrip_bytes(&msg)
504    }
505
506    /// Round-trip Message::HelloOk via postcard.
507    #[crate::ctb_test]
508    fn message_hello_ok_roundtrip() -> Result<()> {
509        let msg = Message::HelloOk(HelloOk {
510            bound_capabilities: Default::default(),
511        });
512        roundtrip_bytes(&msg)
513    }
514
515    /// Round-trip Message::HelloErr via postcard.
516    #[crate::ctb_test]
517    fn message_hello_err_roundtrip() -> Result<()> {
518        let msg = Message::HelloErr(HelloErr {
519            message: "handshake failed".into(),
520        });
521        roundtrip_bytes(&msg)
522    }
523
524    /// Round-trip Message::Request via postcard.
525    #[crate::ctb_test]
526    fn message_request_roundtrip() -> Result<()> {
527        let msg = Message::Request(Request {
528            id: Default::default(),
529            method: MethodId {
530                service: "svc".into(),
531                method: "do_work".into(),
532            },
533            args: vec![0x01, 0x02, 0x03],
534        });
535        roundtrip_bytes(&msg)
536    }
537
538    /// Round-trip Message::Response(ok) via postcard.
539    #[crate::ctb_test]
540    fn message_response_ok_roundtrip() -> Result<()> {
541        let msg = Message::Response(Response {
542            id: Default::default(),
543            ok: true,
544            result: Some(vec![0xAA, 0xBB]),
545            error: None,
546        });
547        roundtrip_bytes(&msg)
548    }
549
550    /// Round-trip Message::Response(err) via postcard.
551    #[crate::ctb_test]
552    fn message_response_err_roundtrip() -> Result<()> {
553        let msg = Message::Response(Response {
554            id: Default::default(),
555            ok: false,
556            result: None,
557            error: Some(RpcError {
558                code: "bad_request".into(),
559                message: "invalid input".into(),
560            }),
561        });
562        roundtrip_bytes(&msg)
563    }
564
565    /// Round-trip Message::Event via postcard.
566    #[crate::ctb_test]
567    fn message_event_roundtrip() -> Result<()> {
568        let msg = Message::Event(Event {
569            topic: EventTopic::Custom("custom.topic".into()),
570            payload: vec![0x10, 0x20],
571        });
572        roundtrip_bytes(&msg)
573    }
574
575    /// Round-trip Message::Event(Heartbeat) via postcard.
576    #[crate::ctb_test]
577    fn message_event_heartbeat_roundtrip() -> Result<()> {
578        let msg = Message::Event(Event::heartbeat());
579        roundtrip_bytes(&msg)
580    }
581
582    /// Round-trip Message::Stream(Start) via postcard.
583    #[crate::ctb_test]
584    fn message_stream_start_roundtrip() -> Result<()> {
585        let msg = Message::Stream(StreamControl::Start {
586            id: Default::default(),
587            kind: StreamKind::Custom("bytes".into()),
588            blob: Some(Default::default()),
589        });
590        roundtrip_bytes(&msg)
591    }
592
593    /// Round-trip Message::Stream(Next) via postcard.
594    #[crate::ctb_test]
595    fn message_stream_next_roundtrip() -> Result<()> {
596        let msg = Message::Stream(StreamControl::Next {
597            id: Default::default(),
598            chunk: Some(vec![1, 2, 3, 4]),
599        });
600        roundtrip_bytes(&msg)
601    }
602
603    /// Round-trip Message::Stream(End) via postcard.
604    #[crate::ctb_test]
605    fn message_stream_end_roundtrip() -> Result<()> {
606        let msg = Message::Stream(StreamControl::End {
607            id: Default::default(),
608            ok: false,
609            error: Some("stream error".into()),
610        });
611        roundtrip_bytes(&msg)
612    }
613
614    /// Round-trip Message::Cancel via postcard.
615    #[crate::ctb_test]
616    fn message_cancel_roundtrip() -> Result<()> {
617        let msg = Message::Cancel(Cancel {
618            id: Default::default(),
619        });
620        roundtrip_bytes(&msg)
621    }
622}