Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
272 changes: 154 additions & 118 deletions crates/commitlog/src/commitlog.rs

Large diffs are not rendered by default.

105 changes: 47 additions & 58 deletions crates/commitlog/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::{
io,
num::{NonZeroU16, NonZeroU64},
num::NonZeroU64,
ops::RangeBounds,
sync::{Arc, RwLock},
};
Expand All @@ -17,10 +17,11 @@ pub mod segment;
mod varchar;
mod varint;

use crate::segment::Committed;
pub use crate::{
commit::{Commit, StoredCommit},
payload::{Decoder, Encode},
repo::fs::SizeOnDisk,
repo::{fs::SizeOnDisk, TxOffset},
segment::{Transaction, DEFAULT_LOG_FORMAT_VERSION},
varchar::Varchar,
};
Expand Down Expand Up @@ -57,14 +58,6 @@ pub struct Options {
/// Default: 1GiB
#[cfg_attr(feature = "serde", serde(default = "Options::default_max_segment_size"))]
pub max_segment_size: u64,
/// The maximum number of records in a commit.
///
/// If this number is exceeded, the commit is flushed to disk even without
/// explicitly calling [`Commitlog::flush`].
///
/// Default: 1
#[cfg_attr(feature = "serde", serde(default = "Options::default_max_records_in_commit"))]
pub max_records_in_commit: NonZeroU16,
/// Whenever at least this many bytes have been written to the currently
/// active segment, an entry is added to its offset index.
///
Expand Down Expand Up @@ -96,6 +89,12 @@ pub struct Options {
/// Has no effect if the `fallocate` feature is not enabled.
#[cfg_attr(feature = "serde", serde(default = "Options::default_preallocate_segments"))]
pub preallocate_segments: bool,
/// Size in bytes of the memory buffer holding commit data before flushing
/// to storage.
///
/// Default: 4KiB
#[cfg_attr(feature = "serde", serde(default = "Options::default_write_buffer_size"))]
pub write_buffer_size: usize,
}

impl Default for Options {
Expand All @@ -106,18 +105,18 @@ impl Default for Options {

impl Options {
pub const DEFAULT_MAX_SEGMENT_SIZE: u64 = 1024 * 1024 * 1024;
pub const DEFAULT_MAX_RECORDS_IN_COMMIT: NonZeroU16 = NonZeroU16::new(1).expect("1 > 0, qed");
pub const DEFAULT_OFFSET_INDEX_INTERVAL_BYTES: NonZeroU64 = NonZeroU64::new(4096).expect("4096 > 0, qed");
pub const DEFAULT_OFFSET_INDEX_REQUIRE_SEGMENT_FSYNC: bool = false;
pub const DEFAULT_PREALLOCATE_SEGMENTS: bool = false;
pub const DEFAULT_WRITE_BUFFER_SIZE: usize = 4 * 1024;

pub const DEFAULT: Self = Self {
log_format_version: DEFAULT_LOG_FORMAT_VERSION,
max_segment_size: Self::default_max_segment_size(),
max_records_in_commit: Self::default_max_records_in_commit(),
offset_index_interval_bytes: Self::default_offset_index_interval_bytes(),
offset_index_require_segment_fsync: Self::default_offset_index_require_segment_fsync(),
preallocate_segments: Self::default_preallocate_segments(),
write_buffer_size: Self::default_write_buffer_size(),
};

pub const fn default_log_format_version() -> u8 {
Expand All @@ -128,10 +127,6 @@ impl Options {
Self::DEFAULT_MAX_SEGMENT_SIZE
}

pub const fn default_max_records_in_commit() -> NonZeroU16 {
Self::DEFAULT_MAX_RECORDS_IN_COMMIT
}

pub const fn default_offset_index_interval_bytes() -> NonZeroU64 {
Self::DEFAULT_OFFSET_INDEX_INTERVAL_BYTES
}
Expand All @@ -144,6 +139,10 @@ impl Options {
Self::DEFAULT_PREALLOCATE_SEGMENTS
}

pub const fn default_write_buffer_size() -> usize {
Self::DEFAULT_WRITE_BUFFER_SIZE
}

/// Compute the length in bytes of an offset index based on the settings in
/// `self`.
pub fn offset_index_len(&self) -> u64 {
Expand Down Expand Up @@ -262,7 +261,7 @@ impl<T> Commitlog<T> {
pub fn flush(&self) -> io::Result<Option<u64>> {
let mut inner = self.inner.write().unwrap();
trace!("flush commitlog");
inner.commit()?;
inner.flush()?;

Ok(inner.max_committed_offset())
}
Expand All @@ -282,7 +281,7 @@ impl<T> Commitlog<T> {
pub fn flush_and_sync(&self) -> io::Result<Option<u64>> {
let mut inner = self.inner.write().unwrap();
trace!("flush and sync commitlog");
inner.commit()?;
inner.flush()?;
inner.sync();

Ok(inner.max_committed_offset())
Expand Down Expand Up @@ -383,57 +382,47 @@ impl<T> Commitlog<T> {
}

impl<T: Encode> Commitlog<T> {
/// Append the record `txdata` to the log.
/// Write `transactions` to the log.
///
/// If the internal buffer exceeds [`Options::max_records_in_commit`], the
/// argument is returned in an `Err`. The caller should [`Self::flush`] the
/// log and try again.
/// This will store all `transactions` as a single [Commit]
/// (note that `transactions` must not yield more than [u16::MAX] elements).
///
/// In case the log is appended to from multiple threads, this may result in
/// a busy loop trying to acquire a slot in the buffer. In such scenarios,
/// [`Self::append_maybe_flush`] is preferable.
pub fn append(&self, txdata: T) -> Result<(), T> {
let mut inner = self.inner.write().unwrap();
inner.append(txdata)
}

/// Append the record `txdata` to the log.
/// Data is buffered internally, call [Self::flush] to force flushing to
/// the underlying storage.
///
/// Returns `Ok(None)` if `transactions` was empty, otherwise [Committed],
/// which contains the offset range and checksum of the commit.
///
/// The `txdata` payload is buffered in memory until either:
/// # Errors
///
/// - [`Self::flush`] is called explicitly, or
/// - [`Options::max_records_in_commit`] is exceeded
/// An `Err` value is returned in the following cases:
///
/// In the latter case, [`Self::append`] flushes implicitly, _before_
/// appending the `txdata` argument.
/// - if the transaction sequence is invalid, e.g. because the transaction
/// offsets are not contiguous.
///
/// I.e. the argument is not guaranteed to be flushed after the method
/// returns. If that is desired, [`Self::flush`] must be called explicitly.
/// In this case, **none** of the `transactions` will be written.
///
/// If writing `txdata` to the commitlog results in a new segment file being opened,
/// we will send a message down `on_new_segment`.
/// This will be hooked up to the `request_snapshot` channel of a `SnapshotWorker`.
/// - if creating the new segment fails due to an I/O error.
///
/// # Errors
/// # Panics
///
/// The method panics if:
///
/// - `transactions` exceeds [u16::MAX] elements
///
/// - [Self::flush] or writing to the underlying buffered writer fails
///
/// If the log needs to be flushed, but an I/O error occurs, ownership of
/// `txdata` is returned back to the caller alongside the [`io::Error`].
/// This is likely caused by some storage issue. As we cannot tell with
/// certainty how much data (if any) has been written, the internal state
/// becomes invalid and thus a panic is raised.
///
/// The value can then be used to retry appending.
pub fn append_maybe_flush(&self, txdata: T) -> Result<(), error::Append<T>> {
/// - [Self::sync] panics (called when rotating segments)
pub fn commit<U: Into<Transaction<T>>>(
&self,
transactions: impl IntoIterator<Item = U>,
) -> io::Result<Option<Committed>> {
let mut inner = self.inner.write().unwrap();

if let Err(txdata) = inner.append(txdata) {
if let Err(source) = inner.commit() {
return Err(error::Append { txdata, source });
}

// `inner.commit.n` must be zero at this point
let res = inner.append(txdata);
debug_assert!(res.is_ok(), "failed to append while holding write lock");
}

Ok(())
inner.commit(transactions)
}

/// Obtain an iterator which traverses the log from the start, yielding
Expand Down
6 changes: 1 addition & 5 deletions crates/commitlog/src/repo/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,13 +214,11 @@ pub fn create_segment_writer<R: Repo>(
records: Vec::new(),
epoch,
},
inner: io::BufWriter::new(storage),
inner: io::BufWriter::with_capacity(opts.write_buffer_size, storage),

min_tx_offset: offset,
bytes_written: Header::LEN as u64,

max_records_in_commit: opts.max_records_in_commit,

offset_index_head: create_offset_index_writer(repo, offset, opts),
})
}
Expand Down Expand Up @@ -293,8 +291,6 @@ pub fn resume_segment_writer<R: Repo>(
min_tx_offset: tx_range.start,
bytes_written: size_in_bytes,

max_records_in_commit: opts.max_records_in_commit,

offset_index_head: create_offset_index_writer(repo, offset, opts),
}))
}
Expand Down
Loading
Loading