diff --git a/doc/modules/ROOT/pages/7.examples/7a.hello-task.adoc b/doc/modules/ROOT/pages/7.examples/7a.hello-task.adoc index 22eaa170..f0d54416 100644 --- a/doc/modules/ROOT/pages/7.examples/7a.hello-task.adoc +++ b/doc/modules/ROOT/pages/7.examples/7a.hello-task.adoc @@ -20,9 +20,9 @@ The minimal Capy program: a task that prints a message. #include #include -using namespace boost::capy; +namespace capy = boost::capy; -task<> say_hello() +capy::task<> say_hello() { std::cout << "Hello from Capy!\n"; co_return; @@ -30,8 +30,8 @@ task<> say_hello() int main() { - thread_pool pool; - run_async(pool.get_executor())(say_hello()); + capy::thread_pool pool; + capy::run_async(pool.get_executor())(say_hello()); return 0; } ---- @@ -50,7 +50,7 @@ target_link_libraries(hello_task PRIVATE capy) [source,cpp] ---- -task<> say_hello() +capy::task<> say_hello() { std::cout << "Hello from Capy!\n"; co_return; @@ -65,7 +65,7 @@ Tasks are lazy: calling `say_hello()` creates a task object but does not execute [source,cpp] ---- -thread_pool pool; +capy::thread_pool pool; ---- `thread_pool` provides an execution context with worker threads. By default, it creates one thread per CPU core. @@ -76,7 +76,7 @@ The pool's destructor waits for all work to complete before returning. This ensu [source,cpp] ---- -run_async(pool.get_executor())(say_hello()); +capy::run_async(pool.get_executor())(say_hello()); ---- `run_async` bridges non-coroutine code (like `main`) to coroutine code. The two-call syntax: diff --git a/doc/modules/ROOT/pages/7.examples/7b.producer-consumer.adoc b/doc/modules/ROOT/pages/7.examples/7b.producer-consumer.adoc index 697b0f9a..c00cf685 100644 --- a/doc/modules/ROOT/pages/7.examples/7b.producer-consumer.adoc +++ b/doc/modules/ROOT/pages/7.examples/7b.producer-consumer.adoc @@ -23,21 +23,21 @@ Two tasks communicating via an async event, with strand serialization. #include #include -using namespace boost::capy; +namespace capy = boost::capy; int main() { - thread_pool pool; // thread_pool - strand s{pool.get_executor()}; // strand - serializes execution - std::latch done(1); // std::latch - wait for completion + capy::thread_pool pool; + capy::strand s{pool.get_executor()}; + std::latch done(1); - auto on_complete = [&done](auto&&...) { done.count_down(); }; // lambda - auto on_error = [&done](std::exception_ptr) { done.count_down(); }; // lambda + auto on_complete = [&done](auto&&...) { done.count_down(); }; + auto on_error = [&done](std::exception_ptr) { done.count_down(); }; - async_event data_ready; // async_event - int shared_value = 0; // int + capy::async_event data_ready; + int shared_value = 0; - auto producer = [&]() -> task<> { + auto producer = [&]() -> capy::task<> { std::cout << "Producer: preparing data...\n"; shared_value = 42; std::cout << "Producer: data ready, signaling\n"; @@ -45,9 +45,10 @@ int main() co_return; }; - auto consumer = [&]() -> task<> { + auto consumer = [&]() -> capy::task<> { std::cout << "Consumer: waiting for data...\n"; - co_await data_ready.wait(); + auto [ec] = co_await data_ready.wait(); + (void)ec; std::cout << "Consumer: received value " << shared_value << "\n"; co_return; }; @@ -55,11 +56,11 @@ int main() // Run both tasks concurrently using when_all, through a strand. // The strand serializes execution, ensuring thread-safe access // to the shared async_event and shared_value. - auto run_both = [&]() -> task<> { - co_await when_all(producer(), consumer()); + auto run_both = [&]() -> capy::task<> { + co_await capy::when_all(producer(), consumer()); }; - run_async(s, on_complete, on_error)(run_both()); + capy::run_async(s, on_complete, on_error)(run_both()); done.wait(); // Block until tasks complete return 0; @@ -80,7 +81,7 @@ target_link_libraries(producer_consumer PRIVATE capy) [source,cpp] ---- -strand s{pool.get_executor()}; // strand - serializes execution +capy::strand s{pool.get_executor()}; ---- A `strand` is an executor adaptor that serializes execution. All coroutines dispatched through a strand are guaranteed not to run concurrently, making it safe to access shared state without explicit locking. Note that `async_event` is not thread-safe, so using a strand ensures safe access. @@ -89,7 +90,7 @@ A `strand` is an executor adaptor that serializes execution. All coroutines disp [source,cpp] ---- -async_event data_ready; // async_event +capy::async_event data_ready; ---- `async_event` is a one-shot signaling mechanism. One task can `set()` it; other tasks can `wait()` for it. When set, all waiting tasks resume. @@ -98,7 +99,7 @@ async_event data_ready; // async_event [source,cpp] ---- -auto producer = [&]() -> task<> { +auto producer = [&]() -> capy::task<> { std::cout << "Producer: preparing data...\n"; shared_value = 42; std::cout << "Producer: data ready, signaling\n"; @@ -113,9 +114,10 @@ The producer prepares data and signals completion by calling `set()`. [source,cpp] ---- -auto consumer = [&]() -> task<> { +auto consumer = [&]() -> capy::task<> { std::cout << "Consumer: waiting for data...\n"; - co_await data_ready.wait(); + auto [ec] = co_await data_ready.wait(); + (void)ec; std::cout << "Consumer: received value " << shared_value << "\n"; co_return; }; @@ -130,11 +132,11 @@ The consumer waits until the event is set. The `co_await data_ready.wait()` susp // Run both tasks concurrently using when_all, through a strand. // The strand serializes execution, ensuring thread-safe access // to the shared async_event and shared_value. -auto run_both = [&]() -> task<> { - co_await when_all(producer(), consumer()); +auto run_both = [&]() -> capy::task<> { + co_await capy::when_all(producer(), consumer()); }; -run_async(s, on_complete, on_error)(run_both()); +capy::run_async(s, on_complete, on_error)(run_both()); ---- `when_all` runs both tasks concurrently within the same parent coroutine context, but the strand ensures they don't run at the same time on different threads. The producer signals `data_ready` when the value is set, and the consumer waits for the signal before reading. diff --git a/doc/modules/ROOT/pages/7.examples/7c.buffer-composition.adoc b/doc/modules/ROOT/pages/7.examples/7c.buffer-composition.adoc index b8410c1c..9b9e992f 100644 --- a/doc/modules/ROOT/pages/7.examples/7c.buffer-composition.adoc +++ b/doc/modules/ROOT/pages/7.examples/7c.buffer-composition.adoc @@ -23,7 +23,7 @@ Composing buffer sequences without allocation for scatter/gather I/O. #include #include -using namespace boost::capy; +namespace capy = boost::capy; void demonstrate_single_buffers() { @@ -35,9 +35,9 @@ void demonstrate_single_buffers() std::vector vec = {'V', 'e', 'c', 't', 'o', 'r'}; // make_buffer creates buffer views (no copies) - auto str_buf = make_buffer(str); // mutable_buffer - auto arr_buf = make_buffer(arr, sizeof(arr) - 1); // mutable_buffer - Exclude null terminator - auto vec_buf = make_buffer(vec); // mutable_buffer + auto str_buf = capy::make_buffer(str); // mutable_buffer + auto arr_buf = capy::make_buffer(arr, sizeof(arr) - 1); // mutable_buffer - Exclude null terminator + auto vec_buf = capy::make_buffer(vec); // mutable_buffer std::cout << "String buffer: " << str_buf.size() << " bytes\n"; std::cout << "Array buffer: " << arr_buf.size() << " bytes\n"; @@ -48,19 +48,19 @@ void demonstrate_buffer_pair() { std::cout << "\n=== Buffer Pair (Scatter/Gather) ===\n\n"; - // const_buffer_pair is std::array + // capy::const_buffer_pair is std::array std::string header = "Content-Type: text/plain\r\n\r\n"; std::string body = "Hello, World!"; - const_buffer_pair message = {{ - make_buffer(header), - make_buffer(body) + capy::const_buffer_pair message = {{ + capy::make_buffer(header), + capy::make_buffer(body) }}; // Calculate total size - std::size_t total = buffer_size(message); + std::size_t total = capy::buffer_size(message); std::cout << "Total message size: " << total << " bytes\n"; - std::cout << "Buffer count: " << buffer_length(message) << "\n"; + std::cout << "Buffer count: " << capy::buffer_length(message) << "\n"; // Iterate through buffers std::cout << "\nBuffer contents:\n"; @@ -83,17 +83,17 @@ void demonstrate_buffer_array() std::string empty_line = "\r\n"; std::string body = R"({"status":"ok"})"; - std::array http_response = {{ - make_buffer(status), - make_buffer(content_type), - make_buffer(server), - make_buffer(empty_line), - make_buffer(body) + std::array http_response = {{ + capy::make_buffer(status), + capy::make_buffer(content_type), + capy::make_buffer(server), + capy::make_buffer(empty_line), + capy::make_buffer(body) }}; - std::size_t total = buffer_size(http_response); + std::size_t total = capy::buffer_size(http_response); std::cout << "HTTP response size: " << total << " bytes\n"; - std::cout << "Buffer count: " << buffer_length(http_response) << "\n"; + std::cout << "Buffer count: " << capy::buffer_length(http_response) << "\n"; // In real code with streams: // co_await write(stream, http_response); @@ -108,13 +108,13 @@ void demonstrate_mutable_buffers() char buf1[64]; char buf2[64]; - mutable_buffer_pair recv_buffers = {{ - mutable_buffer(buf1, sizeof(buf1)), - mutable_buffer(buf2, sizeof(buf2)) + capy::mutable_buffer_pair recv_buffers = {{ + capy::mutable_buffer(buf1, sizeof(buf1)), + capy::mutable_buffer(buf2, sizeof(buf2)) }}; - std::cout << "Prepared " << buffer_length(recv_buffers) - << " buffers with " << buffer_size(recv_buffers) + std::cout << "Prepared " << capy::buffer_length(recv_buffers) + << " buffers with " << capy::buffer_size(recv_buffers) << " bytes total capacity\n"; // In real code: @@ -146,8 +146,8 @@ target_link_libraries(buffer_composition PRIVATE capy) [source,cpp] ---- -auto str_buf = make_buffer(str); // mutable_buffer -auto arr_buf = make_buffer(arr, sizeof(arr) - 1); // mutable_buffer +auto str_buf = capy::make_buffer(str); // mutable_buffer +auto arr_buf = capy::make_buffer(arr, sizeof(arr) - 1); // mutable_buffer ---- `make_buffer` creates buffer views from various sources. No data is copied—the buffers reference the original storage. @@ -156,9 +156,9 @@ auto arr_buf = make_buffer(arr, sizeof(arr) - 1); // mutable_buffer [source,cpp] ---- -const_buffer_pair message = {{ - make_buffer(header), - make_buffer(body) +capy::const_buffer_pair message = {{ + capy::make_buffer(header), + capy::make_buffer(body) }}; ---- @@ -168,9 +168,9 @@ const_buffer_pair message = {{ [source,cpp] ---- -std::array http_response = {{ - make_buffer(status), - make_buffer(content_type), +std::array http_response = {{ + capy::make_buffer(status), + capy::make_buffer(content_type), // ... }}; ---- diff --git a/doc/modules/ROOT/pages/7.examples/7d.mock-stream-testing.adoc b/doc/modules/ROOT/pages/7.examples/7d.mock-stream-testing.adoc index 287d14f7..16c41d4e 100644 --- a/doc/modules/ROOT/pages/7.examples/7d.mock-stream-testing.adoc +++ b/doc/modules/ROOT/pages/7.examples/7d.mock-stream-testing.adoc @@ -26,24 +26,24 @@ Unit testing protocol code with mock streams and error injection. #include #include -using namespace boost::capy; +namespace capy = boost::capy; // A simple protocol: read until newline, echo back uppercase // Takes any_stream& so the function is transport-independent -task echo_line_uppercase(any_stream& stream) +capy::task echo_line_uppercase(capy::any_stream& stream) { std::string line; char c; - + // Read character by character until newline while (true) { // ec: std::error_code, n: std::size_t - auto [ec, n] = co_await stream.read_some(mutable_buffer(&c, 1)); - + auto [ec, n] = co_await stream.read_some(capy::mutable_buffer(&c, 1)); + if (ec) { - if (ec == cond::eof) + if (ec == capy::cond::eof) break; co_return false; } @@ -62,7 +62,7 @@ task echo_line_uppercase(any_stream& stream) { // wec: std::error_code, wn: std::size_t auto [wec, wn] = co_await stream.write_some( - const_buffer(line.data() + written, line.size() - written)); + capy::const_buffer(line.data() + written, line.size() - written)); if (wec) co_return false; @@ -77,19 +77,16 @@ void test_happy_path() { std::cout << "Test: happy path\n"; - // Use fuse in disarmed mode (no error injection) for happy path - test::fuse f; // test::fuse - test::stream mock(f); // test::stream - mock.provide("hello\n"); - - // Wrap mock in any_stream using pointer construction for reference semantics - any_stream stream{&mock}; // any_stream - + auto [a, b] = capy::test::make_stream_pair(); + b.provide("hello\n"); + + capy::any_stream stream{&a}; // any_stream + bool result = false; // bool - test::run_blocking([&](bool r) { result = r; })(echo_line_uppercase(stream)); + capy::test::run_blocking([&](bool r) { result = r; })(echo_line_uppercase(stream)); assert(result == true); - assert(mock.data() == "HELLO\n"); + assert(b.data() == "HELLO\n"); std::cout << " PASSED\n"; } @@ -98,20 +95,17 @@ void test_partial_reads() { std::cout << "Test: partial reads (1 byte at a time)\n"; - // Use fuse in disarmed mode (no error injection) - test::fuse f; // test::fuse - // Mock returns at most 1 byte per read_some - test::stream mock(f, 1); // test::stream, max_read_size = 1 - mock.provide("hi\n"); - - // Wrap mock in any_stream using pointer construction for reference semantics - any_stream stream{&mock}; // any_stream - + auto [a, b] = capy::test::make_stream_pair(); + a.set_max_read_size(1); + b.provide("hi\n"); + + capy::any_stream stream{&a}; // any_stream + bool result = false; // bool - test::run_blocking([&](bool r) { result = r; })(echo_line_uppercase(stream)); + capy::test::run_blocking([&](bool r) { result = r; })(echo_line_uppercase(stream)); assert(result == true); - assert(mock.data() == "HI\n"); + assert(b.data() == "HI\n"); std::cout << " PASSED\n"; } @@ -125,13 +119,12 @@ void test_with_error_injection() // fuse::armed runs the test repeatedly, failing at each // operation point until all paths are covered - test::fuse f; // test::fuse - auto r = f.armed([&](test::fuse&) -> task<> { // fuse::result - test::stream mock(f); // test::stream - mock.provide("test\n"); + capy::test::fuse f; // test::fuse + auto r = f.armed([&](capy::test::fuse&) -> capy::task<> { // fuse::result + auto [a, b] = capy::test::make_stream_pair(f); + b.provide("test\n"); - // Wrap mock in any_stream using pointer construction for reference semantics - any_stream stream{&mock}; // any_stream + capy::any_stream stream{&a}; // any_stream // Run the protocol - fuse will inject errors at each step bool result = co_await echo_line_uppercase(stream); // bool @@ -140,7 +133,7 @@ void test_with_error_injection() if (result) { ++success_count; - assert(mock.data() == "TEST\n"); + assert(b.data() == "TEST\n"); } else { @@ -185,8 +178,8 @@ target_link_libraries(mock_stream_testing PRIVATE capy) [source,cpp] ---- -test::fuse f; // test::fuse -test::stream mock(f); // test::stream +capy::test::fuse f; // test::fuse +capy::test::stream mock(f); // test::stream mock.provide("hello\n"); ---- @@ -202,7 +195,7 @@ mock.provide("hello\n"); [source,cpp] ---- // Wrap mock in any_stream using pointer construction for reference semantics -any_stream stream{&mock}; // any_stream +capy::any_stream stream{&mock}; // any_stream ---- Use pointer construction (`&mock`) so the `any_stream` wrapper references the mock without taking ownership. This allows inspecting `mock.data()` after operations. @@ -212,7 +205,7 @@ Use pointer construction (`&mock`) so the `any_stream` wrapper references the mo [source,cpp] ---- bool result = false; // bool -test::run_blocking([&](bool r) { result = r; })(echo_line_uppercase(stream)); +capy::test::run_blocking([&](bool r) { result = r; })(echo_line_uppercase(stream)); ---- `run_blocking` executes a coroutine synchronously, blocking until complete. Pass a handler to capture the result. @@ -221,9 +214,9 @@ test::run_blocking([&](bool r) { result = r; })(echo_line_uppercase(stream)); [source,cpp] ---- -test::fuse f; // test::fuse -auto r = f.armed([&](test::fuse&) -> task<> { - test::stream mock(f); // test::stream +capy::test::fuse f; // test::fuse +auto r = f.armed([&](capy::test::fuse&) -> capy::task<> { + capy::test::stream mock(f); // test::stream // ... run test ... }); ---- diff --git a/doc/modules/ROOT/pages/7.examples/7e.type-erased-echo.adoc b/doc/modules/ROOT/pages/7.examples/7e.type-erased-echo.adoc index afdd6857..a45eec18 100644 --- a/doc/modules/ROOT/pages/7.examples/7e.type-erased-echo.adoc +++ b/doc/modules/ROOT/pages/7.examples/7e.type-erased-echo.adoc @@ -47,28 +47,28 @@ boost::capy::task<> echo_session(boost::capy::any_stream& stream); namespace myapp { -using namespace boost::capy; +namespace capy = boost::capy; -task<> echo_session(any_stream& stream) +capy::task<> echo_session(capy::any_stream& stream) { char buffer[1024]; - + for (;;) { // Read some data // ec: std::error_code, n: std::size_t - auto [ec, n] = co_await stream.read_some(make_buffer(buffer)); - - if (ec == cond::eof) + auto [ec, n] = co_await stream.read_some(capy::make_buffer(buffer)); + + if (ec == capy::cond::eof) co_return; // Client closed connection - + if (ec) throw std::system_error(ec); - + // Echo it back // wec: std::error_code, wn: std::size_t - auto [wec, wn] = co_await write(stream, const_buffer(buffer, n)); - + auto [wec, wn] = co_await capy::write(stream, capy::const_buffer(buffer, n)); + if (wec) throw std::system_error(wec); } @@ -88,22 +88,21 @@ task<> echo_session(any_stream& stream) #include #include -using namespace boost::capy; +namespace capy = boost::capy; void test_with_mock() { - test::fuse f; - test::stream mock(f); - mock.provide("Hello, "); - mock.provide("World!\n"); - // Stream returns eof when no more data is available + auto [a, b] = capy::test::make_stream_pair(); + b.provide("Hello, "); + b.provide("World!\n"); + b.close(); - // Using pointer construction (&mock) for reference semantics - the - // wrapper does not take ownership, so mock must outlive stream. - any_stream stream{&mock}; // any_stream - test::run_blocking()(myapp::echo_session(stream)); + // Using pointer construction (&a) for reference semantics - the + // wrapper does not take ownership, so a must outlive stream. + capy::any_stream stream{&a}; // any_stream + capy::test::run_blocking()(myapp::echo_session(stream)); - std::cout << "Echo output: " << mock.data() << "\n"; + std::cout << "Echo output: " << b.data() << "\n"; } // With real sockets (using Corosio), you would write: @@ -140,7 +139,7 @@ target_link_libraries(echo_demo PRIVATE echo_lib) [source,cpp] ---- // echo.hpp -task<> echo_session(any_stream& stream); +boost::capy::task<> echo_session(boost::capy::any_stream& stream); ---- The header declares only the signature. It includes `any_stream` and `task`, but no concrete transport types. @@ -156,7 +155,7 @@ Clients of this header: [source,cpp] ---- // echo.cpp -task<> echo_session(any_stream& stream) +capy::task<> echo_session(capy::any_stream& stream) { // Full implementation here } diff --git a/doc/modules/ROOT/pages/7.examples/7f.timeout-cancellation.adoc b/doc/modules/ROOT/pages/7.examples/7f.timeout-cancellation.adoc index fa55a43b..835bfdd1 100644 --- a/doc/modules/ROOT/pages/7.examples/7f.timeout-cancellation.adoc +++ b/doc/modules/ROOT/pages/7.examples/7f.timeout-cancellation.adoc @@ -18,6 +18,7 @@ Using stop tokens to implement operation timeouts. [source,cpp] ---- #include +#include #include #include #include @@ -25,12 +26,12 @@ Using stop tokens to implement operation timeouts. #include #include -using namespace boost::capy; +namespace capy = boost::capy; // A slow operation that respects cancellation -task slow_fetch(int steps) +capy::task slow_fetch(int steps) { - auto token = co_await this_coro::stop_token; // std::stop_token + auto token = co_await capy::this_coro::stop_token; // std::stop_token std::string result; for (int i = 0; i < steps; ++i) @@ -50,6 +51,7 @@ task slow_fetch(int steps) std::cout << " Completed step " << i << std::endl; // Yield to allow stop request to be processed before next check + // Extra 5ms ensures print completes before main thread prints std::this_thread::sleep_for(std::chrono::milliseconds(15)); } @@ -57,9 +59,9 @@ task slow_fetch(int steps) } // Run with timeout (conceptual - real implementation needs timer) -task> fetch_with_timeout() +capy::task> fetch_with_timeout() { - auto token = co_await this_coro::stop_token; // std::stop_token + auto token = co_await capy::this_coro::stop_token; // std::stop_token try { @@ -78,11 +80,11 @@ void demo_normal_completion() { std::cout << "Demo: Normal completion\n"; - thread_pool pool; + capy::thread_pool pool; std::stop_source source; std::latch done(1); // std::latch - wait for 1 task - - run_async(pool.get_executor(), source.get_token(), + + capy::run_async(pool.get_executor(), source.get_token(), [&done](std::optional result) { if (result) std::cout << "Result: " << *result << "\n"; @@ -92,20 +94,20 @@ void demo_normal_completion() }, [&done](std::exception_ptr) { done.count_down(); } )(fetch_with_timeout()); - + done.wait(); // Block until task completes } void demo_cancellation() { std::cout << "\nDemo: Cancellation after 2 steps\n"; - - thread_pool pool; + + capy::thread_pool pool; std::stop_source source; std::latch done(1); // std::latch - wait for 1 task - + // Launch the task - run_async(pool.get_executor(), source.get_token(), + capy::run_async(pool.get_executor(), source.get_token(), [&done](std::optional result) { if (result) std::cout << "Result: " << *result << "\n"; @@ -118,6 +120,7 @@ void demo_cancellation() // Simulate timeout: cancel after 2 steps complete // Timing: each step is 10ms work + 15ms yield = 25ms total + // Step 1 prints at 35ms, step 2 check at 50ms // Stop at 42ms: after step 1 print, before step 2 check std::this_thread::sleep_for(std::chrono::milliseconds(42)); std::cout << " Requesting stop..." << std::endl; @@ -127,9 +130,9 @@ void demo_cancellation() } // Example: Manual stop token checking -task process_items(std::vector const& items) +capy::task process_items(std::vector const& items) { - auto token = co_await this_coro::stop_token; // std::stop_token + auto token = co_await capy::this_coro::stop_token; // std::stop_token int sum = 0; for (auto item : items) // int @@ -169,7 +172,7 @@ target_link_libraries(timeout_cancellation PRIVATE capy) [source,cpp] ---- -auto token = co_await this_coro::stop_token; // std::stop_token +auto token = co_await capy::this_coro::stop_token; // std::stop_token ---- Inside a task, `this_coro::stop_token` retrieves the stop token propagated from the caller. You can also access it through the full environment via `co_await this_coro::environment`. @@ -191,7 +194,7 @@ Check `stop_requested()` at appropriate points—typically before expensive oper [source,cpp] ---- std::stop_source source; -run_async(ex, source.get_token())(my_task()); +capy::run_async(ex, source.get_token())(my_task()); // Later: source.request_stop(); diff --git a/doc/modules/ROOT/pages/7.examples/7g.parallel-fetch.adoc b/doc/modules/ROOT/pages/7.examples/7g.parallel-fetch.adoc index db92a072..e36db4e5 100644 --- a/doc/modules/ROOT/pages/7.examples/7g.parallel-fetch.adoc +++ b/doc/modules/ROOT/pages/7.examples/7g.parallel-fetch.adoc @@ -22,36 +22,36 @@ Running multiple operations concurrently with `when_all`. #include #include -using namespace boost::capy; +namespace capy = boost::capy; // Simulated async operations -task fetch_user_id(std::string username) +capy::task fetch_user_id(std::string username) { std::cout << "Fetching user ID for: " << username << "\n"; // In real code: co_await http_get("/users/" + username); co_return static_cast(username.length()) * 100; // Fake ID } -task fetch_user_name(int id) +capy::task fetch_user_name(int id) { std::cout << "Fetching name for user ID: " << id << "\n"; co_return "User" + std::to_string(id); } -task fetch_order_count(int user_id) +capy::task fetch_order_count(int user_id) { std::cout << "Fetching order count for user: " << user_id << "\n"; co_return user_id / 10; // Fake count } -task fetch_account_balance(int user_id) +capy::task fetch_account_balance(int user_id) { std::cout << "Fetching balance for user: " << user_id << "\n"; co_return user_id * 1.5; // Fake balance } // Fetch all user data in parallel -task<> fetch_user_dashboard(std::string username) +capy::task<> fetch_user_dashboard(std::string username) { std::cout << "\n=== Fetching dashboard for: " << username << " ===\n"; @@ -62,7 +62,7 @@ task<> fetch_user_dashboard(std::string username) // Now fetch all user data in parallel std::cout << "Starting parallel fetches...\n"; // name: std::string, orders: int, balance: double - auto [name, orders, balance] = co_await when_all( + auto [name, orders, balance] = co_await capy::when_all( fetch_user_name(user_id), fetch_order_count(user_id), fetch_account_balance(user_id) @@ -75,24 +75,24 @@ task<> fetch_user_dashboard(std::string username) } // Example with void tasks -task<> log_access(std::string resource) +capy::task<> log_access(std::string resource) { std::cout << "Logging access to: " << resource << "\n"; co_return; } -task<> update_metrics(std::string metric) +capy::task<> update_metrics(std::string metric) { std::cout << "Updating metric: " << metric << "\n"; co_return; } -task fetch_with_side_effects() +capy::task fetch_with_side_effects() { std::cout << "\n=== Fetch with side effects ===\n"; // void tasks don't contribute to result tuple - std::tuple results = co_await when_all( + std::tuple results = co_await capy::when_all( log_access("api/data"), // void - no result update_metrics("api_calls"), // void - no result fetch_user_name(42) // returns string @@ -104,7 +104,7 @@ task fetch_with_side_effects() } // Error handling example -task might_fail(bool should_fail, std::string name) +capy::task might_fail(bool should_fail, std::string name) { std::cout << "Task " << name << " starting\n"; @@ -117,14 +117,14 @@ task might_fail(bool should_fail, std::string name) co_return 42; } -task<> demonstrate_error_handling() +capy::task<> demonstrate_error_handling() { std::cout << "\n=== Error handling ===\n"; try { // a: int, b: int, c: int - auto [a, b, c] = co_await when_all( + auto [a, b, c] = co_await capy::when_all( might_fail(false, "A"), might_fail(true, "B"), // This one fails might_fail(false, "C") @@ -141,18 +141,17 @@ task<> demonstrate_error_handling() int main() { - thread_pool pool; - + capy::thread_pool pool; std::latch done(3); // std::latch - wait for 3 tasks - + // Completion handlers signal the latch when each task finishes // Use generic lambda to accept any result type (or no result for task) auto on_complete = [&done](auto&&...) { done.count_down(); }; auto on_error = [&done](std::exception_ptr) { done.count_down(); }; - - run_async(pool.get_executor(), on_complete, on_error)(fetch_user_dashboard("alice")); - run_async(pool.get_executor(), on_complete, on_error)(fetch_with_side_effects()); - run_async(pool.get_executor(), on_complete, on_error)(demonstrate_error_handling()); + + capy::run_async(pool.get_executor(), on_complete, on_error)(fetch_user_dashboard("alice")); + capy::run_async(pool.get_executor(), on_complete, on_error)(fetch_with_side_effects()); + capy::run_async(pool.get_executor(), on_complete, on_error)(demonstrate_error_handling()); done.wait(); // Block until all tasks complete return 0; @@ -173,7 +172,7 @@ target_link_libraries(parallel_fetch PRIVATE capy) [source,cpp] ---- -auto [name, orders, balance] = co_await when_all( +auto [name, orders, balance] = co_await capy::when_all( fetch_user_name(user_id), fetch_order_count(user_id), fetch_account_balance(user_id) @@ -186,7 +185,7 @@ All three tasks run concurrently. `when_all` completes when all tasks finish. Re [source,cpp] ---- -std::tuple results = co_await when_all( +std::tuple results = co_await capy::when_all( log_access("api/data"), // void - filtered out update_metrics("api_calls"), // void - filtered out fetch_user_name(42) // string - in tuple @@ -202,7 +201,7 @@ Tasks returning `void` don't contribute to the result tuple. Only non-void resul ---- try { - auto results = co_await when_all(task_a(), task_b(), task_c()); + auto results = co_await capy::when_all(task_a(), task_b(), task_c()); } catch (...) { diff --git a/doc/modules/ROOT/pages/7.examples/7h.custom-dynamic-buffer.adoc b/doc/modules/ROOT/pages/7.examples/7h.custom-dynamic-buffer.adoc index 10bc0a0d..59cef7f5 100644 --- a/doc/modules/ROOT/pages/7.examples/7h.custom-dynamic-buffer.adoc +++ b/doc/modules/ROOT/pages/7.examples/7h.custom-dynamic-buffer.adoc @@ -27,7 +27,7 @@ Implementing the DynamicBuffer concept for a custom allocation strategy. #include #include -using namespace boost::capy; +namespace capy = boost::capy; // Custom dynamic buffer with statistics tracking class tracked_buffer @@ -52,9 +52,9 @@ public: // === DynamicBuffer interface === // Consumer: readable data - const_buffer data() const noexcept + capy::const_buffer data() const noexcept { - return const_buffer( + return capy::const_buffer( storage_.data() + read_pos_, write_pos_ - read_pos_); } @@ -76,7 +76,7 @@ public: } // Producer: prepare space for writing - mutable_buffer prepare(std::size_t n) + capy::mutable_buffer prepare(std::size_t n) { total_prepared_ += n; @@ -94,7 +94,7 @@ public: if (required > storage_.size()) storage_.resize(required); - return mutable_buffer( + return capy::mutable_buffer( storage_.data() + write_pos_, n); } @@ -147,7 +147,7 @@ private: }; // Demonstrate using the custom buffer -task<> read_into_tracked_buffer(test::stream& stream, tracked_buffer& buffer) +capy::task<> read_into_tracked_buffer(capy::test::stream& stream, tracked_buffer& buffer) { // Read data in chunks while (true) @@ -156,7 +156,7 @@ task<> read_into_tracked_buffer(test::stream& stream, tracked_buffer& buffer) // ec: std::error_code, n: std::size_t auto [ec, n] = co_await stream.read_some(space); - if (ec == cond::eof) + if (ec == capy::cond::eof) break; if (ec) @@ -173,17 +173,15 @@ void demo_tracked_buffer() { std::cout << "=== Tracked Buffer Demo ===\n\n"; - // Setup mock stream with test data - test::fuse f; - test::stream mock(f); - mock.provide("Hello, "); - mock.provide("World! "); - mock.provide("This is a test of the custom buffer.\n"); - // Stream returns eof when data is exhausted + auto [reader, writer] = capy::test::make_stream_pair(); + writer.provide("Hello, "); + writer.provide("World! "); + writer.provide("This is a test of the custom buffer.\n"); + writer.close(); tracked_buffer buffer; - test::run_blocking()(read_into_tracked_buffer(mock, buffer)); + capy::test::run_blocking()(read_into_tracked_buffer(reader, buffer)); std::cout << "\nFinal buffer contents: "; auto data = buffer.data(); // const_buffer diff --git a/doc/modules/ROOT/pages/7.examples/7i.echo-server-corosio.adoc b/doc/modules/ROOT/pages/7.examples/7i.echo-server-corosio.adoc index 836afd59..274a2997 100644 --- a/doc/modules/ROOT/pages/7.examples/7i.echo-server-corosio.adoc +++ b/doc/modules/ROOT/pages/7.examples/7i.echo-server-corosio.adoc @@ -23,22 +23,22 @@ A complete echo server using Corosio for real network I/O. #include namespace corosio = boost::corosio; -using namespace boost::capy; +namespace capy = boost::capy; -task<> echo_session(corosio::tcp_socket sock) +capy::task<> echo_session(corosio::tcp_socket sock) { char buf[1024]; for (;;) { auto [ec, n] = co_await sock.read_some( - mutable_buffer(buf, sizeof(buf))); + capy::mutable_buffer(buf, sizeof(buf))); if (ec) break; - auto [wec, wn] = co_await write( - sock, const_buffer(buf, n)); + auto [wec, wn] = co_await capy::write( + sock, capy::const_buffer(buf, n)); if (wec) break; @@ -47,7 +47,7 @@ task<> echo_session(corosio::tcp_socket sock) sock.close(); } -task<> accept_loop( +capy::task<> accept_loop( corosio::tcp_acceptor& acc, corosio::io_context& ioc) { @@ -73,7 +73,7 @@ task<> accept_loop( std::cout << remote.v6_address(); std::cout << ":" << remote.port() << "\n"; - run_async(ioc.get_executor())( + capy::run_async(ioc.get_executor())( echo_session(std::move(peer))); } } @@ -87,7 +87,7 @@ int main(int argc, char* argv[]) corosio::io_context ioc; corosio::tcp_acceptor acc(ioc, corosio::endpoint(port)); - run_async(ioc.get_executor())( + capy::run_async(ioc.get_executor())( accept_loop(acc, ioc)); ioc.run(); @@ -135,10 +135,10 @@ The accept loop runs forever, creating a new `tcp_socket` for each connection. ` [source,cpp] ---- auto [ec, n] = co_await sock.read_some( - mutable_buffer(buf, sizeof(buf))); + capy::mutable_buffer(buf, sizeof(buf))); // ... -auto [wec, wn] = co_await write( - sock, const_buffer(buf, n)); +auto [wec, wn] = co_await capy::write( + sock, capy::const_buffer(buf, n)); ---- Each session reads data with `read_some` and writes it back with `write`. When the client disconnects, `read_some` returns an error and the loop exits. @@ -147,7 +147,7 @@ Each session reads data with `read_some` and writes it back with `write`. When t [source,cpp] ---- -run_async(ioc.get_executor())( +capy::run_async(ioc.get_executor())( echo_session(std::move(peer))); ---- diff --git a/doc/modules/ROOT/pages/7.examples/7j.stream-pipeline.adoc b/doc/modules/ROOT/pages/7.examples/7j.stream-pipeline.adoc index 39468864..60010211 100644 --- a/doc/modules/ROOT/pages/7.examples/7j.stream-pipeline.adoc +++ b/doc/modules/ROOT/pages/7.examples/7j.stream-pipeline.adoc @@ -17,18 +17,6 @@ Data transformation through a pipeline of sources and sinks. [source,cpp] ---- -// -// Stream Pipeline Example -// -// This example demonstrates chaining buffer sources to create a data -// processing pipeline. Data flows through transform stages: -// -// input -> uppercase_transform -> line_numbering_transform -> output -// -// Each transform is a BufferSource that wraps an upstream any_buffer_source, -// enabling type-erased composition of arbitrary transform chains. -// - #include #include #include @@ -40,26 +28,36 @@ Data transformation through a pipeline of sources and sinks. #include #include -using namespace boost::capy; +namespace capy = boost::capy; + +//------------------------------------------------------------------------------ +// +// Transform: uppercase_transform +// +// A BufferSource that pulls from an upstream source and converts all +// characters to uppercase. Demonstrates a simple byte-by-byte transform. +// +//------------------------------------------------------------------------------ -// A transform stage that converts to uppercase class uppercase_transform { - any_buffer_source* source_; // any_buffer_source* - std::vector buffer_; // std::vector - std::size_t consumed_ = 0; // std::size_t - bool exhausted_ = false; // bool - + capy::any_buffer_source* source_; // any_buffer_source* + std::vector buffer_; // std::vector - transformed data + std::size_t consumed_ = 0; // std::size_t - bytes consumed by downstream + bool exhausted_ = false; // bool - upstream exhausted + public: - explicit uppercase_transform(any_buffer_source& source) + explicit uppercase_transform(capy::any_buffer_source& source) : source_(&source) { } // BufferSource::consume - advance past processed bytes - void consume(std::size_t n) noexcept + void + consume(std::size_t n) noexcept { consumed_ += n; + // Compact buffer when fully consumed if (consumed_ >= buffer_.size()) { buffer_.clear(); @@ -68,86 +66,97 @@ public: } // BufferSource::pull - returns task<> to enable co_await on upstream - io_task> - pull(std::span dest) + capy::io_task> + pull(std::span dest) { // Already have unconsumed data? if (consumed_ < buffer_.size()) { if (dest.empty()) - co_return {std::error_code{}, std::span{}}; - - dest[0] = const_buffer( + co_return {std::error_code{}, std::span{}}; + + dest[0] = capy::const_buffer( buffer_.data() + consumed_, buffer_.size() - consumed_); co_return {std::error_code{}, dest.first(1)}; } - + // Upstream exhausted? if (exhausted_) - co_return {std::error_code{}, std::span{}}; - + co_return {capy::error::eof, std::span{}}; + // Pull from upstream buffer_.clear(); consumed_ = 0; - - const_buffer upstream[8]; // const_buffer[8] + + capy::const_buffer upstream[8]; // const_buffer[8] // ec: std::error_code, bufs: std::span auto [ec, bufs] = co_await source_->pull(upstream); - - if (ec) - co_return {ec, std::span{}}; - - if (bufs.empty()) + + if (ec == capy::cond::eof) { exhausted_ = true; - co_return {std::error_code{}, std::span{}}; + co_return {capy::error::eof, std::span{}}; } - + + if (ec) + co_return {ec, std::span{}}; + // Transform: uppercase each byte for (auto const& buf : bufs) // const_buffer const& { auto const* data = static_cast(buf.data()); // char const* auto size = buf.size(); // std::size_t - + for (std::size_t i = 0; i < size; ++i) { buffer_.push_back(static_cast( std::toupper(static_cast(data[i])))); } } - + // Consume from upstream - source_->consume(buffer_size(bufs)); - + source_->consume(capy::buffer_size(bufs)); + // Return transformed data if (dest.empty() || buffer_.empty()) - co_return {std::error_code{}, std::span{}}; - - dest[0] = const_buffer(buffer_.data(), buffer_.size()); + co_return {std::error_code{}, std::span{}}; + + dest[0] = capy::const_buffer(buffer_.data(), buffer_.size()); co_return {std::error_code{}, dest.first(1)}; } }; -// A transform that adds line numbers +//------------------------------------------------------------------------------ +// +// Transform: line_numbering_transform +// +// A BufferSource that pulls from an upstream source and prepends line +// numbers to each line. Demonstrates a transform that changes data size. +// +//------------------------------------------------------------------------------ + class line_numbering_transform { - any_buffer_source* source_; // any_buffer_source* - std::string buffer_; // std::string - std::size_t consumed_ = 0; // std::size_t - std::size_t line_num_ = 1; // std::size_t - bool at_line_start_ = true; // bool - bool exhausted_ = false; // bool - + capy::any_buffer_source* source_; // any_buffer_source* + std::string buffer_; // std::string - transformed data + std::size_t consumed_ = 0; // std::size_t - bytes consumed by downstream + std::size_t line_num_ = 1; // std::size_t - current line number + bool at_line_start_ = true; // bool - are we at start of a line? + bool exhausted_ = false; // bool - upstream exhausted + public: - explicit line_numbering_transform(any_buffer_source& source) + explicit line_numbering_transform(capy::any_buffer_source& source) : source_(&source) { } - void consume(std::size_t n) noexcept + // BufferSource::consume - advance past processed bytes + void + consume(std::size_t n) noexcept { consumed_ += n; + // Compact buffer when fully consumed if (consumed_ >= buffer_.size()) { buffer_.clear(); @@ -155,45 +164,49 @@ public: } } - io_task> - pull(std::span dest) + // BufferSource::pull - returns task<> to enable co_await on upstream + capy::io_task> + pull(std::span dest) { + // Already have unconsumed data? if (consumed_ < buffer_.size()) { if (dest.empty()) - co_return {std::error_code{}, std::span{}}; - - dest[0] = const_buffer( + co_return {std::error_code{}, std::span{}}; + + dest[0] = capy::const_buffer( buffer_.data() + consumed_, buffer_.size() - consumed_); co_return {std::error_code{}, dest.first(1)}; } - + + // Upstream exhausted? if (exhausted_) - co_return {std::error_code{}, std::span{}}; - + co_return {capy::error::eof, std::span{}}; + + // Pull from upstream buffer_.clear(); consumed_ = 0; - - const_buffer upstream[8]; // const_buffer[8] + + capy::const_buffer upstream[8]; // const_buffer[8] // ec: std::error_code, bufs: std::span auto [ec, bufs] = co_await source_->pull(upstream); - - if (ec) - co_return {ec, std::span{}}; - - if (bufs.empty()) + + if (ec == capy::cond::eof) { exhausted_ = true; - co_return {std::error_code{}, std::span{}}; + co_return {capy::error::eof, std::span{}}; } - + + if (ec) + co_return {ec, std::span{}}; + // Transform: add line numbers for (auto const& buf : bufs) // const_buffer const& { auto const* data = static_cast(buf.data()); // char const* auto size = buf.size(); // std::size_t - + for (std::size_t i = 0; i < size; ++i) { if (at_line_start_) @@ -206,34 +219,42 @@ public: at_line_start_ = true; } } - - source_->consume(buffer_size(bufs)); - + + // Consume from upstream + source_->consume(capy::buffer_size(bufs)); + + // Return transformed data if (dest.empty() || buffer_.empty()) - co_return {std::error_code{}, std::span{}}; - - dest[0] = const_buffer(buffer_.data(), buffer_.size()); + co_return {std::error_code{}, std::span{}}; + + dest[0] = capy::const_buffer(buffer_.data(), buffer_.size()); co_return {std::error_code{}, dest.first(1)}; } }; -// Transfer from source to sink -task transfer(any_buffer_source& source, any_write_sink& sink) +//------------------------------------------------------------------------------ +// +// transfer: Pull from source and write to sink until exhausted +// +//------------------------------------------------------------------------------ + +capy::task transfer(capy::any_buffer_source& source, capy::any_write_sink& sink) { std::size_t total = 0; // std::size_t - const_buffer bufs[8]; // const_buffer[8] - + capy::const_buffer bufs[8]; // const_buffer[8] + for (;;) { // ec: std::error_code, spans: std::span auto [ec, spans] = co_await source.pull(bufs); - + + if (ec == capy::cond::eof) + break; + if (ec) throw std::system_error(ec); - - if (spans.empty()) - break; - + + // Write each buffer to sink for (auto const& buf : spans) // const_buffer const& { // wec: std::error_code, n: std::size_t @@ -242,52 +263,72 @@ task transfer(any_buffer_source& source, any_write_sink& sink) throw std::system_error(wec); total += n; } - - source.consume(buffer_size(spans)); + + // Consume what we read + source.consume(capy::buffer_size(spans)); } - - io_result<> eof_result = co_await sink.write_eof(); + + capy::io_result<> eof_result = co_await sink.write_eof(); if (eof_result.ec) throw std::system_error(eof_result.ec); - + co_return total; } +//------------------------------------------------------------------------------ +// +// demo_pipeline: Demonstrate chained transforms +// +//------------------------------------------------------------------------------ + void demo_pipeline() { std::cout << "=== Stream Pipeline Demo ===\n\n"; - // Input data + // Input data - three lines std::string input = "hello world\nthis is a test\nof the pipeline\n"; std::cout << "Input:\n" << input << "\n"; // Create mock source with input data - test::fuse f; // test::fuse - test::buffer_source source(f); // test::buffer_source + capy::test::fuse f; // test::fuse + capy::test::buffer_source source(f); // test::buffer_source source.provide(input); - // Build the pipeline using type-erased buffer sources. - // Using pointer construction (&source) for reference semantics - - // the wrapper does not take ownership, so source must outlive src. - any_buffer_source src{&source}; // any_buffer_source + // Build the pipeline using type-erased buffer sources: + // source -> [uppercase] -> [line_numbering] -> sink + + // Stage 1: Wrap raw source as any_buffer_source. + // Using pointer construction (&source) for reference semantics - the + // wrapper does not take ownership, so source must outlive src. + capy::any_buffer_source src{&source}; // any_buffer_source + // Stage 2: Uppercase transform wraps src. + // Again using pointer construction so upper_src references upper + // without taking ownership. uppercase_transform upper{src}; // uppercase_transform - any_buffer_source upper_src{&upper}; // any_buffer_source + capy::any_buffer_source upper_src{&upper}; // any_buffer_source + // Stage 3: Line numbering transform wraps upper_src. line_numbering_transform numbered{upper_src}; // line_numbering_transform - any_buffer_source numbered_src{&numbered}; // any_buffer_source + capy::any_buffer_source numbered_src{&numbered}; // any_buffer_source - // Create sink - pointer construction ensures sink outlives dst - test::write_sink sink(f); // test::write_sink - any_write_sink dst{&sink}; // any_write_sink + // Create sink to collect output. + // Pointer construction ensures sink outlives dst. + capy::test::write_sink sink(f); // test::write_sink + capy::any_write_sink dst{&sink}; // any_write_sink - // Run pipeline + // Run the pipeline std::size_t bytes = 0; // std::size_t - test::run_blocking([&](std::size_t n) { bytes = n; })( + capy::test::run_blocking([&](std::size_t n) { bytes = n; })( transfer(numbered_src, dst)); std::cout << "Output (" << bytes << " bytes):\n"; std::cout << sink.data() << "\n"; + + // Expected output: + // 1: HELLO WORLD + // 2: THIS IS A TEST + // 3: OF THE PIPELINE } int main() @@ -332,20 +373,20 @@ Data flows through the pipeline: [source,cpp] ---- -io_task> -pull(std::span dest) +capy::io_task> +pull(std::span dest) { // Pull from upstream // ec: std::error_code, bufs: std::span auto [ec, bufs] = co_await source_->pull(upstream); - + // Transform data... - + // Consume from upstream - source_->consume(buffer_size(bufs)); - + source_->consume(capy::buffer_size(bufs)); + // Return transformed buffer - dest[0] = const_buffer(buffer_.data(), buffer_.size()); + dest[0] = capy::const_buffer(buffer_.data(), buffer_.size()); co_return {std::error_code{}, dest.first(1)}; } ---- @@ -362,10 +403,10 @@ Each stage: [source,cpp] ---- // Using pointer construction (&source) for reference semantics -any_buffer_source src{&source}; // any_buffer_source +capy::any_buffer_source src{&source}; // any_buffer_source uppercase_transform upper{src}; // uppercase_transform -any_buffer_source upper_src{&upper}; // any_buffer_source +capy::any_buffer_source upper_src{&upper}; // any_buffer_source ---- `any_buffer_source` wraps each stage using pointer construction, allowing uniform composition while preserving the lifetime of the underlying objects. diff --git a/doc/modules/ROOT/pages/7.examples/7k.strand-serialization.adoc b/doc/modules/ROOT/pages/7.examples/7k.strand-serialization.adoc index 7b30a155..2b0374d3 100644 --- a/doc/modules/ROOT/pages/7.examples/7k.strand-serialization.adoc +++ b/doc/modules/ROOT/pages/7.examples/7k.strand-serialization.adoc @@ -21,15 +21,15 @@ Protecting shared state with a strand instead of a mutex. #include #include -using namespace boost::capy; +namespace capy = boost::capy; int main() { constexpr int num_coroutines = 10; constexpr int increments_per_coro = 1000; - thread_pool pool(4); - strand s{pool.get_executor()}; + capy::thread_pool pool(4); + capy::strand s{pool.get_executor()}; std::latch done(1); auto on_complete = [&done](auto&&...) { done.count_down(); }; @@ -48,7 +48,7 @@ int main() // Each coroutine increments the shared counter without locks. // The strand ensures only one coroutine runs at a time. - auto increment = [&](int id) -> task<> { + auto increment = [&](int id) -> capy::task<> { for (int i = 0; i < increments_per_coro; ++i) ++counter; std::cout << "Coroutine " << id @@ -56,15 +56,15 @@ int main() co_return; }; - auto run_all = [&]() -> task<> { - co_await when_all( + auto run_all = [&]() -> capy::task<> { + co_await capy::when_all( increment(0), increment(1), increment(2), increment(3), increment(4), increment(5), increment(6), increment(7), increment(8), increment(9)); }; - run_async(s, on_complete, on_error)(run_all()); + capy::run_async(s, on_complete, on_error)(run_all()); done.wait(); int expected = num_coroutines * increments_per_coro; @@ -89,7 +89,7 @@ target_link_libraries(strand_serialization PRIVATE Boost::capy) [source,cpp] ---- -strand s{pool.get_executor()}; +capy::strand s{pool.get_executor()}; ---- A `strand` wraps an executor and guarantees that handlers dispatched through it never run concurrently. This replaces the need for a mutex when protecting shared state accessed by coroutines. @@ -100,7 +100,7 @@ A `strand` wraps an executor and guarantees that handlers dispatched through it ---- int counter = 0; -auto increment = [&](int id) -> task<> { +auto increment = [&](int id) -> capy::task<> { for (int i = 0; i < increments_per_coro; ++i) ++counter; // ... @@ -113,7 +113,7 @@ Multiple coroutines increment the same `counter` without any locks. The strand s [source,cpp] ---- -run_async(s, on_complete, on_error)(run_all()); +capy::run_async(s, on_complete, on_error)(run_all()); ---- Passing the strand `s` to `run_async` ensures the entire coroutine tree executes through the strand. Even though the underlying `thread_pool` has 4 threads, the strand constrains execution to one coroutine at a time. diff --git a/doc/modules/ROOT/pages/7.examples/7l.async-mutex.adoc b/doc/modules/ROOT/pages/7.examples/7l.async-mutex.adoc index 033379b1..1ebc4a5c 100644 --- a/doc/modules/ROOT/pages/7.examples/7l.async-mutex.adoc +++ b/doc/modules/ROOT/pages/7.examples/7l.async-mutex.adoc @@ -22,14 +22,12 @@ Fair FIFO coroutine locking with `async_mutex`. #include #include -using namespace boost::capy; +namespace capy = boost::capy; int main() { - constexpr int num_workers = 6; - - thread_pool pool; - strand s{pool.get_executor()}; + capy::thread_pool pool; + capy::strand s{pool.get_executor()}; std::latch done(1); auto on_complete = [&done](auto&&...) { done.count_down(); }; @@ -44,11 +42,11 @@ int main() done.count_down(); }; - async_mutex mtx; + capy::async_mutex mtx; int acquisition_order = 0; std::vector order_log; - auto worker = [&](int id) -> task<> { + auto worker = [&](int id) -> capy::task<> { std::cout << "Worker " << id << " waiting for lock\n"; auto [ec, guard] = co_await mtx.scoped_lock(); if (ec) @@ -68,14 +66,14 @@ int main() co_return; }; - auto run_all = [&]() -> task<> { - co_await when_all( + auto run_all = [&]() -> capy::task<> { + co_await capy::when_all( worker(0), worker(1), worker(2), worker(3), worker(4), worker(5)); }; // Run on a strand so async_mutex operations are single-threaded - run_async(s, on_complete, on_error)(run_all()); + capy::run_async(s, on_complete, on_error)(run_all()); done.wait(); std::cout << "\nAcquisition order: "; @@ -105,7 +103,7 @@ target_link_libraries(async_mutex PRIVATE Boost::capy) [source,cpp] ---- -async_mutex mtx; +capy::async_mutex mtx; ---- `async_mutex` is a coroutine-aware mutex. Unlike `std::mutex`, it suspends the calling coroutine instead of blocking the thread, allowing other coroutines to run while waiting for the lock. diff --git a/doc/modules/ROOT/pages/7.examples/7m.parallel-tasks.adoc b/doc/modules/ROOT/pages/7.examples/7m.parallel-tasks.adoc index bc9f8a6e..5da55844 100644 --- a/doc/modules/ROOT/pages/7.examples/7m.parallel-tasks.adoc +++ b/doc/modules/ROOT/pages/7.examples/7m.parallel-tasks.adoc @@ -22,10 +22,10 @@ Distributing CPU-bound work across a thread pool and collecting results. #include #include -using namespace boost::capy; +namespace capy = boost::capy; // Sum integers in [lo, hi) -task partial_sum(int lo, int hi) +capy::task partial_sum(int lo, int hi) { std::ostringstream oss; oss << " range [" << lo << ", " << hi @@ -44,7 +44,7 @@ int main() constexpr int num_tasks = 4; constexpr int chunk = total / num_tasks; - thread_pool pool(num_tasks); + capy::thread_pool pool(num_tasks); std::latch done(1); auto on_complete = [&done](auto&&...) { done.count_down(); }; @@ -59,11 +59,11 @@ int main() done.count_down(); }; - auto compute = [&]() -> task<> { + auto compute = [&]() -> capy::task<> { std::cout << "Dispatching " << num_tasks << " parallel tasks...\n"; - auto [s0, s1, s2, s3] = co_await when_all( + auto [s0, s1, s2, s3] = co_await capy::when_all( partial_sum(0 * chunk, 1 * chunk), partial_sum(1 * chunk, 2 * chunk), partial_sum(2 * chunk, 3 * chunk), @@ -81,7 +81,7 @@ int main() << " (expected " << expected << ")\n"; }; - run_async(pool.get_executor(), on_complete, on_error)(compute()); + capy::run_async(pool.get_executor(), on_complete, on_error)(compute()); done.wait(); return 0; @@ -113,7 +113,7 @@ The range `[0, 10000)` is divided into 4 equal chunks, one per task. Each task c [source,cpp] ---- -auto [s0, s1, s2, s3] = co_await when_all( +auto [s0, s1, s2, s3] = co_await capy::when_all( partial_sum(0 * chunk, 1 * chunk), partial_sum(1 * chunk, 2 * chunk), partial_sum(2 * chunk, 3 * chunk), diff --git a/doc/modules/ROOT/pages/7.examples/7n.custom-executor.adoc b/doc/modules/ROOT/pages/7.examples/7n.custom-executor.adoc index 24b4b53e..8581934a 100644 --- a/doc/modules/ROOT/pages/7.examples/7n.custom-executor.adoc +++ b/doc/modules/ROOT/pages/7.examples/7n.custom-executor.adoc @@ -21,12 +21,12 @@ Implementing the Executor concept with a single-threaded run loop. #include #include -using namespace boost::capy; +namespace capy = boost::capy; // A minimal single-threaded execution context. // Demonstrates how to satisfy the Executor concept // for any custom scheduling system. -class run_loop : public execution_context +class run_loop : public capy::execution_context { std::queue> queue_; std::thread::id owner_; @@ -86,7 +86,7 @@ class run_loop::executor_type public: executor_type() = default; - execution_context& context() const noexcept + capy::execution_context& context() const noexcept { return *loop_; } @@ -122,19 +122,19 @@ run_loop::get_executor() noexcept } // Verify the concept is satisfied -static_assert(Executor); +static_assert(capy::Executor); -task compute(int x) +capy::task compute(int x) { std::cout << " computing " << x << " * " << x << "\n"; co_return x * x; } -task<> run_tasks() +capy::task<> run_tasks() { std::cout << "Launching 3 tasks with when_all...\n"; - auto [a, b, c] = co_await when_all( + auto [a, b, c] = co_await capy::when_all( compute(3), compute(7), compute(11)); @@ -149,9 +149,9 @@ int main() run_loop loop; // Launch using run_async, just like with thread_pool - run_async(loop.get_executor())(run_tasks()); + capy::run_async(loop.get_executor())(run_tasks()); - // Drive the loop -- all coroutines execute here + // Drive the loop — all coroutines execute here std::cout << "Running event loop on main thread...\n"; loop.run(); @@ -174,7 +174,7 @@ target_link_libraries(custom_executor PRIVATE Boost::capy) [source,cpp] ---- -class run_loop : public execution_context +class run_loop : public capy::execution_context { // ... run_loop() @@ -198,7 +198,7 @@ The nested `executor_type` must provide: [source,cpp] ---- -static_assert(Executor); +static_assert(capy::Executor); ---- The `static_assert` verifies at compile time that all concept requirements are met. @@ -225,7 +225,7 @@ std::coroutine_handle<> dispatch( [source,cpp] ---- -run_async(loop.get_executor())(run_tasks()); +capy::run_async(loop.get_executor())(run_tasks()); loop.run(); ---- diff --git a/example/allocation/allocation.cpp b/example/allocation/allocation.cpp index b2bcfc36..20ad589c 100644 --- a/example/allocation/allocation.cpp +++ b/example/allocation/allocation.cpp @@ -38,7 +38,7 @@ # define CAPY_NOINLINE #endif -using namespace boost::capy; +namespace capy = boost::capy; std::atomic counter{0}; @@ -82,31 +82,31 @@ class mi_memory_resource // business logic awaiting an HTTP client, awaiting // a TLS stream, awaiting a tcp_socket -CAPY_NOINLINE task<> depth_4() +CAPY_NOINLINE capy::task<> depth_4() { counter.fetch_add(1, std::memory_order_relaxed); co_return; } -CAPY_NOINLINE task<> depth_3() +CAPY_NOINLINE capy::task<> depth_3() { for(int i = 0; i < 3; ++i) co_await depth_4(); } -CAPY_NOINLINE task<> depth_2() +CAPY_NOINLINE capy::task<> depth_2() { for(int i = 0; i < 3; ++i) co_await depth_3(); } -CAPY_NOINLINE task<> depth_1() +CAPY_NOINLINE capy::task<> depth_1() { for(int i = 0; i < 5; ++i) co_await depth_2(); } -CAPY_NOINLINE task<> bench_loop(std::size_t n) +CAPY_NOINLINE capy::task<> bench_loop(std::size_t n) { for(std::size_t i = 0; i < n; ++i) co_await depth_1(); @@ -120,9 +120,9 @@ int main() counter.store(0); auto t0 = std::chrono::steady_clock::now(); { - test::blocking_context ctx; - ctx.set_frame_allocator(get_recycling_memory_resource()); - run_async(ctx.get_executor(), + capy::test::blocking_context ctx; + ctx.set_frame_allocator(capy::get_recycling_memory_resource()); + capy::run_async(ctx.get_executor(), [&] { ctx.signal_done(); })( bench_loop(iterations)); ctx.run(); @@ -135,9 +135,9 @@ int main() mi_memory_resource mi_mr; auto t2 = std::chrono::steady_clock::now(); { - test::blocking_context ctx; + capy::test::blocking_context ctx; ctx.set_frame_allocator(&mi_mr); - run_async(ctx.get_executor(), + capy::run_async(ctx.get_executor(), [&] { ctx.signal_done(); })( bench_loop(iterations)); ctx.run(); @@ -149,8 +149,8 @@ int main() counter.store(0); auto t4 = std::chrono::steady_clock::now(); { - test::blocking_context ctx; - run_async(ctx.get_executor(), std::allocator{}, + capy::test::blocking_context ctx; + capy::run_async(ctx.get_executor(), std::allocator{}, [&] { ctx.signal_done(); })( bench_loop(iterations)); ctx.run(); diff --git a/example/async-mutex/async_mutex.cpp b/example/async-mutex/async_mutex.cpp index c86a3fbf..22963f80 100644 --- a/example/async-mutex/async_mutex.cpp +++ b/example/async-mutex/async_mutex.cpp @@ -20,12 +20,12 @@ #include #include -using namespace boost::capy; +namespace capy = boost::capy; int main() { - thread_pool pool; - strand s{pool.get_executor()}; + capy::thread_pool pool; + capy::strand s{pool.get_executor()}; std::latch done(1); auto on_complete = [&done](auto&&...) { done.count_down(); }; @@ -40,11 +40,11 @@ int main() done.count_down(); }; - async_mutex mtx; + capy::async_mutex mtx; int acquisition_order = 0; std::vector order_log; - auto worker = [&](int id) -> task<> { + auto worker = [&](int id) -> capy::task<> { std::cout << "Worker " << id << " waiting for lock\n"; auto [ec, guard] = co_await mtx.scoped_lock(); if (ec) @@ -64,14 +64,14 @@ int main() co_return; }; - auto run_all = [&]() -> task<> { - co_await when_all( + auto run_all = [&]() -> capy::task<> { + co_await capy::when_all( worker(0), worker(1), worker(2), worker(3), worker(4), worker(5)); }; // Run on a strand so async_mutex operations are single-threaded - run_async(s, on_complete, on_error)(run_all()); + capy::run_async(s, on_complete, on_error)(run_all()); done.wait(); std::cout << "\nAcquisition order: "; diff --git a/example/buffer-composition/buffer_composition.cpp b/example/buffer-composition/buffer_composition.cpp index 8f39e6c9..ed1f17fc 100644 --- a/example/buffer-composition/buffer_composition.cpp +++ b/example/buffer-composition/buffer_composition.cpp @@ -13,7 +13,7 @@ #include #include -using namespace boost::capy; +namespace capy = boost::capy; void demonstrate_single_buffers() { @@ -25,9 +25,9 @@ void demonstrate_single_buffers() std::vector vec = {'V', 'e', 'c', 't', 'o', 'r'}; // make_buffer creates buffer views (no copies) - auto str_buf = make_buffer(str); // mutable_buffer - auto arr_buf = make_buffer(arr, sizeof(arr) - 1); // mutable_buffer - Exclude null terminator - auto vec_buf = make_buffer(vec); // mutable_buffer + auto str_buf = capy::make_buffer(str); // mutable_buffer + auto arr_buf = capy::make_buffer(arr, sizeof(arr) - 1); // mutable_buffer - Exclude null terminator + auto vec_buf = capy::make_buffer(vec); // mutable_buffer std::cout << "String buffer: " << str_buf.size() << " bytes\n"; std::cout << "Array buffer: " << arr_buf.size() << " bytes\n"; @@ -38,19 +38,19 @@ void demonstrate_buffer_pair() { std::cout << "\n=== Buffer Pair (Scatter/Gather) ===\n\n"; - // const_buffer_pair is std::array + // capy::const_buffer_pair is std::array std::string header = "Content-Type: text/plain\r\n\r\n"; std::string body = "Hello, World!"; - const_buffer_pair message = {{ - make_buffer(header), - make_buffer(body) + capy::const_buffer_pair message = {{ + capy::make_buffer(header), + capy::make_buffer(body) }}; // Calculate total size - std::size_t total = buffer_size(message); + std::size_t total = capy::buffer_size(message); std::cout << "Total message size: " << total << " bytes\n"; - std::cout << "Buffer count: " << buffer_length(message) << "\n"; + std::cout << "Buffer count: " << capy::buffer_length(message) << "\n"; // Iterate through buffers std::cout << "\nBuffer contents:\n"; @@ -73,17 +73,17 @@ void demonstrate_buffer_array() std::string empty_line = "\r\n"; std::string body = R"({"status":"ok"})"; - std::array http_response = {{ - make_buffer(status), - make_buffer(content_type), - make_buffer(server), - make_buffer(empty_line), - make_buffer(body) + std::array http_response = {{ + capy::make_buffer(status), + capy::make_buffer(content_type), + capy::make_buffer(server), + capy::make_buffer(empty_line), + capy::make_buffer(body) }}; - std::size_t total = buffer_size(http_response); + std::size_t total = capy::buffer_size(http_response); std::cout << "HTTP response size: " << total << " bytes\n"; - std::cout << "Buffer count: " << buffer_length(http_response) << "\n"; + std::cout << "Buffer count: " << capy::buffer_length(http_response) << "\n"; // In real code with streams: // co_await write(stream, http_response); @@ -98,13 +98,13 @@ void demonstrate_mutable_buffers() char buf1[64]; char buf2[64]; - mutable_buffer_pair recv_buffers = {{ - mutable_buffer(buf1, sizeof(buf1)), - mutable_buffer(buf2, sizeof(buf2)) + capy::mutable_buffer_pair recv_buffers = {{ + capy::mutable_buffer(buf1, sizeof(buf1)), + capy::mutable_buffer(buf2, sizeof(buf2)) }}; - std::cout << "Prepared " << buffer_length(recv_buffers) - << " buffers with " << buffer_size(recv_buffers) + std::cout << "Prepared " << capy::buffer_length(recv_buffers) + << " buffers with " << capy::buffer_size(recv_buffers) << " bytes total capacity\n"; // In real code: diff --git a/example/custom-dynamic-buffer/custom_dynamic_buffer.cpp b/example/custom-dynamic-buffer/custom_dynamic_buffer.cpp index accf86ce..995ce7ae 100644 --- a/example/custom-dynamic-buffer/custom_dynamic_buffer.cpp +++ b/example/custom-dynamic-buffer/custom_dynamic_buffer.cpp @@ -17,7 +17,7 @@ #include #include -using namespace boost::capy; +namespace capy = boost::capy; // Custom dynamic buffer with statistics tracking class tracked_buffer @@ -42,9 +42,9 @@ class tracked_buffer // === DynamicBuffer interface === // Consumer: readable data - const_buffer data() const noexcept + capy::const_buffer data() const noexcept { - return const_buffer( + return capy::const_buffer( storage_.data() + read_pos_, write_pos_ - read_pos_); } @@ -66,7 +66,7 @@ class tracked_buffer } // Producer: prepare space for writing - mutable_buffer prepare(std::size_t n) + capy::mutable_buffer prepare(std::size_t n) { total_prepared_ += n; @@ -84,7 +84,7 @@ class tracked_buffer if (required > storage_.size()) storage_.resize(required); - return mutable_buffer( + return capy::mutable_buffer( storage_.data() + write_pos_, n); } @@ -137,7 +137,7 @@ class tracked_buffer }; // Demonstrate using the custom buffer -task<> read_into_tracked_buffer(test::stream& stream, tracked_buffer& buffer) +capy::task<> read_into_tracked_buffer(capy::test::stream& stream, tracked_buffer& buffer) { // Read data in chunks while (true) @@ -146,7 +146,7 @@ task<> read_into_tracked_buffer(test::stream& stream, tracked_buffer& buffer) // ec: std::error_code, n: std::size_t auto [ec, n] = co_await stream.read_some(space); - if (ec == cond::eof) + if (ec == capy::cond::eof) break; if (ec) @@ -163,7 +163,7 @@ void demo_tracked_buffer() { std::cout << "=== Tracked Buffer Demo ===\n\n"; - auto [reader, writer] = test::make_stream_pair(); + auto [reader, writer] = capy::test::make_stream_pair(); writer.provide("Hello, "); writer.provide("World! "); writer.provide("This is a test of the custom buffer.\n"); @@ -171,7 +171,7 @@ void demo_tracked_buffer() tracked_buffer buffer; - test::run_blocking()(read_into_tracked_buffer(reader, buffer)); + capy::test::run_blocking()(read_into_tracked_buffer(reader, buffer)); std::cout << "\nFinal buffer contents: "; auto data = buffer.data(); // const_buffer diff --git a/example/custom-executor/custom_executor.cpp b/example/custom-executor/custom_executor.cpp index f5288277..0f6564fb 100644 --- a/example/custom-executor/custom_executor.cpp +++ b/example/custom-executor/custom_executor.cpp @@ -20,12 +20,12 @@ #include #include -using namespace boost::capy; +namespace capy = boost::capy; // A minimal single-threaded execution context. // Demonstrates how to satisfy the Executor concept // for any custom scheduling system. -class run_loop : public execution_context +class run_loop : public capy::execution_context { std::queue> queue_; std::thread::id owner_; @@ -85,7 +85,7 @@ class run_loop::executor_type public: executor_type() = default; - execution_context& context() const noexcept + capy::execution_context& context() const noexcept { return *loop_; } @@ -121,19 +121,19 @@ run_loop::get_executor() noexcept } // Verify the concept is satisfied -static_assert(Executor); +static_assert(capy::Executor); -task compute(int x) +capy::task compute(int x) { std::cout << " computing " << x << " * " << x << "\n"; co_return x * x; } -task<> run_tasks() +capy::task<> run_tasks() { std::cout << "Launching 3 tasks with when_all...\n"; - auto [a, b, c] = co_await when_all( + auto [a, b, c] = co_await capy::when_all( compute(3), compute(7), compute(11)); @@ -148,7 +148,7 @@ int main() run_loop loop; // Launch using run_async, just like with thread_pool - run_async(loop.get_executor())(run_tasks()); + capy::run_async(loop.get_executor())(run_tasks()); // Drive the loop — all coroutines execute here std::cout << "Running event loop on main thread...\n"; diff --git a/example/echo-server-corosio/echo_server.cpp b/example/echo-server-corosio/echo_server.cpp index 0417cb8e..29addc25 100644 --- a/example/echo-server-corosio/echo_server.cpp +++ b/example/echo-server-corosio/echo_server.cpp @@ -19,22 +19,22 @@ #include namespace corosio = boost::corosio; -using namespace boost::capy; +namespace capy = boost::capy; -task<> echo_session(corosio::tcp_socket sock) +capy::task<> echo_session(corosio::tcp_socket sock) { char buf[1024]; for (;;) { auto [ec, n] = co_await sock.read_some( - mutable_buffer(buf, sizeof(buf))); + capy::mutable_buffer(buf, sizeof(buf))); if (ec) break; - auto [wec, wn] = co_await write( - sock, const_buffer(buf, n)); + auto [wec, wn] = co_await capy::write( + sock, capy::const_buffer(buf, n)); if (wec) break; @@ -43,7 +43,7 @@ task<> echo_session(corosio::tcp_socket sock) sock.close(); } -task<> accept_loop( +capy::task<> accept_loop( corosio::tcp_acceptor& acc, corosio::io_context& ioc) { @@ -69,7 +69,7 @@ task<> accept_loop( std::cout << remote.v6_address(); std::cout << ":" << remote.port() << "\n"; - run_async(ioc.get_executor())( + capy::run_async(ioc.get_executor())( echo_session(std::move(peer))); } } @@ -83,7 +83,7 @@ int main(int argc, char* argv[]) corosio::io_context ioc; corosio::tcp_acceptor acc(ioc, corosio::endpoint(port)); - run_async(ioc.get_executor())( + capy::run_async(ioc.get_executor())( accept_loop(acc, ioc)); ioc.run(); diff --git a/example/hello-task/hello_task.cpp b/example/hello-task/hello_task.cpp index e6aa44ce..843ca46c 100644 --- a/example/hello-task/hello_task.cpp +++ b/example/hello-task/hello_task.cpp @@ -10,9 +10,9 @@ #include #include -using namespace boost::capy; +namespace capy = boost::capy; -task<> say_hello() +capy::task<> say_hello() { std::cout << "Hello from Capy!\n"; co_return; @@ -20,7 +20,7 @@ task<> say_hello() int main() { - thread_pool pool; - run_async(pool.get_executor())(say_hello()); + capy::thread_pool pool; + capy::run_async(pool.get_executor())(say_hello()); return 0; } diff --git a/example/mock-stream-testing/mock_stream_testing.cpp b/example/mock-stream-testing/mock_stream_testing.cpp index 89b4bb58..04e66e12 100644 --- a/example/mock-stream-testing/mock_stream_testing.cpp +++ b/example/mock-stream-testing/mock_stream_testing.cpp @@ -16,24 +16,24 @@ #include #include -using namespace boost::capy; +namespace capy = boost::capy; // A simple protocol: read until newline, echo back uppercase // Takes any_stream& so the function is transport-independent -task echo_line_uppercase(any_stream& stream) +capy::task echo_line_uppercase(capy::any_stream& stream) { std::string line; char c; - + // Read character by character until newline while (true) { // ec: std::error_code, n: std::size_t - auto [ec, n] = co_await stream.read_some(mutable_buffer(&c, 1)); - + auto [ec, n] = co_await stream.read_some(capy::mutable_buffer(&c, 1)); + if (ec) { - if (ec == cond::eof) + if (ec == capy::cond::eof) break; co_return false; } @@ -52,7 +52,7 @@ task echo_line_uppercase(any_stream& stream) { // wec: std::error_code, wn: std::size_t auto [wec, wn] = co_await stream.write_some( - const_buffer(line.data() + written, line.size() - written)); + capy::const_buffer(line.data() + written, line.size() - written)); if (wec) co_return false; @@ -67,13 +67,13 @@ void test_happy_path() { std::cout << "Test: happy path\n"; - auto [a, b] = test::make_stream_pair(); + auto [a, b] = capy::test::make_stream_pair(); b.provide("hello\n"); - - any_stream stream{&a}; // any_stream - + + capy::any_stream stream{&a}; // any_stream + bool result = false; // bool - test::run_blocking([&](bool r) { result = r; })(echo_line_uppercase(stream)); + capy::test::run_blocking([&](bool r) { result = r; })(echo_line_uppercase(stream)); assert(result == true); assert(b.data() == "HELLO\n"); @@ -85,14 +85,14 @@ void test_partial_reads() { std::cout << "Test: partial reads (1 byte at a time)\n"; - auto [a, b] = test::make_stream_pair(); + auto [a, b] = capy::test::make_stream_pair(); a.set_max_read_size(1); b.provide("hi\n"); - - any_stream stream{&a}; // any_stream - + + capy::any_stream stream{&a}; // any_stream + bool result = false; // bool - test::run_blocking([&](bool r) { result = r; })(echo_line_uppercase(stream)); + capy::test::run_blocking([&](bool r) { result = r; })(echo_line_uppercase(stream)); assert(result == true); assert(b.data() == "HI\n"); @@ -109,12 +109,12 @@ void test_with_error_injection() // fuse::armed runs the test repeatedly, failing at each // operation point until all paths are covered - test::fuse f; // test::fuse - auto r = f.armed([&](test::fuse&) -> task<> { // fuse::result - auto [a, b] = test::make_stream_pair(f); + capy::test::fuse f; // test::fuse + auto r = f.armed([&](capy::test::fuse&) -> capy::task<> { // fuse::result + auto [a, b] = capy::test::make_stream_pair(f); b.provide("test\n"); - any_stream stream{&a}; // any_stream + capy::any_stream stream{&a}; // any_stream // Run the protocol - fuse will inject errors at each step bool result = co_await echo_line_uppercase(stream); // bool diff --git a/example/parallel-fetch/parallel_fetch.cpp b/example/parallel-fetch/parallel_fetch.cpp index 3528714d..54f20887 100644 --- a/example/parallel-fetch/parallel_fetch.cpp +++ b/example/parallel-fetch/parallel_fetch.cpp @@ -12,36 +12,36 @@ #include #include -using namespace boost::capy; +namespace capy = boost::capy; // Simulated async operations -task fetch_user_id(std::string username) +capy::task fetch_user_id(std::string username) { std::cout << "Fetching user ID for: " << username << "\n"; // In real code: co_await http_get("/users/" + username); co_return static_cast(username.length()) * 100; // Fake ID } -task fetch_user_name(int id) +capy::task fetch_user_name(int id) { std::cout << "Fetching name for user ID: " << id << "\n"; co_return "User" + std::to_string(id); } -task fetch_order_count(int user_id) +capy::task fetch_order_count(int user_id) { std::cout << "Fetching order count for user: " << user_id << "\n"; co_return user_id / 10; // Fake count } -task fetch_account_balance(int user_id) +capy::task fetch_account_balance(int user_id) { std::cout << "Fetching balance for user: " << user_id << "\n"; co_return user_id * 1.5; // Fake balance } // Fetch all user data in parallel -task<> fetch_user_dashboard(std::string username) +capy::task<> fetch_user_dashboard(std::string username) { std::cout << "\n=== Fetching dashboard for: " << username << " ===\n"; @@ -52,7 +52,7 @@ task<> fetch_user_dashboard(std::string username) // Now fetch all user data in parallel std::cout << "Starting parallel fetches...\n"; // name: std::string, orders: int, balance: double - auto [name, orders, balance] = co_await when_all( + auto [name, orders, balance] = co_await capy::when_all( fetch_user_name(user_id), fetch_order_count(user_id), fetch_account_balance(user_id) @@ -65,24 +65,24 @@ task<> fetch_user_dashboard(std::string username) } // Example with void tasks -task<> log_access(std::string resource) +capy::task<> log_access(std::string resource) { std::cout << "Logging access to: " << resource << "\n"; co_return; } -task<> update_metrics(std::string metric) +capy::task<> update_metrics(std::string metric) { std::cout << "Updating metric: " << metric << "\n"; co_return; } -task fetch_with_side_effects() +capy::task fetch_with_side_effects() { std::cout << "\n=== Fetch with side effects ===\n"; // void tasks don't contribute to result tuple - std::tuple results = co_await when_all( + std::tuple results = co_await capy::when_all( log_access("api/data"), // void - no result update_metrics("api_calls"), // void - no result fetch_user_name(42) // returns string @@ -94,7 +94,7 @@ task fetch_with_side_effects() } // Error handling example -task might_fail(bool should_fail, std::string name) +capy::task might_fail(bool should_fail, std::string name) { std::cout << "Task " << name << " starting\n"; @@ -107,14 +107,14 @@ task might_fail(bool should_fail, std::string name) co_return 42; } -task<> demonstrate_error_handling() +capy::task<> demonstrate_error_handling() { std::cout << "\n=== Error handling ===\n"; try { // a: int, b: int, c: int - auto [a, b, c] = co_await when_all( + auto [a, b, c] = co_await capy::when_all( might_fail(false, "A"), might_fail(true, "B"), // This one fails might_fail(false, "C") @@ -131,17 +131,17 @@ task<> demonstrate_error_handling() int main() { - thread_pool pool; + capy::thread_pool pool; std::latch done(3); // std::latch - wait for 3 tasks - + // Completion handlers signal the latch when each task finishes // Use generic lambda to accept any result type (or no result for task) auto on_complete = [&done](auto&&...) { done.count_down(); }; auto on_error = [&done](std::exception_ptr) { done.count_down(); }; - - run_async(pool.get_executor(), on_complete, on_error)(fetch_user_dashboard("alice")); - run_async(pool.get_executor(), on_complete, on_error)(fetch_with_side_effects()); - run_async(pool.get_executor(), on_complete, on_error)(demonstrate_error_handling()); + + capy::run_async(pool.get_executor(), on_complete, on_error)(fetch_user_dashboard("alice")); + capy::run_async(pool.get_executor(), on_complete, on_error)(fetch_with_side_effects()); + capy::run_async(pool.get_executor(), on_complete, on_error)(demonstrate_error_handling()); done.wait(); // Block until all tasks complete return 0; diff --git a/example/parallel-tasks/parallel_tasks.cpp b/example/parallel-tasks/parallel_tasks.cpp index 9d5f7a36..7e49927c 100644 --- a/example/parallel-tasks/parallel_tasks.cpp +++ b/example/parallel-tasks/parallel_tasks.cpp @@ -21,10 +21,10 @@ #include #include -using namespace boost::capy; +namespace capy = boost::capy; // Sum integers in [lo, hi) -task partial_sum(int lo, int hi) +capy::task partial_sum(int lo, int hi) { std::ostringstream oss; oss << " range [" << lo << ", " << hi @@ -43,7 +43,7 @@ int main() constexpr int num_tasks = 4; constexpr int chunk = total / num_tasks; - thread_pool pool(num_tasks); + capy::thread_pool pool(num_tasks); std::latch done(1); auto on_complete = [&done](auto&&...) { done.count_down(); }; @@ -58,11 +58,11 @@ int main() done.count_down(); }; - auto compute = [&]() -> task<> { + auto compute = [&]() -> capy::task<> { std::cout << "Dispatching " << num_tasks << " parallel tasks...\n"; - auto [s0, s1, s2, s3] = co_await when_all( + auto [s0, s1, s2, s3] = co_await capy::when_all( partial_sum(0 * chunk, 1 * chunk), partial_sum(1 * chunk, 2 * chunk), partial_sum(2 * chunk, 3 * chunk), @@ -80,7 +80,7 @@ int main() << " (expected " << expected << ")\n"; }; - run_async(pool.get_executor(), on_complete, on_error)(compute()); + capy::run_async(pool.get_executor(), on_complete, on_error)(compute()); done.wait(); return 0; diff --git a/example/producer-consumer/producer_consumer.cpp b/example/producer-consumer/producer_consumer.cpp index 5dca0e52..4a9c7e1e 100644 --- a/example/producer-consumer/producer_consumer.cpp +++ b/example/producer-consumer/producer_consumer.cpp @@ -19,21 +19,21 @@ #include #include -using namespace boost::capy; +namespace capy = boost::capy; int main() { - thread_pool pool; // thread_pool - strand s{pool.get_executor()}; // strand - serializes execution - std::latch done(1); // std::latch - wait for completion + capy::thread_pool pool; + capy::strand s{pool.get_executor()}; + std::latch done(1); - auto on_complete = [&done](auto&&...) { done.count_down(); }; // lambda - auto on_error = [&done](std::exception_ptr) { done.count_down(); }; // lambda + auto on_complete = [&done](auto&&...) { done.count_down(); }; + auto on_error = [&done](std::exception_ptr) { done.count_down(); }; - async_event data_ready; // async_event - int shared_value = 0; // int + capy::async_event data_ready; + int shared_value = 0; - auto producer = [&]() -> task<> { + auto producer = [&]() -> capy::task<> { std::cout << "Producer: preparing data...\n"; shared_value = 42; std::cout << "Producer: data ready, signaling\n"; @@ -41,7 +41,7 @@ int main() co_return; }; - auto consumer = [&]() -> task<> { + auto consumer = [&]() -> capy::task<> { std::cout << "Consumer: waiting for data...\n"; auto [ec] = co_await data_ready.wait(); (void)ec; @@ -52,11 +52,11 @@ int main() // Run both tasks concurrently using when_all, through a strand. // The strand serializes execution, ensuring thread-safe access // to the shared async_event and shared_value. - auto run_both = [&]() -> task<> { - co_await when_all(producer(), consumer()); + auto run_both = [&]() -> capy::task<> { + co_await capy::when_all(producer(), consumer()); }; - run_async(s, on_complete, on_error)(run_both()); + capy::run_async(s, on_complete, on_error)(run_both()); done.wait(); // Block until tasks complete return 0; diff --git a/example/strand-serialization/strand_serialization.cpp b/example/strand-serialization/strand_serialization.cpp index df837958..a806dc40 100644 --- a/example/strand-serialization/strand_serialization.cpp +++ b/example/strand-serialization/strand_serialization.cpp @@ -19,15 +19,15 @@ #include #include -using namespace boost::capy; +namespace capy = boost::capy; int main() { constexpr int num_coroutines = 10; constexpr int increments_per_coro = 1000; - thread_pool pool(4); - strand s{pool.get_executor()}; + capy::thread_pool pool(4); + capy::strand s{pool.get_executor()}; std::latch done(1); auto on_complete = [&done](auto&&...) { done.count_down(); }; @@ -46,7 +46,7 @@ int main() // Each coroutine increments the shared counter without locks. // The strand ensures only one coroutine runs at a time. - auto increment = [&](int id) -> task<> { + auto increment = [&](int id) -> capy::task<> { for (int i = 0; i < increments_per_coro; ++i) ++counter; std::cout << "Coroutine " << id @@ -54,15 +54,15 @@ int main() co_return; }; - auto run_all = [&]() -> task<> { - co_await when_all( + auto run_all = [&]() -> capy::task<> { + co_await capy::when_all( increment(0), increment(1), increment(2), increment(3), increment(4), increment(5), increment(6), increment(7), increment(8), increment(9)); }; - run_async(s, on_complete, on_error)(run_all()); + capy::run_async(s, on_complete, on_error)(run_all()); done.wait(); int expected = num_coroutines * increments_per_coro; diff --git a/example/stream-pipeline/stream_pipeline.cpp b/example/stream-pipeline/stream_pipeline.cpp index 3b816923..a81d1b1a 100644 --- a/example/stream-pipeline/stream_pipeline.cpp +++ b/example/stream-pipeline/stream_pipeline.cpp @@ -33,7 +33,7 @@ #include #include -using namespace boost::capy; +namespace capy = boost::capy; //------------------------------------------------------------------------------ // @@ -46,13 +46,13 @@ using namespace boost::capy; class uppercase_transform { - any_buffer_source* source_; // any_buffer_source* + capy::any_buffer_source* source_; // any_buffer_source* std::vector buffer_; // std::vector - transformed data std::size_t consumed_ = 0; // std::size_t - bytes consumed by downstream bool exhausted_ = false; // bool - upstream exhausted - + public: - explicit uppercase_transform(any_buffer_source& source) + explicit uppercase_transform(capy::any_buffer_source& source) : source_(&source) { } @@ -71,63 +71,63 @@ class uppercase_transform } // BufferSource::pull - returns task<> to enable co_await on upstream - io_task> - pull(std::span dest) + capy::io_task> + pull(std::span dest) { // Already have unconsumed data? if (consumed_ < buffer_.size()) { if (dest.empty()) - co_return {std::error_code{}, std::span{}}; - - dest[0] = const_buffer( + co_return {std::error_code{}, std::span{}}; + + dest[0] = capy::const_buffer( buffer_.data() + consumed_, buffer_.size() - consumed_); co_return {std::error_code{}, dest.first(1)}; } - + // Upstream exhausted? if (exhausted_) - co_return {error::eof, std::span{}}; - + co_return {capy::error::eof, std::span{}}; + // Pull from upstream buffer_.clear(); consumed_ = 0; - - const_buffer upstream[8]; // const_buffer[8] + + capy::const_buffer upstream[8]; // const_buffer[8] // ec: std::error_code, bufs: std::span auto [ec, bufs] = co_await source_->pull(upstream); - - if (ec == cond::eof) + + if (ec == capy::cond::eof) { exhausted_ = true; - co_return {error::eof, std::span{}}; + co_return {capy::error::eof, std::span{}}; } if (ec) - co_return {ec, std::span{}}; - + co_return {ec, std::span{}}; + // Transform: uppercase each byte for (auto const& buf : bufs) // const_buffer const& { auto const* data = static_cast(buf.data()); // char const* auto size = buf.size(); // std::size_t - + for (std::size_t i = 0; i < size; ++i) { buffer_.push_back(static_cast( std::toupper(static_cast(data[i])))); } } - + // Consume from upstream - source_->consume(buffer_size(bufs)); - + source_->consume(capy::buffer_size(bufs)); + // Return transformed data if (dest.empty() || buffer_.empty()) - co_return {std::error_code{}, std::span{}}; - - dest[0] = const_buffer(buffer_.data(), buffer_.size()); + co_return {std::error_code{}, std::span{}}; + + dest[0] = capy::const_buffer(buffer_.data(), buffer_.size()); co_return {std::error_code{}, dest.first(1)}; } }; @@ -143,15 +143,15 @@ class uppercase_transform class line_numbering_transform { - any_buffer_source* source_; // any_buffer_source* + capy::any_buffer_source* source_; // any_buffer_source* std::string buffer_; // std::string - transformed data std::size_t consumed_ = 0; // std::size_t - bytes consumed by downstream std::size_t line_num_ = 1; // std::size_t - current line number bool at_line_start_ = true; // bool - are we at start of a line? bool exhausted_ = false; // bool - upstream exhausted - + public: - explicit line_numbering_transform(any_buffer_source& source) + explicit line_numbering_transform(capy::any_buffer_source& source) : source_(&source) { } @@ -170,48 +170,48 @@ class line_numbering_transform } // BufferSource::pull - returns task<> to enable co_await on upstream - io_task> - pull(std::span dest) + capy::io_task> + pull(std::span dest) { // Already have unconsumed data? if (consumed_ < buffer_.size()) { if (dest.empty()) - co_return {std::error_code{}, std::span{}}; - - dest[0] = const_buffer( + co_return {std::error_code{}, std::span{}}; + + dest[0] = capy::const_buffer( buffer_.data() + consumed_, buffer_.size() - consumed_); co_return {std::error_code{}, dest.first(1)}; } - + // Upstream exhausted? if (exhausted_) - co_return {error::eof, std::span{}}; - + co_return {capy::error::eof, std::span{}}; + // Pull from upstream buffer_.clear(); consumed_ = 0; - - const_buffer upstream[8]; // const_buffer[8] + + capy::const_buffer upstream[8]; // const_buffer[8] // ec: std::error_code, bufs: std::span auto [ec, bufs] = co_await source_->pull(upstream); - - if (ec == cond::eof) + + if (ec == capy::cond::eof) { exhausted_ = true; - co_return {error::eof, std::span{}}; + co_return {capy::error::eof, std::span{}}; } if (ec) - co_return {ec, std::span{}}; - + co_return {ec, std::span{}}; + // Transform: add line numbers for (auto const& buf : bufs) // const_buffer const& { auto const* data = static_cast(buf.data()); // char const* auto size = buf.size(); // std::size_t - + for (std::size_t i = 0; i < size; ++i) { if (at_line_start_) @@ -224,15 +224,15 @@ class line_numbering_transform at_line_start_ = true; } } - + // Consume from upstream - source_->consume(buffer_size(bufs)); - + source_->consume(capy::buffer_size(bufs)); + // Return transformed data if (dest.empty() || buffer_.empty()) - co_return {std::error_code{}, std::span{}}; - - dest[0] = const_buffer(buffer_.data(), buffer_.size()); + co_return {std::error_code{}, std::span{}}; + + dest[0] = capy::const_buffer(buffer_.data(), buffer_.size()); co_return {std::error_code{}, dest.first(1)}; } }; @@ -243,22 +243,22 @@ class line_numbering_transform // //------------------------------------------------------------------------------ -task transfer(any_buffer_source& source, any_write_sink& sink) +capy::task transfer(capy::any_buffer_source& source, capy::any_write_sink& sink) { std::size_t total = 0; // std::size_t - const_buffer bufs[8]; // const_buffer[8] - + capy::const_buffer bufs[8]; // const_buffer[8] + for (;;) { // ec: std::error_code, spans: std::span auto [ec, spans] = co_await source.pull(bufs); - - if (ec == cond::eof) + + if (ec == capy::cond::eof) break; if (ec) throw std::system_error(ec); - + // Write each buffer to sink for (auto const& buf : spans) // const_buffer const& { @@ -268,15 +268,15 @@ task transfer(any_buffer_source& source, any_write_sink& sink) throw std::system_error(wec); total += n; } - + // Consume what we read - source.consume(buffer_size(spans)); + source.consume(capy::buffer_size(spans)); } - - io_result<> eof_result = co_await sink.write_eof(); + + capy::io_result<> eof_result = co_await sink.write_eof(); if (eof_result.ec) throw std::system_error(eof_result.ec); - + co_return total; } @@ -295,8 +295,8 @@ void demo_pipeline() std::cout << "Input:\n" << input << "\n"; // Create mock source with input data - test::fuse f; // test::fuse - test::buffer_source source(f); // test::buffer_source + capy::test::fuse f; // test::fuse + capy::test::buffer_source source(f); // test::buffer_source source.provide(input); // Build the pipeline using type-erased buffer sources: @@ -305,26 +305,26 @@ void demo_pipeline() // Stage 1: Wrap raw source as any_buffer_source. // Using pointer construction (&source) for reference semantics - the // wrapper does not take ownership, so source must outlive src. - any_buffer_source src{&source}; // any_buffer_source + capy::any_buffer_source src{&source}; // any_buffer_source // Stage 2: Uppercase transform wraps src. // Again using pointer construction so upper_src references upper // without taking ownership. uppercase_transform upper{src}; // uppercase_transform - any_buffer_source upper_src{&upper}; // any_buffer_source + capy::any_buffer_source upper_src{&upper}; // any_buffer_source // Stage 3: Line numbering transform wraps upper_src. line_numbering_transform numbered{upper_src}; // line_numbering_transform - any_buffer_source numbered_src{&numbered}; // any_buffer_source + capy::any_buffer_source numbered_src{&numbered}; // any_buffer_source // Create sink to collect output. // Pointer construction ensures sink outlives dst. - test::write_sink sink(f); // test::write_sink - any_write_sink dst{&sink}; // any_write_sink + capy::test::write_sink sink(f); // test::write_sink + capy::any_write_sink dst{&sink}; // any_write_sink // Run the pipeline std::size_t bytes = 0; // std::size_t - test::run_blocking([&](std::size_t n) { bytes = n; })( + capy::test::run_blocking([&](std::size_t n) { bytes = n; })( transfer(numbered_src, dst)); std::cout << "Output (" << bytes << " bytes):\n"; diff --git a/example/timeout-cancellation/timeout_cancellation.cpp b/example/timeout-cancellation/timeout_cancellation.cpp index 22298c12..b1c791bb 100644 --- a/example/timeout-cancellation/timeout_cancellation.cpp +++ b/example/timeout-cancellation/timeout_cancellation.cpp @@ -16,12 +16,12 @@ #include #include -using namespace boost::capy; +namespace capy = boost::capy; // A slow operation that respects cancellation -task slow_fetch(int steps) +capy::task slow_fetch(int steps) { - auto token = co_await this_coro::stop_token; // std::stop_token + auto token = co_await capy::this_coro::stop_token; // std::stop_token std::string result; for (int i = 0; i < steps; ++i) @@ -49,9 +49,9 @@ task slow_fetch(int steps) } // Run with timeout (conceptual - real implementation needs timer) -task> fetch_with_timeout() +capy::task> fetch_with_timeout() { - auto token = co_await this_coro::stop_token; // std::stop_token + auto token = co_await capy::this_coro::stop_token; // std::stop_token try { @@ -70,11 +70,11 @@ void demo_normal_completion() { std::cout << "Demo: Normal completion\n"; - thread_pool pool; + capy::thread_pool pool; std::stop_source source; std::latch done(1); // std::latch - wait for 1 task - - run_async(pool.get_executor(), source.get_token(), + + capy::run_async(pool.get_executor(), source.get_token(), [&done](std::optional result) { if (result) std::cout << "Result: " << *result << "\n"; @@ -84,20 +84,20 @@ void demo_normal_completion() }, [&done](std::exception_ptr) { done.count_down(); } )(fetch_with_timeout()); - + done.wait(); // Block until task completes } void demo_cancellation() { std::cout << "\nDemo: Cancellation after 2 steps\n"; - - thread_pool pool; + + capy::thread_pool pool; std::stop_source source; std::latch done(1); // std::latch - wait for 1 task - + // Launch the task - run_async(pool.get_executor(), source.get_token(), + capy::run_async(pool.get_executor(), source.get_token(), [&done](std::optional result) { if (result) std::cout << "Result: " << *result << "\n"; @@ -120,9 +120,9 @@ void demo_cancellation() } // Example: Manual stop token checking -task process_items(std::vector const& items) +capy::task process_items(std::vector const& items) { - auto token = co_await this_coro::stop_token; // std::stop_token + auto token = co_await capy::this_coro::stop_token; // std::stop_token int sum = 0; for (auto item : items) // int diff --git a/example/type-erased-echo/echo.cpp b/example/type-erased-echo/echo.cpp index fe04912b..d1f67872 100644 --- a/example/type-erased-echo/echo.cpp +++ b/example/type-erased-echo/echo.cpp @@ -15,28 +15,28 @@ namespace myapp { -using namespace boost::capy; +namespace capy = boost::capy; -task<> echo_session(any_stream& stream) +capy::task<> echo_session(capy::any_stream& stream) { char buffer[1024]; - + for (;;) { // Read some data // ec: std::error_code, n: std::size_t - auto [ec, n] = co_await stream.read_some(make_buffer(buffer)); - - if (ec == cond::eof) + auto [ec, n] = co_await stream.read_some(capy::make_buffer(buffer)); + + if (ec == capy::cond::eof) co_return; // Client closed connection - + if (ec) throw std::system_error(ec); - + // Echo it back // wec: std::error_code, wn: std::size_t - auto [wec, wn] = co_await write(stream, const_buffer(buffer, n)); - + auto [wec, wn] = co_await capy::write(stream, capy::const_buffer(buffer, n)); + if (wec) throw std::system_error(wec); } diff --git a/example/type-erased-echo/main.cpp b/example/type-erased-echo/main.cpp index 9d096bec..9b9cb8ce 100644 --- a/example/type-erased-echo/main.cpp +++ b/example/type-erased-echo/main.cpp @@ -14,19 +14,19 @@ #include #include -using namespace boost::capy; +namespace capy = boost::capy; void test_with_mock() { - auto [a, b] = test::make_stream_pair(); + auto [a, b] = capy::test::make_stream_pair(); b.provide("Hello, "); b.provide("World!\n"); b.close(); // Using pointer construction (&a) for reference semantics - the // wrapper does not take ownership, so a must outlive stream. - any_stream stream{&a}; // any_stream - test::run_blocking()(myapp::echo_session(stream)); + capy::any_stream stream{&a}; // any_stream + capy::test::run_blocking()(myapp::echo_session(stream)); std::cout << "Echo output: " << b.data() << "\n"; } diff --git a/example/when-any-cancellation/when_any_cancellation.cpp b/example/when-any-cancellation/when_any_cancellation.cpp index b2869072..8af437b6 100644 --- a/example/when-any-cancellation/when_any_cancellation.cpp +++ b/example/when-any-cancellation/when_any_cancellation.cpp @@ -32,14 +32,14 @@ #include #include -using namespace boost::capy; +namespace capy = boost::capy; // Simulates a data source that takes `steps` iterations to produce a result. // Each step checks the stop token so the task exits promptly when cancelled. -task fetch_from_source( +capy::task fetch_from_source( std::string name, int steps, int step_ms) { - auto token = co_await this_coro::stop_token; + auto token = co_await capy::this_coro::stop_token; for (int i = 0; i < steps; ++i) { @@ -62,14 +62,14 @@ task fetch_from_source( } // Race three sources — the fastest one wins, the rest get cancelled. -task<> race_data_sources() +capy::task<> race_data_sources() { std::cout << "=== Racing three data sources ===\n\n"; // source_a: 2 steps * 20ms = fast // source_b: 5 steps * 20ms = medium // source_c: 8 steps * 20ms = slow - auto result = co_await when_any( + auto result = co_await capy::when_any( fetch_from_source("source_a", 2, 20), fetch_from_source("source_b", 5, 20), fetch_from_source("source_c", 8, 20)); @@ -83,9 +83,9 @@ task<> race_data_sources() // A void task that loops until stopped. // Useful for background workers that run indefinitely. -task<> background_worker(std::string name, int step_ms) +capy::task<> background_worker(std::string name, int step_ms) { - auto token = co_await this_coro::stop_token; + auto token = co_await capy::this_coro::stop_token; int iteration = 0; while (!token.stop_requested()) @@ -101,7 +101,7 @@ task<> background_worker(std::string name, int step_ms) } // A task that finishes after a fixed delay (acts as a timeout). -task<> timeout(int ms) +capy::task<> timeout(int ms) { std::this_thread::sleep_for( std::chrono::milliseconds(ms)); @@ -112,11 +112,11 @@ task<> timeout(int ms) // Use when_any with a timeout to bound the lifetime of a background worker. // With void tasks, the variadic overload returns variant. // Use .index() to know which task completed first. -task<> timeout_a_worker() +capy::task<> timeout_a_worker() { std::cout << "\n=== Timeout a background worker ===\n\n"; - auto result = co_await when_any( + auto result = co_await capy::when_any( background_worker("worker", 30), timeout(100)); @@ -127,23 +127,23 @@ task<> timeout_a_worker() } // Race a vector of tasks (homogeneous range overload). -task<> race_vector_of_sources() +capy::task<> race_vector_of_sources() { std::cout << "\n=== Racing a vector of sources ===\n\n"; - std::vector> tasks; + std::vector> tasks; tasks.push_back(fetch_from_source("replica_1", 6, 20)); tasks.push_back(fetch_from_source("replica_2", 3, 20)); tasks.push_back(fetch_from_source("replica_3", 5, 20)); - auto [winner_index, value] = co_await when_any(std::move(tasks)); + auto [winner_index, value] = co_await capy::when_any(std::move(tasks)); std::cout << "\nFastest replica: index=" << winner_index << " value=\"" << value << "\"\n"; } // Run all demos sequentially so output is readable. -task<> run_demos() +capy::task<> run_demos() { co_await race_data_sources(); co_await timeout_a_worker(); @@ -152,10 +152,10 @@ task<> run_demos() int main() { - thread_pool pool; + capy::thread_pool pool; std::latch done(1); - run_async(pool.get_executor(), + capy::run_async(pool.get_executor(), [&done](auto&&...) { done.count_down(); }, [&done](std::exception_ptr ep) { try { std::rethrow_exception(ep); }