diff --git a/Cargo.lock b/Cargo.lock index 577f7b3..d4de999 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -283,9 +283,9 @@ dependencies = [ [[package]] name = "core-foundation" -version = "0.9.4" +version = "0.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "91e195e091a93c46f7102ec7818a2aa394e1e1771c3ab4825963fa03e45afb8f" +checksum = "b2a6cd9ae233e7f62ba4e9353e81a88df7fc8a5987b8d445b4d90c879bd156f6" dependencies = [ "core-foundation-sys", "libc", @@ -525,15 +525,6 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "34aa73646ffb006b8f5147f3dc182bd4bcb190227ce861fc4a4844bf8e3cb2c0" -[[package]] -name = "encoding_rs" -version = "0.8.35" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "75030f3c4f45dafd7586dd6780965a8c7e8e285a5ecb86713e63a79c5b2766f3" -dependencies = [ - "cfg-if", -] - [[package]] name = "equivalent" version = "1.0.2" @@ -575,19 +566,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d9c4f5dac5e15c24eb999c26181a6ca40b39fe946cbe4c263c7209467bc83af2" [[package]] -name = "foreign-types" -version = "0.3.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1" -dependencies = [ - "foreign-types-shared", -] - -[[package]] -name = "foreign-types-shared" -version = "0.1.1" +name = "foldhash" +version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b" +checksum = "77ce24cb58228fbb8aa041425bb1050850ac19177686ea6e0f41a70416f56fdb" [[package]] name = "form_urlencoded" @@ -767,6 +749,7 @@ dependencies = [ "dirs", "fuser", "futures", + "hashlink", "http", "inquire", "libc", @@ -788,6 +771,7 @@ dependencies = [ "serde_json", "serde_path_to_error", "shellexpand", + "tempfile", "thiserror 2.0.18", "tokio", "toml", @@ -835,7 +819,7 @@ version = "0.15.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9229cfe53dfd69f0609a49f65461bd93001ea1ef889cd5529dd176593f5338a1" dependencies = [ - "foldhash", + "foldhash 0.1.5", ] [[package]] @@ -843,6 +827,18 @@ name = "hashbrown" version = "0.16.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "841d1cc9bed7f9236f321df977030373f4a4163ae1a7dbfe1a51a2c1a51d9100" +dependencies = [ + "foldhash 0.2.0", +] + +[[package]] +name = "hashlink" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ea0b22561a9c04a7cb1a302c013e0259cd3b4bb619f145b32f72b8b4bcbed230" +dependencies = [ + "hashbrown 0.16.1", +] [[package]] name = "heck" @@ -927,6 +923,7 @@ dependencies = [ "hyper", "hyper-util", "rustls", + "rustls-native-certs", "rustls-pki-types", "tokio", "tokio-rustls", @@ -934,22 +931,6 @@ dependencies = [ "webpki-roots", ] -[[package]] -name = "hyper-tls" -version = "0.6.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "70206fc6890eaca9fde8a0bf71caa2ddfc9fe045ac9e5c70df101a7dbde866e0" -dependencies = [ - "bytes", - "http-body-util", - "hyper", - "hyper-util", - "native-tls", - "tokio", - "tokio-native-tls", - "tower-service", -] - [[package]] name = "hyper-util" version = "0.1.20" @@ -968,11 +949,9 @@ dependencies = [ "percent-encoding", "pin-project-lite", "socket2", - "system-configuration", "tokio", "tower-service", "tracing", - "windows-registry", ] [[package]] @@ -1308,9 +1287,9 @@ checksum = "f8ca58f447f06ed17d5fc4043ce1b10dd205e060fb3ce5b979b8ed8e59ff3f79" [[package]] name = "mesa-dev" -version = "1.11.0" +version = "1.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0685415ca22ab4f72a6a3f0f720fcca0c69d295d2a95a40cd3ce92555103d3a1" +checksum = "92979030f29144d605a5f077fe21502b5397b30af92da92d2ca3d11051c8ff6c" dependencies = [ "async-stream", "futures-core", @@ -1325,9 +1304,9 @@ dependencies = [ [[package]] name = "mesa_dev_oapi" -version = "1.8.0" +version = "1.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "266585ec1d9950343ba2a307909003c05342149c4bd0cf5b224591b2621ccfd4" +checksum = "4456b424726e1dbf9e3ac4763438eda69939458a6cd50ca06dd65d7333b75ff1" dependencies = [ "reqwest", "reqwest-middleware", @@ -1367,23 +1346,6 @@ dependencies = [ "windows-sys 0.61.2", ] -[[package]] -name = "native-tls" -version = "0.2.14" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "87de3442987e9dbec73158d5c715e7ad9072fda936bb03d19d7fa10e00520f0e" -dependencies = [ - "libc", - "log", - "openssl", - "openssl-probe", - "openssl-sys", - "schannel", - "security-framework", - "security-framework-sys", - "tempfile", -] - [[package]] name = "nix" version = "0.29.0" @@ -1447,49 +1409,11 @@ version = "1.70.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "384b8ab6d37215f3c5301a95a4accb5d64aa607f1fcb26a11b5303878451b4fe" -[[package]] -name = "openssl" -version = "0.10.75" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "08838db121398ad17ab8531ce9de97b244589089e290a384c900cb9ff7434328" -dependencies = [ - "bitflags", - "cfg-if", - "foreign-types", - "libc", - "once_cell", - "openssl-macros", - "openssl-sys", -] - -[[package]] -name = "openssl-macros" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c" -dependencies = [ - "proc-macro2", - "quote", - "syn", -] - [[package]] name = "openssl-probe" -version = "0.1.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d05e27ee213611ffe7d6348b942e8f942b37114c00cc03cec254295a4a17852e" - -[[package]] -name = "openssl-sys" -version = "0.9.111" +version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "82cab2d520aa75e3c58898289429321eb788c3106963d0dc886ec7a5f4adc321" -dependencies = [ - "cc", - "libc", - "pkg-config", - "vcpkg", -] +checksum = "7c87def4c32ab89d880effc9e097653c8da5d6ef28e6b539d313baaacfbafcbe" [[package]] name = "opentelemetry" @@ -1908,7 +1832,6 @@ checksum = "eddd3ca559203180a307f12d114c268abf583f59b03cb906fd0b3ff8646c1147" dependencies = [ "base64", "bytes", - "encoding_rs", "futures-channel", "futures-core", "futures-util", @@ -1918,24 +1841,21 @@ dependencies = [ "http-body-util", "hyper", "hyper-rustls", - "hyper-tls", "hyper-util", "js-sys", "log", - "mime", "mime_guess", - "native-tls", "percent-encoding", "pin-project-lite", "quinn", "rustls", + "rustls-native-certs", "rustls-pki-types", "serde", "serde_json", "serde_urlencoded", "sync_wrapper", "tokio", - "tokio-native-tls", "tokio-rustls", "tower", "tower-http", @@ -2018,6 +1938,18 @@ dependencies = [ "zeroize", ] +[[package]] +name = "rustls-native-certs" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "612460d5f7bea540c490b2b6395d8e34a953e52b491accd6c86c8164c5932a63" +dependencies = [ + "openssl-probe", + "rustls-pki-types", + "schannel", + "security-framework", +] + [[package]] name = "rustls-pki-types" version = "1.14.0" @@ -2124,9 +2056,9 @@ dependencies = [ [[package]] name = "security-framework" -version = "2.11.1" +version = "3.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "897b2245f0b511c87893af39b033e5ca9cce68824c4d7e7630b5a1d339658d02" +checksum = "d17b898a6d6948c3a8ee4372c17cb384f90d2e6e912ef00895b14fd7ab54ec38" dependencies = [ "bitflags", "core-foundation", @@ -2137,9 +2069,9 @@ dependencies = [ [[package]] name = "security-framework-sys" -version = "2.15.0" +version = "2.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cc1f0cbffaac4852523ce30d8bd3c5cdc873501d96ff467ca09b6767bb8cd5c0" +checksum = "321c8673b092a9a42605034a9879d73cb79101ed5fd117bc9a597b89b4e9e61a" dependencies = [ "core-foundation-sys", "libc", @@ -2424,27 +2356,6 @@ dependencies = [ "syn", ] -[[package]] -name = "system-configuration" -version = "0.7.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a13f3d0daba03132c0aa9767f98351b3488edc2c100cda2d2ec2b04f3d8d3c8b" -dependencies = [ - "bitflags", - "core-foundation", - "system-configuration-sys", -] - -[[package]] -name = "system-configuration-sys" -version = "0.6.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8e1d1b10ced5ca923a1fcb8d03e96b8d3268065d724548c0211415ff6ac6bac4" -dependencies = [ - "core-foundation-sys", - "libc", -] - [[package]] name = "tempfile" version = "3.25.0" @@ -2593,16 +2504,6 @@ dependencies = [ "syn", ] -[[package]] -name = "tokio-native-tls" -version = "0.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bbae76ab933c85776efabc971569dd6119c580d8f5d448769dec1764bf796ef2" -dependencies = [ - "native-tls", - "tokio", -] - [[package]] name = "tokio-rustls" version = "0.26.4" @@ -2917,12 +2818,6 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ba73ea9cf16a25df0c8caa16c51acb937d5712a8429db78a3ee29d5dcacd3a65" -[[package]] -name = "vcpkg" -version = "0.2.15" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" - [[package]] name = "vergen" version = "9.1.0" @@ -3210,17 +3105,6 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f0805222e57f7521d6a62e36fa9163bc891acd422f971defe97d64e70d0a4fe5" -[[package]] -name = "windows-registry" -version = "0.6.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "02752bf7fbdcce7f2a27a742f798510f3e5ad88dbe84871e5168e2120c3d5720" -dependencies = [ - "windows-link", - "windows-result", - "windows-strings", -] - [[package]] name = "windows-result" version = "0.4.1" diff --git a/Cargo.toml b/Cargo.toml index 9256d6e..12daa76 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,12 +7,15 @@ license = "MIT" repository = "https://github.com/mesa-dot-dev/git-fs" authors = ["Marko Vejnovic "] +[lib] +path = "lib/lib.rs" + [dependencies] clap = { version = "4.5.54", features = ["derive", "env"] } tracing-subscriber = { version = "0.3", features = ["env-filter", "fmt"] } fuser = { version = "0.16.0", features = ["libfuse"] } libc = "0.2" -mesa-dev = "1.11.0" +mesa-dev = "1.15.0" num-traits = "0.2" http = "1" reqwest = { version = "0.12", default-features = false } @@ -47,15 +50,20 @@ opentelemetry = { version = "0.29" } opentelemetry_sdk = { version = "0.29", features = ["rt-tokio"] } opentelemetry-otlp = { version = "0.29", default-features = false, features = ["http-proto", "trace", "reqwest-blocking-client"] } tracing-opentelemetry = { version = "0.30" } +hashlink = "0.11.0" [features] default = [] staging = [] +[dev-dependencies] +tempfile = "3" + [build-dependencies] vergen-gitcl = { version = "1", features = [] } [lints.rust] +unexpected_cfgs = "warn" unsafe_code = "allow" missing_docs = "warn" unreachable_pub = "allow" diff --git a/lib/cache/eviction/lru.rs b/lib/cache/eviction/lru.rs new file mode 100644 index 0000000..ff56b2c --- /dev/null +++ b/lib/cache/eviction/lru.rs @@ -0,0 +1,297 @@ +//! Implements the LRU eviction policy. + +use std::{future::Future, hash::Hash}; + +use std::sync::{ + Arc, + atomic::{self, AtomicI64}, +}; + +use hashlink::LinkedHashMap; +use tokio::sync::mpsc::{Receiver, error::TrySendError}; + +/// Types that carry a monotonic version for deduplication of out-of-order messages. +pub trait Versioned { + /// Returns the monotonic version of this value. + fn version(&self) -> u64; +} + +/// A trait for deleting keys from the cache. This is used by the LRU eviction tracker to delete +/// keys when they are evicted. +pub trait Deleter: Send + Clone + 'static { + /// Delete the given key from the cache. Deletions for different keys may be invoked + /// concurrently and in arbitrary order. Correctness is ensured by the caller through + /// a context-based guard (e.g. matching on a unique file ID), not by invocation order. + fn delete(&mut self, key: K, ctx: Ctx) -> impl Future + Send; +} + +/// Messages sent to the LRU eviction tracker worker. +#[derive(Debug, Clone, Copy)] +enum Message { + /// Notify the LRU eviction tracker that the given key was accessed. + Accessed(K), + /// Request an eviction set of the given size. + Evict(u32), + /// Notify the LRU eviction tracker that a key was inserted or overwritten. + Upserted(K, C), +} + +/// Tracks in-flight eviction batches and individual deletions using a single packed `AtomicI64`. +/// +/// Layout: `[upper 32 bits: pending batches | lower 32 bits: active deletions]` +/// +/// The lifecycle of an eviction is: +/// 1. Producer calls `submit_batch()` — increments upper half by 1. +/// 2. Producer enqueues the `Evict` message into the channel. +/// 3. Worker receives the message and calls `process_batch(n)` — decrements upper half by 1 and +/// increments lower half by `n` (the actual number of keys to delete). +/// 4. Each spawned deletion task calls `observe_deletion()` when done — decrements lower half by 1. +/// +/// The indicator reads zero only when no batches are pending and no deletions are in flight. +#[derive(Debug)] +struct DeletionIndicator { + underlying: AtomicI64, +} + +impl DeletionIndicator { + fn new() -> Self { + Self { + underlying: AtomicI64::new(0), + } + } + + /// Mark that a new eviction batch has been submitted by a producer. Called on the producer + /// side *before* the `Evict` message is sent into the channel. + fn submit_batch(&self) { + self.underlying + .fetch_add(1 << 32, atomic::Ordering::Relaxed); + } + + /// Undo a `submit_batch` call. Used when the channel `try_send` fails after we already + /// incremented the pending-batch counter. + fn undo_submit_batch(&self) { + self.underlying + .fetch_sub(1 << 32, atomic::Ordering::Relaxed); + } + + /// Called by the worker when it begins processing an eviction batch. Atomically decrements + /// the pending-batch counter (upper half) by 1 and increments the active-deletion counter + /// (lower half) by `count`. + fn process_batch(&self, count: u32) { + self.underlying + .fetch_add(i64::from(count) - (1 << 32), atomic::Ordering::Relaxed); + } + + /// Called by a spawned deletion task when it finishes deleting one key. + fn observe_deletion(&self) { + self.underlying.fetch_sub(1, atomic::Ordering::Relaxed); + } + + /// Returns `true` if there is any eviction work in progress — either batches waiting to be + /// processed by the worker, or individual deletions still in flight. + fn have_pending_work(&self) -> bool { + self.underlying.load(atomic::Ordering::Relaxed) != 0 + } +} + +#[derive(Debug)] +struct LruProcessingTask> { + receiver: Receiver>, + + /// The ordered set of keys, ordered according to the last-used policy. + ordered_key_map: LinkedHashMap, + + /// The deleter to call when we need to evict keys. + deleter: D, + + /// Pointer into the shared deletion tracker. + shared: Arc, +} + +impl> + LruProcessingTask +{ + fn new(deleter: D, receiver: Receiver>, shared: Arc) -> Self { + Self { + receiver, + ordered_key_map: LinkedHashMap::new(), + deleter, + shared, + } + } + + fn spawn_task(deleter: D, receiver: Receiver>, shared: Arc) { + // TODO(markovejnovic): This should have a best-effort drop. + tokio::spawn(async move { + let mut task = Self::new(deleter, receiver, shared); + task.work().await; + }); + } + + async fn work(&mut self) { + while let Some(msg) = self.receiver.recv().await + && self.service_message(msg) + {} + } + + /// Returns true if the task should continue working. + #[must_use] + fn service_message(&mut self, message: Message) -> bool { + match message { + Message::Accessed(k) => { + // The key may have been evicted between the access and this message arriving. + // If it was, the remove returns None and this is a no-op. + if let Some(entry) = self.ordered_key_map.remove(&k) { + self.ordered_key_map.insert(k, entry); + } + } + Message::Evict(max_count) => { + { + // These integer casts are safe, since max_count is guaranteed to fit + // within a u32, min(MAX_U32, MAX_USIZE) == MAX_U32. + #[expect(clippy::cast_possible_truncation)] + let take_count = self.ordered_key_map.len().min(max_count as usize) as u32; + + // Atomically transition this batch from "pending" to "active deletions". + // This decrements the upper half (pending batches) by 1 and increments the + // lower half (active deletions) by take_count. If take_count is 0, this + // still correctly clears the pending batch. + self.shared.active_deletions.process_batch(take_count); + + for _ in 0..take_count { + let Some((key, ctx)) = self.ordered_key_map.pop_front() else { + break; + }; + let mut deleter = self.deleter.clone(); + let shared_clone = Arc::clone(&self.shared); + tokio::spawn(async move { + // Drop guard ensures observe_deletion() runs even if the + // deletion task is cancelled or panics. + struct DeletionGuard(Arc); + impl Drop for DeletionGuard { + fn drop(&mut self) { + self.0.active_deletions.observe_deletion(); + } + } + let _guard = DeletionGuard(shared_clone); + deleter.delete(key, ctx).await; + }); + } + } + } + Message::Upserted(k, ctx) => { + if let Some(existing) = self.ordered_key_map.get(&k) + && ctx.version() < existing.version() + { + // Stale message from a previous incarnation; drop it. + return true; + } + self.ordered_key_map.remove(&k); + self.ordered_key_map.insert(k, ctx); + } + } + + true + } +} + +#[derive(Debug)] +struct WorkerState { + /// Packed atomic tracking pending eviction batches (upper 32 bits) and active deletions + /// (lower 32 bits). See `DeletionIndicator` for the full protocol. + active_deletions: DeletionIndicator, +} + +/// An LRU eviction tracker. This is used to track the least recently used keys in the cache, and +/// to evict keys when necessary. +#[derive(Debug)] +pub struct LruEvictionTracker { + worker_message_sender: tokio::sync::mpsc::Sender>, + worker_state: Arc, +} + +impl + LruEvictionTracker +{ + /// Spawn a new LRU eviction tracker with the given deleter and channel size. + pub fn spawn>(deleter: D, channel_size: usize) -> Self { + let (tx, rx) = tokio::sync::mpsc::channel(channel_size); + + let worker_state = Arc::new(WorkerState { + active_deletions: DeletionIndicator::new(), + }); + // The worker task is intentionally detached: it exits when the channel closes + // (i.e. when all senders, including the one held by LruEvictionTracker, are dropped). + LruProcessingTask::spawn_task(deleter, rx, Arc::clone(&worker_state)); + + Self { + worker_message_sender: tx, + worker_state, + } + } + + /// Notify the LRU tracker that a key was inserted or overwritten. + /// + /// Non-cancellable: uses `try_send` (sync) with a `tokio::spawn` fallback so that the + /// notification cannot be lost to task cancellation. + pub fn upsert(&self, key: K, ctx: C) { + match self + .worker_message_sender + .try_send(Message::Upserted(key, ctx)) + { + Ok(()) | Err(TrySendError::Closed(_)) => {} + Err(TrySendError::Full(msg)) => { + let sender = self.worker_message_sender.clone(); + tokio::spawn(async move { + let _ = sender.send(msg).await; + }); + } + } + } + + /// Notify the LRU eviction tracker that the given key was accessed. + /// + /// Non-cancellable: uses `try_send` (sync) with a `tokio::spawn` fallback so that the + /// notification cannot be lost to task cancellation. + pub fn access(&self, key: K) { + match self.worker_message_sender.try_send(Message::Accessed(key)) { + Ok(()) | Err(TrySendError::Closed(_)) => {} + Err(TrySendError::Full(msg)) => { + let sender = self.worker_message_sender.clone(); + tokio::spawn(async move { + let _ = sender.send(msg).await; + }); + } + } + } + + /// Try to cull the least recently used keys with the given deletion method. + /// + /// Returns `true` if the eviction message was successfully enqueued, or `false` if the + /// channel is full. In the latter case, the caller should yield and retry. + /// + /// The ordering here is critical for correctness: we `submit_batch()` *before* `try_send()` + /// so that the `DeletionIndicator` is always in a state where `have_pending_batches()` returns + /// `true` by the time anyone observes the enqueued message. If the send fails, we roll back + /// with `undo_submit_batch()`, which is a net-zero delta on the packed atomic. + #[must_use] + pub fn try_cull(&self, max_count: u32) -> bool { + self.worker_state.active_deletions.submit_batch(); + if self + .worker_message_sender + .try_send(Message::Evict(max_count)) + .is_ok() + { + true + } else { + self.worker_state.active_deletions.undo_submit_batch(); + false + } + } + + /// Check whether there are culls that are already scheduled or actively in progress. + #[must_use] + pub fn have_pending_culls(&self) -> bool { + self.worker_state.active_deletions.have_pending_work() + } +} diff --git a/lib/cache/eviction/mod.rs b/lib/cache/eviction/mod.rs new file mode 100644 index 0000000..94ca450 --- /dev/null +++ b/lib/cache/eviction/mod.rs @@ -0,0 +1,2 @@ +/// Different types of eviction policies for any cache. +pub mod lru; diff --git a/lib/cache/fcache.rs b/lib/cache/fcache.rs new file mode 100644 index 0000000..2e445b7 --- /dev/null +++ b/lib/cache/fcache.rs @@ -0,0 +1,469 @@ +//! File-based MT-safe, async-safe data cache. + +use std::{ + fmt::Debug, + hash::Hash, + path::{Path, PathBuf}, +}; + +use std::sync::{ + Arc, + atomic::{self, AtomicU64, AtomicUsize}, +}; + +use tracing::error; + +use crate::{ + cache::{ + eviction::lru::{Deleter, LruEvictionTracker, Versioned}, + traits::{AsyncReadableCache, AsyncWritableCache}, + }, + io, +}; +use thiserror::Error; + +use tokio::io::{AsyncReadExt as _, AsyncWriteExt as _}; + +/// Drop guard that deletes an unregistered cache file if the insert task is cancelled or fails +/// between file creation and map registration. Defused once the file is registered in the map. +struct FileGuard { + path: Option, +} + +impl Drop for FileGuard { + fn drop(&mut self) { + if let Some(path) = self.path.take() + && let Err(e) = std::fs::remove_file(&path) + && e.kind() != std::io::ErrorKind::NotFound + { + error!(error = ?e, path = ?path, "failed to clean up orphaned cache file"); + } + } +} + +struct FileCacheShared { + root_path: PathBuf, + map: scc::HashMap, + size_bytes: AtomicUsize, +} + +impl FileCacheShared { + fn size(&self) -> usize { + self.size_bytes.load(atomic::Ordering::Relaxed) + } + + fn path_for(&self, fid: usize) -> PathBuf { + self.root_path.join(fid.to_string()) + } + + /// Given an entry, remove it from the disk. Note this does not update the cache size tracker. + /// + /// Gracefully handles the case where the file doesn't exist, which can happen if the file was + /// deleted after we read the file ID from the map, but before we tried to delete + /// the file. + async fn delete_entry_from_disk_async(&self, entry: &CacheMapEntry) -> std::io::Result<()> { + match tokio::fs::remove_file(self.path_for(entry.fid)).await { + Ok(()) => Ok(()), + Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()), + Err(e) => Err(e), + } + } +} + +#[derive(Debug, Clone, Copy)] +struct DeleterCtx { + fid: usize, + version: u64, +} + +impl Versioned for DeleterCtx { + fn version(&self) -> u64 { + self.version + } +} + +#[derive(Clone)] +struct LruEntryDeleter { + shared: Arc>, +} + +impl LruEntryDeleter { + fn new(shared: Arc>) -> Self { + Self { shared } + } +} + +impl Deleter for LruEntryDeleter { + /// The LRU cache will call this method when it wants to evict keys. + async fn delete(&mut self, key: K, ctx: DeleterCtx) { + if let Some(entry) = self + .shared + .map + .remove_if_async(&key, |m| m.fid == ctx.fid) + .await + { + if let Err(e) = self.shared.delete_entry_from_disk_async(&entry.1).await { + error!(error = ?e, "failed to delete evicted cache entry from disk"); + } + + self.shared + .size_bytes + .fetch_sub(entry.1.size_bytes, atomic::Ordering::Relaxed); + } + } +} + +/// Error thrown during construction of a `FileCache` which describes why the given root path is +/// invalid. +#[derive(Debug, Error)] +pub enum InvalidRootPathError { + /// The root path exists but isn't a directory. + #[error("Root path is not a directory: {0}")] + NotADirectory(PathBuf), + + /// The root path exists and is a directory, but it isn't empty and the data in it appears to + /// come from a different source than this application. + #[error("Root path appears to contain data stemming from sources different to this app: {0}")] + RootPathUnsafeCache(PathBuf), + + /// An IO error occurred while trying to access the root path. + #[error("IO error while accessing root path: {0}")] + Io(#[from] std::io::Error), +} + +/// Error thrown during insertion into the cache. +#[derive(Debug, Error)] +pub enum CacheWriteError { + /// An IO error occurred while trying to write the new value to disk. + #[error("IO error while inserting into cache: {0}")] + Io(#[from] std::io::Error), +} + +#[derive(Debug, Clone, Copy)] +struct CacheMapEntry { + /// A unique ID representing the name of the file on disk where the value for this cache entry + /// is stored. This is just an integer that is used to generate the file path for the cache + /// entry. + fid: usize, + + /// The size of the value for this cache entry in bytes. This is used to keep track of the + /// total size of the cache, and to evict entries when necessary. + size_bytes: usize, +} + +/// A general-purpose file-backed cache. This cache is designed to store arbitrary byte values on +/// disk, and to retrieve them later using a key. +/// +/// This cache is considered thread-safe and async-safe. +pub struct FileCache { + shared: Arc>, + + /// Generates unique file paths for new cache entries. This is just a counter that increments + /// for each new file, and the file ID is the value of the counter at the time of creation. + file_generator: AtomicUsize, + + /// The maximum size of the cache in bytes. This is used to determine when to evict entries + /// from the cache. This is a soft hint, rather than a hard limit, and the cache may + /// temporarily exceed this size during insertions, but it will try to evict entries as soon as + /// possible to get back under this limit. + max_size_bytes: usize, + + /// LRU eviction tracker. This is used to track the least recently used keys in the cache, and + /// to evict keys when necessary. + lru_tracker: LruEvictionTracker, + + /// Global monotonic counter for per-key versioning. Incremented under the scc bucket lock + /// to guarantee that versions are strictly monotonically increasing per key. + version_counter: AtomicU64, +} + +impl FileCache { + // How many cache entries to evict at most in a single batch. + // + // Not really sure how to determine this number. + const LRU_EVICTION_MAX_BATCH_SIZE: u32 = 8; + + // The maximum number of messages that can be buffered between this file cache and the LRU + // eviction tracker. If this is too small, then the file cache will be blocked waiting for the + // eviction tracker to catch up. + const MAX_LRU_TRACKER_CHANNEL_SIZE: usize = 256; + + // Dangerous: Changing this constant may cause the program to treat existing cache directories + // as invalid, and thus provide a worse user experience. Do not change this unless you have a + // very good reason to do so. Changing this will break backwards compatibility with existing + // cache directories. + const GITFS_MARKER_FILE: &'static str = ".gitfs_cache"; + + // The total maximum number of times we will try to read a "deleted" file before giving up and + // treating it as a hard error. + // + // See usage for reasoning on why this is necessary. + const MAX_READ_RETRY_COUNT: usize = 8; + + // The maximum number of eviction loop iterations before giving up and proceeding with the + // insert. Prevents livelock when the LRU worker's ordered map is empty (e.g., Upserted + // messages haven't been delivered yet) and try_cull succeeds but evicts nothing. + const MAX_EVICTION_ATTEMPTS: u32 = 4; + + /// Try to create a new file cache at the given path. + /// + /// + /// # Args + /// + /// * `file_path` - If the path exists, it must either be an empty directory, or a directory + /// which was previously used as a cache for this program. + /// * `max_size_bytes` - The maximum size of the cache in bytes. This is used to determine when + /// to evict entries from the cache. This is a soft hint, rather than a hard limit, and the + /// cache may temporarily exceed this size during insertions, but it will try to evict entries + /// as soon as possible to get back under this limit. + pub async fn new( + file_path: &Path, + max_size_bytes: usize, + ) -> Result { + let mut pbuf = match tokio::fs::canonicalize(file_path).await { + Ok(mut p) => { + if !tokio::fs::metadata(&p).await?.is_dir() { + return Err(InvalidRootPathError::NotADirectory(p)); + } + + let mut entries = tokio::fs::read_dir(&p).await?; + let is_empty = entries.next_entry().await?.is_none(); + + p.push(Self::GITFS_MARKER_FILE); + let marker_exists = tokio::fs::try_exists(&p).await?; + p.pop(); + + if !(is_empty || marker_exists) { + return Err(InvalidRootPathError::RootPathUnsafeCache(p)); + } + + io::remove_dir_contents(&p).await?; + + Ok(p) + } + Err(e) if e.kind() == std::io::ErrorKind::NotFound => { + tokio::fs::create_dir_all(file_path).await?; + tokio::fs::canonicalize(file_path).await + } + Err(e) => return Err(e.into()), + }?; + + // Create marker file so that subsequent restarts of this application gracefully handle the + // existing cache directory. + pbuf.push(Self::GITFS_MARKER_FILE); + tokio::fs::OpenOptions::new() + .create(true) + .truncate(true) + .write(true) + .open(&pbuf) + .await?; + pbuf.pop(); + + let shared = Arc::new(FileCacheShared { + root_path: pbuf, + map: scc::HashMap::new(), + size_bytes: AtomicUsize::new(0), + }); + + Ok(Self { + shared: Arc::clone(&shared), + file_generator: AtomicUsize::new(0), + max_size_bytes, + lru_tracker: LruEvictionTracker::spawn( + LruEntryDeleter::new(shared), + Self::MAX_LRU_TRACKER_CHANNEL_SIZE, + ), + version_counter: AtomicU64::new(0), + }) + } + + async fn create_file(&self, path: &Path) -> Result { + tokio::fs::OpenOptions::new() + .create(true) + .truncate(true) + .write(true) + .open(path) + .await + } +} + +impl Drop for FileCache { + fn drop(&mut self) { + // Surprisingly, it's safe to delete files here. + // + // Note that the LRU tracker is running at the time of Drop. It, consequently, may end up + // calling the deleter, but that's fine, since the deleter ignores this problem gracefully. + // + // It is 100% safe to do this here, since it's guaranteed no concurrent task is borrowing + // the FileCache at this point. + if let Err(e) = std::fs::remove_dir_all(&self.shared.root_path) { + error!(error = ?e, "failed to delete cache directory on drop"); + } + } +} + +impl AsyncReadableCache> + for FileCache +{ + async fn get(&self, key: &K) -> Option> { + for _ in 0..Self::MAX_READ_RETRY_COUNT { + // Grab the best-guess file ID for the given key. If this fails, then the key is not in + // the cache, and we can return `None`. + let entry = *(self.shared.map.get_async(key).await?); + + // The next thing we need to do is try to open the file. This may fail for a plethora + // of reasons. + // + // TODO(markovejnovic): path_for allocates on heap, could be on stack. + let mut file = match tokio::fs::File::open(self.shared.path_for(entry.fid)).await { + Ok(file) => file, + Err(e) if e.kind() == std::io::ErrorKind::NotFound => { + // The file was not found. + // This may happen for two reasons: + // + // There is a bug in the code. + // + // The file was id was deleted after we read the file ID from the map on line: + // + // let fid: usize = *(self.map.get_async(key).await?); + // + // The file can get deleted as part of the insert procedure, or the deletion + // procedure, so we need to try to re-read the file. + continue; + } + Err(e) => { + error!(error = ?e, key = ?key, "IO error while opening file for cache key"); + + // This MIGHT be recoverable. The IO error may very well be transient, so we'll + // just say we don't have this value and then pray that the user will perform + // an .insert + return None; + } + }; + + let mut buf = Vec::new(); + file.read_to_end(&mut buf) + .await + .inspect_err(|e| { + error!(error = ?e, key = ?key, "IO error while reading file for cache key"); + }) + .ok()?; + self.lru_tracker.access(*key); + return Some(buf); + } + + error!(key = ?key, attempt_count = Self::MAX_READ_RETRY_COUNT, "could not find file."); + + // Again, might be recoverable if the user calls an .insert later on. + None + } + + async fn contains(&self, key: &K) -> bool { + self.shared.map.contains_async(key).await + } +} + +impl + AsyncWritableCache, CacheWriteError> for FileCache +{ + async fn insert(&self, key: &K, value: Vec) -> Result<(), CacheWriteError> { + let new_fid = self.file_generator.fetch_add(1, atomic::Ordering::Relaxed); + let new_size = value.len(); + + let mut eviction_attempts = 0u32; + while self.shared.size() + new_size > self.max_size_bytes { + if self.shared.size() == 0 { + // The new entry is larger than the entire cache size limit. Insert it anyway + // and let the cache temporarily exceed the limit. + break; + } + + if eviction_attempts >= Self::MAX_EVICTION_ATTEMPTS { + // We've tried enough times. The cache is over budget but max_size_bytes is a + // soft hint -- proceed with the insert and let future inserts drive further + // eviction. + break; + } + eviction_attempts += 1; + + if self.lru_tracker.have_pending_culls() { + tokio::task::yield_now().await; + continue; + } + + if !self.lru_tracker.try_cull(Self::LRU_EVICTION_MAX_BATCH_SIZE) { + tokio::task::yield_now().await; + } + } + + // Write file to disk. The guard is constructed *before* the `.await` so that if + // this future is cancelled while `create_file` is in flight, the drop guard still + // cleans up the (potentially created) file. + let path = self.shared.path_for(new_fid); + let mut guard = FileGuard { + path: Some(path.clone()), + }; + let mut new_file = self.create_file(&path).await?; + new_file.write_all(&value).await?; + new_file.flush().await?; + + // Use entry_async to lock the bucket, then allocate version under the lock. + // This ensures version monotonicity per key matches the actual mutation order. + // Size accounting is also done under the lock to prevent transient underflow + // when concurrent inserts to the same key interleave their deltas. + let (old_entry, new_version) = match self.shared.map.entry_async(*key).await { + scc::hash_map::Entry::Occupied(mut o) => { + let old = *o.get(); + let v = self.version_counter.fetch_add(1, atomic::Ordering::Relaxed); + *o.get_mut() = CacheMapEntry { + fid: new_fid, + size_bytes: new_size, + }; + + let size_delta: isize = new_size.cast_signed() - old.size_bytes.cast_signed(); + if size_delta > 0 { + self.shared + .size_bytes + .fetch_add(size_delta.cast_unsigned(), atomic::Ordering::Relaxed); + } else if size_delta < 0 { + self.shared + .size_bytes + .fetch_sub(size_delta.unsigned_abs(), atomic::Ordering::Relaxed); + } + + (Some(old), v) + } + scc::hash_map::Entry::Vacant(vacant) => { + let v = self.version_counter.fetch_add(1, atomic::Ordering::Relaxed); + vacant.insert_entry(CacheMapEntry { + fid: new_fid, + size_bytes: new_size, + }); + + self.shared + .size_bytes + .fetch_add(new_size, atomic::Ordering::Relaxed); + + (None, v) + } + }; + // Bucket lock released here. + guard.path = None; + + // LRU notification (sync via try_send, non-cancellable). + self.lru_tracker.upsert( + *key, + DeleterCtx { + fid: new_fid, + version: new_version, + }, + ); + + // Deferrable: delete old file (safe to cancel — file is orphaned). + if let Some(old_entry) = old_entry { + drop(self.shared.delete_entry_from_disk_async(&old_entry).await); + } + + Ok(()) + } +} diff --git a/lib/cache/mod.rs b/lib/cache/mod.rs new file mode 100644 index 0000000..e0c1c97 --- /dev/null +++ b/lib/cache/mod.rs @@ -0,0 +1,6 @@ +/// Cache eviction policies. +pub mod eviction; +/// File-backed cache implementation. +pub mod fcache; +/// Cache traits for read and write operations. +pub mod traits; diff --git a/lib/cache/traits.rs b/lib/cache/traits.rs new file mode 100644 index 0000000..5a38464 --- /dev/null +++ b/lib/cache/traits.rs @@ -0,0 +1,32 @@ +use std::hash::Hash; + +/// A readable cache that can be read from asynchronously. +#[expect(async_fn_in_trait)] +pub trait AsyncReadableCache +where + K: Eq + Hash, + V: Clone, +{ + /// Fetch the value associated with the given key from the cache. If the key is not present in + /// the cache, return `None`. + async fn get(&self, key: &K) -> Option; + + /// Check if the cache contains the given key. + async fn contains(&self, key: &K) -> bool; +} + +/// A writable cache that can be written to asynchronously. +/// +/// Note that this trait does not require the cache to support manual eviction of entries. Many +/// caches only support automatic eviction (such as LRU eviction), and this trait is designed to +/// support those caches as well. +#[expect(async_fn_in_trait)] +pub trait AsyncWritableCache +where + K: Eq + Hash, + V: Clone, + E: std::error::Error, +{ + /// Insert the given value into the cache with the given key. + async fn insert(&self, key: &K, value: V) -> Result<(), E>; +} diff --git a/lib/io.rs b/lib/io.rs new file mode 100644 index 0000000..95a5d97 --- /dev/null +++ b/lib/io.rs @@ -0,0 +1,17 @@ +//! Random IO utilities + +use std::path::Path; + +/// Remove all files and directories in the given directory, but not the directory itself. +pub async fn remove_dir_contents(path: &Path) -> std::io::Result<()> { + let mut entries = tokio::fs::read_dir(path).await?; + while let Some(entry) = entries.next_entry().await? { + let path = entry.path(); + if tokio::fs::metadata(&path).await?.is_dir() { + tokio::fs::remove_dir_all(path).await?; + } else { + tokio::fs::remove_file(path).await?; + } + } + Ok(()) +} diff --git a/lib/lib.rs b/lib/lib.rs new file mode 100644 index 0000000..f7388bd --- /dev/null +++ b/lib/lib.rs @@ -0,0 +1,5 @@ +//! git-fs shared library. + +/// Caching primitives for git-fs. +pub mod cache; +pub mod io; diff --git a/src/daemon.rs b/src/daemon.rs index 0e70229..dac2d05 100644 --- a/src/daemon.rs +++ b/src/daemon.rs @@ -43,7 +43,7 @@ mod managed_fuse { name: org_name.clone(), api_key: org.api_key.clone(), }); - let mesa_fs = MesaFS::new(orgs, (config.uid, config.gid)); + let mesa_fs = MesaFS::new(orgs, (config.uid, config.gid), &config.cache); let fuse_adapter = FuserAdapter::new(mesa_fs, handle); let mount_opts = [ fuser::MountOption::FSName("git-fs".to_owned()), diff --git a/src/fs/mescloud/mod.rs b/src/fs/mescloud/mod.rs index a3ce17d..1a3cce8 100644 --- a/src/fs/mescloud/mod.rs +++ b/src/fs/mescloud/mod.rs @@ -10,6 +10,7 @@ use secrecy::ExposeSecret as _; use tracing::{Instrument as _, instrument, trace, warn}; use tracing_opentelemetry::OpenTelemetrySpanExt as _; +use crate::app_config::CacheConfig; use crate::fs::icache::bridge::HashMapBridge; use crate::fs::icache::{AsyncICache, FileTable, IcbResolver}; use crate::fs::r#trait::{ @@ -154,7 +155,11 @@ impl MesaFS { /// Create a new `MesaFS` instance. #[must_use] - pub fn new(orgs: impl Iterator, fs_owner: (u32, u32)) -> Self { + pub fn new( + orgs: impl Iterator, + fs_owner: (u32, u32), + cache: &CacheConfig, + ) -> Self { let resolver = MesaResolver { fs_owner, block_size: Self::BLOCK_SIZE, @@ -174,7 +179,7 @@ impl MesaFS { slots: orgs .map(|org_conf| { let client = build_mesa_client(org_conf.api_key.expose_secret()); - let org = OrgFs::new(org_conf.name, client, fs_owner); + let org = OrgFs::new(org_conf.name, client, fs_owner, cache.clone()); ChildSlot { inner: org, bridge: HashMapBridge::new(), diff --git a/src/fs/mescloud/org.rs b/src/fs/mescloud/org.rs index 968c748..1f3b8b5 100644 --- a/src/fs/mescloud/org.rs +++ b/src/fs/mescloud/org.rs @@ -17,6 +17,7 @@ use super::composite::{ChildSlot, CompositeFs}; use super::icache as mescloud_icache; use super::icache::MescloudICache; use super::repo::RepoFs; +use crate::app_config::CacheConfig; use crate::fs::icache::bridge::HashMapBridge; use crate::fs::icache::{AsyncICache, FileTable, IcbResolver}; use crate::fs::r#trait::{ @@ -94,6 +95,7 @@ pub struct OrgFs { composite: CompositeFs, /// Maps org-level owner-dir inodes to owner name (github only). owner_inodes: HashMap, + cache_config: CacheConfig, } impl OrgFs { @@ -186,7 +188,12 @@ impl OrgFs { } #[must_use] - pub fn new(name: String, client: MesaClient, fs_owner: (u32, u32)) -> Self { + pub fn new( + name: String, + client: MesaClient, + fs_owner: (u32, u32), + cache_config: CacheConfig, + ) -> Self { let resolver = OrgResolver { fs_owner, block_size: Self::BLOCK_SIZE, @@ -203,6 +210,7 @@ impl OrgFs { slots: Vec::new(), }, owner_inodes: HashMap::new(), + cache_config, } } @@ -296,7 +304,11 @@ impl OrgFs { repo_name.to_owned(), default_branch.to_owned(), self.composite.icache.fs_owner(), - ); + // TODO(markovejnovic): Unnecessary clone. Refactoring for clearer ownership semantics + // would be ideal. + self.cache_config.clone(), + ) + .await; let mut bridge = HashMapBridge::new(); bridge.insert_inode(ino, RepoFs::ROOT_INO); diff --git a/src/fs/mescloud/repo.rs b/src/fs/mescloud/repo.rs index 0d22196..11b334a 100644 --- a/src/fs/mescloud/repo.rs +++ b/src/fs/mescloud/repo.rs @@ -12,6 +12,10 @@ use mesa_dev::low_level::content::{Content, DirEntry as MesaDirEntry}; use num_traits::cast::ToPrimitive as _; use tracing::{Instrument as _, instrument, trace, warn}; +use git_fs::cache::fcache::FileCache; +use git_fs::cache::traits::{AsyncReadableCache as _, AsyncWritableCache as _}; + +use crate::app_config::CacheConfig; use crate::fs::icache::{AsyncICache, FileTable, IcbResolver}; use crate::fs::r#trait::{ DirEntry, DirEntryType, FileAttr, FileHandle, FileOpenOptions, FilesystemStats, Fs, Inode, @@ -183,6 +187,7 @@ pub struct RepoFs { file_table: FileTable, readdir_buf: Vec, open_files: HashMap, + file_cache: Option>, } impl RepoFs { @@ -190,12 +195,13 @@ impl RepoFs { const BLOCK_SIZE: u32 = 4096; /// Create a new `RepoFs` for a specific org and repo. - pub fn new( + pub async fn new( client: MesaClient, org_name: String, repo_name: String, ref_: String, fs_owner: (u32, u32), + cache_config: CacheConfig, ) -> Self { let resolver = RepoResolver { client: client.clone(), @@ -205,6 +211,23 @@ impl RepoFs { fs_owner, block_size: Self::BLOCK_SIZE, }; + + let file_cache = match cache_config.max_size { + Some(max_size) if max_size.as_u64() > 0 => { + let cache_dir = cache_config.path.join(&org_name).join(&repo_name); + let max_bytes = max_size.as_u64().try_into().unwrap_or(usize::MAX); + match FileCache::new(&cache_dir, max_bytes).await { + Ok(cache) => Some(cache), + Err(e) => { + warn!(error = ?e, org = %org_name, repo = %repo_name, + "failed to create file cache, continuing without caching",); + None + } + } + } + _ => None, + }; + Self { client, org_name, @@ -214,6 +237,7 @@ impl RepoFs { file_table: FileTable::new(), readdir_buf: Vec::new(), open_files: HashMap::new(), + file_cache, } } @@ -419,9 +443,29 @@ impl Fs for RepoFs { "read: inode {ino} has non-file cached attr" ); + // Try the file cache first. + if let Some(cache) = &self.file_cache + && let Some(data) = cache.get(&ino).await + { + let start = usize::try_from(offset) + .unwrap_or(data.len()) + .min(data.len()); + let end = start.saturating_add(size as usize).min(data.len()); + trace!( + ino, + fh, + cached = true, + decoded_len = data.len(), + start, + end, + "read content" + ); + return Ok(Bytes::copy_from_slice(&data[start..end])); + } + + // Cache miss — fetch from the Mesa API. let file_path = self.path_of_inode(ino).await; - // Non-root inodes must have a resolvable path. if ino != Self::ROOT_INO && file_path.is_none() { warn!(ino, "read: path_of_inode returned None for non-root inode"); return Err(ReadError::InodeNotFound); @@ -451,8 +495,17 @@ impl Fs for RepoFs { .unwrap_or(decoded.len()) .min(decoded.len()); let end = start.saturating_add(size as usize).min(decoded.len()); - trace!(ino, fh, path = ?file_path, decoded_len = decoded.len(), start, end, "read content"); - Ok(Bytes::copy_from_slice(&decoded[start..end])) + let result = Bytes::copy_from_slice(&decoded[start..end]); + trace!(ino, fh, cached = false, path = ?file_path, decoded_len = decoded.len(), start, end, "read content"); + + // Store the decoded content in the cache for future reads. + if let Some(cache) = &self.file_cache + && let Err(e) = cache.insert(&ino, decoded).await + { + warn!(error = ?e, ino, "failed to cache file content"); + } + + Ok(result) } #[instrument(name = "RepoFs::release", skip(self), fields(repo = %self.repo_name))] diff --git a/tests/common/mod.rs b/tests/common/mod.rs new file mode 100644 index 0000000..101f929 --- /dev/null +++ b/tests/common/mod.rs @@ -0,0 +1,59 @@ +#![allow(missing_docs, clippy::unwrap_used)] + +use std::sync::{Arc, Mutex}; +use std::time::Duration; + +use git_fs::cache::eviction::lru::{Deleter, LruEvictionTracker, Versioned}; + +/// Minimal versioned context for LRU tests. +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub struct MockCtx { + pub version: u64, +} + +impl Versioned for MockCtx { + fn version(&self) -> u64 { + self.version + } +} + +/// A mock deleter that records every (key, ctx) pair it receives. +#[derive(Clone)] +pub struct MockDeleter { + pub deleted: Arc>>, +} + +impl MockDeleter { + pub fn new() -> Self { + Self { + deleted: Arc::new(Mutex::new(Vec::new())), + } + } + + /// Returns just the keys that were deleted, in deletion order. + pub fn deleted_keys(&self) -> Vec { + self.deleted + .lock() + .unwrap() + .iter() + .map(|(k, _)| *k) + .collect() + } +} + +impl Deleter for MockDeleter { + async fn delete(&mut self, key: u64, ctx: MockCtx) { + self.deleted.lock().unwrap().push((key, ctx)); + } +} + +/// Poll `have_pending_culls()` until it returns false, or panic after timeout. +pub async fn wait_for_culls(tracker: &LruEvictionTracker) { + for _ in 0..200 { + if !tracker.have_pending_culls() { + return; + } + tokio::time::sleep(Duration::from_millis(5)).await; + } + panic!("culls did not complete within 1 second"); +} diff --git a/tests/docker/Dockerfile b/tests/docker/Dockerfile index c4f6c61..9821acc 100644 --- a/tests/docker/Dockerfile +++ b/tests/docker/Dockerfile @@ -9,6 +9,7 @@ RUN apt-get update && apt-get install -y \ WORKDIR /src COPY Cargo.toml Cargo.lock build.rs ./ COPY .git/ .git/ +COPY lib/ lib/ COPY src/ src/ RUN cargo build --release diff --git a/tests/fcache_concurrent.rs b/tests/fcache_concurrent.rs new file mode 100644 index 0000000..79426f1 --- /dev/null +++ b/tests/fcache_concurrent.rs @@ -0,0 +1,153 @@ +#![allow(clippy::unwrap_used, clippy::expect_used, missing_docs)] + +use std::sync::Arc; + +use git_fs::cache::fcache::FileCache; +use git_fs::cache::traits::{AsyncReadableCache as _, AsyncWritableCache as _}; +use tokio::task::JoinSet; + +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn concurrent_inserts_different_keys() { + let tmp = tempfile::tempdir().unwrap(); + let cache = Arc::new( + FileCache::::new(tmp.path(), 1024 * 1024) + .await + .unwrap(), + ); + + let mut set = JoinSet::new(); + for i in 0u64..100 { + let cache = Arc::clone(&cache); + set.spawn(async move { + cache + .insert(&i, format!("value-{i}").into_bytes()) + .await + .unwrap(); + }); + } + while set.join_next().await.is_some() {} + + // Every key should be present with the correct value. + for i in 0u64..100 { + let val = cache.get(&i).await; + assert_eq!( + val.as_deref(), + Some(format!("value-{i}").as_bytes()), + "key {i} missing or has wrong value" + ); + } +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn concurrent_inserts_same_key() { + let tmp = tempfile::tempdir().unwrap(); + let cache = Arc::new( + FileCache::::new(tmp.path(), 1024 * 1024) + .await + .unwrap(), + ); + + let mut set = JoinSet::new(); + for i in 0u64..50 { + let cache = Arc::clone(&cache); + set.spawn(async move { + cache + .insert(&0, format!("value-{i}").into_bytes()) + .await + .unwrap(); + }); + } + while set.join_next().await.is_some() {} + + // The final value must be one of the inserted values (last writer wins). + let val = cache.get(&0).await; + assert!(val.is_some(), "key 0 should still be present"); + + let val_str = String::from_utf8(val.unwrap()).unwrap(); + assert!( + val_str.starts_with("value-"), + "value should be one of the inserted values, got: {val_str}" + ); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn concurrent_reads_during_writes() { + let tmp = tempfile::tempdir().unwrap(); + let cache = Arc::new( + FileCache::::new(tmp.path(), 1024 * 1024) + .await + .unwrap(), + ); + + // Pre-populate with initial value. + cache.insert(&0, b"initial".to_vec()).await.unwrap(); + + let mut set = JoinSet::new(); + + // Spawn 20 writers overwriting key 0. + for i in 0u64..20 { + let cache = Arc::clone(&cache); + set.spawn(async move { + cache + .insert(&0, format!("write-{i}").into_bytes()) + .await + .unwrap(); + }); + } + + // Spawn 20 readers -- each get() should return None or valid UTF-8. + for _ in 0..20 { + let cache = Arc::clone(&cache); + set.spawn(async move { + if let Some(val) = cache.get(&0).await { + String::from_utf8(val) + .expect("concurrent read returned corrupted (non-UTF-8) data"); + } + }); + } + + while set.join_next().await.is_some() {} +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn concurrent_inserts_with_eviction() { + // Small cache: 200 bytes. Sequential warmup fills the LRU worker's map, + // then concurrent inserts trigger eviction. + let tmp = tempfile::tempdir().unwrap(); + let cache = Arc::new(FileCache::::new(tmp.path(), 200).await.unwrap()); + + // Warmup: insert 10 entries sequentially so the LRU worker registers them + // in its ordered map. This ensures eviction has candidates to evict. + for i in 0u64..10 { + cache.insert(&i, vec![b'x'; 20]).await.unwrap(); + } + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + + // Concurrent phase: 40 more inserts push the cache well over the 200-byte + // limit and trigger eviction against the warmed-up LRU map. + let mut set = JoinSet::new(); + for i in 10u64..50 { + let cache = Arc::clone(&cache); + set.spawn(async move { + cache.insert(&i, vec![b'x'; 20]).await.unwrap(); + }); + } + while set.join_next().await.is_some() {} + + // Give eviction time to settle. + tokio::time::sleep(std::time::Duration::from_millis(200)).await; + + // Not all keys can fit (50 x 20 = 1000 > 200). Some must have been evicted. + let mut present = 0usize; + for i in 0u64..50 { + if cache.get(&i).await.is_some() { + present += 1; + } + } + + assert!( + present < 50, + "expected some evictions under pressure, but all 50 entries survived" + ); + assert!(present > 0, "at least some entries should still be present"); +} diff --git a/tests/fcache_correctness.rs b/tests/fcache_correctness.rs new file mode 100644 index 0000000..4731258 --- /dev/null +++ b/tests/fcache_correctness.rs @@ -0,0 +1,195 @@ +#![allow(clippy::unwrap_used, missing_docs)] + +use git_fs::cache::fcache::{FileCache, InvalidRootPathError}; +use git_fs::cache::traits::{AsyncReadableCache as _, AsyncWritableCache as _}; + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn new_creates_directory_and_marker() { + let tmp = tempfile::tempdir().unwrap(); + let cache_path = tmp.path().join("cache"); + + let _cache = FileCache::::new(&cache_path, 4096).await.unwrap(); + + assert!(cache_path.join(".gitfs_cache").exists()); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn new_with_existing_empty_directory() { + let tmp = tempfile::tempdir().unwrap(); + // tmp.path() is an existing empty directory + let _cache = FileCache::::new(tmp.path(), 4096).await.unwrap(); + + assert!(tmp.path().join(".gitfs_cache").exists()); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn new_rejects_file_path() { + let tmp = tempfile::tempdir().unwrap(); + let file_path = tmp.path().join("not_a_dir"); + std::fs::write(&file_path, b"hello").unwrap(); + + let result = FileCache::::new(&file_path, 4096).await; + + assert!( + matches!(result, Err(InvalidRootPathError::NotADirectory(_))), + "expected NotADirectory, got {:?}", + result + .as_ref() + .map(|_| "Ok(...)") + .map_err(|e| format!("{e:?}")) + ); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn new_rejects_non_empty_unmarked_directory() { + let tmp = tempfile::tempdir().unwrap(); + // Place a foreign file so the directory is non-empty and has no marker + std::fs::write(tmp.path().join("foreign.txt"), b"data").unwrap(); + + let result = FileCache::::new(tmp.path(), 4096).await; + + assert!( + matches!(result, Err(InvalidRootPathError::RootPathUnsafeCache(_))), + "expected RootPathUnsafeCache, got {:?}", + result + .as_ref() + .map(|_| "Ok(...)") + .map_err(|e| format!("{e:?}")) + ); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn new_cleans_previously_used_directory() { + let tmp = tempfile::tempdir().unwrap(); + let cache_path = tmp.path().join("cache"); + + // First use: create cache and insert a value + { + let cache = FileCache::::new(&cache_path, 4096).await.unwrap(); + cache.insert(&1, b"value-1".to_vec()).await.unwrap(); + // FileCache::drop removes the directory + } + + // Recreate the directory with a marker to simulate a restart + std::fs::create_dir_all(&cache_path).unwrap(); + std::fs::write(cache_path.join(".gitfs_cache"), b"").unwrap(); + std::fs::write(cache_path.join("leftover"), b"stale").unwrap(); + + // Second use: should clean old contents + let cache = FileCache::::new(&cache_path, 4096).await.unwrap(); + + // Old data should not be accessible + assert!(cache.get(&1).await.is_none()); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn insert_and_get_returns_value() { + let tmp = tempfile::tempdir().unwrap(); + let cache = FileCache::::new(tmp.path(), 4096).await.unwrap(); + + cache.insert(&42, b"hello world".to_vec()).await.unwrap(); + + let val = cache.get(&42).await; + assert_eq!(val.as_deref(), Some(b"hello world".as_slice())); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn get_missing_key_returns_none() { + let tmp = tempfile::tempdir().unwrap(); + let cache = FileCache::::new(tmp.path(), 4096).await.unwrap(); + + assert!(cache.get(&999).await.is_none()); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn contains_reflects_presence() { + let tmp = tempfile::tempdir().unwrap(); + let cache = FileCache::::new(tmp.path(), 4096).await.unwrap(); + + assert!(!cache.contains(&1).await); + + cache.insert(&1, b"data".to_vec()).await.unwrap(); + + assert!(cache.contains(&1).await); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn insert_overwrites_previous_value() { + let tmp = tempfile::tempdir().unwrap(); + let cache = FileCache::::new(tmp.path(), 4096).await.unwrap(); + + cache.insert(&1, b"first".to_vec()).await.unwrap(); + cache.insert(&1, b"second".to_vec()).await.unwrap(); + + let val = cache.get(&1).await; + assert_eq!(val.as_deref(), Some(b"second".as_slice())); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn insert_empty_value() { + let tmp = tempfile::tempdir().unwrap(); + let cache = FileCache::::new(tmp.path(), 4096).await.unwrap(); + + cache.insert(&1, Vec::new()).await.unwrap(); + + let val = cache.get(&1).await; + assert_eq!(val.as_deref(), Some(b"".as_slice())); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn eviction_removes_oldest_entries() { + // max_size = 100 bytes. Insert 5 × 30-byte entries (150 bytes total). + // After the last insert, eviction should have removed at least one early entry. + let tmp = tempfile::tempdir().unwrap(); + let cache = FileCache::::new(tmp.path(), 100).await.unwrap(); + + for i in 0u64..5 { + let value = vec![b'x'; 30]; + cache.insert(&i, value).await.unwrap(); + } + + // Give the async eviction worker time to complete deletions. + tokio::time::sleep(std::time::Duration::from_millis(200)).await; + + // At least one of the earliest keys should have been evicted. + let mut present_count = 0; + for i in 0u64..5 { + if cache.get(&i).await.is_some() { + present_count += 1; + } + } + + assert!( + present_count < 5, + "expected at least one eviction, but all 5 entries are still present" + ); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn entry_larger_than_max_size_still_inserted() { + let tmp = tempfile::tempdir().unwrap(); + let cache = FileCache::::new(tmp.path(), 10).await.unwrap(); + + // Value is 50 bytes, far exceeding max_size of 10. + let big_value = vec![b'A'; 50]; + cache.insert(&1, big_value.clone()).await.unwrap(); + + let val = cache.get(&1).await; + assert_eq!(val, Some(big_value)); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn drop_removes_cache_directory() { + let tmp = tempfile::tempdir().unwrap(); + let cache_path = tmp.path().join("cache"); + + { + let cache = FileCache::::new(&cache_path, 4096).await.unwrap(); + cache.insert(&1, b"data".to_vec()).await.unwrap(); + } + // After drop, the cache directory should be gone. + assert!( + !cache_path.exists(), + "cache directory should have been removed on drop" + ); +} diff --git a/tests/lru_concurrent.rs b/tests/lru_concurrent.rs new file mode 100644 index 0000000..247a863 --- /dev/null +++ b/tests/lru_concurrent.rs @@ -0,0 +1,140 @@ +#![allow(clippy::unwrap_used, clippy::similar_names, missing_docs)] + +mod common; + +use std::sync::Arc; +use std::time::Duration; + +use common::{MockCtx, MockDeleter, wait_for_culls}; +use git_fs::cache::eviction::lru::LruEvictionTracker; +use tokio::task::JoinSet; + +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn concurrent_upserts() { + let deleter = MockDeleter::new(); + let tracker = Arc::new(LruEvictionTracker::spawn(deleter.clone(), 256)); + + let mut set = JoinSet::new(); + for i in 0u64..100 { + let tracker = Arc::clone(&tracker); + set.spawn(async move { + tracker.upsert(i, MockCtx { version: i }); + }); + } + while set.join_next().await.is_some() {} + + // Let the worker process all messages. + tokio::time::sleep(Duration::from_millis(100)).await; + + // Evict all 100 — every key should appear in the deletion list exactly once. + assert!(tracker.try_cull(100)); + wait_for_culls(&tracker).await; + + let mut deleted = deleter.deleted_keys(); + deleted.sort_unstable(); + let expected: Vec = (0..100).collect(); + assert_eq!(deleted, expected, "all 100 keys should have been evicted"); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn concurrent_access_notifications() { + let deleter = MockDeleter::new(); + let tracker = Arc::new(LruEvictionTracker::spawn(deleter.clone(), 256)); + + // Insert keys 1..=10 + for i in 1u64..=10 { + tracker.upsert(i, MockCtx { version: i }); + } + tokio::time::sleep(Duration::from_millis(100)).await; + + // Concurrently access keys 1..=5, making them "recently used." + let mut set = JoinSet::new(); + for i in 1u64..=5 { + let tracker = Arc::clone(&tracker); + set.spawn(async move { + for _ in 0..10 { + tracker.access(i); + } + }); + } + while set.join_next().await.is_some() {} + tokio::time::sleep(Duration::from_millis(100)).await; + + // Evict 5 keys — keys 6..=10 should be evicted (they were never accessed). + assert!(tracker.try_cull(5)); + wait_for_culls(&tracker).await; + + let mut deleted = deleter.deleted_keys(); + deleted.sort_unstable(); + assert_eq!( + deleted, + vec![6, 7, 8, 9, 10], + "non-accessed keys 6..=10 should be evicted first" + ); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn concurrent_cull_requests() { + let deleter = MockDeleter::new(); + let tracker = Arc::new(LruEvictionTracker::spawn(deleter.clone(), 256)); + + for i in 0u64..20 { + tracker.upsert(i, MockCtx { version: i }); + } + tokio::time::sleep(Duration::from_millis(100)).await; + + // Fire 10 concurrent cull requests, each for 2 keys. + let mut set = JoinSet::new(); + for _ in 0..10 { + let tracker = Arc::clone(&tracker); + set.spawn(async move { + let _ = tracker.try_cull(2); + }); + } + while set.join_next().await.is_some() {} + + // Wait for all culls to settle. + wait_for_culls(&tracker).await; + + // Some keys should have been evicted (exact count depends on channel capacity + // and scheduling, but at least some culls should succeed). + let deleted = deleter.deleted_keys(); + assert!( + deleted.len() >= 2, + "at least one cull request (2 keys) should have succeeded" + ); + assert!(deleted.len() <= 20, "cannot evict more keys than exist"); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn high_throughput_message_storm() { + let deleter = MockDeleter::new(); + // Small channel to stress the try_send → spawn fallback path. + let tracker = Arc::new(LruEvictionTracker::spawn(deleter.clone(), 4)); + + let mut set = JoinSet::new(); + + // 50 tasks each upsert and access rapidly. + for i in 0u64..50 { + let tracker = Arc::clone(&tracker); + set.spawn(async move { + tracker.upsert(i, MockCtx { version: i }); + tracker.access(i); + tracker.upsert(i, MockCtx { version: i + 100 }); + }); + } + while set.join_next().await.is_some() {} + + // Let messages drain. + tokio::time::sleep(Duration::from_millis(200)).await; + + // Evict everything. + assert!(tracker.try_cull(50)); + wait_for_culls(&tracker).await; + + // All 50 unique keys should have been evicted exactly once. + let mut deleted = deleter.deleted_keys(); + deleted.sort_unstable(); + deleted.dedup(); + assert_eq!(deleted.len(), 50, "all 50 unique keys should be evicted"); +} diff --git a/tests/lru_correctness.rs b/tests/lru_correctness.rs new file mode 100644 index 0000000..e9a7bf1 --- /dev/null +++ b/tests/lru_correctness.rs @@ -0,0 +1,187 @@ +#![allow(clippy::unwrap_used, clippy::similar_names, missing_docs)] + +mod common; + +use common::{MockCtx, MockDeleter, wait_for_culls}; +use git_fs::cache::eviction::lru::LruEvictionTracker; + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn evicts_least_recently_inserted() { + let deleter = MockDeleter::new(); + let tracker = LruEvictionTracker::spawn(deleter.clone(), 64); + + // Insert keys 1, 2, 3 in order. Key 1 is the oldest. + tracker.upsert(1, MockCtx { version: 1 }); + tracker.upsert(2, MockCtx { version: 2 }); + tracker.upsert(3, MockCtx { version: 3 }); + + // Let the worker process the upsert messages. + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + + // Evict 1 key — should be key 1 (least recently inserted). + assert!(tracker.try_cull(1), "try_cull should succeed"); + wait_for_culls(&tracker).await; + + let deleted = deleter.deleted_keys(); + assert_eq!(deleted, vec![1], "key 1 should be evicted first"); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn access_moves_key_to_back() { + let deleter = MockDeleter::new(); + let tracker = LruEvictionTracker::spawn(deleter.clone(), 64); + + tracker.upsert(1, MockCtx { version: 1 }); + tracker.upsert(2, MockCtx { version: 2 }); + tracker.upsert(3, MockCtx { version: 3 }); + + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + + // Access key 1, moving it to the back of the LRU queue. + tracker.access(1); + + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + + // Evict 1 key — should now be key 2 (key 1 was refreshed). + assert!(tracker.try_cull(1)); + wait_for_culls(&tracker).await; + + let deleted = deleter.deleted_keys(); + assert_eq!( + deleted, + vec![2], + "key 2 should be evicted since key 1 was accessed" + ); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn try_cull_returns_true_on_success() { + let deleter = MockDeleter::new(); + let tracker = LruEvictionTracker::spawn(deleter, 64); + + tracker.upsert(1, MockCtx { version: 1 }); + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + + let result = tracker.try_cull(1); + assert!(result, "try_cull should return true when channel has space"); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn have_pending_culls_reflects_state() { + let deleter = MockDeleter::new(); + let tracker = LruEvictionTracker::spawn(deleter, 64); + + // No culls requested yet. + assert!(!tracker.have_pending_culls()); + + tracker.upsert(1, MockCtx { version: 1 }); + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + + // Request a cull. + let _ = tracker.try_cull(1); + + // Immediately after try_cull, pending work should be true + // (unless the worker already processed it, which is unlikely but possible). + // Wait for completion and verify it clears. + wait_for_culls(&tracker).await; + assert!(!tracker.have_pending_culls()); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn stale_version_ignored() { + let deleter = MockDeleter::new(); + let tracker = LruEvictionTracker::spawn(deleter.clone(), 64); + + // Upsert key 1 with version 5, then send a stale version 3. + tracker.upsert(1, MockCtx { version: 5 }); + tracker.upsert(1, MockCtx { version: 3 }); + + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + + // Evict — the context should carry version 5 (stale version 3 was dropped). + assert!(tracker.try_cull(1)); + wait_for_culls(&tracker).await; + + let deleted = deleter.deleted.lock().unwrap(); + assert_eq!(deleted.len(), 1); + assert_eq!( + deleted[0].1.version, 5, + "should carry version 5, not stale version 3" + ); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn evict_more_than_available() { + let deleter = MockDeleter::new(); + let tracker = LruEvictionTracker::spawn(deleter.clone(), 64); + + tracker.upsert(1, MockCtx { version: 1 }); + tracker.upsert(2, MockCtx { version: 2 }); + + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + + // Request eviction of 100 keys but only 2 exist. + assert!(tracker.try_cull(100)); + wait_for_culls(&tracker).await; + + let deleted = deleter.deleted_keys(); + assert_eq!(deleted.len(), 2, "should only evict the 2 available keys"); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn multiple_eviction_rounds() { + let deleter = MockDeleter::new(); + let tracker = LruEvictionTracker::spawn(deleter.clone(), 64); + + for i in 1u64..=6 { + tracker.upsert(i, MockCtx { version: i }); + } + + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + + // Round 1: evict 2 + assert!(tracker.try_cull(2)); + wait_for_culls(&tracker).await; + + // Round 2: evict 2 more + assert!(tracker.try_cull(2)); + wait_for_culls(&tracker).await; + + let deleted = deleter.deleted_keys(); + assert_eq!( + deleted.len(), + 4, + "should have evicted 4 total across 2 rounds" + ); + // First round evicts keys 1, 2; second round evicts keys 3, 4. + // Deletion tasks within a round may complete in any order, so sort each batch. + let mut round1: Vec<_> = deleted[..2].to_vec(); + round1.sort_unstable(); + let mut round2: Vec<_> = deleted[2..].to_vec(); + round2.sort_unstable(); + assert_eq!(round1, vec![1, 2]); + assert_eq!(round2, vec![3, 4]); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn try_cull_returns_false_when_channel_full() { + let deleter = MockDeleter::new(); + // Channel size 1 — fills immediately. + let tracker = LruEvictionTracker::spawn(deleter, 1); + + tracker.upsert(1, MockCtx { version: 1 }); + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + + // First cull takes the only channel slot. + assert!(tracker.try_cull(1), "first try_cull should succeed"); + + // Second cull should fail — channel is full. + assert!( + !tracker.try_cull(1), + "second try_cull should fail when channel is full" + ); + + // After the worker drains, pending culls should eventually clear. + wait_for_culls(&tracker).await; + assert!(!tracker.have_pending_culls()); +}