Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,16 @@
# Changelog

## 0.47.0 - 2026-02-04

### Enhancements
- Added Zstd compression support to live clients which can be enabled with
`LiveBuilder::SetCompression()`. It's disabled by default
- Added `Compression()` getter to `LiveBlocking` and `LiveThreaded`
- Upgraded default `httplib` version to 0.30.1

### Breaking changes
- Added an overload to the `ReadSome` method on `IReadable` for timeout support

## 0.46.1 - 2026-01-27

### Enhancements
Expand Down
11 changes: 2 additions & 9 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ cmake_minimum_required(VERSION 3.24..4.2)

project(
databento
VERSION 0.46.1
VERSION 0.47.0
LANGUAGES CXX
DESCRIPTION "Official Databento client library"
)
Expand Down Expand Up @@ -128,10 +128,6 @@ endif()

find_package(OpenSSL REQUIRED)
find_package(zstd REQUIRED)
if (APPLE)
find_library(CORE_FOUNDATION_LIB CoreFoundation REQUIRED)
find_library(CFNETWORK_LIB CFNetwork REQUIRED)
endif()
if(NOT TARGET zstd::libzstd)
if(TARGET zstd::libzstd_shared)
add_library(zstd::libzstd ALIAS zstd::libzstd_shared)
Expand Down Expand Up @@ -182,7 +178,7 @@ if(${PROJECT_NAME_UPPERCASE}_USE_EXTERNAL_HTTPLIB)
find_package(httplib REQUIRED)
endif()
else()
set(httplib_version 0.28.0)
set(httplib_version 0.30.1)
FetchContent_Declare(
httplib
URL https://github.com/yhirose/cpp-httplib/archive/refs/tags/v${httplib_version}.tar.gz
Expand Down Expand Up @@ -227,9 +223,6 @@ target_link_libraries(
OpenSSL::SSL
Threads::Threads
zstd::libzstd
# macOS-specific libraries required by httplib
$<$<PLATFORM_ID:Darwin>:${CFNETWORK_LIB}>
$<$<PLATFORM_ID:Darwin>:${CORE_FRAMEWORK_LIB}>
)

target_compile_definitions(
Expand Down
2 changes: 2 additions & 0 deletions cmake/SourcesAndHeaders.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,11 @@ set(sources
src/detail/dbn_buffer_decoder.cpp
src/detail/http_client.cpp
src/detail/json_helpers.cpp
src/detail/live_connection.cpp
src/detail/scoped_fd.cpp
src/detail/sha256_hasher.cpp
src/detail/tcp_client.cpp
src/detail/tcp_readable.cpp
src/detail/zstd_stream.cpp
src/enums.cpp
src/exceptions.cpp
Expand Down
3 changes: 2 additions & 1 deletion examples/live/simple.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,13 @@ int main() {
.SetSendTsOut(true)
.SetKeyFromEnv()
.SetDataset(db::Dataset::GlbxMdp3)
.SetCompression(db::Compression::Zstd)
.BuildThreaded();

// Set up signal handler for Ctrl+C
std::signal(SIGINT, [](int signal) { gSignal = signal; });

std::vector<std::string> symbols{"ESZ5", "ESZ5 C6200", "ESZ5 P5500"};
std::vector<std::string> symbols{"ESZ6", "ESZ6 C8200", "ESZ6 P7500"};
client.Subscribe(symbols, db::Schema::Definition, db::SType::RawSymbol);
client.Subscribe(symbols, db::Schema::Mbo, db::SType::RawSymbol);

Expand Down
3 changes: 3 additions & 0 deletions include/databento/detail/buffer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ class Buffer : public IReadable, public IWritable {
// Will throw if `length > ReadCapacity()`.
void ReadExact(std::byte* buffer, std::size_t length) override;
std::size_t ReadSome(std::byte* buffer, std::size_t max_length) override;
// timeout is ignored
IReadable::Result ReadSome(std::byte* buffer, std::size_t max_length,
std::chrono::milliseconds timeout) override;

std::byte* ReadBegin() { return read_pos_; }
std::byte* ReadEnd() { return write_pos_; }
Expand Down
38 changes: 38 additions & 0 deletions include/databento/detail/live_connection.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
#pragma once

#include <chrono>
#include <cstddef>
#include <cstdint>
#include <optional>
#include <string>
#include <string_view>

#include "databento/detail/tcp_client.hpp"
#include "databento/detail/zstd_stream.hpp"
#include "databento/enums.hpp"
#include "databento/ireadable.hpp"
#include "databento/iwritable.hpp"

namespace databento::detail {
// Manages the TCP connection to the live gateway with optionally compressed reads for
// the DBN data.
class LiveConnection : IWritable {
public:
LiveConnection(const std::string& gateway, std::uint16_t port);

void WriteAll(std::string_view str);
void WriteAll(const std::byte* buffer, std::size_t size);
void ReadExact(std::byte* buffer, std::size_t size);
IReadable::Result ReadSome(std::byte* buffer, std::size_t max_size);
IReadable::Result ReadSome(std::byte* buffer, std::size_t max_size,
std::chrono::milliseconds timeout);
// Closes the socket.
void Close();
// Sets compression for subsequent reads.
void SetCompression(Compression compression);

private:
TcpClient client_;
std::optional<ZstdDecodeStream> zstd_stream_;
};
} // namespace databento::detail
18 changes: 4 additions & 14 deletions include/databento/detail/tcp_client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,11 @@
#include <string_view>

#include "databento/detail/scoped_fd.hpp" // ScopedFd
#include "databento/ireadable.hpp"

namespace databento::detail {
class TcpClient {
public:
enum class Status : std::uint8_t {
Ok,
Timeout,
Closed,
};

struct Result {
std::size_t read_size;
Status status;
};

struct RetryConf {
std::uint32_t max_attempts{1};
std::chrono::seconds max_wait{std::chrono::minutes{1}};
Expand All @@ -33,11 +23,11 @@ class TcpClient {
void WriteAll(std::string_view str);
void WriteAll(const std::byte* buffer, std::size_t size);
void ReadExact(std::byte* buffer, std::size_t size);
Result ReadSome(std::byte* buffer, std::size_t max_size);
IReadable::Result ReadSome(std::byte* buffer, std::size_t max_size);
// Passing a timeout of 0 will block until data is available of the socket is
// closed, the same behavior as the Read overload without a timeout.
Result ReadSome(std::byte* buffer, std::size_t max_size,
std::chrono::milliseconds timeout);
IReadable::Result ReadSome(std::byte* buffer, std::size_t max_size,
std::chrono::milliseconds timeout);
// Closes the socket.
void Close();

Expand Down
24 changes: 24 additions & 0 deletions include/databento/detail/tcp_readable.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
#pragma once

#include <chrono>
#include <cstddef>

#include "databento/detail/tcp_client.hpp"
#include "databento/ireadable.hpp"

namespace databento::detail {
// Adapter wrapping TcpClient to implement IReadable interface and be passed
// as a non-owned pointer to ZstdDecodeStream.
class TcpReadable : public IReadable {
public:
explicit TcpReadable(TcpClient* client) : client_{client} {}

void ReadExact(std::byte* buffer, std::size_t length) override;
std::size_t ReadSome(std::byte* buffer, std::size_t max_length) override;
IReadable::Result ReadSome(std::byte* buffer, std::size_t max_length,
std::chrono::milliseconds timeout) override;

private:
TcpClient* client_;
};
} // namespace databento::detail
8 changes: 7 additions & 1 deletion include/databento/detail/zstd_stream.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include <zstd.h>

#include <chrono>
#include <cstddef> // size_t
#include <memory> // unique_ptr
#include <vector>
Expand All @@ -19,9 +20,12 @@ class ZstdDecodeStream : public IReadable {

// Read exactly `length` bytes into `buffer`.
void ReadExact(std::byte* buffer, std::size_t length) override;
// Read at most `length` bytes. Returns the number of bytes read. Will only
// Read at most `max_length` bytes. Returns the number of bytes read. Will only
// return 0 if the end of the stream is reached.
std::size_t ReadSome(std::byte* buffer, std::size_t max_length) override;
// Read at most `max_length` bytes with timeout support.
IReadable::Result ReadSome(std::byte* buffer, std::size_t max_length,
std::chrono::milliseconds timeout) override;

IReadable* Input() const { return input_.get(); }

Expand All @@ -44,6 +48,8 @@ class ZstdCompressStream : public IWritable {
~ZstdCompressStream() override;

void WriteAll(const std::byte* buffer, std::size_t length) override;
// Flush any buffered data without ending the stream
void Flush();

private:
ILogReceiver* log_receiver_;
Expand Down
3 changes: 3 additions & 0 deletions include/databento/file_stream.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ class InFileStream : public IReadable {
// Read at most `length` bytes. Returns the number of bytes read. Will only
// return 0 if the end of the stream is reached.
std::size_t ReadSome(std::byte* buffer, std::size_t max_length) override;
// timeout is ignored
Result ReadSome(std::byte* buffer, std::size_t max_length,
std::chrono::milliseconds timeout) override;

private:
std::ifstream stream_;
Expand Down
23 changes: 21 additions & 2 deletions include/databento/ireadable.hpp
Original file line number Diff line number Diff line change
@@ -1,18 +1,37 @@
#pragma once

#include <chrono> // milliseconds
#include <cstddef> // byte, size_t
#include <cstdint> // uint8_t

namespace databento {
// An abstract class for readable objects to allow for runtime polymorphism
// around DBN decoding.
class IReadable {
public:
enum class Status : std::uint8_t {
Ok, // Data read successfully
Timeout, // Timeout reached before any data available
Closed, // Stream is closed/EOF
};

struct Result {
std::size_t read_size; // Number of bytes read
Status status; // Status of the read operation
};

virtual ~IReadable() = default;

// Read exactly `length` bytes into `buffer`.
virtual void ReadExact(std::byte* buffer, std::size_t length) = 0;
// Read at most `length` bytes. Returns the number of bytes read. Will only
// return 0 if the end of the stream is reached.
// Read at most `length` bytes. Returns the number of bytes read. Will only return 0
// if the end of the stream is reached.
virtual std::size_t ReadSome(std::byte* buffer, std::size_t max_length) = 0;
// Read at most `max_length` bytes with timeout support. Returns Result with bytes
// read and status. Status will be Timeout if no data available within timeout period,
// Closed if stream is closed, or Ok if data was read. A timeout of 0 means wait
// indefinitely (same as the no-timeout overload).
virtual Result ReadSome(std::byte* buffer, std::size_t max_length,
std::chrono::milliseconds timeout) = 0;
};
} // namespace databento
3 changes: 3 additions & 0 deletions include/databento/live.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ class LiveBuilder {
LiveBuilder& SetBufferSize(std::size_t size);
// Appends to the default user agent.
LiveBuilder& ExtendUserAgent(std::string extension);
// Sets the compression mode for the read stream.
LiveBuilder& SetCompression(Compression compression);

/*
* Build a live client instance
Expand All @@ -77,5 +79,6 @@ class LiveBuilder {
std::optional<std::chrono::seconds> heartbeat_interval_{};
std::size_t buffer_size_;
std::string user_agent_ext_;
Compression compression_{Compression::None};
};
} // namespace databento
16 changes: 10 additions & 6 deletions include/databento/live_blocking.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
#include "databento/datetime.hpp" // UnixNanos
#include "databento/dbn.hpp" // Metadata
#include "databento/detail/buffer.hpp"
#include "databento/detail/tcp_client.hpp" // TcpClient
#include "databento/enums.hpp" // Schema, SType, VersionUpgradePolicy
#include "databento/detail/live_connection.hpp" // LiveConnection
#include "databento/enums.hpp" // Schema, SType, VersionUpgradePolicy, Compression
#include "databento/live_subscription.hpp"
#include "databento/record.hpp" // Record, RecordHeader

Expand Down Expand Up @@ -44,6 +44,7 @@ class LiveBlocking {
std::optional<std::chrono::seconds> HeartbeatInterval() const {
return heartbeat_interval_;
}
databento::Compression Compression() const { return compression_; }
const std::vector<LiveSubscription>& Subscriptions() const { return subscriptions_; }
std::vector<LiveSubscription>& Subscriptions() { return subscriptions_; }

Expand Down Expand Up @@ -93,12 +94,14 @@ class LiveBlocking {
LiveBlocking(ILogReceiver* log_receiver, std::string key, std::string dataset,
bool send_ts_out, VersionUpgradePolicy upgrade_policy,
std::optional<std::chrono::seconds> heartbeat_interval,
std::size_t buffer_size, std::string user_agent_ext);
std::size_t buffer_size, std::string user_agent_ext,
databento::Compression compression);
LiveBlocking(ILogReceiver* log_receiver, std::string key, std::string dataset,
std::string gateway, std::uint16_t port, bool send_ts_out,
VersionUpgradePolicy upgrade_policy,
std::optional<std::chrono::seconds> heartbeat_interval,
std::size_t buffer_size, std::string user_agent_ext);
std::size_t buffer_size, std::string user_agent_ext,
databento::Compression compression);

std::string DetermineGateway() const;
std::uint64_t Authenticate();
Expand All @@ -109,7 +112,7 @@ class LiveBlocking {
void IncrementSubCounter();
void Subscribe(std::string_view sub_msg, const std::vector<std::string>& symbols,
bool use_snapshot);
detail::TcpClient::Result FillBuffer(std::chrono::milliseconds timeout);
IReadable::Result FillBuffer(std::chrono::milliseconds timeout);
RecordHeader* BufferRecordHeader();

static constexpr std::size_t kMaxStrLen = 24L * 1024;
Expand All @@ -124,7 +127,8 @@ class LiveBlocking {
std::uint8_t version_{};
const VersionUpgradePolicy upgrade_policy_;
const std::optional<std::chrono::seconds> heartbeat_interval_;
detail::TcpClient client_;
const databento::Compression compression_;
detail::LiveConnection connection_;
std::uint32_t sub_counter_{};
std::vector<LiveSubscription> subscriptions_;
detail::Buffer buffer_;
Expand Down
7 changes: 5 additions & 2 deletions include/databento/live_threaded.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ class LiveThreaded {
bool SendTsOut() const;
VersionUpgradePolicy UpgradePolicy() const;
std::optional<std::chrono::seconds> HeartbeatInterval() const;
databento::Compression Compression() const;
const std::vector<LiveSubscription>& Subscriptions() const;
std::vector<LiveSubscription>& Subscriptions();

Expand Down Expand Up @@ -107,12 +108,14 @@ class LiveThreaded {
LiveThreaded(ILogReceiver* log_receiver, std::string key, std::string dataset,
bool send_ts_out, VersionUpgradePolicy upgrade_policy,
std::optional<std::chrono::seconds> heartbeat_interval,
std::size_t buffer_size, std::string user_agent_ext);
std::size_t buffer_size, std::string user_agent_ext,
databento::Compression compression);
LiveThreaded(ILogReceiver* log_receiver, std::string key, std::string dataset,
std::string gateway, std::uint16_t port, bool send_ts_out,
VersionUpgradePolicy upgrade_policy,
std::optional<std::chrono::seconds> heartbeat_interval,
std::size_t buffer_size, std::string user_agent_ext);
std::size_t buffer_size, std::string user_agent_ext,
databento::Compression compression);

// unique_ptr to be movable
std::unique_ptr<Impl> impl_;
Expand Down
2 changes: 1 addition & 1 deletion pkg/PKGBUILD
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Maintainer: Databento <support@databento.com>
_pkgname=databento-cpp
pkgname=databento-cpp-git
pkgver=0.46.1
pkgver=0.47.0
pkgrel=1
pkgdesc="Official C++ client for Databento"
arch=('any')
Expand Down
Loading
Loading