// // experimental/basic_concurrent_channel.hpp // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ // // Copyright (c) 2003-2023 Christopher M. Kohlhoff (chris at kohlhoff dot com) // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) // #ifndef ASIO_EXPERIMENTAL_BASIC_CONCURRENT_CHANNEL_HPP #define ASIO_EXPERIMENTAL_BASIC_CONCURRENT_CHANNEL_HPP #if defined(_MSC_VER) && (_MSC_VER >= 1200) # pragma once #endif // defined(_MSC_VER) && (_MSC_VER >= 1200) #include "asio/detail/config.hpp" #include "asio/detail/non_const_lvalue.hpp" #include "asio/detail/mutex.hpp" #include "asio/execution/executor.hpp" #include "asio/execution_context.hpp" #include "asio/experimental/detail/channel_send_functions.hpp" #include "asio/experimental/detail/channel_service.hpp" #include "asio/detail/push_options.hpp" namespace asio { namespace experimental { namespace detail { } // namespace detail /// A channel for messages. /** * The basic_concurrent_channel class template is used for sending messages * between different parts of the same application. A message is * defined as a collection of arguments to be passed to a completion handler, * and the set of messages supported by a channel is specified by its @c Traits * and Signatures... template parameters. Messages may be sent and * received using asynchronous or non-blocking synchronous operations. * * Unless customising the traits, applications will typically use the @c * experimental::concurrent_channel alias template. For example: * @code void send_loop(int i, steady_timer& timer, * concurrent_channel& ch) * { * if (i < 10) * { * timer.expires_after(chrono::seconds(1)); * timer.async_wait( * [i, &timer, &ch](error_code error) * { * if (!error) * { * ch.async_send(error_code(), i, * [i, &timer, &ch](error_code error) * { * if (!error) * { * send_loop(i + 1, timer, ch); * } * }); * } * }); * } * else * { * ch.close(); * } * } * * void receive_loop(concurent_channel& ch) * { * ch.async_receive( * [&ch](error_code error, int i) * { * if (!error) * { * std::cout << "Received " << i << "\n"; * receive_loop(ch); * } * }); * } @endcode * * @par Thread Safety * @e Distinct @e objects: Safe.@n * @e Shared @e objects: Safe. * * The basic_concurrent_channel class template is thread-safe, and would * typically be used for passing messages between application code that run on * different threads. Consider using @ref basic_channel, and its alias template * @c experimental::channel, to pass messages between code running in a single * thread or on the same strand. */ template class basic_concurrent_channel #if !defined(GENERATING_DOCUMENTATION) : public detail::channel_send_functions< basic_concurrent_channel, Executor, Signatures...> #endif // !defined(GENERATING_DOCUMENTATION) { private: class initiate_async_send; class initiate_async_receive; typedef detail::channel_service service_type; typedef typename service_type::template implementation_type< Traits, Signatures...>::payload_type payload_type; template auto do_async_receive(detail::channel_payload*, ASIO_MOVE_ARG(CompletionToken) token) -> decltype( async_initiate( declval(), token)) { return async_initiate( initiate_async_receive(this), token); } public: /// The type of the executor associated with the channel. typedef Executor executor_type; /// Rebinds the channel type to another executor. template struct rebind_executor { /// The channel type when rebound to the specified executor. typedef basic_concurrent_channel other; }; /// The traits type associated with the channel. typedef typename Traits::template rebind::other traits_type; /// Construct a basic_concurrent_channel. /** * This constructor creates and channel. * * @param ex The I/O executor that the channel will use, by default, to * dispatch handlers for any asynchronous operations performed on the channel. * * @param max_buffer_size The maximum number of messages that may be buffered * in the channel. */ basic_concurrent_channel(const executor_type& ex, std::size_t max_buffer_size = 0) : service_(&asio::use_service( basic_concurrent_channel::get_context(ex))), impl_(), executor_(ex) { service_->construct(impl_, max_buffer_size); } /// Construct and open a basic_concurrent_channel. /** * This constructor creates and opens a channel. * * @param context An execution context which provides the I/O executor that * the channel will use, by default, to dispatch handlers for any asynchronous * operations performed on the channel. * * @param max_buffer_size The maximum number of messages that may be buffered * in the channel. */ template basic_concurrent_channel(ExecutionContext& context, std::size_t max_buffer_size = 0, typename constraint< is_convertible::value, defaulted_constraint >::type = defaulted_constraint()) : service_(&asio::use_service(context)), impl_(), executor_(context.get_executor()) { service_->construct(impl_, max_buffer_size); } #if defined(ASIO_HAS_MOVE) || defined(GENERATING_DOCUMENTATION) /// Move-construct a basic_concurrent_channel from another. /** * This constructor moves a channel from one object to another. * * @param other The other basic_concurrent_channel object from which the move * will occur. * * @note Following the move, the moved-from object is in the same state as if * constructed using the @c basic_concurrent_channel(const executor_type&) * constructor. */ basic_concurrent_channel(basic_concurrent_channel&& other) : service_(other.service_), executor_(other.executor_) { service_->move_construct(impl_, other.impl_); } /// Move-assign a basic_concurrent_channel from another. /** * This assignment operator moves a channel from one object to another. * Cancels any outstanding asynchronous operations associated with the target * object. * * @param other The other basic_concurrent_channel object from which the move * will occur. * * @note Following the move, the moved-from object is in the same state as if * constructed using the @c basic_concurrent_channel(const executor_type&) * constructor. */ basic_concurrent_channel& operator=(basic_concurrent_channel&& other) { if (this != &other) { service_->move_assign(impl_, *other.service_, other.impl_); executor_.~executor_type(); new (&executor_) executor_type(other.executor_); service_ = other.service_; } return *this; } // All channels have access to each other's implementations. template friend class basic_concurrent_channel; /// Move-construct a basic_concurrent_channel from another. /** * This constructor moves a channel from one object to another. * * @param other The other basic_concurrent_channel object from which the move * will occur. * * @note Following the move, the moved-from object is in the same state as if * constructed using the @c basic_concurrent_channel(const executor_type&) * constructor. */ template basic_concurrent_channel( basic_concurrent_channel&& other, typename constraint< is_convertible::value >::type = 0) : service_(other.service_), executor_(other.executor_) { service_->move_construct(impl_, other.impl_); } /// Move-assign a basic_concurrent_channel from another. /** * This assignment operator moves a channel from one object to another. * Cancels any outstanding asynchronous operations associated with the target * object. * * @param other The other basic_concurrent_channel object from which the move * will occur. * * @note Following the move, the moved-from object is in the same state as if * constructed using the @c basic_concurrent_channel(const executor_type&) * constructor. */ template typename constraint< is_convertible::value, basic_concurrent_channel& >::type operator=( basic_concurrent_channel&& other) { if (this != &other) { service_->move_assign(impl_, *other.service_, other.impl_); executor_.~executor_type(); new (&executor_) executor_type(other.executor_); service_ = other.service_; } return *this; } #endif // defined(ASIO_HAS_MOVE) || defined(GENERATING_DOCUMENTATION) /// Destructor. ~basic_concurrent_channel() { service_->destroy(impl_); } /// Get the executor associated with the object. const executor_type& get_executor() ASIO_NOEXCEPT { return executor_; } /// Get the capacity of the channel's buffer. std::size_t capacity() ASIO_NOEXCEPT { return service_->capacity(impl_); } /// Determine whether the channel is open. bool is_open() const ASIO_NOEXCEPT { return service_->is_open(impl_); } /// Reset the channel to its initial state. void reset() { service_->reset(impl_); } /// Close the channel. void close() { service_->close(impl_); } /// Cancel all asynchronous operations waiting on the channel. /** * All outstanding send operations will complete with the error * @c asio::experimental::error::channel_cancelled. Outstanding receive * operations complete with the result as determined by the channel traits. */ void cancel() { service_->cancel(impl_); } /// Determine whether a message can be received without blocking. bool ready() const ASIO_NOEXCEPT { return service_->ready(impl_); } #if defined(GENERATING_DOCUMENTATION) /// Try to send a message without blocking. /** * Fails if the buffer is full and there are no waiting receive operations. * * @returns @c true on success, @c false on failure. */ template bool try_send(ASIO_MOVE_ARG(Args)... args); /// Try to send a number of messages without blocking. /** * @returns The number of messages that were sent. */ template std::size_t try_send_n(std::size_t count, ASIO_MOVE_ARG(Args)... args); /// Asynchronously send a message. template auto async_send(ASIO_MOVE_ARG(Args)... args, ASIO_MOVE_ARG(CompletionToken) token); #endif // defined(GENERATING_DOCUMENTATION) /// Try to receive a message without blocking. /** * Fails if the buffer is full and there are no waiting receive operations. * * @returns @c true on success, @c false on failure. */ template bool try_receive(ASIO_MOVE_ARG(Handler) handler) { return service_->try_receive(impl_, ASIO_MOVE_CAST(Handler)(handler)); } /// Asynchronously receive a message. template auto async_receive( ASIO_MOVE_ARG(CompletionToken) token ASIO_DEFAULT_COMPLETION_TOKEN(Executor)) #if !defined(GENERATING_DOCUMENTATION) -> decltype( this->do_async_receive(static_cast(0), ASIO_MOVE_CAST(CompletionToken)(token))) #endif // !defined(GENERATING_DOCUMENTATION) { return this->do_async_receive(static_cast(0), ASIO_MOVE_CAST(CompletionToken)(token)); } private: // Disallow copying and assignment. basic_concurrent_channel( const basic_concurrent_channel&) ASIO_DELETED; basic_concurrent_channel& operator=( const basic_concurrent_channel&) ASIO_DELETED; template friend class detail::channel_send_functions; // Helper function to get an executor's context. template static execution_context& get_context(const T& t, typename enable_if::value>::type* = 0) { return asio::query(t, execution::context); } // Helper function to get an executor's context. template static execution_context& get_context(const T& t, typename enable_if::value>::type* = 0) { return t.context(); } class initiate_async_send { public: typedef Executor executor_type; explicit initiate_async_send(basic_concurrent_channel* self) : self_(self) { } const executor_type& get_executor() const ASIO_NOEXCEPT { return self_->get_executor(); } template void operator()(ASIO_MOVE_ARG(SendHandler) handler, ASIO_MOVE_ARG(payload_type) payload) const { asio::detail::non_const_lvalue handler2(handler); self_->service_->async_send(self_->impl_, ASIO_MOVE_CAST(payload_type)(payload), handler2.value, self_->get_executor()); } private: basic_concurrent_channel* self_; }; class initiate_async_receive { public: typedef Executor executor_type; explicit initiate_async_receive(basic_concurrent_channel* self) : self_(self) { } const executor_type& get_executor() const ASIO_NOEXCEPT { return self_->get_executor(); } template void operator()(ASIO_MOVE_ARG(ReceiveHandler) handler) const { asio::detail::non_const_lvalue handler2(handler); self_->service_->async_receive(self_->impl_, handler2.value, self_->get_executor()); } private: basic_concurrent_channel* self_; }; // The service associated with the I/O object. service_type* service_; // The underlying implementation of the I/O object. typename service_type::template implementation_type< Traits, Signatures...> impl_; // The associated executor. Executor executor_; }; } // namespace experimental } // namespace asio #include "asio/detail/pop_options.hpp" #endif // ASIO_EXPERIMENTAL_BASIC_CONCURRENT_CHANNEL_HPP