ctoolbox/workspace/ipc/process_manager/
unix.rs1#![cfg(unix)]
2
3use std::collections::HashMap;
15use std::sync::Arc;
16
17use anyhow::{Context, Result, anyhow};
18use async_trait::async_trait;
19use nix::errno::Errno;
20use nix::libc;
21use nix::sys::signal::{Signal, kill};
22use nix::unistd::Pid;
23use process_wrap::tokio::{CommandWrap, ProcessGroup};
24use tokio::sync::Mutex;
25use tokio::sync::oneshot;
26use tokio::time::{Duration, sleep};
27
28use crate::warn;
29use crate::workspace::ipc::error::Error;
30use crate::workspace::ipc::types::{ConnectionId, ProcessId};
31
32use super::{ChildHandle, ProcessManager, SpawnParams};
33
34pub struct UnixProcessSupervisor;
39
40pub struct UnixSharedMemory;
42
43#[derive(Debug)]
44struct ChildEntry {
45 handle: ChildHandle,
47 pgid: libc::pid_t,
49 reaped: oneshot::Receiver<()>,
51}
52
53#[derive(Debug)]
54struct Inner {
55 children: HashMap<ProcessId, ChildEntry>,
56}
57
58#[derive(Debug)]
60pub struct TokioProcessManager {
61 inner: Arc<Mutex<Inner>>,
62}
63
64impl TokioProcessManager {
65 pub fn new() -> Arc<Self> {
67 Arc::new(Self {
68 inner: Arc::new(Mutex::new(Inner {
69 children: HashMap::new(),
70 })),
71 })
72 }
73
74 fn to_err(e: anyhow::Error) -> Error {
75 Error::from(e)
77 }
78
79 fn signal_pgid(
81 pgid: libc::pid_t,
82 sig: libc::c_int,
83 ) -> Result<(), anyhow::Error> {
84 let res = kill(Pid::from_raw(pgid), Signal::try_from(sig)?);
87 match res {
88 Ok(()) => Ok(()),
89 Err(Errno::EPERM) => Ok(()), Err(e) => {
91 Err(anyhow::anyhow!("failed to signal process group: {e}"))
92 }
93 }
94 }
95
96 fn pid_exists(pid: libc::pid_t) -> bool {
99 let e = kill(Pid::from_raw(pid), None);
101 match e {
102 Ok(()) => true,
103 Err(Errno::EPERM) => true, Err(Errno::ESRCH) => false, Err(_) => false,
106 }
107 }
108
109 async fn wait_pid_gone(pid: libc::pid_t, timeout: Duration) -> bool {
111 let step = Duration::from_millis(50);
112 let mut waited = Duration::from_millis(0);
113 while waited < timeout {
114 if !Self::pid_exists(pid) {
115 return true;
116 }
117 sleep(step).await;
118 waited += step;
119 }
120 !Self::pid_exists(pid)
121 }
122}
123
124#[async_trait]
125impl ProcessManager for TokioProcessManager {
126 async fn spawn_child(
127 &self,
128 params: SpawnParams,
129 ) -> Result<ChildHandle, Error> {
130 let program = if let Some(p) = params.program {
132 p
133 } else {
134 if params.kind == crate::workspace::ipc::types::ChildKind::External
135 {
136 return Err(Error::from(anyhow!(
137 "No program specified for child process"
138 )));
139 }
140 std::env::current_exe()?.into_os_string().into_string()?
141 };
142 let mut cmd = CommandWrap::with_new(&program, |command| {
143 command.args(¶ms.args);
144 for (k, v) in ¶ms.env {
145 command.env(k, v);
146 }
147 if let Some(cwd) = ¶ms.cwd {
148 command.current_dir(cwd);
149 }
150 });
151 cmd.wrap(ProcessGroup::leader());
153
154 let mut child = cmd.spawn().context("failed to spawn child")?;
156 let sys_pid = child.id().ok_or_else(|| anyhow!("child has no pid"))?;
157 let pid = ProcessId::new();
158 let handle = ChildHandle {
159 pid,
160 kind: params.kind,
161 connection: None,
162 };
163 let (tx, rx) = oneshot::channel::<()>();
165
166 let pgid = -i32::try_from(sys_pid)
168 .map_err(|e| Self::to_err(anyhow!("pid conversion error: {e}")))?;
169
170 let inner_for_reaper = Arc::clone(&self.inner);
172 let pid_for_reaper = pid;
173 tokio::spawn(async move {
174 let _ = child.wait().await;
176 let _ = tx.send(());
177 let mut inner = inner_for_reaper.lock().await;
179 let removed = inner.children.remove(&pid_for_reaper).is_some();
180 drop(inner);
181 });
182
183 let entry = ChildEntry {
184 handle: handle.clone(),
185 pgid,
186 reaped: rx,
187 };
188 let mut inner = self.inner.lock().await;
189 inner.children.insert(pid, entry);
190 Ok(handle)
191 }
192
193 async fn attach_connection(
194 &self,
195 pid: ProcessId,
196 conn: ConnectionId,
197 ) -> Result<(), Error> {
198 let mut inner = self.inner.lock().await;
199 let entry = inner.children.get_mut(&pid).ok_or_else(|| {
200 Self::to_err(anyhow!("child with pid {pid:?} not found"))
201 })?;
202 entry.handle.connection = Some(conn);
203 Ok(())
204 }
205
206 async fn list_children(&self) -> Result<Vec<ChildHandle>, Error> {
207 let inner = self.inner.lock().await;
208 Ok(inner.children.values().map(|e| e.handle.clone()).collect())
209 }
210
211 async fn terminate_tree(
212 &self,
213 pid: ProcessId,
214 force: bool,
215 ) -> Result<(), Error> {
216 let entry = {
218 let mut inner = self.inner.lock().await;
219 inner.children.remove(&pid).ok_or_else(|| {
220 Self::to_err(anyhow!("child with pid {pid:?} not found"))
221 })?
222 };
223 let pgid = entry.pgid;
224
225 if force {
226 Self::signal_pgid(pgid, libc::SIGKILL).map_err(Self::to_err)?;
228 let _ = tokio::time::timeout(Duration::from_secs(5), entry.reaped)
230 .await;
231 return Ok(());
232 }
233
234 Self::signal_pgid(pgid, libc::SIGTERM).map_err(Self::to_err)?;
236
237 let graceful_timeout = Duration::from_secs(5);
238 let reaped = entry.reaped; if let Ok(()) = tokio::time::timeout(graceful_timeout, async {
240 let _ = reaped.await;
242 })
243 .await
244 {
245 } else {
247 Self::signal_pgid(pgid, libc::SIGKILL).map_err(Self::to_err)?;
249 warn!(
250 "Child pid {:?} did not exit gracefully after {:?}, sent SIGKILL",
251 pid, graceful_timeout
252 );
253 }
259
260 Ok(())
261 }
262}
263
264#[cfg(test)]
265mod tests {
266 use tokio::time::timeout;
267
268 use crate::workspace::ipc::auth::capability::CapabilityBundle;
269 use crate::workspace::ipc::types::ChildKind;
270
271 use super::*;
272
273 #[crate::ctb_test(tokio::test)]
274 async fn test_spawn_and_terminate_graceful() {
275 let manager = TokioProcessManager::new();
276 let params = SpawnParams {
277 kind: ChildKind::External,
278 program: Some("sleep".to_string()),
279 args: vec!["1".to_string()],
280 env: vec![],
281 cwd: None,
282 capabilities: CapabilityBundle::default(),
283 };
284 let handle = manager.spawn_child(params).await.unwrap();
285 let result = timeout(
287 Duration::from_secs(3),
288 manager.terminate_tree(handle.pid, false),
289 )
290 .await;
291 assert!(
292 result.is_ok(),
293 "graceful termination should complete within timeout"
294 );
295 }
296
297 #[crate::ctb_test(tokio::test)]
298 async fn test_spawn_and_terminate_force() {
299 let manager = TokioProcessManager::new();
300 let params = SpawnParams {
301 kind: ChildKind::External,
302 program: Some("sleep".to_string()),
303 args: vec!["10".to_string()], env: vec![],
305 cwd: None,
306 capabilities: CapabilityBundle::default(),
307 };
308 let handle = manager.spawn_child(params).await.unwrap();
309 let result = manager.terminate_tree(handle.pid, true).await;
311 assert!(result.is_ok(), "force termination should succeed");
312 }
313}
314
315