Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions olp-cpp-sdk-dataservice-read/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@ add_library(${PROJECT_NAME}
${SRC}
${INC})

target_compile_definitions(${PROJECT_NAME}
PRIVATE
BOOST_ALL_NO_LIB
BOOST_JSON_NO_LIB)

target_include_directories(${PROJECT_NAME}
PUBLIC
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include>
Expand Down
21 changes: 14 additions & 7 deletions olp-cpp-sdk-dataservice-read/src/repositories/AsyncJsonStream.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (C) 2023-2025 HERE Europe B.V.
* Copyright (C) 2023-2026 HERE Europe B.V.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -31,6 +31,19 @@ RapidJsonByteStream::Ch RapidJsonByteStream::Peek() {
return read_buffer_[count_];
}

boost::json::string_view RapidJsonByteStream::ReadView() {
if (ReadEmpty()) {
SwapBuffers();
}
auto begin = read_buffer_.begin() + count_;
auto terminator_it = std::find(begin, read_buffer_.end(), '\0');
boost::json::string_view::size_type size =
std::distance(begin, terminator_it);
count_ += size;
full_count_ += size;
return {&*begin, size};
}

RapidJsonByteStream::Ch RapidJsonByteStream::Take() {
if (ReadEmpty()) {
SwapBuffers();
Expand All @@ -41,12 +54,6 @@ RapidJsonByteStream::Ch RapidJsonByteStream::Take() {

size_t RapidJsonByteStream::Tell() const { return full_count_; }

// Not implemented
char* RapidJsonByteStream::PutBegin() { return 0; }
void RapidJsonByteStream::Put(char) {}
void RapidJsonByteStream::Flush() {}
size_t RapidJsonByteStream::PutEnd(char*) { return 0; }

bool RapidJsonByteStream::ReadEmpty() const {
return count_ == read_buffer_.size();
}
Expand Down
12 changes: 5 additions & 7 deletions olp-cpp-sdk-dataservice-read/src/repositories/AsyncJsonStream.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (C) 2023-2025 HERE Europe B.V.
* Copyright (C) 2023-2026 HERE Europe B.V.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -25,6 +25,7 @@

#include <olp/core/client/ApiError.h>
#include <olp/core/porting/optional.h>
#include <boost/json/string_view.hpp>

namespace olp {
namespace dataservice {
Expand All @@ -43,15 +44,12 @@ class RapidJsonByteStream {
/// character.
Ch Take();

/// Return the view of current read buffer until the end of first \0 character
boost::json::string_view ReadView();

/// Get the current read cursor.
size_t Tell() const;

/// Not needed for reading.
char* PutBegin();
void Put(char);
void Flush();
size_t PutEnd(char*);

bool ReadEmpty() const;
bool WriteEmpty() const;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (C) 2019-2024 HERE Europe B.V.
* Copyright (C) 2019-2026 HERE Europe B.V.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -26,6 +26,7 @@
#include <olp/core/client/Condition.h>
#include <olp/core/logging/Log.h>
#include <boost/functional/hash.hpp>
#include <boost/json/basic_parser_impl.hpp>
#include "CatalogRepository.h"
#include "generated/api/MetadataApi.h"
#include "generated/api/QueryApi.h"
Expand Down Expand Up @@ -648,37 +649,44 @@ client::ApiNoResponse PartitionsRepository::ParsePartitionsStream(
const std::shared_ptr<AsyncJsonStream>& async_stream,
const PartitionsStreamCallback& partition_callback,
client::CancellationContext context) {
rapidjson::ParseResult parse_result;
auto parse_result =
boost::json::make_error_code(boost::json::error::incomplete);

// We must perform at least one attempt to parse.
do {
rapidjson::Reader reader;
auto partitions_handler =
std::make_shared<repository::PartitionsSaxHandler>(partition_callback);

auto reader_cancellation_token = client::CancellationToken([=]() {
partitions_handler->Abort();
async_stream->CloseStream(client::ApiError::Cancelled());
});

if (!context.ExecuteOrCancelled(
[=]() { return reader_cancellation_token; })) {
auto parser =
std::make_shared<boost::json::basic_parser<PartitionsSaxHandler>>(
boost::json::parse_options{}, partition_callback);

auto reader_cancellation_token =
client::CancellationToken([parser, &async_stream]() {
parser->handler().Abort();
async_stream->CloseStream(client::ApiError::Cancelled());
});

if (!context.ExecuteOrCancelled([reader_cancellation_token]() {
return reader_cancellation_token;
})) {
return client::ApiError::Cancelled();
}

auto json_stream = async_stream->GetCurrentStream();

parse_result = reader.Parse<rapidjson::kParseIterativeFlag>(
*json_stream, *partitions_handler);
while (json_stream->Peek() != '\0') {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe better to have separate function for end detection?

auto view = json_stream->ReadView();
if (parser->write_some(true, view.data(), view.size(), parse_result)) {
parse_result = {};
}
}
// Retry to parse the stream until it's closed.
} while (!async_stream->IsClosed());

auto error = async_stream->GetError();

if (error) {
return {*error};
} else if (!parse_result) {
return client::ApiError(parse_result.Code(), "Parsing error");
} else if (parse_result.failed()) {
return client::ApiError(parse_result.value(), "Parsing error");
} else {
return client::ApiNoResult{};
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (C) 2023-2025 HERE Europe B.V.
* Copyright (C) 2023-2026 HERE Europe B.V.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -35,46 +35,45 @@ constexpr unsigned long long int HashStringToInt(
PartitionsSaxHandler::PartitionsSaxHandler(PartitionCallback partition_callback)
: partition_callback_(std::move(partition_callback)) {}

bool PartitionsSaxHandler::StartObject() {
bool PartitionsSaxHandler::on_object_begin(boost::json::error_code& ec) {
if (state_ == State::kWaitForRootObject) {
state_ = State::kWaitForRootPartitions;
return continue_parsing_;
return CanContinue(ec);
}

if (state_ != State::kWaitForNextPartition) {
return false;
return NotSupported(ec);
}

state_ = State::kProcessingAttribute;

return continue_parsing_;
return CanContinue(ec);
}

bool PartitionsSaxHandler::String(const char* str, unsigned int length, bool) {
bool PartitionsSaxHandler::String(const std::string& str, error_code& ec) {
switch (state_) {
case State::kProcessingAttribute:
state_ = ProcessNextAttribute(str, length);
return continue_parsing_;
state_ = ProcessNextAttribute(str);
return CanContinue(ec);

case State::kWaitForRootPartitions:
if (HashStringToInt("partitions") == HashStringToInt(str)) {
case State::kWaitForRootPartitions: {
if (HashStringToInt("partitions") == HashStringToInt(str.c_str())) {
state_ = State::kWaitPartitionsArray;
return continue_parsing_;
} else {
return false;
return CanContinue(ec);
}
return NotSupported(ec);
}

case State::kParsingPartitionName:
partition_.SetPartition(std::string(str, length));
partition_.SetPartition(str);
break;
case State::kParsingDataHandle:
partition_.SetDataHandle(std::string(str, length));
partition_.SetDataHandle(str);
break;
case State::kParsingChecksum:
partition_.SetChecksum(std::string(str, length));
partition_.SetChecksum(str);
break;
case State::kParsingCrc:
partition_.SetCrc(std::string(str, length));
partition_.SetCrc(str);
break;
case State::kParsingIgnoreAttribute:
break;
Expand All @@ -92,72 +91,72 @@ bool PartitionsSaxHandler::String(const char* str, unsigned int length, bool) {

state_ = State::kProcessingAttribute;

return continue_parsing_;
return CanContinue(ec);
}

bool PartitionsSaxHandler::Uint(unsigned int value) {
bool PartitionsSaxHandler::on_int64(const int64_t value, string_view,
error_code& ec) {
if (state_ == State::kParsingVersion) {
partition_.SetVersion(value);
} else if (state_ == State::kParsingDataSize) {
partition_.SetDataSize(value);
} else if (state_ == State::kParsingCompressedDataSize) {
partition_.SetCompressedDataSize(value);
} else {
return false;
return NotSupported(ec);
}

state_ = State::kProcessingAttribute;
return continue_parsing_;
return CanContinue(ec);
}

bool PartitionsSaxHandler::EndObject(unsigned int) {
bool PartitionsSaxHandler::on_object_end(std::size_t, error_code& ec) {
if (state_ == State::kWaitForRootObjectEnd) {
state_ = State::kParsingComplete;
return true; // complete
return CanContinue(ec); // complete
}

if (state_ != State::kProcessingAttribute) {
return false;
return NotSupported(ec);
}

if (partition_.GetDataHandle().empty() || partition_.GetPartition().empty()) {
return false; // partition is not valid
return NotSupported(ec); // partition is not valid
}

partition_callback_(std::move(partition_));

state_ = State::kWaitForNextPartition;

return continue_parsing_;
return CanContinue(ec);
}

bool PartitionsSaxHandler::StartArray() {
bool PartitionsSaxHandler::on_array_begin(boost::json::error_code& ec) {
// We expect only a single array in whol response
if (state_ != State::kWaitPartitionsArray) {
return false;
return NotSupported(ec);
}

state_ = State::kWaitForNextPartition;

return continue_parsing_;
return CanContinue(ec);
}

bool PartitionsSaxHandler::EndArray(unsigned int) {
bool PartitionsSaxHandler::on_array_end(std::size_t,
boost::json::error_code& ec) {
if (state_ != State::kWaitForNextPartition) {
return false;
return NotSupported(ec);
}

state_ = State::kWaitForRootObjectEnd;
return continue_parsing_;
return CanContinue(ec);
}

bool PartitionsSaxHandler::Default() { return false; }

void PartitionsSaxHandler::Abort() { continue_parsing_.store(false); }

PartitionsSaxHandler::State PartitionsSaxHandler::ProcessNextAttribute(
const char* name, unsigned int /*length*/) {
switch (HashStringToInt(name)) {
const std::string& name) {
switch (HashStringToInt(name.c_str())) {
case HashStringToInt("dataHandle"):
return State::kParsingDataHandle;
case HashStringToInt("partition"):
Expand Down
Loading
Loading