From 73666e9948d4a4df5762382d86ce26a68f571c18 Mon Sep 17 00:00:00 2001 From: clockwork-labs-bot Date: Fri, 13 Feb 2026 10:08:58 -0500 Subject: [PATCH] Revert "Append commit instead of individual transactions to commitlog (#4140)" This reverts commit c4c3bf78b359e1de248265b589b729ad3c301f50. --- crates/commitlog/src/commitlog.rs | 272 ++++++++---------- crates/commitlog/src/lib.rs | 105 ++++--- crates/commitlog/src/repo/mod.rs | 6 +- crates/commitlog/src/segment.rs | 244 ++++++++++------ crates/commitlog/src/tests/helpers.rs | 25 +- crates/commitlog/src/tests/partial.rs | 211 ++++++++------ crates/commitlog/tests/random_payload/mod.rs | 23 +- crates/commitlog/tests/streaming/mod.rs | 7 +- crates/core/src/db/durability.rs | 16 +- crates/core/src/db/relational_db.rs | 13 +- .../subscription/module_subscription_actor.rs | 11 +- crates/durability/src/imp/local.rs | 92 ++++-- crates/durability/src/imp/mod.rs | 4 +- crates/durability/src/lib.rs | 33 +-- crates/durability/tests/io/fallocate.rs | 7 +- 15 files changed, 607 insertions(+), 462 deletions(-) diff --git a/crates/commitlog/src/commitlog.rs b/crates/commitlog/src/commitlog.rs index 7ac339c5ae5..03c590e4950 100644 --- a/crates/commitlog/src/commitlog.rs +++ b/crates/commitlog/src/commitlog.rs @@ -8,7 +8,7 @@ use std::{ }; use itertools::Itertools; -use log::{debug, error, info, trace, warn}; +use log::{debug, info, trace, warn}; use crate::{ commit::StoredCommit, @@ -107,32 +107,81 @@ impl Generic { /// Update the current epoch. /// + /// Calls [`Self::commit`] to flush all data of the previous epoch, and + /// returns the result. + /// /// Does nothing if the given `epoch` is equal to the current epoch. /// /// # Errors /// /// If `epoch` is smaller than the current epoch, an error of kind /// [`io::ErrorKind::InvalidInput`] is returned. - pub fn set_epoch(&mut self, epoch: u64) -> io::Result<()> { - if epoch < self.head.epoch() { - return Err(io::Error::new( + /// + /// Also see [`Self::commit`]. + pub fn set_epoch(&mut self, epoch: u64) -> io::Result> { + use std::cmp::Ordering::*; + + match epoch.cmp(&self.head.epoch()) { + Less => Err(io::Error::new( io::ErrorKind::InvalidInput, "new epoch is smaller than current epoch", - )); + )), + Equal => Ok(None), + Greater => { + let res = self.commit()?; + self.head.set_epoch(epoch); + Ok(res) + } } - self.head.set_epoch(epoch); - Ok(()) + } + + /// Write the currently buffered data to storage and rotate segments as + /// necessary. + /// + /// Note that this does not imply that the data is durable, in particular + /// when a filesystem storage backend is used. Call [`Self::sync`] to flush + /// any OS buffers to stable storage. + /// + /// # Errors + /// + /// If an error occurs writing the data, the current [`Commit`] buffer is + /// retained, but a new segment is created. Retrying in case of an `Err` + /// return value thus will write the current data to that new segment. + /// + /// If this fails, however, the next attempt to create a new segment will + /// fail with [`io::ErrorKind::AlreadyExists`]. Encountering this error kind + /// this means that something is seriously wrong underlying storage, and the + /// caller should stop writing to the log. + pub fn commit(&mut self) -> io::Result> { + self.panicked = true; + let writer = &mut self.head; + let sz = writer.commit.encoded_len(); + // If the segment is empty, but the commit exceeds the max size, + // we got a huge commit which needs to be written even if that + // results in a huge segment. + let should_rotate = !writer.is_empty() && writer.len() + sz as u64 > self.opts.max_segment_size; + let writer = if should_rotate { + self.sync(); + self.start_new_segment()? + } else { + writer + }; + + let ret = writer.commit().or_else(|e| { + warn!("Commit failed: {e}"); + // Nb.: Don't risk a panic by calling `self.sync()`. + // We already gave up on the last commit, and will retry it next time. + self.start_new_segment()?; + Err(e) + }); + self.panicked = false; + ret } /// Force the currently active segment to be flushed to storage. /// /// Using a filesystem backend, this means to call `fsync(2)`. /// - /// **Note** that this does not flush the buffered data from calls to - /// [Self::commit], it only instructs the underlying storage to flush its - /// buffers. Call [Self::flush] prior to this method to ensure data from - /// all previous [Self::commit] calls is flushed to the underlying storage. - /// /// # Panics /// /// As an `fsync` failure leaves a file in a more of less undefined state, @@ -146,22 +195,6 @@ impl Generic { self.panicked = false; } - /// Flush the buffered data from previous calls to [Self::commit] to the - /// underlying storage. - /// - /// Call [Self::sync] to instruct the underlying storage to flush its - /// buffers as well. - pub fn flush(&mut self) -> io::Result<()> { - self.head.flush() - } - - /// Calls [Self::flush] and then [Self::sync]. - fn flush_and_sync(&mut self) -> io::Result<()> { - self.flush()?; - self.sync(); - Ok(()) - } - /// The last transaction offset written to disk, or `None` if nothing has /// been written yet. /// @@ -270,63 +303,8 @@ impl Generic { } impl Generic { - /// Write `transactions` to the log. - /// - /// This will store all `transactions` as a single [Commit] - /// (note that `transactions` must not yield more than [u16::MAX] elements). - /// - /// Data is buffered by the underlying segment [Writer]. - /// Call [Self::flush] to force flushing to the underlying storage. - /// - /// If, after writing the transactions, the writer's total written bytes - /// exceed [Options::max_segment_size], the current segment is flushed, - /// `fsync`ed and closed, and a new segment is created. - /// - /// Returns `Ok(None)` if `transactions` was empty, otherwise [Committed], - /// which contains the offset range and checksum of the commit. - /// - /// Note that supplying empty `transactions` may cause the current segment - /// to be rotated. - /// - /// # Errors - /// - /// An `Err` value is returned in the following cases: - /// - /// - if the transaction sequence is invalid, e.g. because the transaction - /// offsets are not contiguous. - /// - /// In this case, **none** of the `transactions` will be written. - /// - /// - if creating the new segment fails due to an I/O error. - /// - /// # Panics - /// - /// The method panics if: - /// - /// - `transactions` exceeds [u16::MAX] elements - /// - /// - [Self::flush] or writing to the underlying [Writer] fails - /// - /// This is likely caused by some storage issue. As we cannot tell with - /// certainty how much data (if any) has been written, the internal state - /// becomes invalid and thus a panic is raised. - /// - /// - [Self::sync] panics (called when rotating segments) - pub fn commit>>( - &mut self, - transactions: impl IntoIterator, - ) -> io::Result> { - self.panicked = true; - let writer = &mut self.head; - let committed = writer.commit(transactions)?; - if writer.len() >= self.opts.max_segment_size { - self.flush().expect("failed to flush segment upon rotation"); - self.sync(); - self.start_new_segment()?; - } - self.panicked = false; - - Ok(committed) + pub fn append(&mut self, record: T) -> Result<(), T> { + self.head.append(record) } pub fn transactions_from<'a, D>( @@ -370,8 +348,8 @@ impl Generic { impl Drop for Generic { fn drop(&mut self) { if !self.panicked { - if let Err(e) = self.flush_and_sync() { - error!("failed to flush on drop: {e:#}"); + if let Err(e) = self.head.commit() { + warn!("failed to commit on drop: {e}"); } } } @@ -942,7 +920,7 @@ fn range_is_empty(range: &impl RangeBounds) -> bool { #[cfg(test)] mod tests { - use std::{cell::Cell, iter::repeat}; + use std::{cell::Cell, iter::repeat, num::NonZeroU16}; use pretty_assertions::assert_matches; @@ -955,31 +933,30 @@ mod tests { #[test] fn rotate_segments_simple() { let mut log = mem_log::<[u8; 32]>(128); - for i in 0..4 { - log.commit([(i, [0; 32])]).unwrap(); + for _ in 0..3 { + log.append([0; 32]).unwrap(); + log.commit().unwrap(); } - log.flush_and_sync().unwrap(); let offsets = log.repo.existing_offsets().unwrap(); assert_eq!(&offsets[..offsets.len() - 1], &log.tail); - // TODO: We overshoot the max segment size. - assert_eq!(&offsets, &[0, 3]); + assert_eq!(offsets[offsets.len() - 1], 2); } #[test] fn huge_commit() { let mut log = mem_log::<[u8; 32]>(32); - log.commit([(0, [0; 32]), (1, [1; 32])]).unwrap(); - log.flush_and_sync().unwrap(); - // First segment got rotated out. - assert_eq!(&log.tail, &[0]); + log.append([0; 32]).unwrap(); + log.append([1; 32]).unwrap(); + log.commit().unwrap(); + assert!(log.head.len() > log.opts.max_segment_size); - log.commit([(2, [2; 32])]).unwrap(); - log.flush_and_sync().unwrap(); + log.append([2; 32]).unwrap(); + log.commit().unwrap(); - // Second segment got rotated out and segment 3 is created. - assert_eq!(&log.repo.existing_offsets().unwrap(), &[0, 2, 3]); + assert_eq!(&log.tail, &[0]); + assert_eq!(&log.repo.existing_offsets().unwrap(), &[0, 2]); } #[test] @@ -1075,31 +1052,14 @@ mod tests { fn traverse_commits_ignores_duplicates() { let mut log = mem_log::<[u8; 32]>(1024); - let tx1 = [42u8; 32]; - let tx2 = [43u8; 32]; - - log.commit([(0, tx1)]).unwrap(); - let commit1 = Commit { - min_tx_offset: 0, - n: 1, - records: tx1.to_vec(), - ..log.head.commit.clone() - }; - - // Reset the commit offset, so we can write the same commit twice. - log.head.commit.min_tx_offset = 0; - log.commit([(0, tx1)]).unwrap(); - - // Write another one. - log.commit([(1, tx2)]).unwrap(); - let commit2 = Commit { - min_tx_offset: 1, - n: 1, - records: tx2.to_vec(), - ..log.head.commit.clone() - }; - - log.flush_and_sync().unwrap(); + log.append([42; 32]).unwrap(); + let commit1 = log.head.commit.clone(); + log.commit().unwrap(); + log.head.commit = commit1.clone(); + log.commit().unwrap(); + log.append([43; 32]).unwrap(); + let commit2 = log.head.commit.clone(); + log.commit().unwrap(); assert_eq!( [commit1, commit2].as_slice(), @@ -1114,14 +1074,15 @@ mod tests { fn traverse_commits_errors_when_forked() { let mut log = mem_log::<[u8; 32]>(1024); - log.commit([(0, [42; 32])]).unwrap(); - // Reset the commit offset, - // and write a different commit at the same offset. - // This is considered a fork. - log.head.commit.min_tx_offset = 0; - log.commit([(0, [43; 32])]).unwrap(); - - log.flush_and_sync().unwrap(); + log.append([42; 32]).unwrap(); + log.commit().unwrap(); + log.head.commit = Commit { + min_tx_offset: 0, + n: 1, + records: [43; 32].to_vec(), + epoch: 0, + }; + log.commit().unwrap(); let res = log.commits_from(0).collect::, _>>(); assert!( @@ -1134,11 +1095,11 @@ mod tests { fn traverse_commits_errors_when_offset_not_contiguous() { let mut log = mem_log::<[u8; 32]>(1024); - log.commit([(0, [42; 32])]).unwrap(); + log.append([42; 32]).unwrap(); + log.commit().unwrap(); log.head.commit.min_tx_offset = 18; - log.commit([(18, [42; 32])]).unwrap(); - - log.flush_and_sync().unwrap(); + log.append([42; 32]).unwrap(); + log.commit().unwrap(); let res = log.commits_from(0).collect::, _>>(); assert!( @@ -1150,7 +1111,7 @@ mod tests { prev_error: None }) ), - "expected out-of-order error: {res:?}" + "expected fork error: {res:?}" ) } @@ -1260,7 +1221,7 @@ mod tests { #[test] fn reopen() { let mut log = mem_log::<[u8; 32]>(1024); - let total_txs = fill_log(&mut log, 100, (1..=10).cycle()); + let mut total_txs = fill_log(&mut log, 100, (1..=10).cycle()); assert_eq!( total_txs, log.transactions_from(0, &ArrayDecoder).map(Result::unwrap).count() @@ -1270,11 +1231,12 @@ mod tests { log.repo.clone(), Options { max_segment_size: 1024, + max_records_in_commit: NonZeroU16::new(10).unwrap(), ..Options::default() }, ) .unwrap(); - let total_txs = fill_log(&mut log, 100, (1..=10).cycle()); + total_txs += fill_log(&mut log, 100, (1..=10).cycle()); assert_eq!( total_txs, @@ -1283,22 +1245,24 @@ mod tests { } #[test] - fn set_new_epoch() { + fn set_same_epoch_does_nothing() { let mut log = Generic::<_, [u8; 32]>::open(repo::Memory::unlimited(), <_>::default()).unwrap(); assert_eq!(log.epoch(), Commit::DEFAULT_EPOCH); - log.commit([(0, [12; 32])]).unwrap(); - log.set_epoch(42).unwrap(); - assert_eq!(log.epoch(), 42); - log.commit([(1, [13; 32])]).unwrap(); - - log.flush_and_sync().unwrap(); + let committed = log.set_epoch(Commit::DEFAULT_EPOCH).unwrap(); + assert_eq!(committed, None); + } - let epochs = log - .commits_from(0) - .map(Result::unwrap) - .map(|commit| commit.epoch) - .collect::>(); - assert_eq!(&[Commit::DEFAULT_EPOCH, 42], epochs.as_slice()); + #[test] + fn set_new_epoch_commits() { + let mut log = Generic::<_, [u8; 32]>::open(repo::Memory::unlimited(), <_>::default()).unwrap(); + assert_eq!(log.epoch(), Commit::DEFAULT_EPOCH); + log.append(<_>::default()).unwrap(); + let committed = log + .set_epoch(42) + .unwrap() + .expect("should have committed the pending transaction"); + assert_eq!(log.epoch(), 42); + assert_eq!(committed.tx_range.start, 0); } #[test] diff --git a/crates/commitlog/src/lib.rs b/crates/commitlog/src/lib.rs index efdbb145e41..1e7a8c0047e 100644 --- a/crates/commitlog/src/lib.rs +++ b/crates/commitlog/src/lib.rs @@ -1,6 +1,6 @@ use std::{ io, - num::NonZeroU64, + num::{NonZeroU16, NonZeroU64}, ops::RangeBounds, sync::{Arc, RwLock}, }; @@ -17,11 +17,10 @@ pub mod segment; mod varchar; mod varint; -use crate::segment::Committed; pub use crate::{ commit::{Commit, StoredCommit}, payload::{Decoder, Encode}, - repo::{fs::SizeOnDisk, TxOffset}, + repo::fs::SizeOnDisk, segment::{Transaction, DEFAULT_LOG_FORMAT_VERSION}, varchar::Varchar, }; @@ -58,6 +57,14 @@ pub struct Options { /// Default: 1GiB #[cfg_attr(feature = "serde", serde(default = "Options::default_max_segment_size"))] pub max_segment_size: u64, + /// The maximum number of records in a commit. + /// + /// If this number is exceeded, the commit is flushed to disk even without + /// explicitly calling [`Commitlog::flush`]. + /// + /// Default: 1 + #[cfg_attr(feature = "serde", serde(default = "Options::default_max_records_in_commit"))] + pub max_records_in_commit: NonZeroU16, /// Whenever at least this many bytes have been written to the currently /// active segment, an entry is added to its offset index. /// @@ -89,12 +96,6 @@ pub struct Options { /// Has no effect if the `fallocate` feature is not enabled. #[cfg_attr(feature = "serde", serde(default = "Options::default_preallocate_segments"))] pub preallocate_segments: bool, - /// Size in bytes of the memory buffer holding commit data before flushing - /// to storage. - /// - /// Default: 4KiB - #[cfg_attr(feature = "serde", serde(default = "Options::default_write_buffer_size"))] - pub write_buffer_size: usize, } impl Default for Options { @@ -105,18 +106,18 @@ impl Default for Options { impl Options { pub const DEFAULT_MAX_SEGMENT_SIZE: u64 = 1024 * 1024 * 1024; + pub const DEFAULT_MAX_RECORDS_IN_COMMIT: NonZeroU16 = NonZeroU16::new(1).expect("1 > 0, qed"); pub const DEFAULT_OFFSET_INDEX_INTERVAL_BYTES: NonZeroU64 = NonZeroU64::new(4096).expect("4096 > 0, qed"); pub const DEFAULT_OFFSET_INDEX_REQUIRE_SEGMENT_FSYNC: bool = false; pub const DEFAULT_PREALLOCATE_SEGMENTS: bool = false; - pub const DEFAULT_WRITE_BUFFER_SIZE: usize = 4 * 1024; pub const DEFAULT: Self = Self { log_format_version: DEFAULT_LOG_FORMAT_VERSION, max_segment_size: Self::default_max_segment_size(), + max_records_in_commit: Self::default_max_records_in_commit(), offset_index_interval_bytes: Self::default_offset_index_interval_bytes(), offset_index_require_segment_fsync: Self::default_offset_index_require_segment_fsync(), preallocate_segments: Self::default_preallocate_segments(), - write_buffer_size: Self::default_write_buffer_size(), }; pub const fn default_log_format_version() -> u8 { @@ -127,6 +128,10 @@ impl Options { Self::DEFAULT_MAX_SEGMENT_SIZE } + pub const fn default_max_records_in_commit() -> NonZeroU16 { + Self::DEFAULT_MAX_RECORDS_IN_COMMIT + } + pub const fn default_offset_index_interval_bytes() -> NonZeroU64 { Self::DEFAULT_OFFSET_INDEX_INTERVAL_BYTES } @@ -139,10 +144,6 @@ impl Options { Self::DEFAULT_PREALLOCATE_SEGMENTS } - pub const fn default_write_buffer_size() -> usize { - Self::DEFAULT_WRITE_BUFFER_SIZE - } - /// Compute the length in bytes of an offset index based on the settings in /// `self`. pub fn offset_index_len(&self) -> u64 { @@ -261,7 +262,7 @@ impl Commitlog { pub fn flush(&self) -> io::Result> { let mut inner = self.inner.write().unwrap(); trace!("flush commitlog"); - inner.flush()?; + inner.commit()?; Ok(inner.max_committed_offset()) } @@ -281,7 +282,7 @@ impl Commitlog { pub fn flush_and_sync(&self) -> io::Result> { let mut inner = self.inner.write().unwrap(); trace!("flush and sync commitlog"); - inner.flush()?; + inner.commit()?; inner.sync(); Ok(inner.max_committed_offset()) @@ -382,47 +383,57 @@ impl Commitlog { } impl Commitlog { - /// Write `transactions` to the log. - /// - /// This will store all `transactions` as a single [Commit] - /// (note that `transactions` must not yield more than [u16::MAX] elements). - /// - /// Data is buffered internally, call [Self::flush] to force flushing to - /// the underlying storage. - /// - /// Returns `Ok(None)` if `transactions` was empty, otherwise [Committed], - /// which contains the offset range and checksum of the commit. - /// - /// # Errors + /// Append the record `txdata` to the log. /// - /// An `Err` value is returned in the following cases: + /// If the internal buffer exceeds [`Options::max_records_in_commit`], the + /// argument is returned in an `Err`. The caller should [`Self::flush`] the + /// log and try again. /// - /// - if the transaction sequence is invalid, e.g. because the transaction - /// offsets are not contiguous. + /// In case the log is appended to from multiple threads, this may result in + /// a busy loop trying to acquire a slot in the buffer. In such scenarios, + /// [`Self::append_maybe_flush`] is preferable. + pub fn append(&self, txdata: T) -> Result<(), T> { + let mut inner = self.inner.write().unwrap(); + inner.append(txdata) + } + + /// Append the record `txdata` to the log. /// - /// In this case, **none** of the `transactions` will be written. + /// The `txdata` payload is buffered in memory until either: /// - /// - if creating the new segment fails due to an I/O error. + /// - [`Self::flush`] is called explicitly, or + /// - [`Options::max_records_in_commit`] is exceeded /// - /// # Panics + /// In the latter case, [`Self::append`] flushes implicitly, _before_ + /// appending the `txdata` argument. /// - /// The method panics if: + /// I.e. the argument is not guaranteed to be flushed after the method + /// returns. If that is desired, [`Self::flush`] must be called explicitly. /// - /// - `transactions` exceeds [u16::MAX] elements + /// If writing `txdata` to the commitlog results in a new segment file being opened, + /// we will send a message down `on_new_segment`. + /// This will be hooked up to the `request_snapshot` channel of a `SnapshotWorker`. /// - /// - [Self::flush] or writing to the underlying buffered writer fails + /// # Errors /// - /// This is likely caused by some storage issue. As we cannot tell with - /// certainty how much data (if any) has been written, the internal state - /// becomes invalid and thus a panic is raised. + /// If the log needs to be flushed, but an I/O error occurs, ownership of + /// `txdata` is returned back to the caller alongside the [`io::Error`]. /// - /// - [Self::sync] panics (called when rotating segments) - pub fn commit>>( - &self, - transactions: impl IntoIterator, - ) -> io::Result> { + /// The value can then be used to retry appending. + pub fn append_maybe_flush(&self, txdata: T) -> Result<(), error::Append> { let mut inner = self.inner.write().unwrap(); - inner.commit(transactions) + + if let Err(txdata) = inner.append(txdata) { + if let Err(source) = inner.commit() { + return Err(error::Append { txdata, source }); + } + + // `inner.commit.n` must be zero at this point + let res = inner.append(txdata); + debug_assert!(res.is_ok(), "failed to append while holding write lock"); + } + + Ok(()) } /// Obtain an iterator which traverses the log from the start, yielding diff --git a/crates/commitlog/src/repo/mod.rs b/crates/commitlog/src/repo/mod.rs index 1805e788703..5be633bf5c7 100644 --- a/crates/commitlog/src/repo/mod.rs +++ b/crates/commitlog/src/repo/mod.rs @@ -214,11 +214,13 @@ pub fn create_segment_writer( records: Vec::new(), epoch, }, - inner: io::BufWriter::with_capacity(opts.write_buffer_size, storage), + inner: io::BufWriter::new(storage), min_tx_offset: offset, bytes_written: Header::LEN as u64, + max_records_in_commit: opts.max_records_in_commit, + offset_index_head: create_offset_index_writer(repo, offset, opts), }) } @@ -291,6 +293,8 @@ pub fn resume_segment_writer( min_tx_offset: tx_range.start, bytes_written: size_in_bytes, + max_records_in_commit: opts.max_records_in_commit, + offset_index_head: create_offset_index_writer(repo, offset, opts), })) } diff --git a/crates/commitlog/src/segment.rs b/crates/commitlog/src/segment.rs index 8de47e5a05a..7e8c054467b 100644 --- a/crates/commitlog/src/segment.rs +++ b/crates/commitlog/src/segment.rs @@ -1,7 +1,7 @@ use std::{ fs::File, io::{self, BufWriter, ErrorKind, SeekFrom, Write as _}, - num::NonZeroU64, + num::{NonZeroU16, NonZeroU64}, ops::Range, }; @@ -100,52 +100,58 @@ pub struct Writer { pub(crate) min_tx_offset: u64, pub(crate) bytes_written: u64, + pub(crate) max_records_in_commit: NonZeroU16, + pub(crate) offset_index_head: Option, } impl Writer { - pub fn commit>, U: Encode>( - &mut self, - transactions: impl IntoIterator, - ) -> io::Result> { - for tx in transactions { - let tx = tx.into(); - let expected_offset = self.commit.min_tx_offset + self.commit.n as u64; - if tx.offset != expected_offset { - self.commit.n = 0; - self.commit.records.clear(); - - return Err(io::Error::new( - io::ErrorKind::InvalidInput, - format!("invalid transaction offset {}, expected {}", tx.offset, expected_offset), - )); - } - assert!( - self.commit.n < u16::MAX, - "maximum number of transactions in a single commit exceeded" - ); + /// Append the record (aka transaction) `T` to the segment. + /// + /// If the number of currently buffered records would exceed `max_records_in_commit` + /// after the method returns, the argument is returned in an `Err` and not + /// appended to this writer's buffer. + /// + /// Otherwise, the `record` is encoded and and stored in the buffer. + /// + /// An `Err` result indicates that [`Self::commit`] should be called in + /// order to flush the buffered records to persistent storage. + pub fn append(&mut self, record: T) -> Result<(), T> { + if self.commit.n == u16::MAX || self.commit.n + 1 > self.max_records_in_commit.get() { + Err(record) + } else { self.commit.n += 1; - tx.txdata.encode_record(&mut self.commit.records); + record.encode_record(&mut self.commit.records); + Ok(()) } + } + /// Write the current [`Commit`] to the underlying [`io::Write`]. + /// + /// Will do nothing if the current commit is empty (i.e. `Commit::n` is zero). + /// In this case, `None` is returned. + /// + /// Otherwise `Some` [`Committed`] is returned, providing some metadata about + /// the commit. + pub fn commit(&mut self) -> io::Result> { if self.commit.n == 0 { return Ok(None); } + let checksum = self.commit.write(&mut self.inner)?; + self.inner.flush()?; - let checksum = self - .commit - .write(&mut self.inner) - // Panic here as we don't know how much of the commit has been - // written (if anything). Further commits would leave corrupted data - // in the log. - .unwrap_or_else(|e| panic!("failed to write commit {}: {:#}", self.commit.min_tx_offset, e)); let commit_len = self.commit.encoded_len() as u64; - - if let Some(index) = self.offset_index_head.as_mut() { - let _ = index + self.offset_index_head.as_mut().map(|index| { + debug!( + "append_after commit min_tx_offset={} bytes_written={} commit_len={}", + self.commit.min_tx_offset, self.bytes_written, commit_len + ); + index .append_after_commit(self.commit.min_tx_offset, self.bytes_written, commit_len) - .inspect_err(|e| debug!("failed to append to offset index: {e}")); - } + .map_err(|e| { + debug!("failed to append to offset index: {e:?}"); + }) + }); let tx_range_start = self.commit.min_tx_offset; @@ -160,10 +166,6 @@ impl Writer { })) } - pub fn flush(&mut self) -> io::Result<()> { - self.inner.flush() - } - /// Get the current epoch. pub fn epoch(&self) -> u64 { self.commit.epoch @@ -535,12 +537,6 @@ pub struct Transaction { pub txdata: T, } -impl From<(u64, T)> for Transaction { - fn from((offset, txdata): (u64, T)) -> Self { - Self { offset, txdata } - } -} - pub struct Commits { pub header: Header, reader: R, @@ -748,14 +744,16 @@ impl Metadata { #[cfg(test)] mod tests { + use std::num::NonZeroU16; + + use super::*; + use crate::{payload::ArrayDecoder, repo, Options}; use itertools::Itertools; use pretty_assertions::assert_matches; + use proptest::prelude::*; use spacetimedb_paths::server::CommitLogDir; use tempfile::tempdir; - use super::*; - use crate::{payload::ArrayDecoder, repo, Options}; - #[test] fn header_roundtrip() { let hdr = Header { @@ -774,9 +772,20 @@ mod tests { fn write_read_roundtrip() { let repo = repo::Memory::unlimited(); - let mut writer = repo::create_segment_writer(&repo, <_>::default(), Commit::DEFAULT_EPOCH, 0).unwrap(); - writer.commit([(0, [0; 32]), (1, [1; 32]), (2, [2; 32])]).unwrap(); - writer.flush().unwrap(); + let mut writer = repo::create_segment_writer( + &repo, + Options { + max_records_in_commit: NonZeroU16::new(4).unwrap(), + ..<_>::default() + }, + Commit::DEFAULT_EPOCH, + 0, + ) + .unwrap(); + writer.append([0; 32]).unwrap(); + writer.append([1; 32]).unwrap(); + writer.append([2; 32]).unwrap(); + writer.commit().unwrap(); let reader = repo::open_segment_reader(&repo, DEFAULT_LOG_FORMAT_VERSION, 0).unwrap(); let header = reader.header; @@ -801,15 +810,27 @@ mod tests { fn metadata() { let repo = repo::Memory::unlimited(); - let mut writer = repo::create_segment_writer(&repo, <_>::default(), Commit::DEFAULT_EPOCH, 0).unwrap(); + let mut writer = repo::create_segment_writer( + &repo, + Options { + max_records_in_commit: NonZeroU16::new(3).unwrap(), + ..<_>::default() + }, + Commit::DEFAULT_EPOCH, + 0, + ) + .unwrap(); // Commit 0..2 - writer.commit([(0, [0; 32]), (1, [0; 32])]).unwrap(); + writer.append([0; 32]).unwrap(); + writer.append([0; 32]).unwrap(); + writer.commit().unwrap(); // Commit 2..3 - writer.commit([(2, [1; 32])]).unwrap(); + writer.append([1; 32]).unwrap(); + writer.commit().unwrap(); // Commit 3..5 - writer.commit([(3, [2; 32]), (4, [2; 32])]).unwrap(); - - writer.flush().unwrap(); + writer.append([2; 32]).unwrap(); + writer.append([2; 32]).unwrap(); + writer.commit().unwrap(); let reader = repo::open_segment_reader(&repo, DEFAULT_LOG_FORMAT_VERSION, 0).unwrap(); let metadata = reader.metadata().unwrap(); @@ -830,32 +851,37 @@ mod tests { #[test] fn commits() { let repo = repo::Memory::unlimited(); - let commits = vec![ - vec![(0, [1; 32]), (1, [2; 32])], - vec![(2, [3; 32])], - vec![(3, [4; 32]), (4, [5; 32])], - ]; + let commits = vec![vec![[1; 32], [2; 32]], vec![[3; 32]], vec![[4; 32], [5; 32]]]; - let mut writer = repo::create_segment_writer(&repo, <_>::default(), Commit::DEFAULT_EPOCH, 0).unwrap(); + let mut writer = repo::create_segment_writer( + &repo, + Options { + max_records_in_commit: NonZeroU16::new(3).unwrap(), + ..<_>::default() + }, + Commit::DEFAULT_EPOCH, + 0, + ) + .unwrap(); for commit in &commits { - writer.commit(commit.clone()).unwrap(); + for tx in commit { + writer.append(*tx).unwrap(); + } + writer.commit().unwrap(); } - writer.flush().unwrap(); - let reader = repo::open_segment_reader(&repo, DEFAULT_LOG_FORMAT_VERSION, 0).unwrap(); let mut commits1 = Vec::with_capacity(commits.len()); let mut min_tx_offset = 0; for txs in commits { - let n = txs.len(); commits1.push(Commit { min_tx_offset, - n: n as u16, - records: itertools::concat(txs.into_iter().map(|(_, payload)| payload.to_vec())), + n: txs.len() as u16, + records: txs.concat(), epoch: 0, }); - min_tx_offset += n as u64; + min_tx_offset += txs.len() as u64; } let commits2 = reader .commits() @@ -868,25 +894,73 @@ mod tests { #[test] fn transactions() { let repo = repo::Memory::unlimited(); - let commits = vec![ - vec![(0, [1; 32]), (1, [2; 32])], - vec![(2, [3; 32])], - vec![(3, [4; 32]), (4, [5; 32])], - ]; + let commits = vec![vec![[1; 32], [2; 32]], vec![[3; 32]], vec![[4; 32], [5; 32]]]; - let mut writer = repo::create_segment_writer(&repo, <_>::default(), Commit::DEFAULT_EPOCH, 0).unwrap(); + let mut writer = repo::create_segment_writer( + &repo, + Options { + max_records_in_commit: NonZeroU16::new(3).unwrap(), + ..<_>::default() + }, + Commit::DEFAULT_EPOCH, + 0, + ) + .unwrap(); for commit in &commits { - writer.commit(commit.clone()).unwrap(); + for tx in commit { + writer.append(*tx).unwrap(); + } + writer.commit().unwrap(); } - writer.flush().unwrap(); - let reader = repo::open_segment_reader(&repo, DEFAULT_LOG_FORMAT_VERSION, 0).unwrap(); let txs = reader .transactions(&ArrayDecoder) .collect::, _>>() .unwrap(); - assert_eq!(txs, commits.into_iter().flatten().map(Into::into).collect::>()); + assert_eq!( + txs, + commits + .into_iter() + .flatten() + .enumerate() + .map(|(offset, txdata)| Transaction { + offset: offset as u64, + txdata + }) + .collect::>() + ); + } + + proptest! { + #[test] + fn max_records_in_commit(max_records_in_commit in any::()) { + let mut writer = Writer { + commit: Commit::default(), + inner: BufWriter::new(Vec::new()), + + min_tx_offset: 0, + bytes_written: 0, + + max_records_in_commit, + + offset_index_head: None, + }; + + for i in 0..max_records_in_commit.get() { + assert!( + writer.append([0; 16]).is_ok(), + "less than {} records written: {}", + max_records_in_commit.get(), + i + ); + } + assert!( + writer.append([0; 16]).is_err(), + "more than {} records written", + max_records_in_commit.get() + ); + } } #[test] @@ -898,14 +972,20 @@ mod tests { min_tx_offset: 0, bytes_written: 0, + max_records_in_commit: NonZeroU16::MAX, offset_index_head: None, }; assert_eq!(0, writer.next_tx_offset()); - writer.commit([(0, [0; 16])]).unwrap(); + writer.append([0; 16]).unwrap(); + assert_eq!(0, writer.next_tx_offset()); + writer.commit().unwrap(); + assert_eq!(1, writer.next_tx_offset()); + writer.commit().unwrap(); assert_eq!(1, writer.next_tx_offset()); - writer.commit([(1, [1; 16])]).unwrap(); - writer.commit([(2, [1; 16])]).unwrap(); + writer.append([1; 16]).unwrap(); + writer.append([1; 16]).unwrap(); + writer.commit().unwrap(); assert_eq!(3, writer.next_tx_offset()); } diff --git a/crates/commitlog/src/tests/helpers.rs b/crates/commitlog/src/tests/helpers.rs index b7740b92e5b..c645b2cdc65 100644 --- a/crates/commitlog/src/tests/helpers.rs +++ b/crates/commitlog/src/tests/helpers.rs @@ -1,4 +1,4 @@ -use std::fmt::Debug; +use std::{fmt::Debug, num::NonZeroU16}; use env_logger::Env; @@ -13,6 +13,7 @@ pub fn mem_log(max_segment_size: u64) -> commitlog::Generic(ShortMem::new(800)); - for i in 0..20 { - info!("commit {i}"); - log.commit([(i, [b'z'; 32])]).expect("unexpected `Err` result"); - } -} - -fn fill_log(mut log: commitlog::Generic, range: Range) { - debug!("writing range {range:?}"); - - let end = range.end; - for i in range { - info!("commit {i}"); - log.commit([(i, [b'z'; 32])]).unwrap(); - } - log.flush().unwrap(); - - // Try to write one more, which should fail. - log.commit([(end, [b'x'; 32])]).unwrap(); - assert_matches!( - log.flush(), - Err(e) if e.kind() == io::ErrorKind::StorageFull + let total_commits = 100; + let total_txs = fill_log_enospc(&mut log, total_commits, (1..=10).cycle()); + + assert_eq!( + total_txs, + log.transactions_from(0, &payload::ArrayDecoder) + .map(Result::unwrap) + .count() ); + assert_eq!(total_commits, log.commits_from(0).map(Result::unwrap).count()); } -/// Tests that, when a partial write occurs, we can read all flushed commits -/// up until the faulty one. + +// Note: Write errors cause the in-flight commit to be written to a fresh +// segment. So as long as we write through the public API, partial writes +// never surface (i.e. the log is contiguous). #[test] -fn read_log_up_to_partial_write() { +fn reopen() { enable_logging(); - const MAX_SEGMENT_SIZE: usize = 800; - const TXDATA_SIZE: usize = 32; - const COMMIT_SIZE: usize = Commit::FRAMING_LEN + TXDATA_SIZE; - const TOTAL_TXS: usize = MAX_SEGMENT_SIZE / COMMIT_SIZE; + let repo = ShortMem::new(800); + let num_commits = 10; - let repo = ShortMem::new(MAX_SEGMENT_SIZE as u64); - fill_log(open_log::<[u8; TXDATA_SIZE]>(repo.clone()), 0..(TOTAL_TXS as u64)); + let mut total_txs = 0; + for i in 0..2 { + let mut log = open_log::<[u8; 32]>(repo.clone()); + total_txs += fill_log_enospc(&mut log, num_commits, (1..=10).cycle()); - let txs = commitlog::transactions_from( - repo, - DEFAULT_LOG_FORMAT_VERSION, - 0, - &payload::ArrayDecoder::, - ) - .unwrap() - .map(Result::unwrap) - .count(); + debug!("fill {} done", i + 1); + } + + assert_eq!( + total_txs, + open_log::<[u8; 32]>(repo.clone()) + .transactions_from(0, &payload::ArrayDecoder) + .map(Result::unwrap) + .count() + ); - assert_eq!(txs, TOTAL_TXS,); + // Let's see if we hit a funny case in any of the segments. + for offset in repo.existing_offsets().unwrap().into_iter().rev() { + let meta = repo::open_segment_reader(&repo, DEFAULT_LOG_FORMAT_VERSION, offset) + .unwrap() + .metadata() + .unwrap(); + debug!("dropping segment: segment::{meta:?}"); + repo.remove_segment(offset).unwrap(); + assert_eq!( + meta.tx_range.start, + open_log::<[u8; 32]>(repo.clone()) + .transactions_from(0, &payload::ArrayDecoder) + .map(Result::unwrap) + .count() as u64 + ); + } } -/// Tests: -/// -/// - fill log until a partial write occurs -/// - corrupt the last successfully written commit -/// - fill log until a partial write occurs -/// -/// The log should detect the corrupt commit, create a fresh segment, and write -/// the second batch until ENOSPC. Traversal should work. #[test] -fn reopen_with_corrupt_last_commit() { +fn overwrite_reopen() { enable_logging(); - const MAX_SEGMENT_SIZE: usize = 800; - const TXDATA_SIZE: usize = 32; - const COMMIT_SIZE: usize = Commit::FRAMING_LEN + TXDATA_SIZE; - const TXS_PER_SEGMENT: u64 = (MAX_SEGMENT_SIZE / COMMIT_SIZE) as u64; - const TOTAL_TXS: u64 = (TXS_PER_SEGMENT * 2) - 1; - - let repo = ShortMem::new(MAX_SEGMENT_SIZE as u64); + let repo = ShortMem::new(800); + let num_commits = 10; + let txs_per_commit = 5; - // Fill with as many txs as possible until ENOSPC. - fill_log(open_log::<[u8; TXDATA_SIZE]>(repo.clone()), 0..TXS_PER_SEGMENT); + let mut log = open_log::<[u8; 32]>(repo.clone()); + let mut total_txs = fill_log_enospc(&mut log, num_commits, repeat(txs_per_commit)); - // Invalidate the checksum of the last commit. let last_segment_offset = repo.existing_offsets().unwrap().last().copied().unwrap(); let last_commit: Commit = repo::open_segment_reader(&repo, DEFAULT_LOG_FORMAT_VERSION, last_segment_offset) .unwrap() @@ -105,29 +97,47 @@ fn reopen_with_corrupt_last_commit() { .last() .unwrap() .into(); + debug!("last commit: {last_commit:?}"); + { let mut last_segment = repo.open_segment_writer(last_segment_offset).unwrap(); let pos = last_segment.len() - last_commit.encoded_len() + 1; last_segment.modify_byte_at(pos, |_| 255); } - // Write a second batch, starting with the offset of the corrupt commit. - fill_log( - open_log::<[u8; TXDATA_SIZE]>(repo.clone()), - last_commit.min_tx_offset..TOTAL_TXS, - ); - - let txs = commitlog::transactions_from( - repo, - DEFAULT_LOG_FORMAT_VERSION, - 0, - &payload::ArrayDecoder::, - ) - .unwrap() - .map(Result::unwrap) - .count(); + let mut log = open_log::<[u8; 32]>(repo.clone()); + for (i, commit) in log.commits_from(0).enumerate() { + if i < num_commits - 1 { + commit.expect("all but last commit should be good"); + } else { + let last_good_offset = txs_per_commit * (num_commits - 1); + assert!( + matches!( + commit, + Err(error::Traversal::Checksum { offset, .. }) if offset == last_good_offset as u64, + ), + "expected checksum error with offset={last_good_offset}: {commit:?}" + ); + } + } - assert_eq!(txs as u64, TOTAL_TXS); + // Write some more data. + total_txs += fill_log_enospc(&mut log, num_commits, repeat(txs_per_commit)); + // Log should be contiguous, but missing one corrupted commit. + assert_eq!( + total_txs - txs_per_commit, + log.transactions_from(0, &payload::ArrayDecoder) + .map(Result::unwrap) + .count() + ); + // Check that this is true if we reopen the log. + assert_eq!( + total_txs - txs_per_commit, + open_log::<[u8; 32]>(repo) + .transactions_from(0, &payload::ArrayDecoder) + .map(Result::unwrap) + .count() + ); } /// Edge case surfaced in production: @@ -144,6 +154,7 @@ fn first_commit_in_last_segment_corrupt() { let repo = repo::Memory::unlimited(); let options = Options { max_segment_size: 512, + max_records_in_commit: NonZeroU16::new(1).unwrap(), ..<_>::default() }; { @@ -169,6 +180,7 @@ fn open_log(repo: ShortMem) -> commitlog::Generic { repo, Options { max_segment_size: 1024, + max_records_in_commit: NonZeroU16::new(10).unwrap(), ..Options::default() }, ) @@ -302,3 +314,38 @@ impl Repo for ShortMem { self.inner.existing_offsets() } } + +/// Like [`crate::tests::helpers::fill_log`], but expect that ENOSPC happens at +/// least once. +fn fill_log_enospc( + log: &mut commitlog::Generic, + num_commits: usize, + txs_per_commit: impl Iterator, +) -> usize +where + T: Debug + Default + Encode, +{ + let mut seen_enospc = false; + + let mut total_txs = 0; + for (_, n) in (0..num_commits).zip(txs_per_commit) { + for _ in 0..n { + log.append(T::default()).unwrap(); + total_txs += 1; + } + let res = log.commit(); + if let Err(Some(os)) = res.as_ref().map_err(|e| e.raw_os_error()) { + if os == ENOSPC { + debug!("fill: ignoring ENOSPC"); + seen_enospc = true; + log.commit().unwrap(); + continue; + } + } + res.unwrap(); + } + + assert!(seen_enospc, "expected to see ENOSPC"); + + total_txs +} diff --git a/crates/commitlog/tests/random_payload/mod.rs b/crates/commitlog/tests/random_payload/mod.rs index 49e47ed42d1..85ab653480d 100644 --- a/crates/commitlog/tests/random_payload/mod.rs +++ b/crates/commitlog/tests/random_payload/mod.rs @@ -1,3 +1,5 @@ +use std::num::NonZeroU16; + use log::info; use spacetimedb_commitlog::tests::helpers::enable_logging; use spacetimedb_commitlog::{payload, Commitlog, Options}; @@ -16,6 +18,7 @@ fn smoke() { CommitLogDir::from_path_unchecked(root.path()), Options { max_segment_size: 8 * 1024, + max_records_in_commit: NonZeroU16::MIN, ..Options::default() }, None, @@ -24,18 +27,18 @@ fn smoke() { let n_txs = 500; let payload = gen_payload(); - for i in 0..n_txs { - clog.commit([(i, payload)]).unwrap(); + for _ in 0..n_txs { + clog.append_maybe_flush(payload).unwrap(); } let committed_offset = clog.flush_and_sync().unwrap(); - assert_eq!(n_txs - 1, committed_offset.unwrap()); + assert_eq!(n_txs - 1, committed_offset.unwrap() as usize); assert_eq!( - n_txs as usize, + n_txs, clog.transactions(&payload::ArrayDecoder).map(Result::unwrap).count() ); // We set max_records_in_commit to 1, so n_commits == n_txs - assert_eq!(n_txs as usize, clog.commits().map(Result::unwrap).count()); + assert_eq!(n_txs, clog.commits().map(Result::unwrap).count()); } #[test] @@ -45,6 +48,7 @@ fn resets() { CommitLogDir::from_path_unchecked(root.path()), Options { max_segment_size: 512, + max_records_in_commit: NonZeroU16::MIN, ..Options::default() }, None, @@ -52,8 +56,8 @@ fn resets() { .unwrap(); let payload = gen_payload(); - for i in 0..50 { - clog.commit([(i, payload)]).unwrap(); + for _ in 0..50 { + clog.append_maybe_flush(payload).unwrap(); } clog.flush_and_sync().unwrap(); @@ -84,6 +88,7 @@ fn compression() { CommitLogDir::from_path_unchecked(root.path()), Options { max_segment_size: 8 * 1024, + max_records_in_commit: NonZeroU16::MIN, ..Options::default() }, None, @@ -93,8 +98,8 @@ fn compression() { // try to generate commitlogs that will be amenable to compression - // random data doesn't compress well, so try and have there be repetition let payloads = (0..4).map(|_| gen_payload()).cycle().take(1024).collect::>(); - for (i, payload) in payloads.iter().enumerate() { - clog.commit([(i as u64, *payload)]).unwrap(); + for payload in &payloads { + clog.append_maybe_flush(*payload).unwrap(); } clog.flush_and_sync().unwrap(); diff --git a/crates/commitlog/tests/streaming/mod.rs b/crates/commitlog/tests/streaming/mod.rs index 3c2077bacaf..0474ae75b30 100644 --- a/crates/commitlog/tests/streaming/mod.rs +++ b/crates/commitlog/tests/streaming/mod.rs @@ -1,6 +1,6 @@ use std::{ io, - num::NonZeroU64, + num::{NonZeroU16, NonZeroU64}, ops::RangeBounds, path::{Path, PathBuf}, }; @@ -183,6 +183,7 @@ async fn assert_equal_dirs(src: &Path, dst: &Path) { fn default_options() -> Options { Options { max_segment_size: 8 * 1024, + max_records_in_commit: NonZeroU16::MIN, // Write an index entry for every commit. offset_index_interval_bytes: NonZeroU64::new(256).unwrap(), offset_index_require_segment_fsync: false, @@ -194,8 +195,8 @@ async fn fill_log(path: PathBuf) { spawn_blocking(move || { let clog = Commitlog::open(CommitLogDir::from_path_unchecked(path), default_options(), None).unwrap(); let payload = random_payload::gen_payload(); - for i in 0..100 { - clog.commit([(i, payload)]).unwrap(); + for _ in 0..100 { + clog.append_maybe_flush(payload).unwrap(); } clog.flush_and_sync().unwrap(); }) diff --git a/crates/core/src/db/durability.rs b/crates/core/src/db/durability.rs index c9dbf4383ae..f118759d3c6 100644 --- a/crates/core/src/db/durability.rs +++ b/crates/core/src/db/durability.rs @@ -7,7 +7,7 @@ use spacetimedb_commitlog::payload::{ Txdata, }; use spacetimedb_datastore::{execution_context::ReducerContext, traits::TxData}; -use spacetimedb_durability::{DurableOffset, Transaction, TxOffset}; +use spacetimedb_durability::{DurableOffset, TxOffset}; use spacetimedb_lib::Identity; use tokio::{ runtime, @@ -196,14 +196,14 @@ impl DurabilityWorkerActor { } pub fn do_durability(durability: &Durability, reducer_context: Option, tx_data: &TxData) { - let Some(tx_offset) = tx_data.tx_offset() else { + if tx_data.tx_offset().is_none() { let name = reducer_context.as_ref().map(|rcx| &rcx.name); debug_assert!( !tx_data.has_rows_or_connect_disconnect(name), "tx_data has no rows but has connect/disconnect: `{name:?}`" ); return; - }; + } let mut inserts: Box<_> = tx_data .persistent_inserts() @@ -240,11 +240,9 @@ impl DurabilityWorkerActor { }), }; + // TODO: Should measure queuing time + actual write // This does not block, as per trait docs. - durability.append_tx(Transaction { - offset: tx_offset, - txdata, - }); + durability.append_tx(txdata); } } @@ -283,9 +281,9 @@ mod tests { impl spacetimedb_durability::Durability for CountingDurability { type TxData = Txdata; - fn append_tx(&self, tx: Transaction) { + fn append_tx(&self, _tx: Self::TxData) { self.appended.send_modify(|offset| { - offset.replace(tx.offset); + *offset = offset.map(|x| x + 1).or(Some(0)); }); } diff --git a/crates/core/src/db/relational_db.rs b/crates/core/src/db/relational_db.rs index 85f1b24e7a0..75fb75cbce7 100644 --- a/crates/core/src/db/relational_db.rs +++ b/crates/core/src/db/relational_db.rs @@ -3,7 +3,7 @@ use crate::db::MetricsRecorderQueue; use crate::error::{DBError, RestoreSnapshotError}; use crate::messages::control_db::HostType; use crate::subscription::ExecutionCounters; -use crate::util::asyncify; +use crate::util::{asyncify, spawn_rayon}; use crate::worker_metrics::WORKER_METRICS; use anyhow::{anyhow, Context}; use enum_map::EnumMap; @@ -1741,6 +1741,7 @@ pub async fn local_durability( snapshot_worker: Option<&SnapshotWorker>, ) -> Result<(LocalDurability, DiskSizeFn), DBError> { let rt = tokio::runtime::Handle::current(); + // TODO: Should this better be spawn_blocking? let on_new_segment = snapshot_worker.map(|snapshot_worker| { let snapshot_worker = snapshot_worker.clone(); Arc::new(move || { @@ -1749,11 +1750,17 @@ pub async fn local_durability( snapshot_worker.request_snapshot_ignore_closed(); }) as Arc }); - let local = asyncify(move || { + let local = spawn_rayon(move || { durability::Local::open( replica_dir.clone(), rt, - <_>::default(), + durability::local::Options { + commitlog: commitlog::Options { + max_records_in_commit: 1.try_into().unwrap(), + ..Default::default() + }, + ..Default::default() + }, // Give the durability a handle to request a new snapshot run, // which it will send down whenever we rotate commitlog segments. on_new_segment, diff --git a/crates/core/src/subscription/module_subscription_actor.rs b/crates/core/src/subscription/module_subscription_actor.rs index 5f0a76fb239..f9d8a4c3663 100644 --- a/crates/core/src/subscription/module_subscription_actor.rs +++ b/crates/core/src/subscription/module_subscription_actor.rs @@ -1855,7 +1855,7 @@ mod tests { use spacetimedb_commitlog::{commitlog, repo}; use spacetimedb_data_structures::map::{HashCollectionExt as _, HashMap}; use spacetimedb_datastore::system_tables::{StRowLevelSecurityRow, ST_ROW_LEVEL_SECURITY_ID}; - use spacetimedb_durability::{Durability, EmptyHistory, Transaction, TxOffset}; + use spacetimedb_durability::{Durability, EmptyHistory, TxOffset}; use spacetimedb_execution::dml::MutDatastore; use spacetimedb_lib::bsatn::ToBsatn; use spacetimedb_lib::db::auth::StAccess; @@ -1940,10 +1940,13 @@ mod tests { impl Durability for ManualDurability { type TxData = Txdata; - fn append_tx(&self, tx: Transaction) { + fn append_tx(&self, tx: Self::TxData) { let mut commitlog = self.commitlog.write().unwrap(); - commitlog.commit([tx]).expect("commit failed"); - commitlog.flush().expect("error flushing commitlog"); + if let Err(tx) = commitlog.append(tx) { + commitlog.commit().expect("error flushing commitlog"); + commitlog.append(tx).expect("should be able to append after flush"); + } + commitlog.commit().expect("error flushing commitlog"); } fn durable_tx_offset(&self) -> spacetimedb_durability::DurableOffset { diff --git a/crates/durability/src/imp/local.rs b/crates/durability/src/imp/local.rs index c9397ca0fe3..e3a8afa34ac 100644 --- a/crates/durability/src/imp/local.rs +++ b/crates/durability/src/imp/local.rs @@ -1,5 +1,7 @@ use std::{ io, + num::NonZeroU16, + panic, path::PathBuf, sync::{ atomic::{AtomicU64, Ordering::Relaxed}, @@ -32,12 +34,8 @@ pub use spacetimedb_commitlog::repo::{OnNewSegmentFn, SizeOnDisk}; pub struct Options { /// Periodically flush and sync the log this often. /// - /// Default: 50ms + /// Default: 500ms pub sync_interval: Duration, - /// If `true`, flush (but not sync) each transaction. - /// - /// Default: false - pub flush_each_tx: bool, /// [`Commitlog`] configuration. pub commitlog: spacetimedb_commitlog::Options, } @@ -45,8 +43,7 @@ pub struct Options { impl Default for Options { fn default() -> Self { Self { - sync_interval: Duration::from_millis(50), - flush_each_tx: false, + sync_interval: Duration::from_millis(500), commitlog: Default::default(), } } @@ -83,7 +80,7 @@ pub struct Local { /// [`PersisterTask`]. /// /// Note that this is unbounded! - queue: mpsc::UnboundedSender>>, + queue: mpsc::UnboundedSender>, /// How many transactions are sitting in the `queue`. /// /// This is mainly for observability purposes, and can thus be updated with @@ -135,7 +132,7 @@ impl Local { queue_depth: queue_depth.clone(), sync_interval: opts.sync_interval, - flush_each_tx: opts.flush_each_tx, + max_records_in_commit: opts.commitlog.max_records_in_commit, lock, } @@ -194,7 +191,7 @@ struct Actor { queue_depth: Arc, sync_interval: Duration, - flush_each_tx: bool, + max_records_in_commit: NonZeroU16, #[allow(unused)] lock: Lock, @@ -204,7 +201,7 @@ impl Actor { #[instrument(name = "durability::local::actor", skip_all)] async fn run( self, - mut transactions_rx: mpsc::UnboundedReceiver>>, + mut txdata_rx: mpsc::UnboundedReceiver>, mut shutdown_rx: mpsc::Receiver>, ) { info!("starting durability actor"); @@ -227,7 +224,7 @@ impl Actor { biased; Some(reply) = shutdown_rx.recv() => { - transactions_rx.close(); + txdata_rx.close(); let _ = reply.send(self.lock.notified()); }, @@ -238,24 +235,21 @@ impl Actor { } }, - tx = transactions_rx.recv() => { - let Some(tx) = tx else { + data = txdata_rx.recv() => { + let Some(txdata) = data else { break; }; self.queue_depth.fetch_sub(1, Relaxed); - let clog = self.clog.clone(); - let flush = self.flush_each_tx; - spawn_blocking(move || -> io::Result<()> { - clog.commit([tx])?; - if flush { - clog.flush()?; - } - - Ok(()) - }) - .await - .expect("commitlog write panicked") - .expect("commitlog write failed"); + // If we are writing one commit per tx, trying to buffer is + // fairly pointless. Immediately flush instead. + // + // Otherwise, try `Commitlog::append` as a fast-path + // that doesn't require `spawn_blocking`. + if self.max_records_in_commit.get() == 1 { + self.flush_append(txdata, true).await; + } else if let Err(retry) = self.clog.append(txdata) { + self.flush_append(retry, false).await + } }, } } @@ -267,6 +261,30 @@ impl Actor { info!("exiting durability actor"); } + #[instrument(skip_all)] + async fn flush_append(&self, txdata: Txdata, flush_after: bool) { + let clog = self.clog.clone(); + let span = Span::current(); + spawn_blocking(move || { + let _span = span.enter(); + let mut retry = Some(txdata); + while let Some(txdata) = retry.take() { + if let Err(error::Append { txdata, source }) = clog.append_maybe_flush(txdata) { + flush_error("append-maybe-flush", &source); + retry = Some(txdata); + } + } + + if flush_after { + clog.flush() + .map(drop) + .unwrap_or_else(|e| flush_error("flush-after", &e)); + } + }) + .await + .expect("commitlog append blocking task panicked") + } + #[instrument(skip_all)] async fn flush_and_sync(&self) -> io::Result> { // Skip if nothing changed. @@ -284,7 +302,7 @@ impl Actor { }) .await .expect("commitlog flush-and-sync blocking task panicked") - .inspect_err(|e| warn!("error flushing commitlog: {e:#}")) + .inspect_err(|e| flush_error("flush-and-sync", e)) .inspect(|maybe_offset| { if let Some(new_offset) = maybe_offset { trace!("synced to offset {new_offset}"); @@ -324,11 +342,25 @@ impl Drop for Lock { } } +/// Handle an error flushing the commitlog. +/// +/// Panics if the error indicates that the log may be permanently unwritable. +#[inline] +#[track_caller] +fn flush_error(task: &str, e: &io::Error) { + warn!("error flushing commitlog ({task}): {e:?}"); + if matches!(e.kind(), io::ErrorKind::AlreadyExists | io::ErrorKind::StorageFull) { + panic!("{e}"); + } +} + impl Durability for Local { type TxData = Txdata; - fn append_tx(&self, tx: Transaction) { - self.queue.send(tx).expect("durability actor crashed"); + fn append_tx(&self, tx: Self::TxData) { + if self.queue.send(tx).is_err() { + panic!("durability actor crashed"); + } self.queue_depth.fetch_add(1, Relaxed); } diff --git a/crates/durability/src/imp/mod.rs b/crates/durability/src/imp/mod.rs index 1636e05cc51..6f83106d0b3 100644 --- a/crates/durability/src/imp/mod.rs +++ b/crates/durability/src/imp/mod.rs @@ -15,7 +15,7 @@ mod testing { use futures::FutureExt as _; use tokio::sync::watch; - use crate::{Close, Durability, DurableOffset, Transaction, TxOffset}; + use crate::{Close, Durability, DurableOffset, TxOffset}; /// A [`Durability`] impl that sends all transactions into the void. /// @@ -41,7 +41,7 @@ mod testing { impl Durability for NoDurability { type TxData = T; - fn append_tx(&self, _: Transaction) { + fn append_tx(&self, _: Self::TxData) { if self.closed.load(Ordering::Relaxed) { panic!("`close` was called on this `NoDurability` instance"); } diff --git a/crates/durability/src/lib.rs b/crates/durability/src/lib.rs index 7722d86914a..2038d90eeb9 100644 --- a/crates/durability/src/lib.rs +++ b/crates/durability/src/lib.rs @@ -95,35 +95,28 @@ pub type Close = BoxFuture<'static, Option>; /// /// NOTE: This is a preliminary definition, still under consideration. /// -/// A durability implementation accepts a [Transaction] to be made durable via -/// the [Durability::append_tx] method in a non-blocking fashion. -/// -/// Once a transaction becomes durable, the [DurableOffset] is updated. -/// What durable means depends on the implementation, informally it can be -/// thought of as "written to disk". +/// A durability implementation accepts a payload representing a single database +/// transaction via [`Durability::append_tx`] in a non-blocking fashion. The +/// payload _should_ become durable eventually. [`TxOffset`]s reported by +/// [`Durability::durable_tx_offset`] shall be considered durable to the +/// extent the implementation can guarantee. pub trait Durability: Send + Sync { /// The payload representing a single transaction. type TxData; - /// Submit a [Transaction] to be made durable. + /// Submit the transaction payload to be made durable. /// /// This method must never block, and accept new transactions even if they /// cannot be made durable immediately. /// - /// Errors may be signalled by panicking. - // - // TODO: Support batches of txs, i.e. commits. - // - // The commitlog supports this, but allocation overhead in the durability - // API is too high given we don't make any use of it. - // - // We don't make any use of it because a commit is an atomic unit of storage - // (i.e. a torn write will corrupt all transactions contained in it), and it - // is very unclear when it is both correct and beneficial to bundle more - // than a single transaction into a commit. - fn append_tx(&self, tx: Transaction); + /// A permanent failure of the durable storage may be signalled by panicking. + fn append_tx(&self, tx: Self::TxData); - /// Obtain a handle to the [DurableOffset]. + /// The [`TxOffset`] considered durable. + /// + /// A `None` return value indicates that the durable offset is not known, + /// either because nothing has been persisted yet, or because the status + /// cannot be retrieved. fn durable_tx_offset(&self) -> DurableOffset; /// Asynchronously request the durability to shut down, without dropping it. diff --git a/crates/durability/tests/io/fallocate.rs b/crates/durability/tests/io/fallocate.rs index be08d3f6a4d..5dc67dd13d3 100644 --- a/crates/durability/tests/io/fallocate.rs +++ b/crates/durability/tests/io/fallocate.rs @@ -98,8 +98,8 @@ async fn local_durability_crashes_on_new_segment_if_not_enough_space() { // Mark initial segment as seen. new_segment_rx.borrow_and_update(); // Write past available space. - for offset in 0..256 { - durability.append_tx((offset, txdata.clone()).into()); + for _ in 0..256 { + durability.append_tx(txdata.clone()); } // Ensure new segment is created. new_segment_rx.changed().await?; @@ -107,7 +107,7 @@ async fn local_durability_crashes_on_new_segment_if_not_enough_space() { sleep(Duration::from_millis(5)).await; // Durability actor should have crashed, so this should panic. info!("trying append on crashed durability"); - durability.append_tx((256, txdata.clone()).into()); + durability.append_tx(txdata.clone()); } Ok(()) @@ -168,6 +168,7 @@ async fn local_durability( spacetimedb_durability::local::Options { commitlog: spacetimedb_commitlog::Options { max_segment_size, + max_records_in_commit: 1.try_into().unwrap(), preallocate_segments: true, ..<_>::default() },