1use crate::cli::Invocation;
2use crate::utilities::ipc::{Channel, channel_from_json_string};
3use crate::utilities::json::{jq, jqq};
4use crate::utilities::serde_value::get_as_json_string_from_json_string;
5use crate::utilities::*;
6use crate::workspace::ipc_old::dispatch::dispatch_call;
7use crate::workspace::{get_global_process_manager, initialize_process_state};
8use anyhow::{anyhow, bail, ensure};
9use clap::Parser;
10use serde_json::Value;
11use std::collections::HashMap;
12use std::error::Error;
13use std::sync::LazyLock;
14
15pub mod dispatch;
16pub mod server;
17
18pub static IPC_API: LazyLock<HashMap<String, Vec<&str>>> =
19 LazyLock::new(|| {
20 HashMap::from([
21 ("formats".to_string(), vec!["convert_if_needed"]),
22 (
23 "io".to_string(),
24 vec!["print", "poll_hid", "start_local_web_ui"],
25 ),
26 ("network".to_string(), vec!["get_asset", "get_url"]),
27 (
28 "renderer".to_string(),
29 vec!["test_echo", "start", "get_frame"],
30 ),
31 ("storage".to_string(), vec!["set", "get"]),
32 (
33 "workspace".to_string(),
34 vec![
35 "pc_restart",
36 "pc_shutdown",
37 "workspace_restart",
38 "workspace_shutdown",
39 ],
40 ),
41 ])
42 });
43
44pub fn resolve_workspace_channel() -> Result<Channel> {
45 let pm = get_global_process_manager()?;
46 let pm = pm.lock();
47 if pm.is_err() {
48 bail!("Failed to lock process manager");
49 }
50 let pm = pm.unwrap();
51 let channel = pm.workspace_channel()?;
52 Ok(channel)
53}
54
55pub fn subprocess_init(invocation: &Invocation) -> Result<()> {
56 initialize_process_state(invocation)?;
59
60 let args = match invocation {
61 Invocation::Subprocess(args) => args,
62 _ => bail!("Not a subprocess invocation"),
63 };
64
65 let channel = resolve_workspace_channel()?;
66
67 let service = &args.service_name;
68
69 ensure!(IPC_API.contains_key(service), "Unknown service");
70
71 debug!(format!("Service started: {}", service));
72 loop {
73 let message = json!({ "type": "poll_for_task" });
74 let response = send_message(&channel, message.to_string().as_str());
75
76 let response_type = jq(".type", response.as_str()).unwrap();
77
78 if response_type == "new_task" {
79 let new_task_method = jq(".method", response.as_str()).unwrap();
80 let new_task_args =
81 get_as_json_string_from_json_string(response.as_str(), "args")?;
82 let msgid = jq(".msgid", response.as_str())
83 .unwrap()
84 .parse::<u64>()
85 .unwrap();
86 let channel_copy =
87 channel_from_json_string(json!(channel).as_str()).unwrap();
88 std::thread::spawn({
89 let service = service.clone();
90 let args = new_task_args.clone();
91 move || {
92 dispatch_call(
93 service.as_str(),
94 &channel_copy,
95 msgid,
96 new_task_method.as_str(),
97 if let Some(args) = args {
98 args.clone()
99 } else {
100 "null".to_string()
101 }
102 .as_str(),
103 );
104 }
105 });
106 } else if response_type == "no_new_tasks" {
107 usleep(100000);
108 } else {
109 log!(
110 format!("Received unexpected IPC response: {response}")
111 .as_str()
112 );
113 usleep(100000);
114 }
115 }
116}
117
118pub fn send_message(channel: &Channel, message: &str) -> String {
119 maybe_send_message(channel, message).expect("Failed to send IPC message")
120}
121
122pub fn maybe_send_message(
123 channel: &Channel,
124 message: &str,
125) -> Result<String, Box<dyn Error>> {
126 let message_type = jqq(".type", message).unwrap_or_default();
127 if message_type != "add_channel"
128 && message_type != "poll_for_task"
129 && message_type != "poll_for_result"
130 {
131 log!(
132 format!("Sending message to channel {}: {}", channel.name, message)
133 .as_str()
134 );
135 }
136
137 let response = ureq::post(
138 format!("http://127.0.0.1:{}/hc_ipc", channel.port).as_str(),
139 )
140 .header("X-CollectiveToolbox-IPCAuth", json!(channel))
141 .send(message.as_bytes())?
142 .body_mut()
143 .read_to_string()?;
144 Ok(response)
145}
146
147pub fn wait_for_message_sync(channel: &Channel, msgid: u64) -> String {
148 loop {
149 let message = json!({ "type": "poll_for_result", "msgid": msgid });
150 let response = send_message(channel, message.to_string().as_str());
151 let response_type = jq(".type", response.as_str()).unwrap();
152
153 if response_type == "result" {
154 return jq(".content", response.as_str()).unwrap();
155 } else if response_type == "pending" {
156 usleep(100000);
157 } else {
158 log!(
159 format!("Received unexpected IPC response: {response}")
160 .as_str()
161 );
162 usleep(100000);
163 }
164 }
165}
166
167pub fn listen_for_message(channel: &Channel, msgid: u64) -> String {
168 wait_for_message_sync(channel, msgid)
169}
170
171pub fn call_ipc_sync(
172 channel: &Channel,
173 method: &str,
174 args: Value,
175) -> Result<String> {
176 #[cfg(test)]
177 {
178 use crate::workspace::get_global_process_manager;
182 use crate::workspace::ipc_old::dispatch::ipc_call_method;
183 let pm = get_global_process_manager()?;
184 let result = ipc_call_method(method, &args, Some(pm));
185 return Ok(json!(result).to_string());
186 }
187
188 let message = json!({ "type": "call", "method": method, "args": args });
189 let response = send_message(channel, &message.to_string());
190 let response_type = jqq(".type", &response).unwrap_or_default();
191
192 if response_type != "call_pending" {
193 return Err(anyhow!("Unexpected IPC response: {response}"));
194 }
195
196 let msgid = jq(".msgid", &response)
197 .map_err(|e| {
198 anyhow!("Failed to get msgid from response: {response}, error: {e}")
199 })?
200 .parse::<u64>()?;
201
202 Ok(wait_for_message_sync(channel, msgid))
203}
204
205pub async fn wait_for_message(channel: &Channel, msgid: u64) -> String {
206 loop {
207 let message = json!({ "type": "poll_for_result", "msgid": msgid });
208 let response = send_message(channel, message.to_string().as_str());
209 let response_type = jq(".type", response.as_str()).unwrap();
210
211 if response_type == "result" {
212 return jq(".content", response.as_str()).unwrap();
213 } else if response_type == "pending" {
214 tokio::time::sleep(std::time::Duration::from_micros(100_000)).await;
215 } else {
216 log!(
217 format!("Received unexpected IPC response: {response}")
218 .as_str()
219 );
220 tokio::time::sleep(std::time::Duration::from_micros(100_000)).await;
221 }
222 }
223}
224
225pub async fn call_ipc(
226 channel: &Channel,
227 method: &str,
228 args: Value,
229) -> Result<String> {
230 #[cfg(test)]
231 {
232 use crate::workspace::get_global_process_manager;
233 use crate::workspace::ipc_old::dispatch::ipc_call_method;
234
235 let pm = get_global_process_manager()?;
236 let result = ipc_call_method(method, &args, Some(pm));
237 return Ok(json!(result).to_string());
238 }
239
240 let message = json!({ "type": "call", "method": method, "args": args });
241 let response = send_message(channel, &message.to_string());
242 let response_type = jqq(".type", &response).unwrap_or_default();
243
244 if response_type != "call_pending" {
245 return Err(anyhow!("Unexpected IPC response: {response}"));
246 }
247
248 let msgid = jq(".msgid", &response)
249 .map_err(|e| {
250 anyhow!("Failed to get msgid from response: {response}, error: {e}")
251 })?
252 .parse::<u64>()?;
253
254 Ok(wait_for_message(channel, msgid).await)
255}