From ed8e054f00ae97e071302f0d837b7c89f0b9a435 Mon Sep 17 00:00:00 2001 From: Carter Green Date: Thu, 22 Jan 2026 16:29:20 -0600 Subject: [PATCH 1/5] ADD: Add compression support to C++ live clients --- CHANGELOG.md | 10 +++ cmake/SourcesAndHeaders.cmake | 2 + examples/live/simple.cpp | 3 +- include/databento/detail/buffer.hpp | 3 + include/databento/detail/live_connection.hpp | 38 ++++++++ include/databento/detail/tcp_client.hpp | 18 +--- include/databento/detail/tcp_readable.hpp | 24 ++++++ include/databento/detail/zstd_stream.hpp | 8 +- include/databento/file_stream.hpp | 3 + include/databento/ireadable.hpp | 23 ++++- include/databento/live.hpp | 3 + include/databento/live_blocking.hpp | 16 ++-- include/databento/live_threaded.hpp | 7 +- src/detail/buffer.cpp | 7 ++ src/detail/live_connection.cpp | 45 ++++++++++ src/detail/tcp_client.cpp | 9 +- src/detail/tcp_readable.cpp | 17 ++++ src/detail/zstd_stream.cpp | 78 +++++++++++------ src/file_stream.cpp | 8 ++ src/live.cpp | 31 ++++--- src/live_blocking.cpp | 50 ++++++----- src/live_threaded.cpp | 18 ++-- tests/include/mock/mock_lsg_server.hpp | 23 ++++- tests/src/live_blocking_tests.cpp | 91 ++++++++++++++++++++ tests/src/live_threaded_tests.cpp | 83 ++++++++++++++++++ tests/src/log_tests.cpp | 1 - tests/src/mock_lsg_server.cpp | 80 +++++++++++++---- tests/src/tcp_client_tests.cpp | 12 +-- tests/src/zstd_stream_tests.cpp | 50 +++++++++++ 29 files changed, 639 insertions(+), 122 deletions(-) create mode 100644 include/databento/detail/live_connection.hpp create mode 100644 include/databento/detail/tcp_readable.hpp create mode 100644 src/detail/live_connection.cpp create mode 100644 src/detail/tcp_readable.cpp diff --git a/CHANGELOG.md b/CHANGELOG.md index d924e52..0b6a46d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,15 @@ # Changelog +## 0.47.0 - TBD + +### Enhancements +- Added Zstd compression support to live clients which can be enabled with +`LiveBuilder::SetCompression()`. It's disabled by default +- Added `Compression()` getter to `LiveBlocking` and `LiveThreaded` + +### Breaking changes +- Added an overload to the `ReadSome` method on `IReadable` for timeout support + ## 0.46.1 - 2026-01-27 ### Enhancements diff --git a/cmake/SourcesAndHeaders.cmake b/cmake/SourcesAndHeaders.cmake index 48698d5..bbe24ca 100644 --- a/cmake/SourcesAndHeaders.cmake +++ b/cmake/SourcesAndHeaders.cmake @@ -54,9 +54,11 @@ set(sources src/detail/dbn_buffer_decoder.cpp src/detail/http_client.cpp src/detail/json_helpers.cpp + src/detail/live_connection.cpp src/detail/scoped_fd.cpp src/detail/sha256_hasher.cpp src/detail/tcp_client.cpp + src/detail/tcp_readable.cpp src/detail/zstd_stream.cpp src/enums.cpp src/exceptions.cpp diff --git a/examples/live/simple.cpp b/examples/live/simple.cpp index 85f0316..0d52c97 100644 --- a/examples/live/simple.cpp +++ b/examples/live/simple.cpp @@ -25,12 +25,13 @@ int main() { .SetSendTsOut(true) .SetKeyFromEnv() .SetDataset(db::Dataset::GlbxMdp3) + .SetCompression(db::Compression::Zstd) .BuildThreaded(); // Set up signal handler for Ctrl+C std::signal(SIGINT, [](int signal) { gSignal = signal; }); - std::vector symbols{"ESZ5", "ESZ5 C6200", "ESZ5 P5500"}; + std::vector symbols{"ESZ6", "ESZ6 C8200", "ESZ6 P7500"}; client.Subscribe(symbols, db::Schema::Definition, db::SType::RawSymbol); client.Subscribe(symbols, db::Schema::Mbo, db::SType::RawSymbol); diff --git a/include/databento/detail/buffer.hpp b/include/databento/detail/buffer.hpp index ff9d68e..f54b1ca 100644 --- a/include/databento/detail/buffer.hpp +++ b/include/databento/detail/buffer.hpp @@ -38,6 +38,9 @@ class Buffer : public IReadable, public IWritable { // Will throw if `length > ReadCapacity()`. void ReadExact(std::byte* buffer, std::size_t length) override; std::size_t ReadSome(std::byte* buffer, std::size_t max_length) override; + // timeout is ignored + IReadable::Result ReadSome(std::byte* buffer, std::size_t max_length, + std::chrono::milliseconds timeout) override; std::byte* ReadBegin() { return read_pos_; } std::byte* ReadEnd() { return write_pos_; } diff --git a/include/databento/detail/live_connection.hpp b/include/databento/detail/live_connection.hpp new file mode 100644 index 0000000..3d74abe --- /dev/null +++ b/include/databento/detail/live_connection.hpp @@ -0,0 +1,38 @@ +#pragma once + +#include +#include +#include +#include +#include +#include + +#include "databento/detail/tcp_client.hpp" +#include "databento/detail/zstd_stream.hpp" +#include "databento/enums.hpp" +#include "databento/ireadable.hpp" +#include "databento/iwritable.hpp" + +namespace databento::detail { +// Manages the TCP connection to the live gateway with optionally compressed reads for +// the DBN data. +class LiveConnection : IWritable { + public: + LiveConnection(const std::string& gateway, std::uint16_t port); + + void WriteAll(std::string_view str); + void WriteAll(const std::byte* buffer, std::size_t size); + void ReadExact(std::byte* buffer, std::size_t size); + IReadable::Result ReadSome(std::byte* buffer, std::size_t max_size); + IReadable::Result ReadSome(std::byte* buffer, std::size_t max_size, + std::chrono::milliseconds timeout); + // Closes the socket. + void Close(); + // Sets compression for subsequent reads. + void SetCompression(Compression compression); + + private: + TcpClient client_; + std::optional zstd_stream_; +}; +} // namespace databento::detail diff --git a/include/databento/detail/tcp_client.hpp b/include/databento/detail/tcp_client.hpp index 7aa800d..ea2b764 100644 --- a/include/databento/detail/tcp_client.hpp +++ b/include/databento/detail/tcp_client.hpp @@ -7,21 +7,11 @@ #include #include "databento/detail/scoped_fd.hpp" // ScopedFd +#include "databento/ireadable.hpp" namespace databento::detail { class TcpClient { public: - enum class Status : std::uint8_t { - Ok, - Timeout, - Closed, - }; - - struct Result { - std::size_t read_size; - Status status; - }; - struct RetryConf { std::uint32_t max_attempts{1}; std::chrono::seconds max_wait{std::chrono::minutes{1}}; @@ -33,11 +23,11 @@ class TcpClient { void WriteAll(std::string_view str); void WriteAll(const std::byte* buffer, std::size_t size); void ReadExact(std::byte* buffer, std::size_t size); - Result ReadSome(std::byte* buffer, std::size_t max_size); + IReadable::Result ReadSome(std::byte* buffer, std::size_t max_size); // Passing a timeout of 0 will block until data is available of the socket is // closed, the same behavior as the Read overload without a timeout. - Result ReadSome(std::byte* buffer, std::size_t max_size, - std::chrono::milliseconds timeout); + IReadable::Result ReadSome(std::byte* buffer, std::size_t max_size, + std::chrono::milliseconds timeout); // Closes the socket. void Close(); diff --git a/include/databento/detail/tcp_readable.hpp b/include/databento/detail/tcp_readable.hpp new file mode 100644 index 0000000..8f23fb4 --- /dev/null +++ b/include/databento/detail/tcp_readable.hpp @@ -0,0 +1,24 @@ +#pragma once + +#include +#include + +#include "databento/detail/tcp_client.hpp" +#include "databento/ireadable.hpp" + +namespace databento::detail { +// Adapter wrapping TcpClient to implement IReadable interface and be passed +// as a non-owned pointer to ZstdDecodeStream. +class TcpReadable : public IReadable { + public: + explicit TcpReadable(TcpClient* client) : client_{client} {} + + void ReadExact(std::byte* buffer, std::size_t length) override; + std::size_t ReadSome(std::byte* buffer, std::size_t max_length) override; + IReadable::Result ReadSome(std::byte* buffer, std::size_t max_length, + std::chrono::milliseconds timeout) override; + + private: + TcpClient* client_; +}; +} // namespace databento::detail diff --git a/include/databento/detail/zstd_stream.hpp b/include/databento/detail/zstd_stream.hpp index 0fff739..719e37c 100644 --- a/include/databento/detail/zstd_stream.hpp +++ b/include/databento/detail/zstd_stream.hpp @@ -2,6 +2,7 @@ #include +#include #include // size_t #include // unique_ptr #include @@ -19,9 +20,12 @@ class ZstdDecodeStream : public IReadable { // Read exactly `length` bytes into `buffer`. void ReadExact(std::byte* buffer, std::size_t length) override; - // Read at most `length` bytes. Returns the number of bytes read. Will only + // Read at most `max_length` bytes. Returns the number of bytes read. Will only // return 0 if the end of the stream is reached. std::size_t ReadSome(std::byte* buffer, std::size_t max_length) override; + // Read at most `max_length` bytes with timeout support. + IReadable::Result ReadSome(std::byte* buffer, std::size_t max_length, + std::chrono::milliseconds timeout) override; IReadable* Input() const { return input_.get(); } @@ -44,6 +48,8 @@ class ZstdCompressStream : public IWritable { ~ZstdCompressStream() override; void WriteAll(const std::byte* buffer, std::size_t length) override; + // Flush any buffered data without ending the stream + void Flush(); private: ILogReceiver* log_receiver_; diff --git a/include/databento/file_stream.hpp b/include/databento/file_stream.hpp index 9f9e011..8b588fd 100644 --- a/include/databento/file_stream.hpp +++ b/include/databento/file_stream.hpp @@ -17,6 +17,9 @@ class InFileStream : public IReadable { // Read at most `length` bytes. Returns the number of bytes read. Will only // return 0 if the end of the stream is reached. std::size_t ReadSome(std::byte* buffer, std::size_t max_length) override; + // timeout is ignored + Result ReadSome(std::byte* buffer, std::size_t max_length, + std::chrono::milliseconds timeout) override; private: std::ifstream stream_; diff --git a/include/databento/ireadable.hpp b/include/databento/ireadable.hpp index 0c242a1..8e55531 100644 --- a/include/databento/ireadable.hpp +++ b/include/databento/ireadable.hpp @@ -1,18 +1,37 @@ #pragma once +#include // milliseconds #include // byte, size_t +#include // uint8_t namespace databento { // An abstract class for readable objects to allow for runtime polymorphism // around DBN decoding. class IReadable { public: + enum class Status : std::uint8_t { + Ok, // Data read successfully + Timeout, // Timeout reached before any data available + Closed, // Stream is closed/EOF + }; + + struct Result { + std::size_t read_size; // Number of bytes read + Status status; // Status of the read operation + }; + virtual ~IReadable() = default; // Read exactly `length` bytes into `buffer`. virtual void ReadExact(std::byte* buffer, std::size_t length) = 0; - // Read at most `length` bytes. Returns the number of bytes read. Will only - // return 0 if the end of the stream is reached. + // Read at most `length` bytes. Returns the number of bytes read. Will only return 0 + // if the end of the stream is reached. virtual std::size_t ReadSome(std::byte* buffer, std::size_t max_length) = 0; + // Read at most `max_length` bytes with timeout support. Returns Result with bytes + // read and status. Status will be Timeout if no data available within timeout period, + // Closed if stream is closed, or Ok if data was read. A timeout of 0 means wait + // indefinitely (same as the no-timeout overload). + virtual Result ReadSome(std::byte* buffer, std::size_t max_length, + std::chrono::milliseconds timeout) = 0; }; } // namespace databento diff --git a/include/databento/live.hpp b/include/databento/live.hpp index 000b75a..bd66e68 100644 --- a/include/databento/live.hpp +++ b/include/databento/live.hpp @@ -51,6 +51,8 @@ class LiveBuilder { LiveBuilder& SetBufferSize(std::size_t size); // Appends to the default user agent. LiveBuilder& ExtendUserAgent(std::string extension); + // Sets the compression mode for the read stream. + LiveBuilder& SetCompression(Compression compression); /* * Build a live client instance @@ -77,5 +79,6 @@ class LiveBuilder { std::optional heartbeat_interval_{}; std::size_t buffer_size_; std::string user_agent_ext_; + Compression compression_{Compression::None}; }; } // namespace databento diff --git a/include/databento/live_blocking.hpp b/include/databento/live_blocking.hpp index 5995c09..4f6d81d 100644 --- a/include/databento/live_blocking.hpp +++ b/include/databento/live_blocking.hpp @@ -12,8 +12,8 @@ #include "databento/datetime.hpp" // UnixNanos #include "databento/dbn.hpp" // Metadata #include "databento/detail/buffer.hpp" -#include "databento/detail/tcp_client.hpp" // TcpClient -#include "databento/enums.hpp" // Schema, SType, VersionUpgradePolicy +#include "databento/detail/live_connection.hpp" // LiveConnection +#include "databento/enums.hpp" // Schema, SType, VersionUpgradePolicy, Compression #include "databento/live_subscription.hpp" #include "databento/record.hpp" // Record, RecordHeader @@ -44,6 +44,7 @@ class LiveBlocking { std::optional HeartbeatInterval() const { return heartbeat_interval_; } + databento::Compression Compression() const { return compression_; } const std::vector& Subscriptions() const { return subscriptions_; } std::vector& Subscriptions() { return subscriptions_; } @@ -93,12 +94,14 @@ class LiveBlocking { LiveBlocking(ILogReceiver* log_receiver, std::string key, std::string dataset, bool send_ts_out, VersionUpgradePolicy upgrade_policy, std::optional heartbeat_interval, - std::size_t buffer_size, std::string user_agent_ext); + std::size_t buffer_size, std::string user_agent_ext, + databento::Compression compression); LiveBlocking(ILogReceiver* log_receiver, std::string key, std::string dataset, std::string gateway, std::uint16_t port, bool send_ts_out, VersionUpgradePolicy upgrade_policy, std::optional heartbeat_interval, - std::size_t buffer_size, std::string user_agent_ext); + std::size_t buffer_size, std::string user_agent_ext, + databento::Compression compression); std::string DetermineGateway() const; std::uint64_t Authenticate(); @@ -109,7 +112,7 @@ class LiveBlocking { void IncrementSubCounter(); void Subscribe(std::string_view sub_msg, const std::vector& symbols, bool use_snapshot); - detail::TcpClient::Result FillBuffer(std::chrono::milliseconds timeout); + IReadable::Result FillBuffer(std::chrono::milliseconds timeout); RecordHeader* BufferRecordHeader(); static constexpr std::size_t kMaxStrLen = 24L * 1024; @@ -124,7 +127,8 @@ class LiveBlocking { std::uint8_t version_{}; const VersionUpgradePolicy upgrade_policy_; const std::optional heartbeat_interval_; - detail::TcpClient client_; + const databento::Compression compression_; + detail::LiveConnection connection_; std::uint32_t sub_counter_{}; std::vector subscriptions_; detail::Buffer buffer_; diff --git a/include/databento/live_threaded.hpp b/include/databento/live_threaded.hpp index fb44ee0..231790a 100644 --- a/include/databento/live_threaded.hpp +++ b/include/databento/live_threaded.hpp @@ -54,6 +54,7 @@ class LiveThreaded { bool SendTsOut() const; VersionUpgradePolicy UpgradePolicy() const; std::optional HeartbeatInterval() const; + databento::Compression Compression() const; const std::vector& Subscriptions() const; std::vector& Subscriptions(); @@ -107,12 +108,14 @@ class LiveThreaded { LiveThreaded(ILogReceiver* log_receiver, std::string key, std::string dataset, bool send_ts_out, VersionUpgradePolicy upgrade_policy, std::optional heartbeat_interval, - std::size_t buffer_size, std::string user_agent_ext); + std::size_t buffer_size, std::string user_agent_ext, + databento::Compression compression); LiveThreaded(ILogReceiver* log_receiver, std::string key, std::string dataset, std::string gateway, std::uint16_t port, bool send_ts_out, VersionUpgradePolicy upgrade_policy, std::optional heartbeat_interval, - std::size_t buffer_size, std::string user_agent_ext); + std::size_t buffer_size, std::string user_agent_ext, + databento::Compression compression); // unique_ptr to be movable std::unique_ptr impl_; diff --git a/src/detail/buffer.cpp b/src/detail/buffer.cpp index 4659411..22dec0d 100644 --- a/src/detail/buffer.cpp +++ b/src/detail/buffer.cpp @@ -7,6 +7,7 @@ #include "stream_op_helper.hpp" using databento::detail::Buffer; +using Status = databento::IReadable::Status; size_t Buffer::Write(const char* data, std::size_t length) { return Write(reinterpret_cast(data), length); @@ -51,6 +52,12 @@ std::size_t Buffer::ReadSome(std::byte* buffer, std::size_t max_length) { return read_size; } +databento::IReadable::Result Buffer::ReadSome(std::byte* buffer, std::size_t max_length, + std::chrono::milliseconds) { + const auto bytes_read = ReadSome(buffer, max_length); + return {bytes_read, Status::Ok}; +} + void Buffer::Reserve(std::size_t capacity) { if (capacity <= Capacity()) { return; diff --git a/src/detail/live_connection.cpp b/src/detail/live_connection.cpp new file mode 100644 index 0000000..c2b9d43 --- /dev/null +++ b/src/detail/live_connection.cpp @@ -0,0 +1,45 @@ +#include "databento/detail/live_connection.hpp" + +#include + +#include "databento/detail/tcp_readable.hpp" + +using databento::detail::LiveConnection; + +LiveConnection::LiveConnection(const std::string& gateway, std::uint16_t port) + : client_{gateway, port} {} + +void LiveConnection::WriteAll(std::string_view str) { client_.WriteAll(str); } + +void LiveConnection::WriteAll(const std::byte* buffer, std::size_t size) { + client_.WriteAll(buffer, size); +} + +void LiveConnection::ReadExact(std::byte* buffer, std::size_t size) { + if (zstd_stream_) { + zstd_stream_->ReadExact(buffer, size); + } else { + client_.ReadExact(buffer, size); + } +} + +databento::IReadable::Result LiveConnection::ReadSome(std::byte* buffer, + std::size_t max_size) { + return ReadSome(buffer, max_size, std::chrono::milliseconds{}); +} + +databento::IReadable::Result LiveConnection::ReadSome( + std::byte* buffer, std::size_t max_size, std::chrono::milliseconds timeout) { + if (zstd_stream_) { + return zstd_stream_->ReadSome(buffer, max_size, timeout); + } + return client_.ReadSome(buffer, max_size, timeout); +} + +void LiveConnection::Close() { client_.Close(); } + +void LiveConnection::SetCompression(Compression compression) { + if (compression == Compression::Zstd) { + zstd_stream_.emplace(std::make_unique(&client_)); + } +} diff --git a/src/detail/tcp_client.cpp b/src/detail/tcp_client.cpp index 2ad2900..9d39615 100644 --- a/src/detail/tcp_client.cpp +++ b/src/detail/tcp_client.cpp @@ -20,6 +20,7 @@ #include "databento/exceptions.hpp" // TcpError using databento::detail::TcpClient; +using Status = databento::IReadable::Status; namespace { int GetErrNo() { @@ -62,7 +63,8 @@ void TcpClient::ReadExact(std::byte* buffer, std::size_t size) { } } -TcpClient::Result TcpClient::ReadSome(std::byte* buffer, std::size_t max_size) { +databento::IReadable::Result TcpClient::ReadSome(std::byte* buffer, + std::size_t max_size) { const ::ssize_t res = ::recv(socket_.Get(), reinterpret_cast(buffer), max_size, {}); if (res < 0) { @@ -71,8 +73,9 @@ TcpClient::Result TcpClient::ReadSome(std::byte* buffer, std::size_t max_size) { return {static_cast(res), res == 0 ? Status::Closed : Status::Ok}; } -TcpClient::Result TcpClient::ReadSome(std::byte* buffer, std::size_t max_size, - std::chrono::milliseconds timeout) { +databento::IReadable::Result TcpClient::ReadSome(std::byte* buffer, + std::size_t max_size, + std::chrono::milliseconds timeout) { pollfd fds{socket_.Get(), POLLIN, {}}; // passing a timeout of -1 blocks indefinitely, which is the equivalent of // having no timeout diff --git a/src/detail/tcp_readable.cpp b/src/detail/tcp_readable.cpp new file mode 100644 index 0000000..4c4ce0f --- /dev/null +++ b/src/detail/tcp_readable.cpp @@ -0,0 +1,17 @@ +#include "databento/detail/tcp_readable.hpp" + +using databento::detail::TcpReadable; + +void TcpReadable::ReadExact(std::byte* buffer, std::size_t length) { + client_->ReadExact(buffer, length); +} + +std::size_t TcpReadable::ReadSome(std::byte* buffer, std::size_t max_length) { + return client_->ReadSome(buffer, max_length).read_size; +} + +databento::IReadable::Result TcpReadable::ReadSome(std::byte* buffer, + std::size_t max_length, + std::chrono::milliseconds timeout) { + return client_->ReadSome(buffer, max_length, timeout); +} diff --git a/src/detail/zstd_stream.cpp b/src/detail/zstd_stream.cpp index 45d5df1..88e283f 100644 --- a/src/detail/zstd_stream.cpp +++ b/src/detail/zstd_stream.cpp @@ -4,11 +4,10 @@ #include #include // move -#include "databento/detail/buffer.hpp" #include "databento/exceptions.hpp" -#include "databento/log.hpp" using databento::detail::ZstdDecodeStream; +using Status = databento::IReadable::Status; ZstdDecodeStream::ZstdDecodeStream(std::unique_ptr input) : input_{std::move(input)}, @@ -42,8 +41,14 @@ void ZstdDecodeStream::ReadExact(std::byte* buffer, std::size_t length) { } std::size_t ZstdDecodeStream::ReadSome(std::byte* buffer, std::size_t max_length) { + return ReadSome(buffer, max_length, std::chrono::milliseconds{}).read_size; +} + +databento::IReadable::Result ZstdDecodeStream::ReadSome( + std::byte* buffer, std::size_t max_length, std::chrono::milliseconds timeout) { ZSTD_outBuffer z_out_buffer{buffer, max_length, 0}; - std::size_t read_size = 0; + databento::IReadable::Result read_result{0, Status::Ok}; + do { const auto unread_input = z_in_buffer_.size - z_in_buffer_.pos; if (unread_input > 0) { @@ -59,18 +64,30 @@ std::size_t ZstdDecodeStream::ReadSome(std::byte* buffer, std::size_t max_length in_buffer_.resize(new_size); z_in_buffer_.src = in_buffer_.data(); } - read_size = input_->ReadSome(&in_buffer_[unread_input], read_suggestion_); - z_in_buffer_.size = unread_input + read_size; + + // Only apply timeout to inner reader for simplicity + read_result = + input_->ReadSome(&in_buffer_[unread_input], read_suggestion_, timeout); + z_in_buffer_.size = unread_input + read_result.read_size; z_in_buffer_.pos = 0; + // No data to decompress: timeout or closed. Calling `ZSTD_decompressStream` with an + // empty input and no buffered data will trigger an error + if (z_in_buffer_.size == 0) { + break; + } + read_suggestion_ = ::ZSTD_decompressStream(z_dstream_.get(), &z_out_buffer, &z_in_buffer_); if (::ZSTD_isError(read_suggestion_)) { throw DbnResponseError{std::string{"Zstd error decompressing: "} + ::ZSTD_getErrorName(read_suggestion_)}; } - } while (z_out_buffer.pos == 0 && read_size > 0); - return z_out_buffer.pos; + } while (z_out_buffer.pos == 0 && read_result.read_size > 0); + + const auto read_size = z_out_buffer.pos; + // Only return inner read status if there's no data + return {read_size, read_size > 0 ? Status::Ok : read_result.status}; } using databento::detail::ZstdCompressStream; @@ -91,27 +108,7 @@ ZstdCompressStream::ZstdCompressStream(ILogReceiver* log_receiver, IWritable* ou ::ZSTD_CCtx_setParameter(z_cstream_.get(), ZSTD_c_checksumFlag, 1); } -ZstdCompressStream::~ZstdCompressStream() { - ZSTD_outBuffer z_out_buffer{out_buffer_.data(), out_buffer_.size(), 0}; - while (true) { - const std::size_t remaining = ::ZSTD_compressStream2( - z_cstream_.get(), &z_out_buffer, &z_in_buffer_, ::ZSTD_e_end); - if (remaining == 0) { - break; - } - if (::ZSTD_isError(remaining) && log_receiver_) { - log_receiver_->Receive(LogLevel::Error, - std::string{"Zstd error compressing end of stream: "} + - ::ZSTD_getErrorName(remaining)); - break; - } - } - assert(z_in_buffer_.pos == z_in_buffer_.size); - // Forward compressed output - if (z_out_buffer.pos > 0) { - output_->WriteAll(out_buffer_.data(), z_out_buffer.pos); - } -} +ZstdCompressStream::~ZstdCompressStream() { Flush(); } void ZstdCompressStream::WriteAll(const std::byte* buffer, std::size_t length) { in_buffer_.insert(in_buffer_.end(), buffer, buffer + length); @@ -138,3 +135,28 @@ void ZstdCompressStream::WriteAll(const std::byte* buffer, std::size_t length) { } } } + +void ZstdCompressStream::Flush() { + ZSTD_outBuffer z_out_buffer{out_buffer_.data(), out_buffer_.size(), 0}; + while (true) { + const std::size_t remaining = ::ZSTD_compressStream2( + z_cstream_.get(), &z_out_buffer, &z_in_buffer_, ::ZSTD_e_end); + if (remaining == 0) { + break; + } + if (::ZSTD_isError(remaining) && log_receiver_) { + log_receiver_->Receive(LogLevel::Error, + std::string{"Zstd error compressing end of stream: "} + + ::ZSTD_getErrorName(remaining)); + break; + } + } + assert(z_in_buffer_.pos == z_in_buffer_.size); + // Forward compressed output + if (z_out_buffer.pos > 0) { + output_->WriteAll(out_buffer_.data(), z_out_buffer.pos); + } + // Clear the input buffer since it's all been flushed + in_buffer_.clear(); + z_in_buffer_ = {in_buffer_.data(), 0, 0}; +} diff --git a/src/file_stream.cpp b/src/file_stream.cpp index 9782dcb..b0b9a99 100644 --- a/src/file_stream.cpp +++ b/src/file_stream.cpp @@ -6,6 +6,7 @@ #include "databento/exceptions.hpp" using databento::InFileStream; +using Status = databento::IReadable::Status; InFileStream::InFileStream(const std::filesystem::path& file_path) : stream_{file_path, std::ios::binary} { @@ -30,6 +31,13 @@ std::size_t InFileStream::ReadSome(std::byte* buffer, std::size_t max_length) { return static_cast(stream_.gcount()); } +databento::IReadable::Result InFileStream::ReadSome(std::byte* buffer, + std::size_t max_length, + std::chrono::milliseconds) { + const auto bytes_read = ReadSome(buffer, max_length); + return {bytes_read, bytes_read > 0 ? Status::Ok : Status::Closed}; +} + using databento::OutFileStream; OutFileStream::OutFileStream(const std::filesystem::path& file_path) diff --git a/src/live.cpp b/src/live.cpp index 2ecd0bb..26e8d4d 100644 --- a/src/live.cpp +++ b/src/live.cpp @@ -79,18 +79,26 @@ LiveBuilder& LiveBuilder::ExtendUserAgent(std::string extension) { return *this; } +LiveBuilder& LiveBuilder::SetCompression(Compression compression) { + compression_ = compression; + return *this; +} + databento::LiveBlocking LiveBuilder::BuildBlocking() { Validate(); if (gateway_.empty()) { return databento::LiveBlocking{log_receiver_, key_, dataset_, send_ts_out_, upgrade_policy_, heartbeat_interval_, - buffer_size_, user_agent_ext_}; + buffer_size_, user_agent_ext_, + compression_}; } - return databento::LiveBlocking{ - log_receiver_, key_, dataset_, gateway_, - port_, send_ts_out_, upgrade_policy_, heartbeat_interval_, - buffer_size_, user_agent_ext_}; + return databento::LiveBlocking{log_receiver_, key_, + dataset_, gateway_, + port_, send_ts_out_, + upgrade_policy_, heartbeat_interval_, + buffer_size_, user_agent_ext_, + compression_}; } databento::LiveThreaded LiveBuilder::BuildThreaded() { @@ -99,12 +107,15 @@ databento::LiveThreaded LiveBuilder::BuildThreaded() { return databento::LiveThreaded{log_receiver_, key_, dataset_, send_ts_out_, upgrade_policy_, heartbeat_interval_, - buffer_size_, user_agent_ext_}; + buffer_size_, user_agent_ext_, + compression_}; } - return databento::LiveThreaded{ - log_receiver_, key_, dataset_, gateway_, - port_, send_ts_out_, upgrade_policy_, heartbeat_interval_, - buffer_size_, user_agent_ext_}; + return databento::LiveThreaded{log_receiver_, key_, + dataset_, gateway_, + port_, send_ts_out_, + upgrade_policy_, heartbeat_interval_, + buffer_size_, user_agent_ext_, + compression_}; } void LiveBuilder::Validate() { diff --git a/src/live_blocking.cpp b/src/live_blocking.cpp index 8f9e9cb..451932f 100644 --- a/src/live_blocking.cpp +++ b/src/live_blocking.cpp @@ -12,7 +12,6 @@ #include "databento/constants.hpp" // kApiKeyLength #include "databento/dbn_decoder.hpp" #include "databento/detail/sha256_hasher.hpp" -#include "databento/detail/tcp_client.hpp" #include "databento/exceptions.hpp" // LiveApiError #include "databento/live.hpp" // LiveBuilder #include "databento/log.hpp" // ILogReceiver @@ -21,6 +20,7 @@ #include "dbn_constants.hpp" // kMetadataPreludeSize using databento::LiveBlocking; +using Status = databento::IReadable::Status; namespace { constexpr std::size_t kBucketIdLength = 5; @@ -32,7 +32,8 @@ LiveBlocking::LiveBlocking(ILogReceiver* log_receiver, std::string key, std::string dataset, bool send_ts_out, VersionUpgradePolicy upgrade_policy, std::optional heartbeat_interval, - std::size_t buffer_size, std::string user_agent_ext) + std::size_t buffer_size, std::string user_agent_ext, + databento::Compression compression) : log_receiver_{log_receiver}, key_{std::move(key)}, @@ -43,7 +44,8 @@ LiveBlocking::LiveBlocking(ILogReceiver* log_receiver, std::string key, send_ts_out_{send_ts_out}, upgrade_policy_{upgrade_policy}, heartbeat_interval_{heartbeat_interval}, - client_{gateway_, port_}, + compression_{compression}, + connection_{gateway_, port_}, buffer_{buffer_size}, session_id_{this->Authenticate()} {} @@ -51,7 +53,8 @@ LiveBlocking::LiveBlocking(ILogReceiver* log_receiver, std::string key, std::string dataset, std::string gateway, std::uint16_t port, bool send_ts_out, VersionUpgradePolicy upgrade_policy, std::optional heartbeat_interval, - std::size_t buffer_size, std::string user_agent_ext) + std::size_t buffer_size, std::string user_agent_ext, + databento::Compression compression) : log_receiver_{log_receiver}, key_{std::move(key)}, dataset_{std::move(dataset)}, @@ -61,7 +64,8 @@ LiveBlocking::LiveBlocking(ILogReceiver* log_receiver, std::string key, send_ts_out_{send_ts_out}, upgrade_policy_{upgrade_policy}, heartbeat_interval_{heartbeat_interval}, - client_{gateway_, port_}, + compression_{compression}, + connection_{gateway_, port_}, buffer_{buffer_size}, session_id_{this->Authenticate()} {} @@ -141,7 +145,7 @@ void LiveBlocking::Subscribe(std::string_view sub_msg, << "] Sending subscription request: " << chunked_sub_msg.str(); log_receiver_->Receive(LogLevel::Debug, log_ss.str()); } - client_.WriteAll(chunked_sub_msg.str()); + connection_.WriteAll(chunked_sub_msg.str()); symbols_it += chunk_size; } @@ -150,14 +154,15 @@ void LiveBlocking::Subscribe(std::string_view sub_msg, databento::Metadata LiveBlocking::Start() { log_receiver_->Receive(LogLevel::Info, "[LiveBlocking::Start] Starting session"); - client_.WriteAll("start_session\n"); - client_.ReadExact(buffer_.WriteBegin(), kMetadataPreludeSize); + connection_.WriteAll("start_session\n"); + connection_.SetCompression(compression_); + connection_.ReadExact(buffer_.WriteBegin(), kMetadataPreludeSize); buffer_.Fill(kMetadataPreludeSize); const auto [version, size] = DbnDecoder::DecodeMetadataVersionAndSize( buffer_.ReadBegin(), kMetadataPreludeSize); buffer_.Consume(kMetadataPreludeSize); buffer_.Reserve(size); - client_.ReadExact(buffer_.WriteBegin(), size); + connection_.ReadExact(buffer_.WriteBegin(), size); buffer_.Fill(size); auto metadata = DbnDecoder::DecodeMetadataFields(version, buffer_.ReadBegin(), buffer_.ReadEnd()); @@ -177,20 +182,20 @@ const databento::Record* LiveBlocking::NextRecord(std::chrono::milliseconds time const auto unread_bytes = buffer_.ReadCapacity(); if (unread_bytes == 0) { const auto read_res = FillBuffer(timeout); - if (read_res.status == detail::TcpClient::Status::Timeout) { + if (read_res.status == Status::Timeout) { return nullptr; } - if (read_res.status == detail::TcpClient::Status::Closed) { + if (read_res.status == Status::Closed) { throw DbnResponseError{"Gateway closed the session"}; } } // check length while (buffer_.ReadCapacity() < BufferRecordHeader()->Size()) { const auto read_res = FillBuffer(timeout); - if (read_res.status == detail::TcpClient::Status::Timeout) { + if (read_res.status == Status::Timeout) { return nullptr; } - if (read_res.status == detail::TcpClient::Status::Closed) { + if (read_res.status == Status::Closed) { throw DbnResponseError{"Gateway closed the session"}; } } @@ -202,7 +207,7 @@ const databento::Record* LiveBlocking::NextRecord(std::chrono::milliseconds time return ¤t_record_; } -void LiveBlocking::Stop() { client_.Close(); } +void LiveBlocking::Stop() { connection_.Close(); } void LiveBlocking::Reconnect() { if (log_receiver_->ShouldLog(LogLevel::Info)) { @@ -210,7 +215,7 @@ void LiveBlocking::Reconnect() { log_msg << "Reconnecting to " << gateway_ << ':' << port_; log_receiver_->Receive(LogLevel::Info, log_msg.str()); } - client_ = detail::TcpClient{gateway_, port_}; + connection_ = detail::LiveConnection{gateway_, port_}; buffer_.Clear(); sub_counter_ = 0; session_id_ = this->Authenticate(); @@ -235,7 +240,7 @@ void LiveBlocking::Resubscribe() { std::string LiveBlocking::DecodeChallenge() { static constexpr auto kMethodName = "LiveBlocking::DecodeChallenge"; const auto read_size = - client_.ReadSome(buffer_.WriteBegin(), buffer_.WriteCapacity()).read_size; + connection_.ReadSome(buffer_.WriteBegin(), buffer_.WriteCapacity()).read_size; if (read_size == 0) { throw LiveApiError{"Gateway closed socket during authentication"}; } @@ -259,7 +264,7 @@ std::string LiveBlocking::DecodeChallenge() { while (next_nl_pos == std::string::npos) { // read more buffer_.Fill( - client_.ReadSome(buffer_.WriteBegin(), buffer_.WriteCapacity()).read_size); + connection_.ReadSome(buffer_.WriteBegin(), buffer_.WriteCapacity()).read_size); if (buffer_.ReadCapacity() == 0) { throw LiveApiError{"Gateway closed socket during authentication"}; } @@ -305,7 +310,7 @@ std::uint64_t LiveBlocking::Authenticate() { log_ss << '[' << kMethodName << "] Sending CRAM reply: " << req; log_receiver_->Receive(LogLevel::Debug, log_ss.str()); } - client_.WriteAll(req); + connection_.WriteAll(req); const std::uint64_t session_id = DecodeAuthResp(); if (log_receiver_->ShouldLog(LogLevel::Info)) { @@ -327,7 +332,8 @@ std::string LiveBlocking::GenerateCramReply(std::string_view challenge_key) { std::string LiveBlocking::EncodeAuthReq(std::string_view auth) { std::ostringstream req_stream; req_stream << "auth=" << auth << "|dataset=" << dataset_ << "|encoding=dbn|" - << "ts_out=" << send_ts_out_ << "|client=" << kUserAgent; + << "ts_out=" << send_ts_out_ << "|compression=" << compression_ + << "|client=" << kUserAgent; if (!user_agent_ext_.empty()) { req_stream << ' ' << user_agent_ext_; } @@ -344,7 +350,7 @@ std::uint64_t LiveBlocking::DecodeAuthResp() { buffer_.Clear(); do { const auto read_size = - client_.ReadSome(buffer_.WriteBegin(), buffer_.WriteCapacity()).read_size; + connection_.ReadSome(buffer_.WriteBegin(), buffer_.WriteCapacity()).read_size; if (read_size == 0) { throw LiveApiError{ "Unexpected end of message received from server after replying to " @@ -417,11 +423,11 @@ void LiveBlocking::IncrementSubCounter() { } } -databento::detail::TcpClient::Result LiveBlocking::FillBuffer( +databento::IReadable::Result LiveBlocking::FillBuffer( std::chrono::milliseconds timeout) { buffer_.Shift(); const auto read_res = - client_.ReadSome(buffer_.WriteBegin(), buffer_.WriteCapacity(), timeout); + connection_.ReadSome(buffer_.WriteBegin(), buffer_.WriteCapacity(), timeout); buffer_.Fill(read_res.read_size); return read_res; } diff --git a/src/live_threaded.cpp b/src/live_threaded.cpp index 9dc5824..69d4681 100644 --- a/src/live_threaded.cpp +++ b/src/live_threaded.cpp @@ -61,20 +61,22 @@ LiveThreaded::LiveThreaded(ILogReceiver* log_receiver, std::string key, std::string dataset, bool send_ts_out, VersionUpgradePolicy upgrade_policy, std::optional heartbeat_interval, - std::size_t buffer_size, std::string user_agent_ext) - : impl_{std::make_unique(log_receiver, std::move(key), std::move(dataset), - send_ts_out, upgrade_policy, heartbeat_interval, - buffer_size, std::move(user_agent_ext))} {} + std::size_t buffer_size, std::string user_agent_ext, + databento::Compression compression) + : impl_{std::make_unique( + log_receiver, std::move(key), std::move(dataset), send_ts_out, upgrade_policy, + heartbeat_interval, buffer_size, std::move(user_agent_ext), compression)} {} LiveThreaded::LiveThreaded(ILogReceiver* log_receiver, std::string key, std::string dataset, std::string gateway, std::uint16_t port, bool send_ts_out, VersionUpgradePolicy upgrade_policy, std::optional heartbeat_interval, - std::size_t buffer_size, std::string user_agent_ext) + std::size_t buffer_size, std::string user_agent_ext, + databento::Compression compression) : impl_{std::make_unique(log_receiver, std::move(key), std::move(dataset), std::move(gateway), port, send_ts_out, upgrade_policy, heartbeat_interval, buffer_size, - std::move(user_agent_ext))} {} + std::move(user_agent_ext), compression)} {} const std::string& LiveThreaded::Key() const { return impl_->blocking.Key(); } @@ -94,6 +96,10 @@ std::optional LiveThreaded::HeartbeatInterval() const { return impl_->blocking.HeartbeatInterval(); } +databento::Compression LiveThreaded::Compression() const { + return impl_->blocking.Compression(); +} + const std::vector& LiveThreaded::Subscriptions() const { return impl_->blocking.Subscriptions(); } diff --git a/tests/include/mock/mock_lsg_server.hpp b/tests/include/mock/mock_lsg_server.hpp index aa17d9b..54aee82 100644 --- a/tests/include/mock/mock_lsg_server.hpp +++ b/tests/include/mock/mock_lsg_server.hpp @@ -19,9 +19,11 @@ using ssize_t = SSIZE_T; #include #include +#include "databento/dbn.hpp" // Metadata #include "databento/detail/scoped_fd.hpp" // ScopedFd #include "databento/detail/scoped_thread.hpp" // ScopedThread -#include "databento/enums.hpp" // Schema, SType +#include "databento/detail/zstd_stream.hpp" // ZstdCompressStream +#include "databento/enums.hpp" // Schema, SType, Compression #include "databento/iwritable.hpp" #include "databento/record.hpp" // RecordHeader @@ -45,6 +47,8 @@ class MockLsgServer { MockLsgServer(std::string dataset, bool ts_out, std::chrono::seconds heartbeat_interval, std::function serve_fn); + MockLsgServer(std::string dataset, bool ts_out, Compression compression, + std::function serve_fn); std::uint16_t Port() const { return port_; } @@ -84,10 +88,24 @@ class MockLsgServer { void Close(); + // Compression-aware Start and SendRecord methods + void StartCompressed(); + template + void SendCompressedRecord(const Rec& rec) { + if (!compressor_) { + throw std::runtime_error( + "Compression not initialized. Call StartCompressed first."); + } + compressor_->WriteAll(reinterpret_cast(&rec), sizeof(rec)); + compressor_->Flush(); // Ensure record is sent immediately for testing + } + void FlushCompression(); + private: detail::Socket InitSocketAndSetPort(); detail::Socket InitSocketAndSetPort(int port); std::string Receive(); + databento::Metadata DummyMetadata() const; template std::size_t SendBytes(T bytes) { @@ -100,9 +118,12 @@ class MockLsgServer { std::string dataset_; bool ts_out_; std::chrono::seconds heartbeat_interval_; + Compression compression_{Compression::None}; std::uint16_t port_{}; detail::ScopedFd socket_{}; detail::ScopedFd conn_fd_{}; detail::ScopedThread thread_; + std::unique_ptr socket_stream_; + std::unique_ptr compressor_; }; } // namespace databento::tests::mock diff --git a/tests/src/live_blocking_tests.cpp b/tests/src/live_blocking_tests.cpp index 7074a5d..07ab8ed 100644 --- a/tests/src/live_blocking_tests.cpp +++ b/tests/src/live_blocking_tests.cpp @@ -271,6 +271,35 @@ TEST_F(LiveBlockingTests, TestNextRecord) { } } +TEST_F(LiveBlockingTests, TestNextRecordWithZstdCompression) { + constexpr auto kTsOut = false; + const auto kRecCount = 12; + constexpr OhlcvMsg kRec{DummyHeader(RType::Ohlcv1M), 1, 2, 3, 4, 5}; + const mock::MockLsgServer mock_server{dataset::kXnasItch, kTsOut, Compression::Zstd, + [kRec, kRecCount](mock::MockLsgServer& self) { + self.Accept(); + self.Authenticate(); + self.StartCompressed(); + for (size_t i = 0; i < kRecCount; ++i) { + self.SendCompressedRecord(kRec); + } + self.FlushCompression(); + }}; + + LiveBlocking target = builder_.SetDataset(dataset::kXnasItch) + .SetSendTsOut(kTsOut) + .SetCompression(Compression::Zstd) + .SetAddress(kLocalhost, mock_server.Port()) + .BuildBlocking(); + const auto metadata = target.Start(); + EXPECT_EQ(metadata.dataset, dataset::kXnasItch); + for (size_t i = 0; i < kRecCount; ++i) { + const auto rec = target.NextRecord(); + ASSERT_TRUE(rec.Holds()) << "Failed on call " << i; + EXPECT_EQ(rec.Get(), kRec); + } +} + TEST_F(LiveBlockingTests, TestNextRecordTimeout) { constexpr std::chrono::milliseconds kTimeout{50}; constexpr auto kTsOut = false; @@ -338,6 +367,68 @@ TEST_F(LiveBlockingTests, TestNextRecordTimeout) { EXPECT_EQ(rec->Get(), kRec); } +TEST_F(LiveBlockingTests, TestNextRecordTimeoutWithZstdCompression) { + constexpr std::chrono::milliseconds kTimeout{50}; + constexpr auto kTsOut = false; + constexpr OhlcvMsg kRec{DummyHeader(RType::Ohlcv1M), 1, 2, 3, 4, 5}; + + bool sent_first_msg = false; + std::mutex send_mutex; + std::condition_variable send_cv; + bool received_first_msg = false; + std::mutex receive_mutex; + std::condition_variable receive_cv; + const mock::MockLsgServer mock_server{ + dataset::kXnasItch, kTsOut, Compression::Zstd, [&](mock::MockLsgServer& self) { + self.Accept(); + self.Authenticate(); + self.StartCompressed(); + self.SendCompressedRecord(kRec); + { + // notify client the first record's been sent + const std::lock_guard lock{send_mutex}; + sent_first_msg = true; + send_cv.notify_one(); + } + { + // wait for client to read first record + std::unique_lock lock{receive_mutex}; + receive_cv.wait(lock, [&received_first_msg] { return received_first_msg; }); + } + self.SendCompressedRecord(kRec); + self.FlushCompression(); + }}; + + LiveBlocking target = builder_.SetDataset(dataset::kXnasItch) + .SetSendTsOut(kTsOut) + .SetCompression(Compression::Zstd) + .SetAddress(kLocalhost, mock_server.Port()) + .BuildBlocking(); + const auto metadata = target.Start(); + EXPECT_EQ(metadata.dataset, dataset::kXnasItch); + { + // wait for server to send first record to avoid flaky timeouts + std::unique_lock lock{send_mutex}; + send_cv.wait(lock, [&sent_first_msg] { return sent_first_msg; }); + } + auto* rec = target.NextRecord(kTimeout); + ASSERT_NE(rec, nullptr); + EXPECT_TRUE(rec->Holds()); + EXPECT_EQ(rec->Get(), kRec); + rec = target.NextRecord(kTimeout); + EXPECT_EQ(rec, nullptr) << "Did not timeout with compression when expected"; + { + // notify server the timeout occurred + const std::lock_guard lock{receive_mutex}; + received_first_msg = true; + receive_cv.notify_one(); + } + rec = target.NextRecord(kTimeout); + ASSERT_NE(rec, nullptr); + EXPECT_TRUE(rec->Holds()); + EXPECT_EQ(rec->Get(), kRec); +} + TEST_F(LiveBlockingTests, TestNextRecordPartialRead) { constexpr auto kTsOut = false; constexpr MboMsg kRec{DummyHeader(RType::Mbo), diff --git a/tests/src/live_threaded_tests.cpp b/tests/src/live_threaded_tests.cpp index a335d91..a2df018 100644 --- a/tests/src/live_threaded_tests.cpp +++ b/tests/src/live_threaded_tests.cpp @@ -78,6 +78,43 @@ TEST_F(LiveThreadedTests, TestBasic) { target.BlockForStop(); } +TEST_F(LiveThreadedTests, TestWithZstdCompression) { + const MboMsg kRec{DummyHeader(RType::Mbo), + 1, + 2, + 3, + {}, + 4, + Action::Add, + Side::Bid, + UnixNanos{}, + TimeDeltaNanos{}, + 100}; + const mock::MockLsgServer mock_server{dataset::kGlbxMdp3, kTsOut, Compression::Zstd, + [&kRec](mock::MockLsgServer& self) { + self.Accept(); + self.Authenticate(); + self.StartCompressed(); + self.SendCompressedRecord(kRec); + self.SendCompressedRecord(kRec); + self.FlushCompression(); + }}; + + LiveThreaded target = builder_.SetDataset(dataset::kGlbxMdp3) + .SetSendTsOut(kTsOut) + .SetCompression(Compression::Zstd) + .SetAddress(kLocalhost, mock_server.Port()) + .BuildThreaded(); + std::uint32_t call_count{}; + target.Start([&call_count, &kRec](const Record& rec) { + ++call_count; + EXPECT_TRUE(rec.Holds()); + EXPECT_EQ(rec.Get(), kRec); + return call_count < 2 ? KeepGoing::Continue : KeepGoing::Stop; + }); + target.BlockForStop(); +} + TEST_F(LiveThreadedTests, TestTimeoutRecovery) { const MboMsg kRec{DummyHeader(RType::Mbo), 1, @@ -121,6 +158,52 @@ TEST_F(LiveThreadedTests, TestTimeoutRecovery) { } } +TEST_F(LiveThreadedTests, TestTimeoutRecoveryWithZstdCompression) { + const MboMsg kRec{DummyHeader(RType::Mbo), + 1, + 2, + 3, + {}, + 4, + Action::Add, + Side::Bid, + UnixNanos{}, + TimeDeltaNanos{}, + 100}; + std::atomic call_count{}; + const mock::MockLsgServer mock_server{ + dataset::kXnasItch, kTsOut, Compression::Zstd, + [&kRec, &call_count](mock::MockLsgServer& self) { + self.Accept(); + self.Authenticate(); + self.StartCompressed(); + self.SendCompressedRecord(kRec); + while (call_count < 1) { + std::this_thread::yield(); + } + // 150% of live threaded timeout + std::this_thread::sleep_for(std::chrono::milliseconds{75}); + self.SendCompressedRecord(kRec); + self.FlushCompression(); + }}; + + LiveThreaded target = builder_.SetDataset(dataset::kXnasItch) + .SetSendTsOut(kTsOut) + .SetCompression(Compression::Zstd) + .SetAddress(kLocalhost, mock_server.Port()) + .BuildThreaded(); + target.Start([](Metadata&& metadata) { EXPECT_FALSE(metadata.schema.has_value()); }, + [&call_count, &kRec](const Record& rec) { + ++call_count; + EXPECT_TRUE(rec.Holds()); + EXPECT_EQ(rec.Get(), kRec); + return databento::KeepGoing::Continue; + }); + while (call_count < 2) { + std::this_thread::yield(); + } +} + TEST_F(LiveThreadedTests, TestStop) { const MboMsg kRec{DummyHeader(RType::Mbo), 1, diff --git a/tests/src/log_tests.cpp b/tests/src/log_tests.cpp index d16d509..453b017 100644 --- a/tests/src/log_tests.cpp +++ b/tests/src/log_tests.cpp @@ -6,7 +6,6 @@ #include "databento/log.hpp" #include "databento/system.hpp" #include "databento/version.hpp" -#include "gmock/gmock.h" #include "mock/mock_log_receiver.hpp" namespace databento::tests { diff --git a/tests/src/mock_lsg_server.cpp b/tests/src/mock_lsg_server.cpp index 67161a0..4ee08db 100644 --- a/tests/src/mock_lsg_server.cpp +++ b/tests/src/mock_lsg_server.cpp @@ -1,5 +1,7 @@ #include "mock/mock_lsg_server.hpp" +#include "databento/dbn.hpp" + #ifdef _WIN32 #include #else @@ -47,6 +49,15 @@ MockLsgServer::MockLsgServer(std::string dataset, bool ts_out, socket_{InitSocketAndSetPort()}, thread_{std::move(serve_fn), std::ref(*this)} {} +MockLsgServer::MockLsgServer(std::string dataset, bool ts_out, Compression compression, + std::function serve_fn) + : dataset_{std::move(dataset)}, + ts_out_{ts_out}, + heartbeat_interval_{}, + compression_{compression}, + socket_{InitSocketAndSetPort()}, + thread_{std::move(serve_fn), std::ref(*this)} {} + void MockLsgServer::Accept() { sockaddr_in addr{}; auto addr_len = static_cast(sizeof(addr)); @@ -107,6 +118,9 @@ void MockLsgServer::Authenticate() { EXPECT_NE(received.find("dataset=" + dataset_), std::string::npos); EXPECT_NE(received.find("encoding=dbn"), std::string::npos); EXPECT_NE(received.find("ts_out=" + std::to_string(ts_out_)), std::string::npos); + if (compression_ == Compression::Zstd) { + EXPECT_NE(received.find("compression=zstd"), std::string::npos); + } if (heartbeat_interval_.count() > 0) { EXPECT_NE(received.find("heartbeat_interval_s=" + std::to_string(heartbeat_interval_.count())), @@ -165,27 +179,55 @@ void MockLsgServer::Start() { EXPECT_EQ(received, "start_session\n"); SocketStream writable{conn_fd_.Get()}; - Metadata metadata{1, - dataset_, - {}, - {}, - UnixNanos{std::chrono::nanoseconds{kUndefTimestamp}}, - 0, - {}, - SType::InstrumentId, - false, - kSymbolCstrLenV1, - {}, - {}, - {}, - {}}; - DbnEncoder::EncodeMetadata(metadata, &writable); + DbnEncoder::EncodeMetadata(DummyMetadata(), &writable); } -void MockLsgServer::Close() { conn_fd_.Close(); } +void MockLsgServer::Close() { + if (compressor_) { + compressor_.reset(); + } + conn_fd_.Close(); +} + +void MockLsgServer::StartCompressed() { + const auto received = Receive(); + EXPECT_EQ(received, "start_session\n"); + + socket_stream_ = std::make_unique(conn_fd_.Get()); + + if (compression_ == Compression::Zstd) { + compressor_ = std::make_unique(socket_stream_.get()); + } + + DbnEncoder::EncodeMetadata(DummyMetadata(), compressor_.get()); + compressor_->Flush(); +} + +void MockLsgServer::FlushCompression() { + if (compressor_) { + compressor_->Flush(); + } +} databento::detail::Socket MockLsgServer::InitSocketAndSetPort() { - const auto pair = MockTcpServer::InitSocket(); - port_ = pair.first; - return pair.second; + const auto [port, socket_fd] = MockTcpServer::InitSocket(); + port_ = port; + return socket_fd; +} + +databento::Metadata MockLsgServer::DummyMetadata() const { + return Metadata{1, + dataset_, + {}, + {}, + UnixNanos{std::chrono::nanoseconds{kUndefTimestamp}}, + 0, + {}, + SType::InstrumentId, + false, + kSymbolCstrLenV1, + {}, + {}, + {}, + {}}; } diff --git a/tests/src/tcp_client_tests.cpp b/tests/src/tcp_client_tests.cpp index ffbc993..e191b7b 100644 --- a/tests/src/tcp_client_tests.cpp +++ b/tests/src/tcp_client_tests.cpp @@ -56,7 +56,7 @@ TEST_F(TcpClientTests, TestFullReadSome) { const auto res = target_.ReadSome(buffer.data(), buffer.size() - 1); EXPECT_STREQ(reinterpret_cast(buffer.data()), kSendData.c_str()); - EXPECT_EQ(res.status, detail::TcpClient::Status::Ok); + EXPECT_EQ(res.status, IReadable::Status::Ok); EXPECT_EQ(res.read_size, kSendData.length()); EXPECT_EQ(res.read_size, buffer.size() - 1); } @@ -71,7 +71,7 @@ TEST_F(TcpClientTests, TestPartialReadSome) { const auto res = target_.ReadSome(buffer.data(), buffer.size()); EXPECT_STREQ(reinterpret_cast(buffer.data()), kSendData.c_str()); - EXPECT_EQ(res.status, detail::TcpClient::Status::Ok); + EXPECT_EQ(res.status, IReadable::Status::Ok); EXPECT_EQ(res.read_size, kSendData.length()); } @@ -81,7 +81,7 @@ TEST_F(TcpClientTests, TestReadSomeClose) { std::array buffer{}; const auto res = target_.ReadSome(buffer.data(), buffer.size()); - EXPECT_EQ(res.status, detail::TcpClient::Status::Closed); + EXPECT_EQ(res.status, IReadable::Status::Closed); EXPECT_EQ(res.read_size, 0); } @@ -114,7 +114,7 @@ TEST_F(TcpClientTests, TestReadSomeTimeout) { has_timed_out = true; has_timed_out_cv.notify_one(); } - EXPECT_EQ(res.status, detail::TcpClient::Status::Timeout); + EXPECT_EQ(res.status, IReadable::Status::Timeout); EXPECT_EQ(res.read_size, 0); } @@ -133,7 +133,7 @@ TEST_F(TcpClientTests, TestReadCloseNoTimeout) { // immediately, not wait for the timeout const auto res = target_.ReadSome(buffer.data(), buffer.size(), kTimeout); const auto end = std::chrono::steady_clock::now(); - EXPECT_EQ(res.status, detail::TcpClient::Status::Closed); + EXPECT_EQ(res.status, IReadable::Status::Closed); EXPECT_EQ(res.read_size, 0); EXPECT_LT(end - start, kTimeout); } @@ -146,7 +146,7 @@ TEST_F(TcpClientTests, ReadAfterClose) { std::array buffer{}; const auto res = target_.ReadSome(buffer.data(), buffer.size()); - EXPECT_EQ(res.status, detail::TcpClient::Status::Ok); + EXPECT_EQ(res.status, IReadable::Status::Ok); EXPECT_GT(res.read_size, 0); target_.Close(); ASSERT_THROW(target_.ReadSome(buffer.data(), buffer.size()), databento::TcpError); diff --git a/tests/src/zstd_stream_tests.cpp b/tests/src/zstd_stream_tests.cpp index 1d807e2..975af92 100644 --- a/tests/src/zstd_stream_tests.cpp +++ b/tests/src/zstd_stream_tests.cpp @@ -45,4 +45,54 @@ TEST(ZstdStreamTests, TestIdentity) { }; decode.ReadExact(res.data(), size); } + +TEST(ZstdStreamTests, TestFlush) { + // Test that Flush() makes data available for reading without ending the stream + const std::string kTestData = "DBN\x01\x00\x00\x00TestData123"; + detail::Buffer mock_io; + { + ZstdCompressStream compressor{&mock_io}; + // Write small data that would normally be buffered + compressor.WriteAll(reinterpret_cast(kTestData.data()), + kTestData.size()); + compressor.Flush(); + // At this point, data should be in mock_io even though compressor isn't destroyed + + // Write more data after flush + compressor.WriteAll(reinterpret_cast(kTestData.data()), + kTestData.size()); + compressor.Flush(); + } + + // Verify we can decode both chunks + std::vector res(kTestData.size() * 2); + ZstdDecodeStream decode{std::make_unique(std::move(mock_io))}; + decode.ReadExact(res.data(), kTestData.size() * 2); + + std::string result(reinterpret_cast(res.data()), res.size()); + EXPECT_EQ(result, kTestData + kTestData); +} + +// Mock IReadable that always returns a timeout +class TimeoutReader : public IReadable { + public: + void ReadExact(std::byte*, std::size_t) override { + throw std::runtime_error{"TimeoutReader does not support ReadExact"}; + } + std::size_t ReadSome(std::byte*, std::size_t) override { return 0; } + Result ReadSome(std::byte*, std::size_t, std::chrono::milliseconds) override { + return {0, Status::Timeout}; + } +}; + +TEST(ZstdStreamTests, TestDecodeTimeout) { + ZstdDecodeStream target{std::make_unique()}; + + std::vector buffer(100); + auto result = + target.ReadSome(buffer.data(), buffer.size(), std::chrono::milliseconds{100}); + + EXPECT_EQ(result.read_size, 0); + EXPECT_EQ(result.status, IReadable::Status::Timeout); +} } // namespace databento::detail::tests From a32b0858837f8abf81623dc2bf9e614ca26be5b5 Mon Sep 17 00:00:00 2001 From: Carter Green Date: Thu, 29 Jan 2026 11:51:37 -0600 Subject: [PATCH 2/5] MOD: Update httplib version --- CHANGELOG.md | 1 + CMakeLists.txt | 9 +-------- 2 files changed, 2 insertions(+), 8 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0b6a46d..226ad6d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ - Added Zstd compression support to live clients which can be enabled with `LiveBuilder::SetCompression()`. It's disabled by default - Added `Compression()` getter to `LiveBlocking` and `LiveThreaded` +- Upgraded default `httplib` version to 0.30.1 ### Breaking changes - Added an overload to the `ReadSome` method on `IReadable` for timeout support diff --git a/CMakeLists.txt b/CMakeLists.txt index 88ddd42..9ca7653 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -128,10 +128,6 @@ endif() find_package(OpenSSL REQUIRED) find_package(zstd REQUIRED) -if (APPLE) - find_library(CORE_FOUNDATION_LIB CoreFoundation REQUIRED) - find_library(CFNETWORK_LIB CFNetwork REQUIRED) -endif() if(NOT TARGET zstd::libzstd) if(TARGET zstd::libzstd_shared) add_library(zstd::libzstd ALIAS zstd::libzstd_shared) @@ -182,7 +178,7 @@ if(${PROJECT_NAME_UPPERCASE}_USE_EXTERNAL_HTTPLIB) find_package(httplib REQUIRED) endif() else() - set(httplib_version 0.28.0) + set(httplib_version 0.30.1) FetchContent_Declare( httplib URL https://github.com/yhirose/cpp-httplib/archive/refs/tags/v${httplib_version}.tar.gz @@ -227,9 +223,6 @@ target_link_libraries( OpenSSL::SSL Threads::Threads zstd::libzstd - # macOS-specific libraries required by httplib - $<$:${CFNETWORK_LIB}> - $<$:${CORE_FRAMEWORK_LIB}> ) target_compile_definitions( From 5632d7be584943e827c27312a95fac301f0bd392 Mon Sep 17 00:00:00 2001 From: Carter Green Date: Tue, 3 Feb 2026 11:18:22 -0600 Subject: [PATCH 3/5] FIX: Fix TSAN error in mock LSG test fixture --- tests/include/mock/mock_lsg_server.hpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/include/mock/mock_lsg_server.hpp b/tests/include/mock/mock_lsg_server.hpp index 54aee82..244babc 100644 --- a/tests/include/mock/mock_lsg_server.hpp +++ b/tests/include/mock/mock_lsg_server.hpp @@ -122,8 +122,9 @@ class MockLsgServer { std::uint16_t port_{}; detail::ScopedFd socket_{}; detail::ScopedFd conn_fd_{}; - detail::ScopedThread thread_; std::unique_ptr socket_stream_; std::unique_ptr compressor_; + // declared last to ensure it's destroyed first before the members it uses + detail::ScopedThread thread_; }; } // namespace databento::tests::mock From 34bdbb839354ba64d30f220e35d6b7e90019a357 Mon Sep 17 00:00:00 2001 From: Carter Green Date: Tue, 3 Feb 2026 12:43:33 -0600 Subject: [PATCH 4/5] VER: Release 0.47.0 --- CHANGELOG.md | 2 +- CMakeLists.txt | 2 +- pkg/PKGBUILD | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 226ad6d..2595257 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,6 @@ # Changelog -## 0.47.0 - TBD +## 0.47.0 - 2026-02-03 ### Enhancements - Added Zstd compression support to live clients which can be enabled with diff --git a/CMakeLists.txt b/CMakeLists.txt index 9ca7653..f8035a4 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -6,7 +6,7 @@ cmake_minimum_required(VERSION 3.24..4.2) project( databento - VERSION 0.46.1 + VERSION 0.47.0 LANGUAGES CXX DESCRIPTION "Official Databento client library" ) diff --git a/pkg/PKGBUILD b/pkg/PKGBUILD index 26f6ddd..6a49a58 100644 --- a/pkg/PKGBUILD +++ b/pkg/PKGBUILD @@ -1,7 +1,7 @@ # Maintainer: Databento _pkgname=databento-cpp pkgname=databento-cpp-git -pkgver=0.46.1 +pkgver=0.47.0 pkgrel=1 pkgdesc="Official C++ client for Databento" arch=('any') From 69aaa4cf7d76b6747dd23094865f0231df35f6a2 Mon Sep 17 00:00:00 2001 From: Carter Green Date: Wed, 4 Feb 2026 15:13:03 -0600 Subject: [PATCH 5/5] DOC: Fix release date --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2595257..c34ca11 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,6 @@ # Changelog -## 0.47.0 - 2026-02-03 +## 0.47.0 - 2026-02-04 ### Enhancements - Added Zstd compression support to live clients which can be enabled with