Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 25 additions & 8 deletions include/exec/repeat_n.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,20 @@ namespace exec {
constexpr explicit __opstate_base(_Receiver &&__rcvr, std::size_t __count) noexcept
: __rcvr_{static_cast<_Receiver &&>(__rcvr)}
, __count_{__count} {
static_assert(
__nothrow_constructible_from<trampoline_scheduler>,
"trampoline_scheduler c'tor is always expected to be noexcept");
}

virtual constexpr void __cleanup() noexcept = 0;
virtual constexpr void __repeat() noexcept = 0;

_Receiver __rcvr_;
std::size_t __count_;
trampoline_scheduler __sched_;
trampoline_scheduler __sched_{};

protected:
~__opstate_base() noexcept = default;
};

template <class _ReceiverId>
Expand Down Expand Up @@ -95,11 +101,9 @@ namespace exec {
using __bouncy_sndr_t =
__result_of<exec::sequence, schedule_result_t<trampoline_scheduler>, _Child &>;
using __child_op_t = STDEXEC::connect_result_t<__bouncy_sndr_t, __receiver_t>;
static constexpr bool __nothrow_connect =
STDEXEC::__nothrow_connectable<__bouncy_sndr_t, __receiver_t>;

constexpr explicit __opstate(std::size_t __count, _Child __child, _Receiver __rcvr)
noexcept(__nothrow_connect)
noexcept(__nothrow_move_constructible<_Child> && noexcept(__connect()))
: __opstate_base<_Receiver>{static_cast<_Receiver &&>(__rcvr), __count}
, __child_(std::move(__child)) {
if (this->__count_ != 0) {
Expand All @@ -115,7 +119,10 @@ namespace exec {
}
}

constexpr auto __connect() noexcept(__nothrow_connect) -> __child_op_t & {
constexpr auto __connect() noexcept(
__nothrow_invocable<STDEXEC::schedule_t, trampoline_scheduler&>
&& __nothrow_invocable<sequence_t, schedule_result_t<trampoline_scheduler>, _Child &>
&& __nothrow_connectable<__bouncy_sndr_t, __receiver_t>) -> __child_op_t & {
return __child_op_.__emplace_from(
STDEXEC::connect,
exec::sequence(STDEXEC::schedule(this->__sched_), __child_),
Expand All @@ -137,7 +144,9 @@ namespace exec {
}
}
STDEXEC_CATCH_ALL {
STDEXEC::set_error(std::move(this->__rcvr_), std::current_exception());
if constexpr (!noexcept(__connect())) {
STDEXEC::set_error(std::move(this->__rcvr_), std::current_exception());
}
}
}

Expand Down Expand Up @@ -165,12 +174,20 @@ namespace exec {
template <class _Error>
using __error_t = completion_signatures<set_error_t(__decay_t<_Error>)>;

template <typename _Sender, typename... _Env>
using __with_eptr_completion_t = __eptr_completion_unless<
__cmplsigs::__partitions_of_t<
__completion_signatures_of_t<_Sender, _Env...>
>::__nothrow_decay_copyable::__errors::value
&& (__nothrow_connectable<_Sender, __receiver_archetype<_Env>> && ...)
>;

template <class _Child, class... _Env>
using __completions_t = STDEXEC::transform_completion_signatures<
__completion_signatures_of_t<_Child &, _Env...>,
STDEXEC::transform_completion_signatures<
__completion_signatures_of_t<STDEXEC::schedule_result_t<exec::trampoline_scheduler>, _Env...>,
__eptr_completion,
__completion_signatures_of_t<STDEXEC::schedule_result_t<trampoline_scheduler>, _Env...>,
__with_eptr_completion_t<_Child, _Env...>,
__cmplsigs::__default_set_value,
__error_t
>,
Expand Down
52 changes: 41 additions & 11 deletions include/exec/repeat_until.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,21 @@ namespace exec {

template <class _Receiver>
struct __opstate_base {
constexpr explicit __opstate_base(_Receiver &&__rcvr)
constexpr explicit __opstate_base(_Receiver &&__rcvr) noexcept
: __rcvr_{static_cast<_Receiver &&>(__rcvr)} {
static_assert(
__nothrow_constructible_from<trampoline_scheduler>,
"trampoline_scheduler c'tor is always expected to be noexcept");
}

virtual constexpr void __cleanup() noexcept = 0;
virtual constexpr void __repeat() noexcept = 0;

_Receiver __rcvr_;
trampoline_scheduler __sched_;
trampoline_scheduler __sched_{};

protected:
~__opstate_base() noexcept = default;
};

template <class _Boolean, bool _Expected>
Expand Down Expand Up @@ -128,20 +134,22 @@ namespace exec {
using __bouncy_sndr_t =
__result_of<exec::sequence, schedule_result_t<trampoline_scheduler>, _Child &>;
using __child_op_t = STDEXEC::connect_result_t<__bouncy_sndr_t, __receiver_t>;
static constexpr bool __nothrow_connect =
STDEXEC::__nothrow_connectable<__bouncy_sndr_t, __receiver_t>;

constexpr explicit __opstate(_Child __child, _Receiver __rcvr) noexcept(__nothrow_connect)
: __opstate_base<_Receiver>(static_cast<_Receiver &&>(__rcvr))
, __child_(static_cast<_Child &&>(__child)) {
constexpr explicit __opstate(_Child __child, _Receiver __rcvr)
noexcept(__nothrow_move_constructible<_Child> && noexcept(__connect()))
: __opstate_base<_Receiver>(std::move(__rcvr))
, __child_(std::move(__child)) {
__connect();
}

constexpr void start() noexcept {
STDEXEC::start(*__child_op_);
}

constexpr auto __connect() noexcept(__nothrow_connect) -> __child_op_t & {
constexpr auto __connect() noexcept(
__nothrow_invocable<STDEXEC::schedule_t, trampoline_scheduler &>
&& __nothrow_invocable<sequence_t, schedule_result_t<trampoline_scheduler>, _Child &>
&& __nothrow_connectable<__bouncy_sndr_t, __receiver_t>) -> __child_op_t & {
return __child_op_.__emplace_from(
STDEXEC::connect,
exec::sequence(STDEXEC::schedule(this->__sched_), __child_),
Expand All @@ -157,7 +165,9 @@ namespace exec {
STDEXEC::start(__connect());
}
STDEXEC_CATCH_ALL {
STDEXEC::set_error(static_cast<_Receiver &&>(this->__rcvr_), std::current_exception());
if constexpr (!noexcept(__connect())) {
STDEXEC::set_error(static_cast<_Receiver &&>(this->__rcvr_), std::current_exception());
}
}
}

Expand Down Expand Up @@ -188,15 +198,35 @@ namespace exec {
>
>;

template <class... _Booleans>
using __values_overload_nothrow_bool_convertible_t =
__mand<std::is_nothrow_convertible<_Booleans, bool>...>;

template <class _Sender, class... _Env>
using __values_nothrow_bool_convertible_t = __value_types_t<
__completion_signatures_of_t<_Sender, _Env...>, // sigs
__qq<__values_overload_nothrow_bool_convertible_t>, // tuple
__qq<__mand> // variant
>;

template <typename _Sender, typename... _Env>
using __with_eptr_completion_t = __eptr_completion_unless<
__values_nothrow_bool_convertible_t<_Sender, _Env...>::value
&& __cmplsigs::__partitions_of_t<
__completion_signatures_of_t<_Sender, _Env...>
>::__nothrow_decay_copyable::__errors::value
&& (__nothrow_connectable<_Sender, __receiver_archetype<_Env>> && ...)
>;

template <class...>
using __delete_set_value_t = completion_signatures<>;

template <class _Child, class... _Env>
using __completions_t = STDEXEC::transform_completion_signatures<
__completion_signatures_of_t<__decay_t<_Child> &, _Env...>,
STDEXEC::transform_completion_signatures<
__completion_signatures_of_t<STDEXEC::schedule_result_t<exec::trampoline_scheduler>, _Env...>,
__eptr_completion,
__completion_signatures_of_t<STDEXEC::schedule_result_t<trampoline_scheduler>, _Env...>,
__with_eptr_completion_t<_Child, _Env...>,
__delete_set_value_t
>,
__mbind_front_q<__values_t, _Child>::template __f
Expand Down
43 changes: 43 additions & 0 deletions test/exec/test_repeat_n.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -141,4 +141,47 @@ namespace {
REQUIRE(called);
REQUIRE(!failed.load());
}

TEST_CASE("repeat_n conditionally adds set_error_t(exception)", "[adaptors][repeat_n]") {
SECTION("ensure exception isn't always added") {
ex::sender auto snd = ex::just() | exec::repeat_n(1);
static_assert(
std::same_as<ex::error_types_of_t<decltype(snd)>, ex::__detail::__not_a_variant>,
"Expected no errors ");
}

// There are two main cases that will contribute set_error_t(std::exception_ptr)
// 1. error's copy constructor could throw
// 2. connect() could throw
SECTION("error completion is added when an error's copy ctor can throw") {
// 1.
struct Error_with_throw_copy {
Error_with_throw_copy() noexcept = default;
Error_with_throw_copy(const Error_with_throw_copy&) noexcept(false) = default;
};
ex::sender auto snd = ex::just_error(Error_with_throw_copy{}) | exec::repeat_n(1);
static_assert(
std::same_as<
ex::error_types_of_t<decltype(snd)>,
std::variant<Error_with_throw_copy, std::exception_ptr>
>,
"Missing added set_error_t(std::exception_ptr)");
}

SECTION("error completion is added when connect can throw") {
// 2.
using Sender_connect_throws = just_with_env<ex::env<>>;
static_assert(
!ex::__error_types_t<
ex::completion_signatures_of_t<Sender_connect_throws>,
ex::__mcontains<ex::set_error_t(std::exception_ptr)>
>::value,
"Sender can't already emit exception to test if repeat_until() adds it");
ex::sender auto snd = Sender_connect_throws{} | exec::repeat_n(1);
static_assert(
std::same_as<ex::error_types_of_t<decltype(snd)>, std::variant<std::exception_ptr>>,
"Missing added set_error_t(std::exception_ptr)");
}
}

} // namespace
136 changes: 96 additions & 40 deletions test/exec/test_repeat_until.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

#include "exec/repeat_until.hpp"
#include "exec/static_thread_pool.hpp"
#include "stdexec/__detail/__sync_wait.hpp"
#include "exec/trampoline_scheduler.hpp"
#include "stdexec/execution.hpp"

#include <test_common/receivers.hpp>
Expand All @@ -27,6 +27,7 @@

#include <catch2/catch.hpp>

#include <concepts>
#include <limits>
#include <memory>
#include <stdexcept>
Expand Down Expand Up @@ -262,45 +263,42 @@ namespace {
} while (!done);
}

TEST_CASE("repeat composes with completion signatures") {
ex::sender auto only_stopped = ex::just_stopped() | exec::repeat();
static_assert(
std::same_as<ex::value_types_of_t<decltype(only_stopped)>, ex::__detail::__not_a_variant>,
"Expect no value completions");
static_assert(
std::same_as<ex::error_types_of_t<decltype(only_stopped)>, std::variant<std::exception_ptr>>,
"Missing added set_error_t(std::exception_ptr)");
static_assert(
ex::sender_of<decltype(only_stopped), ex::set_stopped_t()>,
"Missing set_stopped_t() from upstream");

// operator| and sync_wait require valid completion signatures
ex::sync_wait(only_stopped | ex::upon_stopped([]() noexcept { return -1; }));


ex::sender auto only_error = ex::just_error(-1) | exec::repeat();
static_assert(
std::same_as<ex::value_types_of_t<decltype(only_error)>, ex::__detail::__not_a_variant>,
"Expect no value completions");
static_assert(
std::same_as<
ex::error_types_of_t<decltype(only_error)>,
std::variant<int, std::exception_ptr>
>,
"Missing added set_error_t(std::exception_ptr)");

// set_stopped_t is always added as a consequence of the internal trampoline_scheduler
using SC = ex::completion_signatures_of_t<ex::schedule_result_t<exec::trampoline_scheduler>>;
static_assert(
!ex::sender_of<SC, ex::set_stopped_t()>
|| ex::sender_of<decltype(only_error), ex::set_stopped_t()>,
"Missing added set_error_t(std::exception_ptr)");

// operator| and sync_wait require valid completion signatures
ex::sync_wait(
only_error //
| ex::upon_stopped([]() { return -1; })
| ex::upon_error([](const auto) { return -1; }));
TEST_CASE("repeat composes with completion signatures", "[adaptors][repeat]") {
SECTION("repeat composes with stopped upstream") {
ex::sender auto only_stopped = ex::just_stopped() | exec::repeat();
static_assert(
std::same_as<ex::value_types_of_t<decltype(only_stopped)>, ex::__detail::__not_a_variant>,
"Expect no value completions");
static_assert(
std::same_as<ex::error_types_of_t<decltype(only_stopped)>, ex::__detail::__not_a_variant>,
"Expect no value completions");
static_assert(
ex::sender_of<decltype(only_stopped), ex::set_stopped_t()>,
"Missing set_stopped_t() from upstream");

// operator| and sync_wait require valid completion signatures
ex::sync_wait(only_stopped | ex::upon_stopped([]() noexcept { return -1; }));
}

SECTION("repeat composes with errors upstream") {
ex::sender auto only_error = ex::just_error(-1) | exec::repeat();
static_assert(
std::same_as<ex::value_types_of_t<decltype(only_error)>, ex::__detail::__not_a_variant>,
"Expect no value completions");
static_assert(
std::same_as<ex::error_types_of_t<decltype(only_error)>, std::variant<int>>,
"Unexpected added set_error_t(std::exception_ptr)");

// set_stopped_t is always added as a consequence of the internal trampoline_scheduler
using SC = ex::completion_signatures_of_t<ex::schedule_result_t<exec::trampoline_scheduler>>;
static_assert(
!ex::sender_of<SC, ex::set_stopped_t()>
|| ex::sender_of<decltype(only_error), ex::set_stopped_t()>,
"Missing added set_stopped_t()");

// operator| and sync_wait require valid completion signatures
ex::sync_wait(only_error | ex::upon_error([](const auto) { return -1; }));
}
}

TEST_CASE(
Expand Down Expand Up @@ -353,4 +351,62 @@ namespace {
++throw_after;
} while (!done);
}

TEST_CASE("repeat_until conditionally adds set_error_t(exception)", "[adaptors][repeat_until]") {
SECTION("ensure exception isn't always added") {
ex::sender auto snd = ex::just(false) | exec::repeat_until();
static_assert(
std::same_as<ex::error_types_of_t<decltype(snd)>, ex::__detail::__not_a_variant>,
"Expected no errors ");
}

// There are three main cases that will contribute set_error_t(std::exception_ptr)
// 1. value's conversion to bool could throw
// 2. error's copy constructor could throw
// 3. connect() could throw
SECTION("error completion is added when an error's copy ctor can throw") {
// 1.
struct To_bool_can_throw {
[[nodiscard]]
operator bool() const noexcept(false) {
return true;
}
};
ex::sender auto snd = ex::just(To_bool_can_throw{}) | exec::repeat_until();
static_assert(
std::same_as<ex::error_types_of_t<decltype(snd)>, std::variant<std::exception_ptr>>,
"Missing added set_error_t(std::exception_ptr)");
}

SECTION("error completion is added when error->bool can throw") {
// 2.
struct Error_with_throw_copy {
Error_with_throw_copy() noexcept = default;
Error_with_throw_copy(const Error_with_throw_copy&) noexcept(false) = default;
};
ex::sender auto snd = ex::just_error(Error_with_throw_copy{}) | exec::repeat_until();
static_assert(
std::same_as<
ex::error_types_of_t<decltype(snd)>,
std::variant<Error_with_throw_copy, std::exception_ptr>
>,
"Missing added set_error_t(std::exception_ptr)");
}

SECTION("error completion is added when connect can throw") {
// 3.
using Sender_connect_throws = just_with_env<ex::env<>, bool>;
static_assert(
!ex::__error_types_t<
ex::completion_signatures_of_t<Sender_connect_throws>,
ex::__mcontains<ex::set_error_t(std::exception_ptr)>
>::value,
"Sender can't already emit exception to test if repeat_until() adds it");

ex::sender auto snd = Sender_connect_throws{{}, true} | exec::repeat_until();
static_assert(
std::same_as<ex::error_types_of_t<decltype(snd)>, std::variant<std::exception_ptr>>,
"Missing added set_error_t(std::exception_ptr)");
}
}
} // namespace
Loading