ctoolbox/workspace/
ipc_old.rs

1use crate::cli::Invocation;
2use crate::utilities::ipc::{Channel, channel_from_json_string};
3use crate::utilities::json::{jq, jqq};
4use crate::utilities::serde_value::get_as_json_string_from_json_string;
5use crate::utilities::*;
6use crate::workspace::ipc_old::dispatch::dispatch_call;
7use crate::workspace::{get_global_process_manager, initialize_process_state};
8use anyhow::{anyhow, bail, ensure};
9use clap::Parser;
10use serde_json::Value;
11use std::collections::HashMap;
12use std::error::Error;
13use std::sync::LazyLock;
14
15pub mod dispatch;
16pub mod server;
17
18pub static IPC_API: LazyLock<HashMap<String, Vec<&str>>> =
19    LazyLock::new(|| {
20        HashMap::from([
21            ("formats".to_string(), vec!["convert_if_needed"]),
22            (
23                "io".to_string(),
24                vec!["print", "poll_hid", "start_local_web_ui"],
25            ),
26            ("network".to_string(), vec!["get_asset", "get_url"]),
27            (
28                "renderer".to_string(),
29                vec!["test_echo", "start", "get_frame"],
30            ),
31            ("storage".to_string(), vec!["set", "get"]),
32            (
33                "workspace".to_string(),
34                vec![
35                    "pc_restart",
36                    "pc_shutdown",
37                    "workspace_restart",
38                    "workspace_shutdown",
39                ],
40            ),
41        ])
42    });
43
44pub fn resolve_workspace_channel() -> Result<Channel> {
45    let pm = get_global_process_manager()?;
46    let pm = pm.lock();
47    if pm.is_err() {
48        bail!("Failed to lock process manager");
49    }
50    let pm = pm.unwrap();
51    let channel = pm.workspace_channel()?;
52    Ok(channel)
53}
54
55pub fn subprocess_init(invocation: &Invocation) -> Result<()> {
56    // This is run in subprocesses only
57
58    initialize_process_state(invocation)?;
59
60    let args = match invocation {
61        Invocation::Subprocess(args) => args,
62        _ => bail!("Not a subprocess invocation"),
63    };
64
65    let channel = resolve_workspace_channel()?;
66
67    let service = &args.service_name;
68
69    ensure!(IPC_API.contains_key(service), "Unknown service");
70
71    debug!(format!("Service started: {}", service));
72    loop {
73        let message = json!({ "type": "poll_for_task" });
74        let response = send_message(&channel, message.to_string().as_str());
75
76        let response_type = jq(".type", response.as_str()).unwrap();
77
78        if response_type == "new_task" {
79            let new_task_method = jq(".method", response.as_str()).unwrap();
80            let new_task_args =
81                get_as_json_string_from_json_string(response.as_str(), "args")?;
82            let msgid = jq(".msgid", response.as_str())
83                .unwrap()
84                .parse::<u64>()
85                .unwrap();
86            let channel_copy =
87                channel_from_json_string(json!(channel).as_str()).unwrap();
88            std::thread::spawn({
89                let service = service.clone();
90                let args = new_task_args.clone();
91                move || {
92                    dispatch_call(
93                        service.as_str(),
94                        &channel_copy,
95                        msgid,
96                        new_task_method.as_str(),
97                        if let Some(args) = args {
98                            args.clone()
99                        } else {
100                            "null".to_string()
101                        }
102                        .as_str(),
103                    );
104                }
105            });
106        } else if response_type == "no_new_tasks" {
107            usleep(100000);
108        } else {
109            log!(
110                format!("Received unexpected IPC response: {response}")
111                    .as_str()
112            );
113            usleep(100000);
114        }
115    }
116}
117
118pub fn send_message(channel: &Channel, message: &str) -> String {
119    maybe_send_message(channel, message).expect("Failed to send IPC message")
120}
121
122pub fn maybe_send_message(
123    channel: &Channel,
124    message: &str,
125) -> Result<String, Box<dyn Error>> {
126    let message_type = jqq(".type", message).unwrap_or_default();
127    if message_type != "add_channel"
128        && message_type != "poll_for_task"
129        && message_type != "poll_for_result"
130    {
131        log!(
132            format!("Sending message to channel {}: {}", channel.name, message)
133                .as_str()
134        );
135    }
136
137    let response = ureq::post(
138        format!("http://127.0.0.1:{}/hc_ipc", channel.port).as_str(),
139    )
140    .header("X-CollectiveToolbox-IPCAuth", json!(channel))
141    .send(message.as_bytes())?
142    .body_mut()
143    .read_to_string()?;
144    Ok(response)
145}
146
147pub fn wait_for_message_sync(channel: &Channel, msgid: u64) -> String {
148    loop {
149        let message = json!({ "type": "poll_for_result", "msgid": msgid });
150        let response = send_message(channel, message.to_string().as_str());
151        let response_type = jq(".type", response.as_str()).unwrap();
152
153        if response_type == "result" {
154            return jq(".content", response.as_str()).unwrap();
155        } else if response_type == "pending" {
156            usleep(100000);
157        } else {
158            log!(
159                format!("Received unexpected IPC response: {response}")
160                    .as_str()
161            );
162            usleep(100000);
163        }
164    }
165}
166
167pub fn listen_for_message(channel: &Channel, msgid: u64) -> String {
168    wait_for_message_sync(channel, msgid)
169}
170
171pub fn call_ipc_sync(
172    channel: &Channel,
173    method: &str,
174    args: Value,
175) -> Result<String> {
176    #[cfg(test)]
177    {
178        // In test mode, run IPC calls in-process.
179        // You may want to mock or stub the result for test mode.
180
181        use crate::workspace::get_global_process_manager;
182        use crate::workspace::ipc_old::dispatch::ipc_call_method;
183        let pm = get_global_process_manager()?;
184        let result = ipc_call_method(method, &args, Some(pm));
185        return Ok(json!(result).to_string());
186    }
187
188    let message = json!({ "type": "call", "method": method, "args": args });
189    let response = send_message(channel, &message.to_string());
190    let response_type = jqq(".type", &response).unwrap_or_default();
191
192    if response_type != "call_pending" {
193        return Err(anyhow!("Unexpected IPC response: {response}"));
194    }
195
196    let msgid = jq(".msgid", &response)
197        .map_err(|e| {
198            anyhow!("Failed to get msgid from response: {response}, error: {e}")
199        })?
200        .parse::<u64>()?;
201
202    Ok(wait_for_message_sync(channel, msgid))
203}
204
205pub async fn wait_for_message(channel: &Channel, msgid: u64) -> String {
206    loop {
207        let message = json!({ "type": "poll_for_result", "msgid": msgid });
208        let response = send_message(channel, message.to_string().as_str());
209        let response_type = jq(".type", response.as_str()).unwrap();
210
211        if response_type == "result" {
212            return jq(".content", response.as_str()).unwrap();
213        } else if response_type == "pending" {
214            tokio::time::sleep(std::time::Duration::from_micros(100_000)).await;
215        } else {
216            log!(
217                format!("Received unexpected IPC response: {response}")
218                    .as_str()
219            );
220            tokio::time::sleep(std::time::Duration::from_micros(100_000)).await;
221        }
222    }
223}
224
225pub async fn call_ipc(
226    channel: &Channel,
227    method: &str,
228    args: Value,
229) -> Result<String> {
230    #[cfg(test)]
231    {
232        use crate::workspace::get_global_process_manager;
233        use crate::workspace::ipc_old::dispatch::ipc_call_method;
234
235        let pm = get_global_process_manager()?;
236        let result = ipc_call_method(method, &args, Some(pm));
237        return Ok(json!(result).to_string());
238    }
239
240    let message = json!({ "type": "call", "method": method, "args": args });
241    let response = send_message(channel, &message.to_string());
242    let response_type = jqq(".type", &response).unwrap_or_default();
243
244    if response_type != "call_pending" {
245        return Err(anyhow!("Unexpected IPC response: {response}"));
246    }
247
248    let msgid = jq(".msgid", &response)
249        .map_err(|e| {
250            anyhow!("Failed to get msgid from response: {response}, error: {e}")
251        })?
252        .parse::<u64>()?;
253
254    Ok(wait_for_message(channel, msgid).await)
255}