ctoolbox/utilities/
process.rs1use anyhow::{Result, anyhow};
4use serde::{Deserialize, Serialize};
5use std::collections::HashMap;
6use std::env;
7use std::io::Write;
8use std::process::{Command, Stdio};
9use std::sync::Arc;
10use std::sync::Mutex;
11
12use crate::json;
13use crate::utilities::ipc::{Channel, IPC_ARG};
14use crate::utilities::{generate_authentication_key, send_message};
15
16#[derive(Clone, Serialize, Deserialize, Debug)]
18pub struct Process {
19 pub pid: u64,
20 pub system_pid: u64,
21 pub channel: Channel,
22 pub parent_pid: u64,
24}
25
26#[derive(Clone, Serialize, Deserialize, Default, Debug)]
28pub struct ProcessGroup {
29 pub group_id: u64,
30 pub members: Vec<u64>,
31 }
33
34#[derive(Clone, Serialize, Deserialize, Default, Debug)]
36pub struct ProcessManager {
37 pub port: u16,
38 pub ipc_control_channel: Channel,
39 pub processes: HashMap<u64, Process>,
40 pub last_id: u64,
42 pub process_groups: HashMap<u64, ProcessGroup>,
44}
45
46impl ProcessManager {
47 pub fn last_index(&self) -> u64 {
49 self.last_id
50 }
51 pub fn next_index(&self) -> u64 {
53 self.last_id + 1
54 }
55 pub fn last_process(&self) -> Result<Process> {
57 self.processes
58 .get(&self.last_index())
59 .cloned()
60 .ok_or_else(|| anyhow!("Last process not found"))
61 }
62 pub fn workspace_channel(&self) -> Result<Channel> {
67 let ch = self
68 .processes
69 .get(&0)
70 .map(|p| p.channel.clone())
71 .ok_or_else(|| anyhow!("Workspace process not found"))?;
72 Ok(ch)
76 }
77 pub fn add_process_group(&mut self, group: ProcessGroup) {
79 self.process_groups.insert(group.group_id, group);
80 }
81 pub fn add_process_to_group(&mut self, group_id: u64, pid: u64) {
83 if let Some(group) = self.process_groups.get_mut(&group_id) {
84 group.members.push(pid);
85 }
86 }
87}
88
89pub async fn start_process(
92 manager: &Arc<Mutex<ProcessManager>>,
93 parent_pid: u64,
94 args: Vec<String>,
95) -> Result<Process> {
96 let manager = manager
97 .lock()
98 .map_err(|_| anyhow!("Failed to lock process manager"))?;
99 let mut manager = manager;
100 let next_id = manager.next_index();
101 let first_arg = args
102 .first()
103 .ok_or_else(|| anyhow!("No command specified to start_process"))?;
104 let subprocess_channel = Channel {
105 name: format!("com.ctoolbox.p{next_id}.{first_arg}"),
106 port: manager.port,
107 authentication_key: generate_authentication_key(),
108 };
109
110 let path = env::current_exe()
111 .map_err(|e| anyhow!("failed to get current executable path: {e}"))?;
112 let mut new_args = vec![
113 IPC_ARG.to_string(),
114 subprocess_channel.to_arg_string(),
115 next_id.to_string(),
116 ];
117 new_args.extend(args);
118 #[allow(clippy::zombie_processes)]
119 let mut _process = Command::new(path.clone())
120 .args(new_args)
121 .stdin(Stdio::piped())
122 .stdout(Stdio::inherit())
123 .stderr(Stdio::inherit())
124 .spawn()
125 .map_err(|e| anyhow!("failed to execute server process: {e}"))?;
126 let subprocess_stdin = _process
127 .stdin
128 .as_mut()
129 .ok_or_else(|| anyhow!("Failed to get subprocess stdin"))?;
130 subprocess_stdin
131 .write_all(subprocess_channel.authentication_key.as_bytes())
132 .map_err(|e| {
133 anyhow!("Failed to write authentication key to subprocess: {e}")
134 })?;
135 let system_pid = u64::from(_process.id());
136 send_message(
137 &manager.ipc_control_channel,
138 json!({"type": "add_channel", "channel": subprocess_channel})
139 .to_string()
140 .as_str(),
141 );
142 let new_process = Process {
153 pid: next_id,
154 system_pid,
155 channel: subprocess_channel,
156 parent_pid,
157 };
158
159 manager.processes.insert(next_id, new_process.clone());
160 manager.last_id = next_id;
161
162 manager.last_process()
166}