ctoolbox/utilities/
resource_lock.rs

1/*!
2 * Re-entrant lock manager that prevents multiple threads or processes from
3 * concurrently holding a lock on the same resource. I'm using it to mimic
4 * transactions in a database, but it could be used for other things too. It's
5 * LLM generated and I don't really understand all of it.
6 * Works on a best-effort basis; it's likely to be flaky on network filesystems,
7 * FUSE, non-POSIX OSes, etc.
8 *
9 * Guarantees, in principle:
10 * - Mutual exclusion across threads for a given (`resource_type`, id).
11 * - Re-entrant acquisition by the SAME thread won't deadlock (recursion
12 *   counter).
13 * - OS-level advisory file lock held exactly once while any logical
14 *   acquisitions exist.
15 * - Other threads attempting acquisition block until the owning thread fully
16 *   releases (recursion -> 0).
17 *
18 * Design:
19 * - Global table: `(resource_type, id)` -> `Arc<ResourceLockEntry>`.
20 * - `ResourceLockEntry` contains `(Mutex<LockState>, Condvar)`.
21 * - `LockState`:
22 *   - owner: `Option<ThreadId>`
23 *   - recursion: usize (depth for current owner)
24 *   - refcount: usize (number of active `ResourceLock` handles – across all
25 *     threads)
26 *   - file: `Option<File>` (file whose lifetime == OS lock lifetime)
27 *
28 * Acquisition steps (`ResourceLock::acquire)`:
29 * 1. Lookup/insert entry in global table.
30 * 2. Lock the entry's state.
31 * 3. If no owner: set owner = current thread, recursion = 1, refcount += 1.
32 *    - If file is None: open + lock file (OS lock).
33 * 4. Else if owner == current thread: recursion += 1; refcount += 1.
34 * 5. Else wait on Condvar until owner released (recursion == 0), then loop.
35 *
36 * Drop:
37 * - Lock state, decrement recursion and refcount.
38 * - If recursion == 0: owner = None; `notify_all()` to wake waiters.
39 * - If refcount == 0: drop file (releases OS lock) and remove entry from table.
40 *   Also remove the lock file on disk.
41 */
42
43use crate::{debug, debug_fmt, get_storage_dir};
44use anyhow::{Context, Result, bail};
45use fs2::FileExt; // Cross-platform advisory file locking
46use log::{error, warn};
47use std::collections::HashMap;
48#[cfg(unix)]
49use std::os::unix::fs::MetadataExt;
50#[cfg(windows)]
51use std::os::windows::fs::MetadataExt;
52use std::path::Path;
53use std::sync::{Arc, Condvar, Mutex, OnceLock};
54use std::thread::ThreadId;
55use std::time::{Duration, Instant};
56use std::{fs, fs::File, thread};
57
58#[derive(Debug)]
59struct ResourceLockEntry {
60    state: Mutex<LockState>,
61    cvar: Condvar,
62}
63
64#[derive(Debug)]
65struct LockState {
66    owner: Option<ThreadId>,
67    recursion: usize,
68    refcount: usize,
69    file: Option<File>,
70    lock_path: Option<std::path::PathBuf>, // Store lock file path
71}
72
73impl ResourceLockEntry {
74    fn new() -> Self {
75        Self {
76            state: Mutex::new(LockState {
77                owner: None,
78                recursion: 0,
79                refcount: 0,
80                file: None,
81                lock_path: None,
82            }),
83            cvar: Condvar::new(),
84        }
85    }
86}
87
88type LockKey = (String, String); // (resource_type, id)
89
90static MODEL_LOCK_TABLE: OnceLock<
91    Mutex<HashMap<LockKey, Arc<ResourceLockEntry>>>,
92> = OnceLock::new();
93
94fn lock_table() -> &'static Mutex<HashMap<LockKey, Arc<ResourceLockEntry>>> {
95    MODEL_LOCK_TABLE.get_or_init(|| Mutex::new(HashMap::new()))
96}
97
98pub trait Lock {}
99
100///
101/// Acquire a lock for a given resource type and id.
102/// Guarantees mutual exclusion across threads for a given (`resource_type`, id).
103/// Re-entrant acquisition by the same thread is allowed.
104///
105#[derive(Debug)]
106pub struct ResourceLock {
107    resource_type: String,
108    id: String,
109    entry: Arc<ResourceLockEntry>,
110}
111
112impl Lock for ResourceLock {}
113
114impl ResourceLock {
115    /// Acquire a lock for the given resource type and id.
116    pub fn acquire<T: ToString>(resource_type: &str, id: &T) -> Result<Self> {
117        // Warn on unsupported platforms (non-Unix, non-Windows)
118        #[cfg(not(any(unix, windows)))]
119        warn!(
120            "Unsupported platform: resource locking is only validated on Unix-like OSes and Windows"
121        );
122
123        let key = (resource_type.to_string(), id.to_string());
124        // Fetch or create the entry.
125        let entry = {
126            let Ok(mut table) = lock_table().lock() else {
127                bail!("Failed to lock global table");
128            };
129            table
130                .entry(key.clone())
131                .or_insert_with(|| Arc::new(ResourceLockEntry::new()))
132                .clone()
133        };
134
135        let current = thread::current().id();
136
137        // Acquire logical lock (re-entrant if same thread).
138        let Ok(mut state) = entry.state.lock() else {
139            bail!("Failed to lock entry state");
140        };
141        let start = Instant::now();
142        loop {
143            match state.owner {
144                None => {
145                    // First logical owner.
146                    state.owner = Some(current);
147                    state.recursion = 1;
148                    state.refcount += 1;
149
150                    if state.file.is_none() {
151                        // Create & lock file just once for all nested acquisitions.
152                        let root =
153                            get_storage_dir().context("No storage dir")?;
154                        let locks_dir = root.join("locks").join(resource_type);
155                        fs::create_dir_all(&locks_dir)?;
156                        let lock_path = locks_dir.join(id.to_string());
157
158                        let file =
159                            ResourceLock::create_and_lock_file(&lock_path)?;
160
161                        state.file = Some(file);
162                        state.lock_path = Some(lock_path); // Save path for later deletion
163                    }
164
165                    break;
166                }
167                Some(owner) if owner == current => {
168                    // Re-entrant acquisition by same thread.
169                    state.recursion += 1;
170                    state.refcount += 1;
171                    break;
172                }
173                Some(_) => {
174                    // Wait with periodic timeout; log if we've been waiting too
175                    // long, and bail if we've been REALLY waiting too long.
176                    let timeout = Duration::from_secs(5);
177                    let Ok((new_state, result)) =
178                        entry.cvar.wait_timeout(state, timeout)
179                    else {
180                        bail!("Failed to wait on condition variable")
181                    };
182
183                    state = new_state;
184                    if result.timed_out() {
185                        warn!(
186                            "Thread {:?} has been waiting {:?} for lock on {:?}",
187                            current,
188                            start.elapsed(),
189                            key
190                        );
191                        // Continue waiting; loop will re-check the owner
192                    }
193                    if start.elapsed() > (2 * timeout) {
194                        bail!(
195                            "Thread {:?} has been waiting {:?} for lock on {:?}",
196                            current,
197                            start.elapsed(),
198                            key
199                        );
200                    }
201                }
202            }
203        }
204        drop(state);
205
206        Ok(ResourceLock {
207            resource_type: resource_type.to_string(),
208            id: id.to_string(),
209            entry,
210        })
211    }
212
213    /// Create and lock the file for the given path.
214    fn create_and_lock_file(lock_path: &std::path::Path) -> Result<File> {
215        loop {
216            let file = std::fs::OpenOptions::new()
217                .create(true)
218                .truncate(true)
219                .read(true)
220                .write(true)
221                .open(lock_path)
222                .with_context(|| {
223                    format!("Failed to open lock file {}", lock_path.display())
224                })?;
225
226            file.lock_exclusive().with_context(|| {
227                format!("Failed to lock resource file {}", lock_path.display())
228            })?;
229            debug!("Checking lock file {}", lock_path.display());
230
231            // Check inodes to prevent race condition
232            let match_result = path_and_descriptor_match(lock_path, &file);
233            if let Ok(path_and_descriptor_match) = match_result {
234                if path_and_descriptor_match {
235                    return Ok(file);
236                }
237            }
238
239            // Inode mismatch, close and retry
240            debug!(
241                "Inode mismatch on lock file {}, retrying",
242                lock_path.display()
243            );
244            drop(file);
245        }
246    }
247}
248
249impl Drop for ResourceLock {
250    fn drop(&mut self) {
251        // Release logical ownership.
252        let mut remove_entry = false;
253        let mut lock_path: Option<std::path::PathBuf> = None;
254        {
255            let state_result = self.entry.state.lock();
256            match state_result {
257                Ok(mut state) => {
258                    // Defensive: avoid underflow if somehow inconsistent (poisoned).
259                    if state.recursion > 0 {
260                        state.recursion -= 1;
261                    } else {
262                        error!(
263                            "ResourceLock recursion underflow for {:?}/{}",
264                            self.resource_type, self.id
265                        );
266                    }
267                    if state.refcount > 0 {
268                        state.refcount -= 1;
269                    } else {
270                        error!(
271                            "ResourceLock refcount underflow for {:?}/{}",
272                            self.resource_type, self.id
273                        );
274                    }
275
276                    if state.recursion == 0 {
277                        // Fully released by owning thread.
278                        state.owner = None;
279                        // Wake all waiters so one of them can take ownership.
280                        self.entry.cvar.notify_all();
281                    }
282
283                    if state.refcount == 0 {
284                        // Last logical handle: dropping file releases OS lock.
285                        state.file.take();
286                        lock_path = state.lock_path.take(); // Take path for deletion
287                        remove_entry = true;
288                    }
289                }
290                Err(e) => {
291                    error!("Failed to lock entry state in drop: {e}");
292                    // We can't safely mutate state; best effort: try to remove table entry anyway.
293                    // remove_entry = true;
294                }
295            }
296        }
297
298        if remove_entry {
299            // Remove lock file if needed
300            if let Some(path) = lock_path {
301                debug_fmt!("Removing {}", path.display());
302                if let Err(e) = std::fs::remove_file(&path) {
303                    // It's okay if file is already gone; just log otherwise.
304                    if e.kind() != std::io::ErrorKind::NotFound {
305                        error!(
306                            "Failed to remove lock file {}: {:?}",
307                            path.display(),
308                            e
309                        );
310                    }
311                }
312            }
313
314            let key = (self.resource_type.clone(), self.id.clone());
315            let table_result = lock_table().lock();
316            match table_result {
317                Ok(mut table) => {
318                    if let Some(current) = table.get(&key) {
319                        if Arc::ptr_eq(current, &self.entry) {
320                            table.remove(&key);
321                        }
322                    }
323                }
324                Err(e) => {
325                    error!("Failed to lock global table in drop: {e}");
326                }
327            }
328        }
329    }
330}
331
332pub fn check_filesystem_lock_support() -> Result<()> {
333    // Try to bail early on unsupported OS families
334    #[cfg(not(any(unix, windows)))]
335    {
336        warn!(
337            "Warning: File locking may not be fully supported on this OS. Proceeding with caution."
338        );
339    }
340
341    // Create a temp test file in your locks dir
342    let test_dir = get_storage_dir()?.join("locks").join("test");
343    fs::create_dir_all(&test_dir)?;
344    let test_path = test_dir.join("test_lock");
345
346    // Open and lock the file
347    let file = fs::OpenOptions::new()
348        .create(true)
349        .truncate(false)
350        .read(true)
351        .write(true)
352        .open(&test_path)?;
353    file.lock_exclusive()?;
354
355    // Check if inodes match
356    if !path_and_descriptor_match(&test_path, &file)? {
357        error!(
358            "Filesystem does not support reliable inode checks for locking. Bailing out."
359        );
360        bail!("Incompatible filesystem");
361    }
362
363    // Clean up
364    debug_fmt!("Removing {}", test_path.display());
365    fs::remove_file(&test_path)?;
366    debug_fmt!("Removing dir {}", test_dir.display());
367    fs::remove_dir(&test_dir)?;
368    drop(file); // Unlock
369    Ok(())
370}
371
372pub fn get_inode_or_file_index(path: &Path) -> Result<u64> {
373    let path_meta = fs::metadata(path)?;
374    #[cfg(unix)]
375    let path_inode = path_meta.ino();
376    #[cfg(windows)]
377    let path_inode = path_meta.file_index();
378
379    Ok(path_inode)
380}
381
382pub fn get_inode_or_file_index_from_descriptor(file: &File) -> Result<u64> {
383    let file_meta = file.metadata()?;
384    #[cfg(unix)]
385    let file_inode = file_meta.ino();
386    #[cfg(windows)]
387    let file_inode = file_meta.file_index();
388
389    Ok(file_inode)
390}
391
392pub fn path_and_descriptor_match(path: &Path, file: &File) -> Result<bool> {
393    let file_ino = get_inode_or_file_index_from_descriptor(file)?;
394    let path_ino = get_inode_or_file_index(path)?;
395
396    // Check if inodes match and are non-zero (defensive on platforms where zero could be returned)
397    Ok(file_ino != 0 && path_ino != 0 && file_ino == path_ino)
398}
399
400#[cfg(test)]
401#[allow(clippy::unwrap_in_result, clippy::panic_in_result_fn)]
402mod tests {
403    use super::*;
404    use std::sync::atomic::{AtomicU64, Ordering::SeqCst};
405    use std::sync::mpsc;
406    use std::time::Duration;
407
408    fn unique_id(prefix: &str) -> String {
409        static COUNTER: AtomicU64 = AtomicU64::new(0);
410        let n = COUNTER.fetch_add(1, SeqCst);
411        format!("{}-pid{}-{}", prefix, std::process::id(), n)
412    }
413
414    #[test]
415    fn reentrant_acquire_same_thread() {
416        let id = unique_id("reentrant");
417        let _g1 =
418            ResourceLock::acquire("test_resource", &id).expect("first acquire");
419        let _g2 = ResourceLock::acquire("test_resource", &id)
420            .expect("second (re-entrant) acquire");
421        // Dropping _g2 then _g1 should not panic.
422    }
423
424    #[test]
425    fn contention_blocks_and_transfers_ownership() {
426        let id = unique_id("contend");
427        let g1 =
428            ResourceLock::acquire("test_resource", &id).expect("first acquire");
429        let (tx, rx) = mpsc::channel();
430        let id2 = id.clone();
431
432        let start = Instant::now();
433        let th = thread::spawn(move || {
434            // Should block until g1 is dropped
435            let _g2 = ResourceLock::acquire("test_resource", &id2)
436                .expect("second acquire after blocking");
437            tx.send(Instant::now()).unwrap();
438            // Keep it alive briefly to ensure we truly acquired
439            thread::sleep(Duration::from_millis(50));
440        });
441
442        // Hold the lock for a bit, then release
443        thread::sleep(Duration::from_millis(200));
444        drop(g1);
445
446        // Now the second thread should acquire relatively soon after
447        let acquired_at = rx
448            .recv_timeout(Duration::from_secs(2))
449            .expect("thread did not acquire in time");
450        let waited_ms =
451            u64::try_from(acquired_at.duration_since(start).as_millis())
452                .unwrap();
453        assert!(
454            waited_ms >= 180, // allow some scheduling jitter
455            "Second thread acquired too early ({} ms), did it block?",
456            waited_ms
457        );
458
459        th.join().unwrap();
460    }
461
462    #[test]
463    fn independent_resources_do_not_block_each_other() {
464        let id1 = unique_id("indep-a");
465        let id2 = unique_id("indep-b");
466
467        let g1 =
468            ResourceLock::acquire("test_resource", &id1).expect("acquire id1");
469
470        // Acquiring a different resource id should not block
471        let start = Instant::now();
472        let g2 =
473            ResourceLock::acquire("test_resource", &id2).expect("acquire id2");
474        let elapsed = start.elapsed();
475        assert!(
476            elapsed < Duration::from_millis(50),
477            "Independent resource acquisition took too long: {:?}",
478            elapsed
479        );
480
481        drop(g2);
482        drop(g1);
483    }
484
485    #[test]
486    fn lock_file_lifecycle_created_then_removed() {
487        let id = unique_id("file-lifecycle");
488        let root = get_storage_dir().expect("storage dir");
489        let lock_path = root.join("locks").join("test_resource").join(&id);
490
491        {
492            let _g =
493                ResourceLock::acquire("test_resource", &id).expect("acquire");
494            // While held, file should exist
495            assert!(
496                lock_path.exists(),
497                "Lock file should exist while lock is held at {}",
498                lock_path.display()
499            );
500
501            // Re-entrant acquire shouldn't create a second file or change path
502            let _g2 = ResourceLock::acquire("test_resource", &id)
503                .expect("re-entrant acquire");
504            assert!(
505                lock_path.exists(),
506                "Lock file should still exist while re-entrant lock is held at {}",
507                lock_path.display()
508            );
509        }
510
511        // After both locks dropped, file should be removed
512        // Give the filesystem a brief moment if needed
513        for _ in 0..10 {
514            if !lock_path.exists() {
515                break;
516            }
517            thread::sleep(Duration::from_millis(20));
518        }
519        assert!(
520            !lock_path.exists(),
521            "Lock file should be removed after the last handle is dropped at {}",
522            lock_path.display()
523        );
524    }
525
526    #[test]
527    fn check_filesystem_lock_support_smoke() {
528        // This should succeed on supported platforms and typical filesystems
529        check_filesystem_lock_support()
530            .expect("filesystem lock support check failed");
531    }
532}