1use crate::cli::{self, Cli};
2use crate::storage::get_storage_dir;
3use crate::utilities::ipc::{Channel, channel_from_args_and_key};
4use crate::utilities::logging::setup_logger;
5use crate::utilities::process::{Process, ProcessManager, start_process};
6use crate::utilities::resource_lock::check_filesystem_lock_support;
7use crate::utilities::*;
8use crate::workspace::ipc_old::server::start_ipc_server;
9use crate::workspace::ipc_old::subprocess_init;
10use portpicker::pick_unused_port;
11pub use serde_json::json as utilities_serde_json_json;
12use std::collections::HashMap;
13use std::sync::Arc;
14use std::sync::Mutex;
15use text_io::read;
16
17pub mod ipc;
18pub mod ipc_old;
19use once_cell::sync::OnceCell;
20
21static PROCESS_MANAGER: OnceCell<Arc<Mutex<ProcessManager>>> = OnceCell::new();
33
34pub fn set_global_process_manager(
36 pm: Arc<Mutex<ProcessManager>>,
37) -> anyhow::Result<()> {
38 #[cfg(test)]
39 {
40 let _ = PROCESS_MANAGER.get();
42 if PROCESS_MANAGER.get().is_some() {
43 return Ok(());
44 }
45 }
46 PROCESS_MANAGER
47 .set(pm)
48 .map_err(|_| anyhow::anyhow!("ProcessManager already set"))
49}
50
51pub fn get_global_process_manager() -> anyhow::Result<Arc<Mutex<ProcessManager>>>
53{
54 PROCESS_MANAGER
55 .get()
56 .cloned()
57 .ok_or_else(|| anyhow::anyhow!("ProcessManager not initialized"))
58}
59
60pub struct WorkspaceState {
61 pub processes: Arc<Mutex<ProcessManager>>,
62}
63
64pub fn initialize_process_state(
69 invocation: &cli::Invocation,
70) -> anyhow::Result<WorkspaceState> {
71 let mut pm;
72 if let cli::Invocation::Subprocess(subproc) = &invocation {
73 let authentication_key: String = read!("{}\n");
74 let control_channel =
75 channel_from_args_and_key(&subproc.ipc, authentication_key);
76 pm = ProcessManager {
77 port: 0,
78 ipc_control_channel: control_channel.clone(),
79 processes: HashMap::new(),
80 ..Default::default()
81 };
82 pm.processes.insert(
83 0,
84 Process {
85 pid: 0,
86 system_pid: 0,
87 channel: control_channel,
88 parent_pid: 0,
89 },
90 );
91 } else {
92 pm = ProcessManager {
93 port: 0,
94 ipc_control_channel: Channel {
95 name: String::new(),
96 port: 0,
97 authentication_key: String::new(),
98 },
99 processes: HashMap::new(),
100 ..Default::default()
101 };
102 pm.processes.insert(
103 0,
104 Process {
105 pid: 0,
106 system_pid: 0,
107 channel: Channel {
108 name: String::new(),
109 port: 0,
110 authentication_key: String::new(),
111 },
112 parent_pid: 0,
113 },
114 );
115
116 setup_panic_hooks(&pm);
117 }
118
119 let pm_arc = Arc::new(Mutex::new(pm));
120 set_global_process_manager(pm_arc.clone())?;
121
122 Ok(WorkspaceState {
123 processes: get_global_process_manager()?,
124 })
125}
126
127pub async fn entry() -> anyhow::Result<()> {
128 check_filesystem_lock_support()?;
129
130 let invocation = cli::parse_invocation()?;
132
133 if let cli::Invocation::Subprocess(_) = &invocation {
135 setup_logger(&invocation)?;
136 subprocess_init(&invocation)?;
137 return Ok(());
138 }
139 let cli = invocation.expect_cli();
141
142 if let Some(code) = cli::maybe_run_lightweight(cli).await? {
144 std::process::exit(code);
147 }
148
149 setup_logger(&invocation)?;
151 let state = initialize_process_state(&invocation)?;
152 boot(state, cli).await?;
153 Ok(())
154}
155
156fn setup_ipc_server(
159 pm_arc: &Arc<Mutex<ProcessManager>>,
160 args: &Cli,
161) -> anyhow::Result<(Channel, String)> {
162 let port: u16 = if args.ctoolbox_ipc_port.is_none() {
163 pick_unused_port().expect("No ports free")
164 } else {
165 args.ctoolbox_ipc_port.unwrap()
166 };
167 let authentication_key: String = generate_authentication_key();
168 let ipc_control_channel: Channel = Channel {
169 name: "ipc_control".to_string(),
170 port,
171 authentication_key: authentication_key.clone(),
172 };
173
174 let process_manager = pm_arc.lock();
175 if process_manager.is_err() {
176 return Err(anyhow::anyhow!("Failed to lock process manager"));
177 }
178 let mut process_manager = process_manager.unwrap();
179 process_manager.port = port;
180 process_manager.ipc_control_channel = ipc_control_channel.clone();
181 drop(process_manager);
182
183 let authentication_key_for_server = authentication_key.clone();
184 log!("Waiting for IPC server to come up...");
185 let pm_for_server = pm_arc.clone();
186 std::thread::spawn(move || {
187 start_ipc_server(pm_for_server, port, authentication_key_for_server);
188 });
189 loop {
190 let response = maybe_send_message(
191 &ipc_control_channel,
192 json!({ "type": "ipc_ping" }).as_str(),
193 );
194 if response.is_ok() {
195 break;
196 }
197 sleep(1);
198 }
199 log!("IPC server OK.");
200
201 Ok((ipc_control_channel, authentication_key))
202}
203
204async fn boot(state: WorkspaceState, args: &Cli) -> Result<()> {
206 let pm_arc = get_global_process_manager()?;
207 let (ipc_control_channel, _authentication_key) =
208 setup_ipc_server(&pm_arc, args)?;
209
210 get_storage_dir().unwrap();
211
212 let renderer =
213 start_process(&pm_arc, 0, vec!["renderer".to_string()]).await?;
214
215 let localwebui = start_process(&pm_arc, 0, vec!["io".to_string()]).await?;
216
217 call_ipc(&localwebui.channel, "start_local_web_ui", "".into()).await?;
218 loop {
235 sleep(10);
236 }
237}
238
239pub fn workspace_restart(manager: &Arc<Mutex<ProcessManager>>) {
240 std::process::exit(0);
241}
242
243pub fn workspace_shutdown(manager: &Arc<Mutex<ProcessManager>>) {
244 let answer: String = read!("{}\n");
245 if answer.to_lowercase().starts_with('y') {
246 std::process::exit(0);
247 }
248}
249
250fn impl_boot_for_test() -> Result<Arc<Mutex<ProcessManager>>> {
252 setup_logger(&cli::Invocation::User(Cli {
253 ctoolbox_ipc_port: None,
254 command: None,
255 }))?;
256
257 let mut pm = ProcessManager {
259 port: 0,
260 ipc_control_channel: Channel {
261 name: String::new(),
262 port: 0,
263 authentication_key: String::new(),
264 },
265 processes: HashMap::new(),
266 ..Default::default()
267 };
268 pm.processes.insert(
269 0,
270 Process {
271 pid: 0,
272 system_pid: 0,
273 channel: Channel {
274 name: String::new(),
275 port: 0,
276 authentication_key: String::new(),
277 },
278 parent_pid: 0,
279 },
280 );
281 setup_panic_hooks(&pm);
282 let pm_arc = Arc::new(Mutex::new(pm));
283
284 set_global_process_manager(pm_arc.clone())?;
288
289 Ok(pm_arc)
290}
291
292pub struct TestHarness {
293 pub pm: Arc<Mutex<ProcessManager>>,
294}
295
296static HARNESS: OnceCell<TestHarness> = OnceCell::new();
297
298impl TestHarness {
299 #[tracing::instrument]
300 pub fn try_init() -> Result<&'static TestHarness> {
302 if let Some(harness) = HARNESS.get() {
304 return Ok(harness);
305 }
306
307 let pm = impl_boot_for_test();
308 match pm {
309 Ok(pm_arc) => {
310 Ok(HARNESS.get_or_init(|| TestHarness { pm: pm_arc }))
311 }
312 Err(e) => Err(e),
313 }
314 }
315}
316
317pub fn boot_for_test() -> Option<&'static TestHarness> {
326 if let Ok(harness) = TestHarness::try_init() {
327 Some(harness)
328 } else {
329 None
331 }
332}