ctoolbox/workspace/ipc/data_plane/
shared_memory.rs

1use crate::workspace::ipc::error::Error;
2use crate::workspace::ipc::types::{BlobId, BlobToken};
3use async_trait::async_trait;
4use serde::{Deserialize, Serialize};
5use std::os::fd::FromRawFd;
6use std::path::PathBuf;
7use uuid::Uuid;
8
9/// Platform-neutral description of a shared memory handle that can be sent via control plane metadata.
10#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
11pub enum SharedBlobDescriptor {
12    /// Unix file descriptor (sent via `SCM_RIGHTS` on Unix sockets).
13    #[cfg(unix)]
14    UnixFd(i32),
15    /// Windows HANDLE (duplicated to the target process).
16    #[cfg(windows)]
17    WindowsHandle(u64),
18    /// Cross-platform fallback via temporary file path.
19    FilePath(PathBuf),
20    /// Opaque handle by name (e.g., named shared memory).
21    Named(String),
22}
23
24/// A producer-created blob that can be shared with other processes.
25pub struct ProducerBlob {
26    pub id: BlobId,
27    pub size: u64,
28    pub descriptor: SharedBlobDescriptor,
29    pub token: BlobToken,
30}
31
32impl ProducerBlob {
33    /// Best-effort helper for writing blob contents in tests and simple
34    /// producer workflows.
35    #[allow(unsafe_code)]
36    pub fn write_all(&self, data: &[u8]) -> Result<(), Error> {
37        if (u64::try_from(data.len())?) > self.size {
38            return Err(Error::Internal(
39                "blob write exceeds allocated size".to_string(),
40            ));
41        }
42
43        match &self.descriptor {
44            #[cfg(unix)]
45            SharedBlobDescriptor::UnixFd(fd) => {
46                let dup = crate::workspace::ipc::platform::unix::dup_fd(*fd)?;
47                let mut file = unsafe { std::fs::File::from_raw_fd(dup) };
48                use std::io::{Seek, SeekFrom, Write};
49                file.seek(SeekFrom::Start(0))?;
50                file.write_all(data)?;
51                file.flush()?;
52                Ok(())
53            }
54            #[cfg(windows)]
55            SharedBlobDescriptor::WindowsHandle(handle) => {
56                crate::workspace::ipc::platform::windows::write_mapping(
57                    *handle, data,
58                )
59            }
60            SharedBlobDescriptor::FilePath(path) => {
61                use std::io::{Seek, SeekFrom, Write};
62                let mut file = std::fs::OpenOptions::new()
63                    .read(true)
64                    .write(true)
65                    .create(false)
66                    .open(path)?;
67                file.seek(SeekFrom::Start(0))?;
68                file.write_all(data)?;
69                file.flush()?;
70                Ok(())
71            }
72            SharedBlobDescriptor::Named(_name) => Err(Error::Unsupported(
73                "Named shared blobs are not implemented for writing"
74                    .to_string(),
75            )),
76        }
77    }
78}
79
80/// A mapped read-only view of a blob.
81pub struct MappedRead<'a> {
82    /// Platform-backed mapping pointer and length are implementation details.
83    pub len: usize,
84    ptr: *const u8,
85    backing: MappedReadBacking,
86    /// Lifetime-bound marker to prevent use-after-free.
87    pub _marker: std::marker::PhantomData<&'a ()>,
88}
89
90impl MappedRead<'_> {
91    #[allow(unsafe_code)]
92    pub fn as_slice(&self) -> &[u8] {
93        // Safety: `ptr` and `len` are valid for the lifetime of `backing`.
94        unsafe { std::slice::from_raw_parts(self.ptr, self.len) }
95    }
96}
97
98enum MappedReadBacking {
99    Memmap(memmap2::Mmap),
100    #[cfg(windows)]
101    Windows(crate::workspace::ipc::platform::windows::MappingView),
102    #[allow(dead_code)]
103    Empty,
104}
105
106/// Blob allocator for creating and managing shared blobs.
107#[async_trait]
108pub trait BlobAllocator: Send + Sync {
109    /// Create a new blob of the given size and return a producer handle with a lifecycle token.
110    async fn create(&self, size: u64) -> Result<ProducerBlob, Error>;
111
112    /// Cleanup a blob proactively using its token (server-side GC).
113    async fn cleanup(&self, token: &BlobToken) -> Result<(), Error>;
114}
115
116/// Reader side to map an incoming blob by token/descriptor.
117#[async_trait]
118pub trait BlobReader: Send + Sync {
119    /// Map a blob for reading using its token and descriptor metadata.
120    async fn map_read<'a>(
121        &'a self,
122        token: &BlobToken,
123        desc: &SharedBlobDescriptor,
124    ) -> Result<MappedRead<'a>, Error>;
125}
126
127#[derive(Debug, Clone, Copy)]
128pub enum BlobBackend {
129    /// Use the platform’s preferred shared-memory mechanism.
130    PlatformDefault,
131    /// Always use a temp-file + `memmap2` fallback (portable, testable).
132    TempFileFallback,
133}
134
135#[derive(Debug)]
136struct BlobRecord {
137    token: BlobToken,
138    size: u64,
139    descriptor: SharedBlobDescriptor,
140}
141
142#[derive(Debug)]
143pub struct SharedMemoryBlobs {
144    backend: BlobBackend,
145    records: std::sync::Mutex<Vec<BlobRecord>>,
146    seq: std::sync::atomic::AtomicU64,
147}
148
149impl SharedMemoryBlobs {
150    pub fn new(backend: BlobBackend) -> Self {
151        Self {
152            backend,
153            records: std::sync::Mutex::new(Vec::new()),
154            seq: std::sync::atomic::AtomicU64::new(0),
155        }
156    }
157
158    fn new_token(&self) -> BlobToken {
159        let _seq = self.seq.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
160        BlobToken {
161            id: BlobId(Uuid::new_v4()),
162            size: 0,
163            lease_ms: None,
164        }
165    }
166
167    fn find_record(&self, token: &BlobToken) -> Option<BlobRecord> {
168        let records = self.records.lock().ok()?;
169        records
170            .iter()
171            .find(|r| &r.token == token)
172            .map(|r| BlobRecord {
173                token: r.token.clone(),
174                size: r.size,
175                descriptor: r.descriptor.clone(),
176            })
177    }
178
179    fn remove_record(&self, token: &BlobToken) -> Option<BlobRecord> {
180        let mut records = self.records.lock().ok()?;
181        let idx = records.iter().position(|r| &r.token == token)?;
182        Some(records.remove(idx))
183    }
184
185    fn create_tempfile_blob(
186        &self,
187        size: u64,
188        token: &BlobToken,
189    ) -> Result<SharedBlobDescriptor, Error> {
190        let mut path = std::env::temp_dir();
191        path.push(format!("{}.bin", token.id.0));
192
193        let file = std::fs::OpenOptions::new()
194            .read(true)
195            .write(true)
196            .create_new(true)
197            .open(&path)?;
198        file.set_len(size)?;
199        Ok(SharedBlobDescriptor::FilePath(path))
200    }
201}
202
203#[async_trait]
204impl BlobAllocator for SharedMemoryBlobs {
205    async fn create(&self, size: u64) -> Result<ProducerBlob, Error> {
206        let token = self.new_token();
207
208        let descriptor = match self.backend {
209            BlobBackend::TempFileFallback => {
210                self.create_tempfile_blob(size, &token)?
211            }
212            BlobBackend::PlatformDefault => {
213                #[cfg(unix)]
214                {
215                    // Linux memfd; non-Linux Unix will error out and fall back.
216                    if let Ok(fd) =
217                        crate::workspace::ipc::platform::unix::create_memfd(
218                            size,
219                        )
220                    {
221                        SharedBlobDescriptor::UnixFd(fd)
222                    } else {
223                        self.create_tempfile_blob(size, &token)?
224                    }
225                }
226                #[cfg(windows)]
227                {
228                    let handle =
229                        crate::workspace::ipc::platform::windows::create_file_mapping(size)?;
230                    SharedBlobDescriptor::WindowsHandle(handle)
231                }
232                #[cfg(not(any(unix, windows)))]
233                {
234                    self.create_tempfile_blob(size, &token)?
235                }
236            }
237        };
238
239        {
240            let mut records = self.records.lock().map_err(|_| {
241                Error::Internal("blob registry mutex poisoned".to_string())
242            })?;
243            records.push(BlobRecord {
244                token: token.clone(),
245                size,
246                descriptor: descriptor.clone(),
247            });
248        }
249
250        Ok(ProducerBlob {
251            id: BlobId::default(),
252            size,
253            descriptor,
254            token,
255        })
256    }
257
258    async fn cleanup(&self, token: &BlobToken) -> Result<(), Error> {
259        let Some(record) = self.remove_record(token) else {
260            // Cleanup is idempotent.
261            return Ok(());
262        };
263
264        match record.descriptor {
265            #[cfg(unix)]
266            SharedBlobDescriptor::UnixFd(fd) => {
267                crate::workspace::ipc::platform::unix::close_fd(fd)
268            }
269            #[cfg(windows)]
270            SharedBlobDescriptor::WindowsHandle(handle) => {
271                crate::workspace::ipc::platform::windows::close_handle(handle)
272            }
273            SharedBlobDescriptor::FilePath(path) => {
274                let _ = std::fs::remove_file(path);
275                Ok(())
276            }
277            SharedBlobDescriptor::Named(_name) => Ok(()),
278        }
279    }
280}
281
282#[allow(unsafe_code)]
283#[async_trait]
284impl BlobReader for SharedMemoryBlobs {
285    async fn map_read<'a>(
286        &'a self,
287        token: &BlobToken,
288        desc: &SharedBlobDescriptor,
289    ) -> Result<MappedRead<'a>, Error> {
290        let record = self.find_record(token).ok_or_else(|| Error::NotFound)?;
291
292        // Ensure descriptor matches what we created/tracked for this token.
293        if &record.descriptor != desc {
294            return Err(Error::Internal(
295                "descriptor does not match tracked blob token".to_string(),
296            ));
297        }
298
299        let len: usize = record.size.try_into().map_err(|_| {
300            Error::Internal(
301                "blob too large to map on this platform".to_string(),
302            )
303        })?;
304
305        match desc {
306            #[cfg(unix)]
307            SharedBlobDescriptor::UnixFd(fd) => {
308                let dup = crate::workspace::ipc::platform::unix::dup_fd(*fd)?;
309                let file = unsafe { std::fs::File::from_raw_fd(dup) };
310                let mmap = unsafe { memmap2::Mmap::map(&file)? };
311                let ptr = mmap.as_ptr();
312                Ok(MappedRead {
313                    len,
314                    ptr,
315                    backing: MappedReadBacking::Memmap(mmap),
316                    _marker: std::marker::PhantomData,
317                })
318            }
319            #[cfg(windows)]
320            SharedBlobDescriptor::WindowsHandle(handle) => {
321                let view =
322                    crate::workspace::ipc::platform::windows::map_view_read(
323                        *handle, len,
324                    )?;
325                let ptr = view.as_ptr();
326                Ok(MappedRead {
327                    len,
328                    ptr,
329                    backing: MappedReadBacking::Windows(view),
330                    _marker: std::marker::PhantomData,
331                })
332            }
333            SharedBlobDescriptor::FilePath(path) => {
334                let file = std::fs::File::open(path)?;
335                let mmap = unsafe { memmap2::Mmap::map(&file)? };
336                let ptr = mmap.as_ptr();
337                Ok(MappedRead {
338                    len,
339                    ptr,
340                    backing: MappedReadBacking::Memmap(mmap),
341                    _marker: std::marker::PhantomData,
342                })
343            }
344            SharedBlobDescriptor::Named(_name) => Err(Error::Unsupported(
345                "Named shared blobs are not implemented for reading"
346                    .to_string(),
347            )),
348        }
349    }
350}
351
352#[cfg(test)]
353mod tests {
354    use super::*;
355    use anyhow::Result;
356
357    #[crate::ctb_test(tokio::test)]
358    async fn blob_round_trip_tempfile_fallback() -> Result<()> {
359        let blobs = SharedMemoryBlobs::new(BlobBackend::TempFileFallback);
360        let data = b"hello-blob-fallback".to_vec();
361
362        let blob = blobs.create(u64::try_from(data.len())?).await?;
363        blob.write_all(&data)?;
364
365        let mapped = blobs.map_read(&blob.token, &blob.descriptor).await?;
366        assert_eq!(mapped.as_slice(), &data[..]);
367
368        blobs.cleanup(&blob.token).await?;
369        Ok(())
370    }
371
372    #[cfg(unix)]
373    #[crate::ctb_test(tokio::test)]
374    async fn blob_round_trip_unix_platform_default() -> Result<()> {
375        let blobs = SharedMemoryBlobs::new(BlobBackend::PlatformDefault);
376        let data = b"hello-blob-unix".to_vec();
377
378        let blob = blobs.create(u64::try_from(data.len())?).await?;
379        blob.write_all(&data)?;
380
381        let mapped = blobs.map_read(&blob.token, &blob.descriptor).await?;
382        assert_eq!(mapped.as_slice(), &data[..]);
383
384        blobs.cleanup(&blob.token).await?;
385        Ok(())
386    }
387
388    #[cfg(windows)]
389    #[crate::ctb_test(tokio::test)]
390    async fn blob_round_trip_windows_platform_default() -> Result<()> {
391        let blobs = SharedMemoryBlobs::new(BlobBackend::PlatformDefault);
392        let data = b"hello-blob-windows".to_vec();
393
394        let blob = blobs.create(data.len() as u64).await?;
395        blob.write_all(&data)?;
396
397        let mapped = blobs.map_read(&blob.token, &blob.descriptor).await?;
398        assert_eq!(mapped.as_slice(), &data[..]);
399
400        blobs.cleanup(&blob.token).await?;
401        Ok(())
402    }
403}