ctoolbox/utilities/
process.rs

1//! Utilities for process management, including tracking and spawning subprocesses.
2
3use anyhow::{Result, anyhow};
4use serde::{Deserialize, Serialize};
5use std::collections::HashMap;
6use std::env;
7use std::io::Write;
8use std::process::{Command, Stdio};
9use std::sync::Arc;
10use std::sync::Mutex;
11
12use crate::json;
13use crate::utilities::ipc::{Channel, IPC_ARG};
14use crate::utilities::{generate_authentication_key, send_message};
15
16/// Represents a managed process.
17#[derive(Clone, Serialize, Deserialize, Debug)]
18pub struct Process {
19    pub pid: u64,
20    pub system_pid: u64,
21    pub channel: Channel,
22    /// PID of the workspace or renderer process that started this process.
23    pub parent_pid: u64,
24}
25
26/// Represents a group or job handle for managing process subtrees.
27#[derive(Clone, Serialize, Deserialize, Default, Debug)]
28pub struct ProcessGroup {
29    pub group_id: u64,
30    pub members: Vec<u64>,
31    // Add more fields as needed for job control.
32}
33
34/// Manages all workspace and subprocesses.
35#[derive(Clone, Serialize, Deserialize, Default, Debug)]
36pub struct ProcessManager {
37    pub port: u16,
38    pub ipc_control_channel: Channel,
39    pub processes: HashMap<u64, Process>,
40    /// Monotonic counter for process IDs.
41    pub last_id: u64,
42    /// Tracks process groups/job handles for subtree management.
43    pub process_groups: HashMap<u64, ProcessGroup>,
44}
45
46impl ProcessManager {
47    /// Returns the last assigned process ID.
48    pub fn last_index(&self) -> u64 {
49        self.last_id
50    }
51    /// Returns the next process ID to be assigned.
52    pub fn next_index(&self) -> u64 {
53        self.last_id + 1
54    }
55    /// Returns the last process added.
56    pub fn last_process(&self) -> Result<Process> {
57        self.processes
58            .get(&self.last_index())
59            .cloned()
60            .ok_or_else(|| anyhow!("Last process not found"))
61    }
62    /// Returns the workspace channel. I think this is confusingly named,
63    /// because each subprocess talks to the workspace, not to other
64    /// subprocesses. Only the workspace itself should need to know its own
65    /// channel.
66    pub fn workspace_channel(&self) -> Result<Channel> {
67        let ch = self
68            .processes
69            .get(&0)
70            .map(|p| p.channel.clone())
71            .ok_or_else(|| anyhow!("Workspace process not found"))?;
72        // if (ch.name != "ipc_control" && ch.name != "") {
73        //     return Err(anyhow!("Workspace channel name mismatch"));
74        // }
75        Ok(ch)
76    }
77    /// Adds a new process group.
78    pub fn add_process_group(&mut self, group: ProcessGroup) {
79        self.process_groups.insert(group.group_id, group);
80    }
81    /// Adds a process to a group.
82    pub fn add_process_to_group(&mut self, group_id: u64, pid: u64) {
83        if let Some(group) = self.process_groups.get_mut(&group_id) {
84            group.members.push(pid);
85        }
86    }
87}
88
89/// Only the workspace should call this.
90/// Spawns a new process and tracks it with a monotonic ID.
91pub async fn start_process(
92    manager: &Arc<Mutex<ProcessManager>>,
93    parent_pid: u64,
94    args: Vec<String>,
95) -> Result<Process> {
96    let manager = manager
97        .lock()
98        .map_err(|_| anyhow!("Failed to lock process manager"))?;
99    let mut manager = manager;
100    let next_id = manager.next_index();
101    let first_arg = args
102        .first()
103        .ok_or_else(|| anyhow!("No command specified to start_process"))?;
104    let subprocess_channel = Channel {
105        name: format!("com.ctoolbox.p{next_id}.{first_arg}"),
106        port: manager.port,
107        authentication_key: generate_authentication_key(),
108    };
109
110    let path = env::current_exe()
111        .map_err(|e| anyhow!("failed to get current executable path: {e}"))?;
112    let mut new_args = vec![
113        IPC_ARG.to_string(),
114        subprocess_channel.to_arg_string(),
115        next_id.to_string(),
116    ];
117    new_args.extend(args);
118    #[allow(clippy::zombie_processes)]
119    let mut _process = Command::new(path.clone())
120        .args(new_args)
121        .stdin(Stdio::piped())
122        .stdout(Stdio::inherit())
123        .stderr(Stdio::inherit())
124        .spawn()
125        .map_err(|e| anyhow!("failed to execute server process: {e}"))?;
126    let subprocess_stdin = _process
127        .stdin
128        .as_mut()
129        .ok_or_else(|| anyhow!("Failed to get subprocess stdin"))?;
130    subprocess_stdin
131        .write_all(subprocess_channel.authentication_key.as_bytes())
132        .map_err(|e| {
133            anyhow!("Failed to write authentication key to subprocess: {e}")
134        })?;
135    let system_pid = u64::from(_process.id());
136    send_message(
137        &manager.ipc_control_channel,
138        json!({"type": "add_channel", "channel": subprocess_channel})
139            .to_string()
140            .as_str(),
141    );
142    /*let ready = listen_for_message(&channel).await;
143    debug!("Subprocess reply: {} {}", jq(".type", ready.as_str()).unwrap(), ready.as_str());
144    if jq(".type", ready.as_str()).unwrap() == "ready" {
145        debug!("Subprocess ready: {} {}", channel.name, get_service_name());
146    }
147
148    sleep(10);
149    let message = "hello from parent".to_string();
150    log("Sending hello message to child");
151    send_message(channel.clone(), message.as_str());*/
152    let new_process = Process {
153        pid: next_id,
154        system_pid,
155        channel: subprocess_channel,
156        parent_pid,
157    };
158
159    manager.processes.insert(next_id, new_process.clone());
160    manager.last_id = next_id;
161
162    // Optionally, add to a process group here if needed.
163    // manager.add_process_to_group(group_id, next_id);
164
165    manager.last_process()
166}