1use crate::workspace::ipc::auth::capability::{CapabilitySet, CapabilityToken};
8use crate::workspace::ipc::types::{BlobToken, RequestId, StreamId};
9use serde::{Deserialize, Serialize};
10
11#[derive(Debug, Clone, Serialize, Deserialize)]
13pub enum Message {
14 Hello(Hello),
16 HelloOk(HelloOk),
18 HelloErr(HelloErr),
19
20 Request(Request),
22 Response(Response),
24
25 Event(Event),
27
28 Stream(StreamControl),
30
31 Cancel(Cancel),
33}
34
35#[derive(Debug, Clone, Serialize, Deserialize)]
37pub struct Hello {
38 pub token: CapabilityToken,
39 pub client_info: Option<ClientInfo>,
41}
42
43#[derive(Debug, Clone, Serialize, Deserialize)]
44pub struct ClientInfo {
45 pub name: String,
46 pub version: String,
47 pub process_kind: String,
48}
49
50#[derive(Debug, Clone, Serialize, Deserialize)]
51pub struct HelloOk {
52 pub bound_capabilities: CapabilitySet,
53}
54
55#[derive(Debug, Clone, Serialize, Deserialize)]
56pub struct HelloErr {
57 pub message: String,
58}
59
60#[derive(Debug, Clone, Serialize, Deserialize)]
62pub struct Request {
63 pub id: RequestId,
64 pub method: MethodId,
65 pub args: Vec<u8>,
67}
68
69#[derive(Debug, Clone, Serialize, Deserialize)]
71pub struct Response {
72 pub id: RequestId,
73 pub ok: bool,
74 pub result: Option<Vec<u8>>,
76 pub error: Option<RpcError>,
78}
79
80#[derive(Debug, Clone, Serialize, Deserialize)]
81pub struct RpcError {
82 pub code: String,
83 pub message: String,
84}
85
86#[derive(Debug, Clone, Serialize, Deserialize)]
89pub struct Event {
90 pub topic: EventTopic,
91 pub payload: Vec<u8>,
93}
94
95impl Event {
96 pub fn heartbeat() -> Self {
98 Self {
99 topic: EventTopic::Heartbeat,
100 payload: Vec::new(),
101 }
102 }
103}
104
105#[derive(Debug, Clone, Serialize, Deserialize)]
106pub enum EventTopic {
107 Heartbeat,
108 ChildExited,
109 PermissionChanged,
110 StorageCompacted,
111 Custom(String),
112}
113
114#[derive(Debug, Clone, Serialize, Deserialize)]
116pub enum StreamControl {
117 Start {
118 id: StreamId,
119 kind: StreamKind,
120 blob: Option<BlobToken>,
122 },
123 Next {
124 id: StreamId,
125 chunk: Option<Vec<u8>>,
127 },
128 End {
129 id: StreamId,
130 ok: bool,
131 error: Option<String>,
132 },
133}
134
135#[derive(Debug, Clone, Serialize, Deserialize)]
136pub enum StreamKind {
137 VideoFrames,
138 ImageBytes,
139 FileRead,
140 FileWrite,
141
142 IoNetworkReadFile,
144
145 IoNetworkFetchBody,
147
148 Custom(String),
149}
150
151#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
153pub struct MethodId {
154 pub service: String,
155 pub method: String,
156}
157
158pub trait RpcService {
160 const SERVICE_NAME: &'static str;
162
163 fn dispatch(
171 &self,
172 method: &str,
173 args: &[u8],
174 ) -> Result<Option<Vec<u8>>, RpcError>;
175}
176
177#[derive(Debug, Clone, Serialize, Deserialize)]
179pub struct Cancel {
180 pub id: RequestId,
181}
182
183impl Hello {
184 pub fn new(
186 token: CapabilityToken,
187 client_info: Option<ClientInfo>,
188 ) -> Self {
189 Self { token, client_info }
190 }
191}
192
193impl HelloOk {
194 pub fn new(bound_capabilities: CapabilitySet) -> Self {
196 Self { bound_capabilities }
197 }
198}
199
200impl HelloErr {
201 pub fn new<S: Into<String>>(message: S) -> Self {
203 Self {
204 message: message.into(),
205 }
206 }
207}
208
209impl MethodId {
211 pub fn new<S: Into<String>>(service: S, method: S) -> Self {
212 Self {
213 service: service.into(),
214 method: method.into(),
215 }
216 }
217}
218
219impl RpcError {
220 pub fn unauthorized<S: Into<String>>(message: S) -> Self {
222 Self {
223 code: RPC_ERROR_CODE_UNAUTHORIZED.into(),
224 message: message.into(),
225 }
226 }
227
228 pub fn capability_denied<S: Into<String>>(message: S) -> Self {
230 Self {
231 code: RPC_ERROR_CODE_CAPABILITY_DENIED.into(),
232 message: message.into(),
233 }
234 }
235
236 pub fn cancelled<S: Into<String>>(message: S) -> Self {
238 Self {
239 code: RPC_ERROR_CODE_CANCELLED.into(),
240 message: message.into(),
241 }
242 }
243
244 pub fn internal<S: Into<String>>(message: S) -> Self {
246 Self {
247 code: RPC_ERROR_CODE_INTERNAL.into(),
248 message: message.into(),
249 }
250 }
251}
252
253impl Cancel {
254 pub fn new(id: RequestId) -> Self {
256 Self { id }
257 }
258}
259
260pub const RPC_ERROR_CODE_UNAUTHORIZED: &str = "unauthorized";
262pub const RPC_ERROR_CODE_CAPABILITY_DENIED: &str = "capability_denied";
263pub const RPC_ERROR_CODE_RATE_LIMITED: &str = "rate_limited";
264pub const RPC_ERROR_CODE_CANCELLED: &str = "cancelled";
265pub const RPC_ERROR_CODE_INTERNAL: &str = "internal";
266pub const RPC_ERROR_CODE_NOT_IMPLEMENTED: &str = "not_implemented";
267
268pub const IO_NETWORK_BLOB_THRESHOLD_BYTES: u64 = 256 * 1024;
273
274pub const STREAM_CONTROL_PLANE_CHUNK_BYTES: usize = 16 * 1024;
276
277impl Request {
278 #[must_use]
283 pub fn span(&self) -> tracing::Span {
284 tracing::info_span!(
285 "ipc.request",
286 request_id = self.id,
287 service = %self.method.service,
288 method = %self.method.method
289 )
290 }
291}
292
293impl StreamControl {
294 #[must_use]
296 pub fn stream_id(&self) -> StreamId {
297 match self {
298 StreamControl::Start { id, .. } => *id,
299 StreamControl::Next { id, .. } => *id,
300 StreamControl::End { id, .. } => *id,
301 }
302 }
303
304 #[must_use]
309 pub fn span(&self) -> tracing::Span {
310 match self {
311 StreamControl::Start { id, kind, blob } => tracing::info_span!(
312 "ipc.stream",
313 stream_id = *id,
314 variant = "start",
315 stream_kind = ?kind,
316 blob_id = blob.as_ref().map(|b| b.id).map(tracing::field::display),
317 blob_size = blob.as_ref().map(|b| b.size),
318 blob_lease_ms = blob.as_ref().and_then(|b| b.lease_ms),
319 ),
320 StreamControl::Next { id, chunk } => tracing::info_span!(
321 "ipc.stream",
322 stream_id = *id,
323 variant = "next",
324 chunk_len = chunk.as_ref().map(std::vec::Vec::len),
325 ),
326 StreamControl::End { id, ok, error } => tracing::info_span!(
327 "ipc.stream",
328 stream_id = *id,
329 variant = "end",
330 ok = *ok,
331 error = error.as_deref(),
332 ),
333 }
334 }
335}
336
337#[cfg(test)]
338mod tests {
339 use super::*;
340 use anyhow::{Result, ensure};
341
342 use std::collections::HashMap;
343 use std::sync::Mutex;
344 use tracing::field::{Field, Visit};
345 use tracing_subscriber::layer::{Context as LayerContext, Layer};
346 use tracing_subscriber::prelude::*;
347 use tracing_subscriber::registry::LookupSpan;
348
349 #[derive(Default)]
350 struct FieldCaptureVisitor {
351 fields: HashMap<String, String>,
352 }
353
354 impl Visit for FieldCaptureVisitor {
355 fn record_str(&mut self, field: &Field, value: &str) {
356 let _ = self
357 .fields
358 .insert(field.name().to_string(), value.to_string());
359 }
360
361 fn record_u64(&mut self, field: &Field, value: u64) {
362 let _ = self
363 .fields
364 .insert(field.name().to_string(), value.to_string());
365 }
366
367 fn record_i64(&mut self, field: &Field, value: i64) {
368 let _ = self
369 .fields
370 .insert(field.name().to_string(), value.to_string());
371 }
372
373 fn record_bool(&mut self, field: &Field, value: bool) {
374 let _ = self
375 .fields
376 .insert(field.name().to_string(), value.to_string());
377 }
378
379 fn record_debug(&mut self, field: &Field, value: &dyn std::fmt::Debug) {
380 let _ = self
381 .fields
382 .insert(field.name().to_string(), format!("{value:?}"));
383 }
384 }
385
386 #[derive(Default)]
387 struct CaptureLayer {
388 spans: Mutex<HashMap<tracing::span::Id, HashMap<String, String>>>,
389 events: Mutex<Vec<HashMap<String, String>>>,
390 }
391
392 impl CaptureLayer {
393 fn drain_events(&self) -> Vec<HashMap<String, String>> {
394 if let Ok(mut guard) = self.events.lock() {
395 std::mem::take(&mut *guard)
396 } else {
397 Vec::new()
398 }
399 }
400 }
401
402 impl<S> Layer<S> for CaptureLayer
403 where
404 S: tracing::Subscriber + for<'a> LookupSpan<'a>,
405 {
406 fn on_new_span(
407 &self,
408 attrs: &tracing::span::Attributes<'_>,
409 id: &tracing::span::Id,
410 _ctx: LayerContext<'_, S>,
411 ) {
412 let mut v = FieldCaptureVisitor::default();
413 attrs.record(&mut v);
414
415 if let Ok(mut spans) = self.spans.lock() {
416 let _ = spans.insert(id.clone(), v.fields);
417 }
418 }
419
420 fn on_event(
421 &self,
422 event: &tracing::Event<'_>,
423 ctx: LayerContext<'_, S>,
424 ) {
425 let mut merged: HashMap<String, String> = HashMap::new();
426
427 if let Some(scope) = ctx.event_scope(event) {
428 if let Ok(spans) = self.spans.lock() {
429 for span in scope.from_root() {
430 if let Some(fields) = spans.get(&span.id()) {
431 merged.extend(fields.clone());
432 }
433 }
434 }
435 }
436
437 let mut v = FieldCaptureVisitor::default();
438 event.record(&mut v);
439 merged.extend(v.fields);
440
441 if let Ok(mut events) = self.events.lock() {
442 events.push(merged);
443 }
444 }
445 }
446
447 #[crate::ctb_test]
448 fn spans_attach_request_and_stream_fields() -> Result<()> {
449 let req = Request {
450 id: 42,
451 method: MethodId::new("svc", "do_work"),
452 args: vec![],
453 };
454
455 let stream = StreamControl::Start {
456 id: 7,
457 kind: StreamKind::IoNetworkFetchBody,
458 blob: Some(BlobToken {
459 id: Default::default(),
460 size: 123,
461 lease_ms: Some(456),
462 }),
463 };
464
465 req.span().in_scope(|| {
466 tracing::info!("inside request span");
467 });
468
469 stream.span().in_scope(|| {
470 tracing::info!("inside stream span");
471 });
472
473 assert!(logs_contain("inside request span"));
475 assert!(logs_contain("inside stream span"));
476
477 Ok(())
478 }
479
480 fn roundtrip_bytes<T>(value: &T) -> Result<()>
482 where
483 T: serde::Serialize + for<'de> serde::de::Deserialize<'de>,
484 {
485 let bytes = postcard::to_stdvec(value)?;
486 let decoded: T = postcard::from_bytes(&bytes)?;
487 let bytes2 = postcard::to_stdvec(&decoded)?;
488 ensure!(bytes == bytes2, "postcard roundtrip bytes mismatch");
489 Ok(())
490 }
491
492 #[crate::ctb_test]
494 fn message_hello_roundtrip() -> Result<()> {
495 let msg = Message::Hello(Hello {
496 token: Default::default(),
497 client_info: Some(ClientInfo {
498 name: "client".into(),
499 version: "1.0.0".into(),
500 process_kind: "test".into(),
501 }),
502 });
503 roundtrip_bytes(&msg)
504 }
505
506 #[crate::ctb_test]
508 fn message_hello_ok_roundtrip() -> Result<()> {
509 let msg = Message::HelloOk(HelloOk {
510 bound_capabilities: Default::default(),
511 });
512 roundtrip_bytes(&msg)
513 }
514
515 #[crate::ctb_test]
517 fn message_hello_err_roundtrip() -> Result<()> {
518 let msg = Message::HelloErr(HelloErr {
519 message: "handshake failed".into(),
520 });
521 roundtrip_bytes(&msg)
522 }
523
524 #[crate::ctb_test]
526 fn message_request_roundtrip() -> Result<()> {
527 let msg = Message::Request(Request {
528 id: Default::default(),
529 method: MethodId {
530 service: "svc".into(),
531 method: "do_work".into(),
532 },
533 args: vec![0x01, 0x02, 0x03],
534 });
535 roundtrip_bytes(&msg)
536 }
537
538 #[crate::ctb_test]
540 fn message_response_ok_roundtrip() -> Result<()> {
541 let msg = Message::Response(Response {
542 id: Default::default(),
543 ok: true,
544 result: Some(vec![0xAA, 0xBB]),
545 error: None,
546 });
547 roundtrip_bytes(&msg)
548 }
549
550 #[crate::ctb_test]
552 fn message_response_err_roundtrip() -> Result<()> {
553 let msg = Message::Response(Response {
554 id: Default::default(),
555 ok: false,
556 result: None,
557 error: Some(RpcError {
558 code: "bad_request".into(),
559 message: "invalid input".into(),
560 }),
561 });
562 roundtrip_bytes(&msg)
563 }
564
565 #[crate::ctb_test]
567 fn message_event_roundtrip() -> Result<()> {
568 let msg = Message::Event(Event {
569 topic: EventTopic::Custom("custom.topic".into()),
570 payload: vec![0x10, 0x20],
571 });
572 roundtrip_bytes(&msg)
573 }
574
575 #[crate::ctb_test]
577 fn message_event_heartbeat_roundtrip() -> Result<()> {
578 let msg = Message::Event(Event::heartbeat());
579 roundtrip_bytes(&msg)
580 }
581
582 #[crate::ctb_test]
584 fn message_stream_start_roundtrip() -> Result<()> {
585 let msg = Message::Stream(StreamControl::Start {
586 id: Default::default(),
587 kind: StreamKind::Custom("bytes".into()),
588 blob: Some(Default::default()),
589 });
590 roundtrip_bytes(&msg)
591 }
592
593 #[crate::ctb_test]
595 fn message_stream_next_roundtrip() -> Result<()> {
596 let msg = Message::Stream(StreamControl::Next {
597 id: Default::default(),
598 chunk: Some(vec![1, 2, 3, 4]),
599 });
600 roundtrip_bytes(&msg)
601 }
602
603 #[crate::ctb_test]
605 fn message_stream_end_roundtrip() -> Result<()> {
606 let msg = Message::Stream(StreamControl::End {
607 id: Default::default(),
608 ok: false,
609 error: Some("stream error".into()),
610 });
611 roundtrip_bytes(&msg)
612 }
613
614 #[crate::ctb_test]
616 fn message_cancel_roundtrip() -> Result<()> {
617 let msg = Message::Cancel(Cancel {
618 id: Default::default(),
619 });
620 roundtrip_bytes(&msg)
621 }
622}