ctoolbox/workspace/ipc/data_plane/
shared_memory.rs1use crate::workspace::ipc::error::Error;
2use crate::workspace::ipc::types::{BlobId, BlobToken};
3use async_trait::async_trait;
4use serde::{Deserialize, Serialize};
5use std::os::fd::FromRawFd;
6use std::path::PathBuf;
7use uuid::Uuid;
8
9#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
11pub enum SharedBlobDescriptor {
12 #[cfg(unix)]
14 UnixFd(i32),
15 #[cfg(windows)]
17 WindowsHandle(u64),
18 FilePath(PathBuf),
20 Named(String),
22}
23
24pub struct ProducerBlob {
26 pub id: BlobId,
27 pub size: u64,
28 pub descriptor: SharedBlobDescriptor,
29 pub token: BlobToken,
30}
31
32impl ProducerBlob {
33 #[allow(unsafe_code)]
36 pub fn write_all(&self, data: &[u8]) -> Result<(), Error> {
37 if (u64::try_from(data.len())?) > self.size {
38 return Err(Error::Internal(
39 "blob write exceeds allocated size".to_string(),
40 ));
41 }
42
43 match &self.descriptor {
44 #[cfg(unix)]
45 SharedBlobDescriptor::UnixFd(fd) => {
46 let dup = crate::workspace::ipc::platform::unix::dup_fd(*fd)?;
47 let mut file = unsafe { std::fs::File::from_raw_fd(dup) };
48 use std::io::{Seek, SeekFrom, Write};
49 file.seek(SeekFrom::Start(0))?;
50 file.write_all(data)?;
51 file.flush()?;
52 Ok(())
53 }
54 #[cfg(windows)]
55 SharedBlobDescriptor::WindowsHandle(handle) => {
56 crate::workspace::ipc::platform::windows::write_mapping(
57 *handle, data,
58 )
59 }
60 SharedBlobDescriptor::FilePath(path) => {
61 use std::io::{Seek, SeekFrom, Write};
62 let mut file = std::fs::OpenOptions::new()
63 .read(true)
64 .write(true)
65 .create(false)
66 .open(path)?;
67 file.seek(SeekFrom::Start(0))?;
68 file.write_all(data)?;
69 file.flush()?;
70 Ok(())
71 }
72 SharedBlobDescriptor::Named(_name) => Err(Error::Unsupported(
73 "Named shared blobs are not implemented for writing"
74 .to_string(),
75 )),
76 }
77 }
78}
79
80pub struct MappedRead<'a> {
82 pub len: usize,
84 ptr: *const u8,
85 backing: MappedReadBacking,
86 pub _marker: std::marker::PhantomData<&'a ()>,
88}
89
90impl MappedRead<'_> {
91 #[allow(unsafe_code)]
92 pub fn as_slice(&self) -> &[u8] {
93 unsafe { std::slice::from_raw_parts(self.ptr, self.len) }
95 }
96}
97
98enum MappedReadBacking {
99 Memmap(memmap2::Mmap),
100 #[cfg(windows)]
101 Windows(crate::workspace::ipc::platform::windows::MappingView),
102 #[allow(dead_code)]
103 Empty,
104}
105
106#[async_trait]
108pub trait BlobAllocator: Send + Sync {
109 async fn create(&self, size: u64) -> Result<ProducerBlob, Error>;
111
112 async fn cleanup(&self, token: &BlobToken) -> Result<(), Error>;
114}
115
116#[async_trait]
118pub trait BlobReader: Send + Sync {
119 async fn map_read<'a>(
121 &'a self,
122 token: &BlobToken,
123 desc: &SharedBlobDescriptor,
124 ) -> Result<MappedRead<'a>, Error>;
125}
126
127#[derive(Debug, Clone, Copy)]
128pub enum BlobBackend {
129 PlatformDefault,
131 TempFileFallback,
133}
134
135#[derive(Debug)]
136struct BlobRecord {
137 token: BlobToken,
138 size: u64,
139 descriptor: SharedBlobDescriptor,
140}
141
142#[derive(Debug)]
143pub struct SharedMemoryBlobs {
144 backend: BlobBackend,
145 records: std::sync::Mutex<Vec<BlobRecord>>,
146 seq: std::sync::atomic::AtomicU64,
147}
148
149impl SharedMemoryBlobs {
150 pub fn new(backend: BlobBackend) -> Self {
151 Self {
152 backend,
153 records: std::sync::Mutex::new(Vec::new()),
154 seq: std::sync::atomic::AtomicU64::new(0),
155 }
156 }
157
158 fn new_token(&self) -> BlobToken {
159 let _seq = self.seq.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
160 BlobToken {
161 id: BlobId(Uuid::new_v4()),
162 size: 0,
163 lease_ms: None,
164 }
165 }
166
167 fn find_record(&self, token: &BlobToken) -> Option<BlobRecord> {
168 let records = self.records.lock().ok()?;
169 records
170 .iter()
171 .find(|r| &r.token == token)
172 .map(|r| BlobRecord {
173 token: r.token.clone(),
174 size: r.size,
175 descriptor: r.descriptor.clone(),
176 })
177 }
178
179 fn remove_record(&self, token: &BlobToken) -> Option<BlobRecord> {
180 let mut records = self.records.lock().ok()?;
181 let idx = records.iter().position(|r| &r.token == token)?;
182 Some(records.remove(idx))
183 }
184
185 fn create_tempfile_blob(
186 &self,
187 size: u64,
188 token: &BlobToken,
189 ) -> Result<SharedBlobDescriptor, Error> {
190 let mut path = std::env::temp_dir();
191 path.push(format!("{}.bin", token.id.0));
192
193 let file = std::fs::OpenOptions::new()
194 .read(true)
195 .write(true)
196 .create_new(true)
197 .open(&path)?;
198 file.set_len(size)?;
199 Ok(SharedBlobDescriptor::FilePath(path))
200 }
201}
202
203#[async_trait]
204impl BlobAllocator for SharedMemoryBlobs {
205 async fn create(&self, size: u64) -> Result<ProducerBlob, Error> {
206 let token = self.new_token();
207
208 let descriptor = match self.backend {
209 BlobBackend::TempFileFallback => {
210 self.create_tempfile_blob(size, &token)?
211 }
212 BlobBackend::PlatformDefault => {
213 #[cfg(unix)]
214 {
215 if let Ok(fd) =
217 crate::workspace::ipc::platform::unix::create_memfd(
218 size,
219 )
220 {
221 SharedBlobDescriptor::UnixFd(fd)
222 } else {
223 self.create_tempfile_blob(size, &token)?
224 }
225 }
226 #[cfg(windows)]
227 {
228 let handle =
229 crate::workspace::ipc::platform::windows::create_file_mapping(size)?;
230 SharedBlobDescriptor::WindowsHandle(handle)
231 }
232 #[cfg(not(any(unix, windows)))]
233 {
234 self.create_tempfile_blob(size, &token)?
235 }
236 }
237 };
238
239 {
240 let mut records = self.records.lock().map_err(|_| {
241 Error::Internal("blob registry mutex poisoned".to_string())
242 })?;
243 records.push(BlobRecord {
244 token: token.clone(),
245 size,
246 descriptor: descriptor.clone(),
247 });
248 }
249
250 Ok(ProducerBlob {
251 id: BlobId::default(),
252 size,
253 descriptor,
254 token,
255 })
256 }
257
258 async fn cleanup(&self, token: &BlobToken) -> Result<(), Error> {
259 let Some(record) = self.remove_record(token) else {
260 return Ok(());
262 };
263
264 match record.descriptor {
265 #[cfg(unix)]
266 SharedBlobDescriptor::UnixFd(fd) => {
267 crate::workspace::ipc::platform::unix::close_fd(fd)
268 }
269 #[cfg(windows)]
270 SharedBlobDescriptor::WindowsHandle(handle) => {
271 crate::workspace::ipc::platform::windows::close_handle(handle)
272 }
273 SharedBlobDescriptor::FilePath(path) => {
274 let _ = std::fs::remove_file(path);
275 Ok(())
276 }
277 SharedBlobDescriptor::Named(_name) => Ok(()),
278 }
279 }
280}
281
282#[allow(unsafe_code)]
283#[async_trait]
284impl BlobReader for SharedMemoryBlobs {
285 async fn map_read<'a>(
286 &'a self,
287 token: &BlobToken,
288 desc: &SharedBlobDescriptor,
289 ) -> Result<MappedRead<'a>, Error> {
290 let record = self.find_record(token).ok_or_else(|| Error::NotFound)?;
291
292 if &record.descriptor != desc {
294 return Err(Error::Internal(
295 "descriptor does not match tracked blob token".to_string(),
296 ));
297 }
298
299 let len: usize = record.size.try_into().map_err(|_| {
300 Error::Internal(
301 "blob too large to map on this platform".to_string(),
302 )
303 })?;
304
305 match desc {
306 #[cfg(unix)]
307 SharedBlobDescriptor::UnixFd(fd) => {
308 let dup = crate::workspace::ipc::platform::unix::dup_fd(*fd)?;
309 let file = unsafe { std::fs::File::from_raw_fd(dup) };
310 let mmap = unsafe { memmap2::Mmap::map(&file)? };
311 let ptr = mmap.as_ptr();
312 Ok(MappedRead {
313 len,
314 ptr,
315 backing: MappedReadBacking::Memmap(mmap),
316 _marker: std::marker::PhantomData,
317 })
318 }
319 #[cfg(windows)]
320 SharedBlobDescriptor::WindowsHandle(handle) => {
321 let view =
322 crate::workspace::ipc::platform::windows::map_view_read(
323 *handle, len,
324 )?;
325 let ptr = view.as_ptr();
326 Ok(MappedRead {
327 len,
328 ptr,
329 backing: MappedReadBacking::Windows(view),
330 _marker: std::marker::PhantomData,
331 })
332 }
333 SharedBlobDescriptor::FilePath(path) => {
334 let file = std::fs::File::open(path)?;
335 let mmap = unsafe { memmap2::Mmap::map(&file)? };
336 let ptr = mmap.as_ptr();
337 Ok(MappedRead {
338 len,
339 ptr,
340 backing: MappedReadBacking::Memmap(mmap),
341 _marker: std::marker::PhantomData,
342 })
343 }
344 SharedBlobDescriptor::Named(_name) => Err(Error::Unsupported(
345 "Named shared blobs are not implemented for reading"
346 .to_string(),
347 )),
348 }
349 }
350}
351
352#[cfg(test)]
353mod tests {
354 use super::*;
355 use anyhow::Result;
356
357 #[crate::ctb_test(tokio::test)]
358 async fn blob_round_trip_tempfile_fallback() -> Result<()> {
359 let blobs = SharedMemoryBlobs::new(BlobBackend::TempFileFallback);
360 let data = b"hello-blob-fallback".to_vec();
361
362 let blob = blobs.create(u64::try_from(data.len())?).await?;
363 blob.write_all(&data)?;
364
365 let mapped = blobs.map_read(&blob.token, &blob.descriptor).await?;
366 assert_eq!(mapped.as_slice(), &data[..]);
367
368 blobs.cleanup(&blob.token).await?;
369 Ok(())
370 }
371
372 #[cfg(unix)]
373 #[crate::ctb_test(tokio::test)]
374 async fn blob_round_trip_unix_platform_default() -> Result<()> {
375 let blobs = SharedMemoryBlobs::new(BlobBackend::PlatformDefault);
376 let data = b"hello-blob-unix".to_vec();
377
378 let blob = blobs.create(u64::try_from(data.len())?).await?;
379 blob.write_all(&data)?;
380
381 let mapped = blobs.map_read(&blob.token, &blob.descriptor).await?;
382 assert_eq!(mapped.as_slice(), &data[..]);
383
384 blobs.cleanup(&blob.token).await?;
385 Ok(())
386 }
387
388 #[cfg(windows)]
389 #[crate::ctb_test(tokio::test)]
390 async fn blob_round_trip_windows_platform_default() -> Result<()> {
391 let blobs = SharedMemoryBlobs::new(BlobBackend::PlatformDefault);
392 let data = b"hello-blob-windows".to_vec();
393
394 let blob = blobs.create(data.len() as u64).await?;
395 blob.write_all(&data)?;
396
397 let mapped = blobs.map_read(&blob.token, &blob.descriptor).await?;
398 assert_eq!(mapped.as_slice(), &data[..]);
399
400 blobs.cleanup(&blob.token).await?;
401 Ok(())
402 }
403}