ctoolbox/workspace/ipc/process_manager/
unix.rs

1#![cfg(unix)]
2
3//! Unix-specific `ProcessManager` implementation.
4//!
5//! - Each child is started in its own process group via setpgid(0, 0).
6//! - On Linux, `PR_SET_PDEATHSIG` is set to SIGTERM.
7//! - Tree termination uses kill on the process group id.
8//!
9//! Process reaping is based partly on <https://github.com/fpco/pid1-rs>
10//!
11//! Those parts used under the MIT license, see full license at end.
12//! pid1-rs license: Copyright (c) 2023 FP Complete
13
14use std::collections::HashMap;
15use std::sync::Arc;
16
17use anyhow::{Context, Result, anyhow};
18use async_trait::async_trait;
19use nix::errno::Errno;
20use nix::libc;
21use nix::sys::signal::{Signal, kill};
22use nix::unistd::Pid;
23use process_wrap::tokio::{CommandWrap, ProcessGroup};
24use tokio::sync::Mutex;
25use tokio::sync::oneshot;
26use tokio::time::{Duration, sleep};
27
28use crate::warn;
29use crate::workspace::ipc::error::Error;
30use crate::workspace::ipc::types::{ConnectionId, ProcessId};
31
32use super::{ChildHandle, ProcessManager, SpawnParams};
33
34/// Unix-specific process supervision primitives (to be implemented with nix):
35/// - setpgid to create process groups
36/// - `prctl(PR_SET_PDEATHSIG)`
37/// - killpg for tree termination
38pub struct UnixProcessSupervisor;
39
40/// Unix shared memory helpers (memfd, `SCM_RIGHTS`) should be implemented here as needed.
41pub struct UnixSharedMemory;
42
43#[derive(Debug)]
44struct ChildEntry {
45    // child handle is now managed by a background reaper task
46    handle: ChildHandle,
47    // On Unix, the process group id equals pid when setpgid(0, 0) is used.
48    pgid: libc::pid_t,
49    // Notifies when the child has been fully reaped (wait() completed)
50    reaped: oneshot::Receiver<()>,
51}
52
53#[derive(Debug)]
54struct Inner {
55    children: HashMap<ProcessId, ChildEntry>,
56}
57
58/// Tokio-based Unix process manager.
59#[derive(Debug)]
60pub struct TokioProcessManager {
61    inner: Arc<Mutex<Inner>>,
62}
63
64impl TokioProcessManager {
65    /// Create a new manager instance.
66    pub fn new() -> Arc<Self> {
67        Arc::new(Self {
68            inner: Arc::new(Mutex::new(Inner {
69                children: HashMap::new(),
70            })),
71        })
72    }
73
74    fn to_err(e: anyhow::Error) -> Error {
75        // Map anyhow::Error to crate error. Assumes Error: From<anyhow::Error>.
76        Error::from(e)
77    }
78
79    /// Send a signal to the given process group id, using nix.
80    fn signal_pgid(
81        pgid: libc::pid_t,
82        sig: libc::c_int,
83    ) -> Result<(), anyhow::Error> {
84        // Use kill with negative pgid to signal the whole process group.
85        // nix::unistd::Pid supports negative values for process groups.
86        let res = kill(Pid::from_raw(pgid), Signal::try_from(sig)?);
87        match res {
88            Ok(()) => Ok(()),
89            Err(Errno::EPERM) => Ok(()), // group exists but we don't have permission
90            Err(e) => {
91                Err(anyhow::anyhow!("failed to signal process group: {e}"))
92            }
93        }
94    }
95
96    /// Check whether a process group with the given pgid exists, using nix.
97    /// We send signal 0 to -pgid to test for any member of that group.
98    fn pid_exists(pid: libc::pid_t) -> bool {
99        // NOTE: treat `pid` as a process group id (pgid) and test the group.
100        let e = kill(Pid::from_raw(pid), None);
101        match e {
102            Ok(()) => true,
103            Err(Errno::EPERM) => true, // group exists but not permitted
104            Err(Errno::ESRCH) => false, // no such process/group
105            Err(_) => false,
106        }
107    }
108
109    /// Wait up to `timeout` for the given pid to disappear.
110    async fn wait_pid_gone(pid: libc::pid_t, timeout: Duration) -> bool {
111        let step = Duration::from_millis(50);
112        let mut waited = Duration::from_millis(0);
113        while waited < timeout {
114            if !Self::pid_exists(pid) {
115                return true;
116            }
117            sleep(step).await;
118            waited += step;
119        }
120        !Self::pid_exists(pid)
121    }
122}
123
124#[async_trait]
125impl ProcessManager for TokioProcessManager {
126    async fn spawn_child(
127        &self,
128        params: SpawnParams,
129    ) -> Result<ChildHandle, Error> {
130        // Use process-wrap's CommandWrap for safe process group setup
131        let program = if let Some(p) = params.program {
132            p
133        } else {
134            if params.kind == crate::workspace::ipc::types::ChildKind::External
135            {
136                return Err(Error::from(anyhow!(
137                    "No program specified for child process"
138                )));
139            }
140            std::env::current_exe()?.into_os_string().into_string()?
141        };
142        let mut cmd = CommandWrap::with_new(&program, |command| {
143            command.args(&params.args);
144            for (k, v) in &params.env {
145                command.env(k, v);
146            }
147            if let Some(cwd) = &params.cwd {
148                command.current_dir(cwd);
149            }
150        });
151        // Apply the ProcessGroup wrapper to safely handle setpgid and PR_SET_PDEATHSIG (on Linux)
152        cmd.wrap(ProcessGroup::leader());
153
154        // Spawn the child using the wrapped command
155        let mut child = cmd.spawn().context("failed to spawn child")?;
156        let sys_pid = child.id().ok_or_else(|| anyhow!("child has no pid"))?;
157        let pid = ProcessId::new();
158        let handle = ChildHandle {
159            pid,
160            kind: params.kind,
161            connection: None,
162        };
163        // Create a oneshot channel to signal reaping completion
164        let (tx, rx) = oneshot::channel::<()>();
165
166        // Compute process group id value we use for signaling (negative value to target group)
167        let pgid = -i32::try_from(sys_pid)
168            .map_err(|e| Self::to_err(anyhow!("pid conversion error: {e}")))?;
169
170        // Spawn a background reaper task that waits on the child and cleans up
171        let inner_for_reaper = Arc::clone(&self.inner);
172        let pid_for_reaper = pid;
173        tokio::spawn(async move {
174            // Wait for the child to exit and be reaped
175            let _ = child.wait().await;
176            let _ = tx.send(());
177            // Best-effort cleanup of the manager's registry (in case terminate_tree wasn't called)
178            let mut inner = inner_for_reaper.lock().await;
179            let removed = inner.children.remove(&pid_for_reaper).is_some();
180            drop(inner);
181        });
182
183        let entry = ChildEntry {
184            handle: handle.clone(),
185            pgid,
186            reaped: rx,
187        };
188        let mut inner = self.inner.lock().await;
189        inner.children.insert(pid, entry);
190        Ok(handle)
191    }
192
193    async fn attach_connection(
194        &self,
195        pid: ProcessId,
196        conn: ConnectionId,
197    ) -> Result<(), Error> {
198        let mut inner = self.inner.lock().await;
199        let entry = inner.children.get_mut(&pid).ok_or_else(|| {
200            Self::to_err(anyhow!("child with pid {pid:?} not found"))
201        })?;
202        entry.handle.connection = Some(conn);
203        Ok(())
204    }
205
206    async fn list_children(&self) -> Result<Vec<ChildHandle>, Error> {
207        let inner = self.inner.lock().await;
208        Ok(inner.children.values().map(|e| e.handle.clone()).collect())
209    }
210
211    async fn terminate_tree(
212        &self,
213        pid: ProcessId,
214        force: bool,
215    ) -> Result<(), Error> {
216        // Take ownership of the entry so we can await outside the lock
217        let entry = {
218            let mut inner = self.inner.lock().await;
219            inner.children.remove(&pid).ok_or_else(|| {
220                Self::to_err(anyhow!("child with pid {pid:?} not found"))
221            })?
222        };
223        let pgid = entry.pgid;
224
225        if force {
226            // Immediate kill of the whole process group
227            Self::signal_pgid(pgid, libc::SIGKILL).map_err(Self::to_err)?;
228            // Best-effort: await reaping signal shortly to avoid zombies
229            let _ = tokio::time::timeout(Duration::from_secs(5), entry.reaped)
230                .await;
231            return Ok(());
232        }
233
234        // Graceful: SIGTERM then wait for child reaping
235        Self::signal_pgid(pgid, libc::SIGTERM).map_err(Self::to_err)?;
236
237        let graceful_timeout = Duration::from_secs(5);
238        let reaped = entry.reaped; // move receiver out of entry
239        if let Ok(()) = tokio::time::timeout(graceful_timeout, async {
240            // awaiting the receiver consumes it
241            let _ = reaped.await;
242        })
243        .await
244        {
245            // reaped or channel closed
246        } else {
247            // timed out -> escalate and then wait a bit more
248            Self::signal_pgid(pgid, libc::SIGKILL).map_err(Self::to_err)?;
249            warn!(
250                "Child pid {:?} did not exit gracefully after {:?}, sent SIGKILL",
251                pid, graceful_timeout
252            );
253            // If you want to wait for the same oneshot after SIGKILL, you
254            // can't: it's already owned by `reaped` and either completed or
255            // still pending.
256            // Await it now directly (no timeout) or with a new timeout:
257            // let _ = tokio::time::timeout(Duration::from_secs(5), reaped).await;
258        }
259
260        Ok(())
261    }
262}
263
264#[cfg(test)]
265mod tests {
266    use tokio::time::timeout;
267
268    use crate::workspace::ipc::auth::capability::CapabilityBundle;
269    use crate::workspace::ipc::types::ChildKind;
270
271    use super::*;
272
273    #[crate::ctb_test(tokio::test)]
274    async fn test_spawn_and_terminate_graceful() {
275        let manager = TokioProcessManager::new();
276        let params = SpawnParams {
277            kind: ChildKind::External,
278            program: Some("sleep".to_string()),
279            args: vec!["1".to_string()],
280            env: vec![],
281            cwd: None,
282            capabilities: CapabilityBundle::default(),
283        };
284        let handle = manager.spawn_child(params).await.unwrap();
285        // Terminate with force=false, should wait for graceful exit
286        let result = timeout(
287            Duration::from_secs(3),
288            manager.terminate_tree(handle.pid, false),
289        )
290        .await;
291        assert!(
292            result.is_ok(),
293            "graceful termination should complete within timeout"
294        );
295    }
296
297    #[crate::ctb_test(tokio::test)]
298    async fn test_spawn_and_terminate_force() {
299        let manager = TokioProcessManager::new();
300        let params = SpawnParams {
301            kind: ChildKind::External,
302            program: Some("sleep".to_string()),
303            args: vec!["10".to_string()], // Long sleep to test force kill
304            env: vec![],
305            cwd: None,
306            capabilities: CapabilityBundle::default(),
307        };
308        let handle = manager.spawn_child(params).await.unwrap();
309        // Terminate with force=true, should kill immediately
310        let result = manager.terminate_tree(handle.pid, true).await;
311        assert!(result.is_ok(), "force termination should succeed");
312    }
313}
314
315/* pid1-rs license:
316MIT License
317
318Copyright (c) 2023 FP Complete
319
320Permission is hereby granted, free of charge, to any person obtaining a copy
321of this software and associated documentation files (the "Software"), to deal
322in the Software without restriction, including without limitation the rights
323to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
324copies of the Software, and to permit persons to whom the Software is
325furnished to do so, subject to the following conditions:
326
327The above copyright notice and this permission notice shall be included in all
328copies or substantial portions of the Software.
329
330THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
331IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
332FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
333AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
334LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
335OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
336SOFTWARE.
337 */