From c396e32301135064b12b3ec01525aab16cb11ef4 Mon Sep 17 00:00:00 2001 From: Marko Vejnovic Date: Tue, 17 Feb 2026 20:08:14 -0800 Subject: [PATCH 1/2] Start mega cleanup --- lib/fs/async_fs.rs | 203 +++++++++++++++++++++++++++++++++++++++++++++ lib/fs/dcache.rs | 63 ++++++++++++++ lib/fs/mod.rs | 113 +++++++++++++++++++++++++ lib/lib.rs | 2 + lib/tokex.rs | 186 +++++++++++++++++++++++++++++++++++++++++ src/fs/trait.rs | 34 +++++++- src/trc.rs | 150 +++++++++++++++------------------ 7 files changed, 662 insertions(+), 89 deletions(-) create mode 100644 lib/fs/async_fs.rs create mode 100644 lib/fs/dcache.rs create mode 100644 lib/fs/mod.rs create mode 100644 lib/tokex.rs diff --git a/lib/fs/async_fs.rs b/lib/fs/async_fs.rs new file mode 100644 index 0000000..494ca82 --- /dev/null +++ b/lib/fs/async_fs.rs @@ -0,0 +1,203 @@ +//! Async INode Table which supports concurrent access and modification. + +use std::{ffi::OsStr, sync::Arc}; + +use crate::fs::{FileHandle, INode, INodeType, InodeAddr, dcache::DCache}; + +use scc; +use tokio::sync::{Notify, OnceCell, watch}; + +/// Represents an INode that has been loaded by the kernel. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)] +pub struct LoadedAddr(pub InodeAddr); + +/// Represents an open file handle that has been loaded by the kernel. +pub struct OpenFile(pub FileHandle); + +/// A trait representing a data provider for the `AsyncFs`. This trait abstracts over the +/// underlying filesystem or storage mechanism that provides inode data to the `AsyncFs` on cache +/// misses. +pub trait FsDataProvider: Clone + Send + Sync + 'static { + type LookupFuture: Future>; + + async fn lookup(&self, parent: INode, name: &OsStr) -> Result; +} + +/// An asynchronous implementation of a table mapping `InodeAddr` to `InodeData`. +pub struct AsyncFs { + /// The underlying concurrent hash map storing the inode data. + inode_table: scc::HashMap, + + /// Cache for directory entries, mapping a parent `InodeAddr` + directory_cache: DCache, + + lookups_in_flight: scc::HashMap, + inodes_in_flight: scc::HashMap, + + /// The data provider used to fetch inode data on cache misses. + data_provider: DP, +} + +impl AsyncFs { + /// Get the total number of inodes currently stored in the table. + pub fn inode_count(&self) -> usize { + self.inode_table.len() + } + + /// Asynchronously look up an `Inode` by its `InodeAddr`. + /// + /// Args: + /// + /// - `parent`: The `LoadedAddr` of the parent directory containing the inode to look up. + /// - `name`: The name of the inode to look up within the parent directory. + pub async fn lookup(&self, parent: LoadedAddr, name: &OsStr) -> Result { + if cfg!(debug_assertions) { + let parent_ino = self.loaded_inode(parent).await?; + + debug_assert!( + matches!(parent_ino.itype, INodeType::Directory), + "parent inode should be a directory" + ); + } + + if let Some(dentry) = self.directory_cache.lookup(parent, name) { + // The directory cache has an entry for this name. Let's load it from the main table + // and return it. If the entry is in the dcache, then that necessarily means the inode + // is loaded in the main table. + return self.loaded_inode(dentry.ino).await; + } + + // The directory entry does NOT exist in the cache, so we need to perform a lookup. + let parent_ino = self.loaded_inode(parent).await?; + self.data_provider.lookup(parent_ino, name).await + } + + /// Asynchronously get the attributes of an inode by its `InodeAddr`. + /// + /// Args: + /// - `ino`: The `LoadedAddr` of the inode whose attributes to retrieve. + /// - `fh`: An optional `FileHandle` that may be used to optimize attribute retrieval for open + /// files. + pub async fn getattr( + &mut self, + ino: LoadedAddr, + fh: Option, + ) -> Result<(), std::io::Error> { + unimplemented!(); + } + + /// Attempt to load an `INode` from the table. + /// + /// # Notes + /// + /// - This expects the inode to be loaded and available in the table. + /// - Will handle inodes that are currently in-flight by awaiting their completion, but this is + /// not expected and will fail in debug mode. + async fn loaded_inode(&self, addr: LoadedAddr) -> Result { + debug_assert!( + self.inode_table.contains_sync(&addr.0) || self.inodes_in_flight.contains_sync(&addr.0) + ); + + match self.inode_table.get_async(&addr.0).await { + Some(v) => Ok(*v), + None => { + // We didn't find the inode in the main table, so it may be in-flight. + match self.inodes_in_flight.get_async(&addr.0).await { + Some(notifier) => { + // Let's await the notifier for this inode to be loaded. Once it's + // notified, it's safe for us to re-read the main table and expect the + // inode to be present. + notifier.notified().await; + + match self.inode_table.get_async(&addr.0).await { + Some(v) => Ok(*v), + None => { + debug_assert!( + false, + "inode should have been loaded after notification: {:?}", + addr.0 + ); + + Err(std::io::Error::new( + std::io::ErrorKind::NotFound, + format!("inode not found after notification: {:?}", addr.0), + )) + } + } + } + None => { + // The inode was never requested for loading, which means we have a + // programming bug. + debug_assert!( + false, + "inode not found in main table or in-flight map: {:?}", + addr.0 + ); + + Err(std::io::Error::new( + std::io::ErrorKind::NotFound, + format!( + "inode not found in main table or in-flight map: {:?}", + addr.0 + ), + )) + } + } + } + } + } + + async fn inode(&self, addr: LoadedAddr) -> Result { + debug_assert!( + self.inode_table.contains_sync(&addr.0) || self.inodes_in_flight.contains_sync(&addr.0) + ); + + match self.inode_table.get_async(&addr.0).await { + Some(v) => Ok(*v), + None => { + // We didn't find the inode in the main table, so it may be in-flight. + match self.inodes_in_flight.get_async(&addr.0).await { + Some(notifier) => { + // Let's await the notifier for this inode to be loaded. Once it's + // notified, it's safe for us to re-read the main table and expect the + // inode to be present. + notifier.notified().await; + + match self.inode_table.get_async(&addr.0).await { + Some(v) => Ok(*v), + None => { + debug_assert!( + false, + "inode should have been loaded after notification: {:?}", + addr.0 + ); + + Err(std::io::Error::new( + std::io::ErrorKind::NotFound, + format!("inode not found after notification: {:?}", addr.0), + )) + } + } + } + None => { + // The inode was never requested for loading, which means we have a + // programming bug. + debug_assert!( + false, + "inode not found in main table or in-flight map: {:?}", + addr.0 + ); + + Err(std::io::Error::new( + std::io::ErrorKind::NotFound, + format!( + "inode not found in main table or in-flight map: {:?}", + addr.0 + ), + )) + } + } + } + } + } +} diff --git a/lib/fs/dcache.rs b/lib/fs/dcache.rs new file mode 100644 index 0000000..f642629 --- /dev/null +++ b/lib/fs/dcache.rs @@ -0,0 +1,63 @@ +use std::ffi::{OsStr, OsString}; +use std::ops::Bound; + +use scc::Guard; + +use crate::fs::async_fs::LoadedAddr; + +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)] +pub struct DKey { + pub parent_ino: LoadedAddr, + pub name: OsString, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct DValue { + pub ino: LoadedAddr, + pub is_dir: bool, +} + +#[derive(Default)] +pub struct DCache { + cache: scc::TreeIndex, +} + +impl DCache { + pub fn new() -> Self { + Self::default() + } + + pub fn lookup(&self, parent_ino: LoadedAddr, name: &OsStr) -> Option { + let key = DKey { + parent_ino, + name: name.to_os_string(), + }; + self.cache.peek_with(&key, |_, v| v.clone()) + } + + pub fn insert(&self, parent_ino: LoadedAddr, name: OsString, ino: LoadedAddr, is_dir: bool) { + // Overwrites are expected when refreshing cached entries. + let _ = self + .cache + .insert_sync(DKey { parent_ino, name }, DValue { ino, is_dir }); + } + + pub fn readdir(&self, parent_ino: LoadedAddr) -> Vec<(OsString, DValue)> { + let lower = Bound::Included(DKey { + parent_ino, + name: OsString::new(), + }); + let upper = match parent_ino.0.checked_add(1) { + Some(next) => Bound::Excluded(DKey { + parent_ino: LoadedAddr(next), + name: OsString::new(), + }), + None => Bound::Unbounded, + }; + let guard = Guard::new(); + self.cache + .range((lower, upper), &guard) + .map(|(key, value)| (key.name.clone(), value.clone())) + .collect() + } +} diff --git a/lib/fs/mod.rs b/lib/fs/mod.rs new file mode 100644 index 0000000..f6143e7 --- /dev/null +++ b/lib/fs/mod.rs @@ -0,0 +1,113 @@ +//! Useful filesystem generalizations. +pub mod async_fs; +pub mod dcache; + +use std::time::SystemTime; + +use bitflags::bitflags; + +/// Type representing an inode identifier. +pub type InodeAddr = u64; + +/// Type representing a file handle. +pub type FileHandle = u64; + +bitflags! { + /// Permission bits for an inode, similar to Unix file permissions. + #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] + pub struct InodePerms: u16 { + // Other + const OTHER_EXECUTE = 1 << 0; + const OTHER_WRITE = 1 << 1; + const OTHER_READ = 1 << 2; + + // Group + const GROUP_EXECUTE = 1 << 3; + const GROUP_WRITE = 1 << 4; + const GROUP_READ = 1 << 5; + + // Owner + const OWNER_EXECUTE = 1 << 6; + const OWNER_WRITE = 1 << 7; + const OWNER_READ = 1 << 8; + + // Special bits + const STICKY = 1 << 9; + const SETGID = 1 << 10; + const SETUID = 1 << 11; + + const OTHER_RWX = Self::OTHER_READ.bits() + | Self::OTHER_WRITE.bits() + | Self::OTHER_EXECUTE.bits(); + const GROUP_RWX = Self::GROUP_READ.bits() + | Self::GROUP_WRITE.bits() + | Self::GROUP_EXECUTE.bits(); + const OWNER_RWX = Self::OWNER_READ.bits() + | Self::OWNER_WRITE.bits() + | Self::OWNER_EXECUTE.bits(); + } +} + +bitflags! { + /// Flags for opening a file, similar to Unix open(2) flags. + #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] + pub struct OpenFlags: i32 { + // Access modes (mutually exclusive) + const RDONLY = libc::O_RDONLY; + const WRONLY = libc::O_WRONLY; + const RDWR = libc::O_RDWR; + + // Creation/status flags + const APPEND = libc::O_APPEND; + const TRUNC = libc::O_TRUNC; + const CREAT = libc::O_CREAT; + const EXCL = libc::O_EXCL; + + // Behavior flags + const NONBLOCK = libc::O_NONBLOCK; + const SYNC = libc::O_SYNC; + const DSYNC = libc::O_DSYNC; + const NOFOLLOW = libc::O_NOFOLLOW; + const CLOEXEC = libc::O_CLOEXEC; + const DIRECTORY = libc::O_DIRECTORY; + + #[cfg(target_os = "linux")] + const NOATIME = libc::O_NOATIME; + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub enum INodeType { + File, + Directory, +} + +/// Representation of an inode. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub struct INode { + /// The address of this inode, which serves as its unique identifier. + pub addr: InodeAddr, + /// The permissions associated with this inode, represented as a bitfield. + pub permissions: InodePerms, + /// The user ID of the owner of this inode. + pub uid: u32, + /// The group ID of the owner of this inode. + pub gid: u32, + /// The time this inode was created at. + pub create_time: SystemTime, + /// The time this inode was last modified at. + pub last_modified_at: SystemTime, + /// The parent inode address, if any. This is `None` for the root inode. + pub parent: Option, + /// The size of the file represented by this inode, in bytes. + pub size: usize, + /// Additional information about the type of this inode (e.g., file vs directory). + pub itype: INodeType, +} + +impl INode { + /// Check if this inode is the root inode (i.e., has no parent). + pub fn is_root(&self) -> bool { + self.parent.is_none() + } +} diff --git a/lib/lib.rs b/lib/lib.rs index f7388bd..1b6985e 100644 --- a/lib/lib.rs +++ b/lib/lib.rs @@ -2,4 +2,6 @@ /// Caching primitives for git-fs. pub mod cache; +pub mod fs; pub mod io; +pub mod tokex; diff --git a/lib/tokex.rs b/lib/tokex.rs new file mode 100644 index 0000000..5ff8bd9 --- /dev/null +++ b/lib/tokex.rs @@ -0,0 +1,186 @@ +//! Random tokio extensions. + +use std::{fmt::Debug, hash::Hash, pin::Pin, sync::Arc}; + +use tokio::{ + sync::{Notify, oneshot}, + task::JoinHandle, +}; + +/// Pool of future which run in the background and can reference the parent structure. +pub struct BackgroundTaskPoolMap { + values: scc::HashMap, + notifications: scc::HashMap>, + handles: scc::HashMap>, +} + +impl BackgroundTaskPoolMap { + fn spawn<'a>(&'a self, key: K, fut: Pin + Send + 'a>>) + where + K: Eq + Hash + Debug + Send + Sync + Copy + 'static, + V: Send + Sync + 'static, + { + let (tx, rx) = oneshot::channel(); + + // We also need to insert a notify for this key so that waiters can await the completion of + // this task. + if let Err((key, _)) = self.notifications.insert_sync(key, Arc::new(Notify::new())) { + debug_assert!( + false, + "task with key {:?} already exists in notifications map", + key + ); + + return; + } + + // SAFETY: The Drop implementation here guarantees that the future will be aborted before + // the parent structure is dropped, so the future will never outlive the parent structure. + // The caller must also ensure that the future does not reference any data that may be + // modified or dropped while the future is running. + let cool_fut: Pin + Send + 'static>> = + unsafe { std::mem::transmute(fut) }; + + let cool_self: &'static Self = unsafe { std::mem::transmute(self) }; + + let handle = tokio::task::spawn(async move { + let new_val = cool_fut.await; + + // Cool beans, the future completed, let's populate the values map. + cool_self.values.upsert_async(key, new_val).await; + + // After the future completes, we notify any waiters that the task associated with the + // key has completed. + match cool_self.notifications.get_async(&key).await { + Some(notify) => notify.notify_waiters(), + None => { + debug_assert!( + false, + "task completed but no notify found for key: {:?}", + key + ); + } + } + + // Let's notify the waiters. + if let Some(notify) = cool_self.notifications.get_async(&key).await { + notify.notify_waiters(); + } else { + debug_assert!( + false, + "task completed but no notify found for key: {:?}", + key + ); + } + + rx.await.unwrap(); + + // Then, remove the handle from the handles map. + let res = cool_self.handles.remove_async(&tokio::task::id()).await; + debug_assert!( + res.is_some(), + "task completed but no handle found for task id: {:?}", + tokio::task::id() + ); + }); + + let handle_id = handle.id(); + if let Err((handle_id, handle)) = self.handles.insert_sync(handle_id, handle) { + debug_assert!( + false, + "task with id {:?} already exists in handles map", + handle_id + ); + + // This is a bug. + // + // Tokio guarantees that task IDs are unique, so this should never happen, but if it + // does we can try to recover anyways. + // + self.handles.get_sync(&handle_id).unwrap().abort(); + self.handles.insert_sync(handle_id, handle).unwrap(); + } + + // Notify the future that it can modify + tx.send(()).unwrap(); + } + + /// Get the value associated with the key, waiting for the background task to complete if + /// necessary. + /// + /// Spawns a background task if there is no task in-flight for the key, and returns the value + /// once the task completes. + pub async fn get( + &self, + key: &K, + factory: impl FnOnce() -> Pin + Send + 'static>>, + ) -> V + where + K: Eq + Hash + Debug + Send + Sync + Copy + 'static, + V: Send + Sync + 'static + Copy, + { + self.get_value_across_notification(key, factory()).await + } + + async fn get_value(&self, key: &K, mia_handler: impl Future) -> V + where + K: Eq + Hash + Debug + Send + Sync + Copy + 'static, + V: Copy, + { + match self.values.read_async(key, |_, v| *v).await { + Some(v) => v, + None => mia_handler.await, + } + } + + async fn get_value_across_notification( + &self, + key: &K, + mia_handler: impl Future, + ) -> V + where + K: Eq + Hash + Debug + Send + Sync + Copy + 'static, + V: Copy, + { + self.get_value(key, async { + match self + .notifications + .read_async(key, |_, v| Arc::clone(v)) + .await + { + Some(n) => { + n.notified().await; + + self.get_value(key, async { + panic!( + "value should have been present after notification for key: {:?}", + key + ); + }) + .await + } + None => mia_handler.await, + } + }) + .await + } +} + +impl Drop for BackgroundTaskPoolMap { + fn drop(&mut self) { + self.handles.iter_sync(|_k, v| { + v.abort(); + true + }); + } +} + +impl Default for BackgroundTaskPoolMap { + fn default() -> Self { + Self { + values: scc::HashMap::default(), + notifications: scc::HashMap::default(), + handles: scc::HashMap::default(), + } + } +} diff --git a/src/fs/trait.rs b/src/fs/trait.rs index f4d9852..aeb7383 100644 --- a/src/fs/trait.rs +++ b/src/fs/trait.rs @@ -319,6 +319,31 @@ pub struct FilesystemStats { pub max_filename_length: u32, } +/// Represents an item which can be forgotten by the kernel. +/// +/// For certain methods, the kernel may call `forget` on an inode when it is done with it. +/// +/// This trait must be implemented by any item which can be returned by the `lookup` or `getattr` +/// methods of the `Fs` trait. +pub trait Forgettable { + fn from_ino(ino: Inode) -> Self + where + Self: Sized; + + /// Called when the kernel is done with this item. + fn forget(&self, nlookups: u64); +} + +/// Represents an item which has an associated `FileAttr` and is forgettable. +/// +/// This is returned by methods which create new inodes (e.g. `lookup`). +pub trait CreatedFileAttr { + type Forgettable: Forgettable; + + /// Fetch the `FileAttr` associated with this item. + fn file_attr(&self) -> &FileAttr; +} + #[async_trait] pub trait Fs { type LookupError: std::error::Error; @@ -330,7 +355,11 @@ pub trait Fs { /// For each lookup call made by the kernel, it expects the icache to be updated with the /// returned `FileAttr`. - async fn lookup(&mut self, parent: Inode, name: &OsStr) -> Result; + async fn lookup( + &mut self, + parent: Inode, + name: &OsStr, + ) -> Result; /// Can be called in two contexts -- the file is not open (in which case `fh` is `None`), /// or the file is open (in which case `fh` is `Some`). @@ -367,9 +396,6 @@ pub trait Fs { flush: bool, ) -> Result<(), Self::ReleaseError>; - /// Called when the kernel is done with an inode. - async fn forget(&mut self, ino: Inode, nlookups: u64); - /// Get filesystem statistics. async fn statfs(&mut self) -> Result; } diff --git a/src/trc.rs b/src/trc.rs index 553fee5..a8abbc5 100644 --- a/src/trc.rs +++ b/src/trc.rs @@ -3,8 +3,9 @@ //! The tracing subscriber is built with a [`reload::Layer`] wrapping the fmt layer so that the //! output format can be switched at runtime (e.g. from pretty mode to ugly mode when daemonizing). +#[cfg(feature = "__otlp_export")] use opentelemetry::trace::TracerProvider as _; -use opentelemetry_otlp::WithExportConfig as _; +#[cfg(feature = "__otlp_export")] use opentelemetry_sdk::Resource; use tracing_indicatif::IndicatifLayer; use tracing_subscriber::{ @@ -15,7 +16,6 @@ use tracing_subscriber::{ util::{SubscriberInitExt as _, TryInitError}, }; -use crate::app_config::TelemetryConfig; use crate::term; /// The type-erased fmt layer that lives inside the reload handle. @@ -43,9 +43,11 @@ impl TrcMode { /// A handle that allows reconfiguring the tracing subscriber at runtime. pub struct TrcHandle { fmt_handle: FmtReloadHandle, + #[cfg(feature = "__otlp_export")] tracer_provider: Option, } +#[cfg(feature = "__otlp_export")] impl Drop for TrcHandle { fn drop(&mut self) { if let Some(provider) = self.tracer_provider.take() @@ -92,7 +94,6 @@ impl TrcHandle { pub struct Trc { mode: TrcMode, env_filter: EnvFilter, - otlp_endpoints: Vec, } impl Default for Trc { @@ -102,76 +103,30 @@ impl Default for Trc { EnvFilter::try_from_env("GIT_FS_LOG").or_else(|_| EnvFilter::try_from_default_env()); match maybe_env_filter { - Ok(env_filter) => { - // If the user provided an env filter, they probably know what they're doing - // and don't want any fancy formatting or spinners. Default to ugly mode. - Self { - mode: TrcMode::Ugly { use_ansi }, - env_filter, - otlp_endpoints: Vec::new(), - } - } - Err(_) => { - // No env filter provided — give the user a nice out-of-the-box experience - // with compact formatting and progress spinners. - Self { - mode: TrcMode::丑 { use_ansi }, - env_filter: EnvFilter::new("info"), - otlp_endpoints: Vec::new(), - } - } + Ok(env_filter) => Self { + // If the user provided an env_filter, they probably know what they're doing and + // don't want any fancy formatting, spinners or bullshit like that. So we default + // to the ugly mode. + mode: TrcMode::Ugly { use_ansi }, + env_filter, + }, + Err(_) => Self { + // If the user didn't provide an env_filter, we assume they just want a nice + // out-of-the-box experience, and default to 丑 mode with an info level filter. + mode: TrcMode::丑 { use_ansi }, + env_filter: EnvFilter::new("info"), + }, } } } impl Trc { - /// Configure OTLP telemetry endpoints from the application config. - #[must_use] - pub fn with_telemetry(mut self, telemetry: &TelemetryConfig) -> Self { - self.otlp_endpoints = telemetry.endpoints(); - self - } - - /// Build the OpenTelemetry tracer provider if any OTLP endpoints are configured. - fn build_otel_provider(&self) -> Option { - if self.otlp_endpoints.is_empty() { - return None; - } - - let resource = Resource::builder() - .with_service_name("git-fs") - .with_attribute(opentelemetry::KeyValue::new( - "service.version", - env!("CARGO_PKG_VERSION"), - )) - .build(); - let mut builder = - opentelemetry_sdk::trace::SdkTracerProvider::builder().with_resource(resource); - - let mut has_exporter = false; - for endpoint in &self.otlp_endpoints { - match opentelemetry_otlp::SpanExporter::builder() - .with_http() - .with_endpoint(endpoint) - .build() - { - Ok(exporter) => { - builder = builder.with_batch_exporter(exporter); - has_exporter = true; - } - Err(e) => { - eprintln!("Failed to create OTLP exporter for {endpoint}: {e}"); - } - } - } - - has_exporter.then(|| builder.build()) - } - /// Initialize the global tracing subscriber and return a handle for runtime reconfiguration. pub fn init(self) -> Result { let use_ansi = self.mode.use_ansi(); + // Start with a plain ugly-mode layer as a placeholder. In 丑 mode this gets swapped + // out before `try_init` is called so the subscriber never actually uses it. let initial_layer: BoxedFmtLayer = Box::new( tracing_subscriber::fmt::layer() .with_ansi(use_ansi) @@ -179,16 +134,12 @@ impl Trc { ); let (reload_layer, fmt_handle) = reload::Layer::new(initial_layer); - let provider = self.build_otel_provider(); - if provider.is_some() { - opentelemetry::global::set_text_map_propagator( - opentelemetry_sdk::propagation::TraceContextPropagator::new(), - ); - } + #[cfg(feature = "__otlp_export")] + let mut tracer_provider = None; match self.mode { TrcMode::丑 { .. } => { - let indicatif_layer = IndicatifLayer::new().with_max_progress_bars(24, None); + let indicatif_layer = IndicatifLayer::new().with_max_progress_bars(20, None); let pretty_with_indicatif: BoxedFmtLayer = Box::new( tracing_subscriber::fmt::layer() .with_ansi(use_ansi) @@ -198,37 +149,66 @@ impl Trc { .compact(), ); + // Replace the initial placeholder with the correct writer before init. if let Err(e) = fmt_handle.reload(pretty_with_indicatif) { eprintln!("Failed to configure 丑-mode writer: {e}"); } - let otel_layer = provider - .as_ref() - .map(|p| tracing_opentelemetry::layer().with_tracer(p.tracer("git-fs"))); - tracing_subscriber::registry() .with(reload_layer) - .with(otel_layer) .with(self.env_filter) .with(indicatif_layer) .try_init()?; } TrcMode::Ugly { .. } => { - let otel_layer = provider - .as_ref() - .map(|p| tracing_opentelemetry::layer().with_tracer(p.tracer("git-fs"))); + #[cfg(feature = "__otlp_export")] + { + let exporter = opentelemetry_otlp::SpanExporter::builder() + .with_http() + .build() + .ok(); + + if let Some(exporter) = exporter { + let provider = opentelemetry_sdk::trace::SdkTracerProvider::builder() + .with_batch_exporter(exporter) + .with_resource( + Resource::builder_empty() + .with_service_name("git-fs") + .build(), + ) + .build(); + let tracer = provider.tracer("git-fs"); + let otel_layer = tracing_opentelemetry::layer().with_tracer(tracer); + + tracing_subscriber::registry() + .with(reload_layer) + .with(otel_layer) + .with(self.env_filter) + .try_init()?; + + tracer_provider = Some(provider); + } else { + tracing_subscriber::registry() + .with(reload_layer) + .with(self.env_filter) + .try_init()?; + } + } - tracing_subscriber::registry() - .with(reload_layer) - .with(otel_layer) - .with(self.env_filter) - .try_init()?; + #[cfg(not(feature = "__otlp_export"))] + { + tracing_subscriber::registry() + .with(reload_layer) + .with(self.env_filter) + .try_init()?; + } } } Ok(TrcHandle { fmt_handle, - tracer_provider: provider, + #[cfg(feature = "__otlp_export")] + tracer_provider, }) } } From b4fa0dcd8678c75db3960f47bd2ea260fdc58314 Mon Sep 17 00:00:00 2001 From: Marko Vejnovic Date: Tue, 17 Feb 2026 20:10:20 -0800 Subject: [PATCH 2/2] fix background task spawning --- lib/tokex.rs | 35 ++++++++++++++++++++--------------- 1 file changed, 20 insertions(+), 15 deletions(-) diff --git a/lib/tokex.rs b/lib/tokex.rs index 5ff8bd9..b069bda 100644 --- a/lib/tokex.rs +++ b/lib/tokex.rs @@ -20,7 +20,7 @@ impl BackgroundTaskPoolMap { K: Eq + Hash + Debug + Send + Sync + Copy + 'static, V: Send + Sync + 'static, { - let (tx, rx) = oneshot::channel(); + let (trigger_task_tx, trigger_task_rx) = oneshot::channel(); // We also need to insert a notify for this key so that waiters can await the completion of // this task. @@ -62,18 +62,7 @@ impl BackgroundTaskPoolMap { } } - // Let's notify the waiters. - if let Some(notify) = cool_self.notifications.get_async(&key).await { - notify.notify_waiters(); - } else { - debug_assert!( - false, - "task completed but no notify found for key: {:?}", - key - ); - } - - rx.await.unwrap(); + trigger_task_rx.await.unwrap(); // Then, remove the handle from the handles map. let res = cool_self.handles.remove_async(&tokio::task::id()).await; @@ -102,7 +91,7 @@ impl BackgroundTaskPoolMap { } // Notify the future that it can modify - tx.send(()).unwrap(); + trigger_task_tx.send(()).unwrap(); } /// Get the value associated with the key, waiting for the background task to complete if @@ -119,7 +108,23 @@ impl BackgroundTaskPoolMap { K: Eq + Hash + Debug + Send + Sync + Copy + 'static, V: Send + Sync + 'static + Copy, { - self.get_value_across_notification(key, factory()).await + self.get_value_across_notification(key, async { + let fut = factory(); + + self.spawn(*key, fut); + + // After we spawn the task, we should wait for the notification, failing if the + // notification is not present since that means the task failed to spawn for some + // reason. + self.get_value_across_notification(key, async { + panic!( + "task should have been spawned and completed for key: {:?}", + key + ); + }) + .await + }) + .await } async fn get_value(&self, key: &K, mia_handler: impl Future) -> V