1use crate::{debug, debug_fmt, get_storage_dir};
44use anyhow::{Context, Result, bail};
45use fs2::FileExt; use log::{error, warn};
47use std::collections::HashMap;
48#[cfg(unix)]
49use std::os::unix::fs::MetadataExt;
50#[cfg(windows)]
51use std::os::windows::fs::MetadataExt;
52use std::path::Path;
53use std::sync::{Arc, Condvar, Mutex, OnceLock};
54use std::thread::ThreadId;
55use std::time::{Duration, Instant};
56use std::{fs, fs::File, thread};
57
58#[derive(Debug)]
59struct ResourceLockEntry {
60 state: Mutex<LockState>,
61 cvar: Condvar,
62}
63
64#[derive(Debug)]
65struct LockState {
66 owner: Option<ThreadId>,
67 recursion: usize,
68 refcount: usize,
69 file: Option<File>,
70 lock_path: Option<std::path::PathBuf>, }
72
73impl ResourceLockEntry {
74 fn new() -> Self {
75 Self {
76 state: Mutex::new(LockState {
77 owner: None,
78 recursion: 0,
79 refcount: 0,
80 file: None,
81 lock_path: None,
82 }),
83 cvar: Condvar::new(),
84 }
85 }
86}
87
88type LockKey = (String, String); static MODEL_LOCK_TABLE: OnceLock<
91 Mutex<HashMap<LockKey, Arc<ResourceLockEntry>>>,
92> = OnceLock::new();
93
94fn lock_table() -> &'static Mutex<HashMap<LockKey, Arc<ResourceLockEntry>>> {
95 MODEL_LOCK_TABLE.get_or_init(|| Mutex::new(HashMap::new()))
96}
97
98pub trait Lock {}
99
100#[derive(Debug)]
106pub struct ResourceLock {
107 resource_type: String,
108 id: String,
109 entry: Arc<ResourceLockEntry>,
110}
111
112impl Lock for ResourceLock {}
113
114impl ResourceLock {
115 pub fn acquire<T: ToString>(resource_type: &str, id: &T) -> Result<Self> {
117 #[cfg(not(any(unix, windows)))]
119 warn!(
120 "Unsupported platform: resource locking is only validated on Unix-like OSes and Windows"
121 );
122
123 let key = (resource_type.to_string(), id.to_string());
124 let entry = {
126 let Ok(mut table) = lock_table().lock() else {
127 bail!("Failed to lock global table");
128 };
129 table
130 .entry(key.clone())
131 .or_insert_with(|| Arc::new(ResourceLockEntry::new()))
132 .clone()
133 };
134
135 let current = thread::current().id();
136
137 let Ok(mut state) = entry.state.lock() else {
139 bail!("Failed to lock entry state");
140 };
141 let start = Instant::now();
142 loop {
143 match state.owner {
144 None => {
145 state.owner = Some(current);
147 state.recursion = 1;
148 state.refcount += 1;
149
150 if state.file.is_none() {
151 let root =
153 get_storage_dir().context("No storage dir")?;
154 let locks_dir = root.join("locks").join(resource_type);
155 fs::create_dir_all(&locks_dir)?;
156 let lock_path = locks_dir.join(id.to_string());
157
158 let file =
159 ResourceLock::create_and_lock_file(&lock_path)?;
160
161 state.file = Some(file);
162 state.lock_path = Some(lock_path); }
164
165 break;
166 }
167 Some(owner) if owner == current => {
168 state.recursion += 1;
170 state.refcount += 1;
171 break;
172 }
173 Some(_) => {
174 let timeout = Duration::from_secs(5);
177 let Ok((new_state, result)) =
178 entry.cvar.wait_timeout(state, timeout)
179 else {
180 bail!("Failed to wait on condition variable")
181 };
182
183 state = new_state;
184 if result.timed_out() {
185 warn!(
186 "Thread {:?} has been waiting {:?} for lock on {:?}",
187 current,
188 start.elapsed(),
189 key
190 );
191 }
193 if start.elapsed() > (2 * timeout) {
194 bail!(
195 "Thread {:?} has been waiting {:?} for lock on {:?}",
196 current,
197 start.elapsed(),
198 key
199 );
200 }
201 }
202 }
203 }
204 drop(state);
205
206 Ok(ResourceLock {
207 resource_type: resource_type.to_string(),
208 id: id.to_string(),
209 entry,
210 })
211 }
212
213 fn create_and_lock_file(lock_path: &std::path::Path) -> Result<File> {
215 loop {
216 let file = std::fs::OpenOptions::new()
217 .create(true)
218 .truncate(true)
219 .read(true)
220 .write(true)
221 .open(lock_path)
222 .with_context(|| {
223 format!("Failed to open lock file {}", lock_path.display())
224 })?;
225
226 file.lock_exclusive().with_context(|| {
227 format!("Failed to lock resource file {}", lock_path.display())
228 })?;
229 debug!("Checking lock file {}", lock_path.display());
230
231 let match_result = path_and_descriptor_match(lock_path, &file);
233 if let Ok(path_and_descriptor_match) = match_result {
234 if path_and_descriptor_match {
235 return Ok(file);
236 }
237 }
238
239 debug!(
241 "Inode mismatch on lock file {}, retrying",
242 lock_path.display()
243 );
244 drop(file);
245 }
246 }
247}
248
249impl Drop for ResourceLock {
250 fn drop(&mut self) {
251 let mut remove_entry = false;
253 let mut lock_path: Option<std::path::PathBuf> = None;
254 {
255 let state_result = self.entry.state.lock();
256 match state_result {
257 Ok(mut state) => {
258 if state.recursion > 0 {
260 state.recursion -= 1;
261 } else {
262 error!(
263 "ResourceLock recursion underflow for {:?}/{}",
264 self.resource_type, self.id
265 );
266 }
267 if state.refcount > 0 {
268 state.refcount -= 1;
269 } else {
270 error!(
271 "ResourceLock refcount underflow for {:?}/{}",
272 self.resource_type, self.id
273 );
274 }
275
276 if state.recursion == 0 {
277 state.owner = None;
279 self.entry.cvar.notify_all();
281 }
282
283 if state.refcount == 0 {
284 state.file.take();
286 lock_path = state.lock_path.take(); remove_entry = true;
288 }
289 }
290 Err(e) => {
291 error!("Failed to lock entry state in drop: {e}");
292 }
295 }
296 }
297
298 if remove_entry {
299 if let Some(path) = lock_path {
301 debug_fmt!("Removing {}", path.display());
302 if let Err(e) = std::fs::remove_file(&path) {
303 if e.kind() != std::io::ErrorKind::NotFound {
305 error!(
306 "Failed to remove lock file {}: {:?}",
307 path.display(),
308 e
309 );
310 }
311 }
312 }
313
314 let key = (self.resource_type.clone(), self.id.clone());
315 let table_result = lock_table().lock();
316 match table_result {
317 Ok(mut table) => {
318 if let Some(current) = table.get(&key) {
319 if Arc::ptr_eq(current, &self.entry) {
320 table.remove(&key);
321 }
322 }
323 }
324 Err(e) => {
325 error!("Failed to lock global table in drop: {e}");
326 }
327 }
328 }
329 }
330}
331
332pub fn check_filesystem_lock_support() -> Result<()> {
333 #[cfg(not(any(unix, windows)))]
335 {
336 warn!(
337 "Warning: File locking may not be fully supported on this OS. Proceeding with caution."
338 );
339 }
340
341 let test_dir = get_storage_dir()?.join("locks").join("test");
343 fs::create_dir_all(&test_dir)?;
344 let test_path = test_dir.join("test_lock");
345
346 let file = fs::OpenOptions::new()
348 .create(true)
349 .truncate(false)
350 .read(true)
351 .write(true)
352 .open(&test_path)?;
353 file.lock_exclusive()?;
354
355 if !path_and_descriptor_match(&test_path, &file)? {
357 error!(
358 "Filesystem does not support reliable inode checks for locking. Bailing out."
359 );
360 bail!("Incompatible filesystem");
361 }
362
363 debug_fmt!("Removing {}", test_path.display());
365 fs::remove_file(&test_path)?;
366 debug_fmt!("Removing dir {}", test_dir.display());
367 fs::remove_dir(&test_dir)?;
368 drop(file); Ok(())
370}
371
372pub fn get_inode_or_file_index(path: &Path) -> Result<u64> {
373 let path_meta = fs::metadata(path)?;
374 #[cfg(unix)]
375 let path_inode = path_meta.ino();
376 #[cfg(windows)]
377 let path_inode = path_meta.file_index();
378
379 Ok(path_inode)
380}
381
382pub fn get_inode_or_file_index_from_descriptor(file: &File) -> Result<u64> {
383 let file_meta = file.metadata()?;
384 #[cfg(unix)]
385 let file_inode = file_meta.ino();
386 #[cfg(windows)]
387 let file_inode = file_meta.file_index();
388
389 Ok(file_inode)
390}
391
392pub fn path_and_descriptor_match(path: &Path, file: &File) -> Result<bool> {
393 let file_ino = get_inode_or_file_index_from_descriptor(file)?;
394 let path_ino = get_inode_or_file_index(path)?;
395
396 Ok(file_ino != 0 && path_ino != 0 && file_ino == path_ino)
398}
399
400#[cfg(test)]
401#[allow(clippy::unwrap_in_result, clippy::panic_in_result_fn)]
402mod tests {
403 use super::*;
404 use std::sync::atomic::{AtomicU64, Ordering::SeqCst};
405 use std::sync::mpsc;
406 use std::time::Duration;
407
408 fn unique_id(prefix: &str) -> String {
409 static COUNTER: AtomicU64 = AtomicU64::new(0);
410 let n = COUNTER.fetch_add(1, SeqCst);
411 format!("{}-pid{}-{}", prefix, std::process::id(), n)
412 }
413
414 #[test]
415 fn reentrant_acquire_same_thread() {
416 let id = unique_id("reentrant");
417 let _g1 =
418 ResourceLock::acquire("test_resource", &id).expect("first acquire");
419 let _g2 = ResourceLock::acquire("test_resource", &id)
420 .expect("second (re-entrant) acquire");
421 }
423
424 #[test]
425 fn contention_blocks_and_transfers_ownership() {
426 let id = unique_id("contend");
427 let g1 =
428 ResourceLock::acquire("test_resource", &id).expect("first acquire");
429 let (tx, rx) = mpsc::channel();
430 let id2 = id.clone();
431
432 let start = Instant::now();
433 let th = thread::spawn(move || {
434 let _g2 = ResourceLock::acquire("test_resource", &id2)
436 .expect("second acquire after blocking");
437 tx.send(Instant::now()).unwrap();
438 thread::sleep(Duration::from_millis(50));
440 });
441
442 thread::sleep(Duration::from_millis(200));
444 drop(g1);
445
446 let acquired_at = rx
448 .recv_timeout(Duration::from_secs(2))
449 .expect("thread did not acquire in time");
450 let waited_ms =
451 u64::try_from(acquired_at.duration_since(start).as_millis())
452 .unwrap();
453 assert!(
454 waited_ms >= 180, "Second thread acquired too early ({} ms), did it block?",
456 waited_ms
457 );
458
459 th.join().unwrap();
460 }
461
462 #[test]
463 fn independent_resources_do_not_block_each_other() {
464 let id1 = unique_id("indep-a");
465 let id2 = unique_id("indep-b");
466
467 let g1 =
468 ResourceLock::acquire("test_resource", &id1).expect("acquire id1");
469
470 let start = Instant::now();
472 let g2 =
473 ResourceLock::acquire("test_resource", &id2).expect("acquire id2");
474 let elapsed = start.elapsed();
475 assert!(
476 elapsed < Duration::from_millis(50),
477 "Independent resource acquisition took too long: {:?}",
478 elapsed
479 );
480
481 drop(g2);
482 drop(g1);
483 }
484
485 #[test]
486 fn lock_file_lifecycle_created_then_removed() {
487 let id = unique_id("file-lifecycle");
488 let root = get_storage_dir().expect("storage dir");
489 let lock_path = root.join("locks").join("test_resource").join(&id);
490
491 {
492 let _g =
493 ResourceLock::acquire("test_resource", &id).expect("acquire");
494 assert!(
496 lock_path.exists(),
497 "Lock file should exist while lock is held at {}",
498 lock_path.display()
499 );
500
501 let _g2 = ResourceLock::acquire("test_resource", &id)
503 .expect("re-entrant acquire");
504 assert!(
505 lock_path.exists(),
506 "Lock file should still exist while re-entrant lock is held at {}",
507 lock_path.display()
508 );
509 }
510
511 for _ in 0..10 {
514 if !lock_path.exists() {
515 break;
516 }
517 thread::sleep(Duration::from_millis(20));
518 }
519 assert!(
520 !lock_path.exists(),
521 "Lock file should be removed after the last handle is dropped at {}",
522 lock_path.display()
523 );
524 }
525
526 #[test]
527 fn check_filesystem_lock_support_smoke() {
528 check_filesystem_lock_support()
530 .expect("filesystem lock support check failed");
531 }
532}