diff --git a/system/ulib/fit/include/lib/fit/promise.h b/system/ulib/fit/include/lib/fit/promise.h index 78127366c1a7dbdc1ba06ae47d9e785d65f52b34..7ef9c44adfa6dd5ff890631b06e2fc02d905fe4e 100644 --- a/system/ulib/fit/include/lib/fit/promise.h +++ b/system/ulib/fit/include/lib/fit/promise.h @@ -61,6 +61,8 @@ namespace fit { // |inspect()|: examine result of prior promise // |discard_result()|: discard result and unconditionally return // fit::result<> when prior promise completes +// |wrap_with()|: applies a wrapper to the promise +// |box()|: wraps the promise's continuation into a |fit::function| // |fit::join_promises()|: await multiple promises, once they all complete // return a tuple of their results // @@ -660,7 +662,57 @@ public: std::move(*this))); } - // Wraps the promise's continuation inside of a |fit::function|. + // Applies a |wrapper| to the promise. Invokes the wrapper's |wrap()| + // method, passes the promise to the wrapper by value followed by any + // additional |args| passed to |wrap_with()|, then returns the wrapper's + // result. + // + // |Wrapper| is a type that implements a method called |wrap()| which + // accepts a promise as its argument and produces a wrapped result of + // any type, such as another promise. + // + // Asserts that the promise is non-empty. + // This method consumes the promise's continuation, leaving it empty. + // + // EXAMPLE + // + // In this example, |fit::sequencer| is a wrapper type that imposes + // FIFO execution order onto a sequence of wrapped promises. + // + // // This wrapper type is intended to be applied to + // // a sequence of promises so we store it in a variable. + // fit::sequencer seq; + // + // // This task consists of some amount of work that must be + // // completed sequentially followed by other work that can + // // happen in any order. We use |wrap_with()| to wrap the + // // sequential work with the sequencer. + // fit::promise<> perform_complex_task() { + // return fit::make_promise([] { /* do sequential work */ }) + // .then([] (fit::result<> result) { /* this will also be wrapped */ }) + // .wrap_with(seq) + // .then([] (fit::result<> result) { /* do more work */ }); + // } + // + // This example can also be written without using |wrap_with()|. + // The behavior is equivalent but the syntax may seem more awkward. + // + // fit::sequencer seq; + // + // promise<> perform_complex_task() { + // return seq.wrap( + // fit::make_promise([] { /* sequential work */ }) + // ).then([] (fit::result<> result) { /* more work */ }); + // } + // + template <typename Wrapper, typename... Args> + decltype(auto) wrap_with(Wrapper& wrapper, Args... args) { + assert(state_.has_value()); + return wrapper.wrap(std::move(*this), + std::forward<Args>(args)...); + } + + // Wraps the promise's continuation into a |fit::function|. // // A boxed promise is easier to store and pass around than the unboxed // promises produced by |fit::make_promise()| and combinators, though boxing diff --git a/system/ulib/fit/include/lib/fit/sequencer.h b/system/ulib/fit/include/lib/fit/sequencer.h new file mode 100644 index 0000000000000000000000000000000000000000..2b106d5e073d27fc16c370fd6af17cd5d5e96674 --- /dev/null +++ b/system/ulib/fit/include/lib/fit/sequencer.h @@ -0,0 +1,80 @@ +// Copyright 2018 The Fuchsia Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef LIB_FIT_SEQUENCER_H_ +#define LIB_FIT_SEQUENCER_H_ + +#include <assert.h> + +#include <mutex> + +#include "bridge.h" +#include "thread_safety.h" + +namespace fit { + +// A sequencer imposes a first-in-first-out sequential execution order onto a +// sequence of promises. Each successively enqueued promise remains suspended +// until all previously enqueued promises complete or are abandoned. +// +// |fit::sequencer| is designed to be used either on its own or chained +// onto a promise using |fit::promise::wrap_with()|. +// +// EXAMPLE +// +// // This wrapper type is intended to be applied to +// // a sequence of promises so we store it in a variable. +// fit::sequencer seq; +// +// // This task consists of some amount of work that must be +// // completed sequentially followed by other work that can +// // happen in any order. We use |wrap_with()| to wrap the +// // sequential work with the sequencer. +// fit::promise<> perform_complex_task() { +// return fit::make_promise([] { /* do sequential work */ }) +// .then([] (fit::result<> result) { /* this will also be wrapped */ }) +// .wrap_with(seq) +// .then([] (fit::result<> result) { /* do more work */ }); +// } +// +class sequencer final { +public: + sequencer(); + ~sequencer(); + + // Returns a new promise which will invoke |promise| after all previously + // enqueued promises on this sequencer have completed or been abandoned. + // + // This method is thread-safe. + template <typename Promise> + decltype(auto) wrap(Promise promise) { + assert(promise); + + fit::bridge<> bridge; + fit::consumer<> prior = swap_prior(std::move(bridge.consumer())); + return prior.promise_or(fit::ok()) + .then([promise = std::move(promise), + completer = std::move(bridge.completer())]( + fit::context& context, fit::result<>) mutable { + // This handler will run once the completer associated + // with the |prior| promise is abandoned. Once the promise + // has finished, both the promise and completer will be + // destroyed thereby causing the next promise chained onto + // the |bridge|'s associated consumer to become runnable. + return promise(context); + }); + } + +private: + fit::consumer<> swap_prior(fit::consumer<> new_prior); + + std::mutex mutex_; + + // Holds the consumption capability of the most recently wrapped promise. + fit::consumer<> prior_ FIT_GUARDED(mutex_); +}; + +} // namespace fit + +#endif // LIB_FIT_SEQUENCER_H_ diff --git a/system/ulib/fit/rules.mk b/system/ulib/fit/rules.mk index 0f1de2110fd5a6a448ded7c72a36d4ab03f16d61..7a0481eac3abab6b4adfc39e450d674911a5d38e 100644 --- a/system/ulib/fit/rules.mk +++ b/system/ulib/fit/rules.mk @@ -7,8 +7,9 @@ LOCAL_DIR := $(GET_LOCAL_DIR) fit_srcs := \ $(LOCAL_DIR)/empty.c \ $(LOCAL_DIR)/promise.cpp \ - $(LOCAL_DIR)/sequential_executor.cpp \ $(LOCAL_DIR)/scheduler.cpp \ + $(LOCAL_DIR)/sequencer.cpp \ + $(LOCAL_DIR)/sequential_executor.cpp \ # # Userspace library. diff --git a/system/ulib/fit/sequencer.cpp b/system/ulib/fit/sequencer.cpp new file mode 100644 index 0000000000000000000000000000000000000000..35c60d38601036cd25ef9d18047d16b3c2206a24 --- /dev/null +++ b/system/ulib/fit/sequencer.cpp @@ -0,0 +1,31 @@ +// Copyright 2018 The Fuchsia Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +// Can't compile this for Zircon userspace yet since libstdc++ isn't available. +#ifndef FIT_NO_STD_FOR_ZIRCON_USERSPACE + +#include <lib/fit/sequencer.h> + +namespace fit { + +sequencer::sequencer() { + // Capture a new consumer and intentionally abandon its associated + // completer so that a promise chained onto the consumer using + // |promise_or()| will become immediately runnable. + fit::bridge<> bridge; + prior_ = std::move(bridge.consumer()); +} + +sequencer::~sequencer() = default; + +fit::consumer<> sequencer::swap_prior(fit::consumer<> new_prior) { + std::lock_guard lock(mutex_); + fit::consumer<> old_prior = std::move(prior_); + prior_ = std::move(new_prior); + return old_prior; +} + +} // namespace fit + +#endif // FIT_NO_STD_FOR_ZIRCON_USERSPACE diff --git a/system/utest/fit/promise_tests.cpp b/system/utest/fit/promise_tests.cpp index aee3468285647e7b50c9e66dcbe8e1f95f516a36..0442fe901aaccef0957f8381d832e0f880fbb02a 100644 --- a/system/utest/fit/promise_tests.cpp +++ b/system/utest/fit/promise_tests.cpp @@ -24,6 +24,22 @@ public: } }; +template <typename V = void, typename E = void> +class capture_result_wrapper { +public: + template <typename Promise> + decltype(auto) wrap(Promise promise) { + static_assert(std::is_same<V, typename Promise::value_type>::value, ""); + static_assert(std::is_same<E, typename Promise::error_type>::value, ""); + assert(promise); + return promise.then([this](fit::result<V, E>& result) { + last_result = std::move(result); + }); + } + + fit::result<V, E> last_result; +}; + struct move_only { move_only(const move_only&) = delete; move_only(move_only&&) = default; @@ -881,6 +897,36 @@ bool discard_result_combinator() { END_TEST; } +bool wrap_with_combinator() { + BEGIN_TEST; + + fake_context fake_context; + capture_result_wrapper<int, char> wrapper; + uint64_t successor_run_count = 0; + + // Apply a wrapper which steals a promise's result th + auto p = make_delayed_ok_promise(42) + .wrap_with(wrapper) + .then([&](fit::result<>) { successor_run_count++; }); + static_assert(std::is_same<void, decltype(p)::value_type>::value, ""); + static_assert(std::is_same<void, decltype(p)::error_type>::value, ""); + + fit::result<> result = p(fake_context); + EXPECT_TRUE(p); + EXPECT_EQ(fit::result_state::pending, result.state()); + EXPECT_EQ(fit::result_state::pending, wrapper.last_result.state()); + EXPECT_EQ(0, successor_run_count); + + result = p(fake_context); + EXPECT_FALSE(p); + EXPECT_EQ(fit::result_state::ok, result.state()); + EXPECT_EQ(fit::result_state::ok, wrapper.last_result.state()); + EXPECT_EQ(42, wrapper.last_result.value()); + EXPECT_EQ(1, successor_run_count); + + END_TEST; +} + bool box_combinator() { BEGIN_TEST; @@ -1225,6 +1271,7 @@ RUN_TEST(and_then_combinator) RUN_TEST(or_else_combinator) RUN_TEST(inspect_combinator) RUN_TEST(discard_result_combinator) +RUN_TEST(wrap_with_combinator) RUN_TEST(box_combinator) RUN_TEST(join_combinator) RUN_TEST(example1) diff --git a/system/utest/fit/rules.mk b/system/utest/fit/rules.mk index 0a3760a219ecc1916d8e1037a4c70b89c27afd3b..a487f596f2852721134d1920116ca3f848db44c7 100644 --- a/system/utest/fit/rules.mk +++ b/system/utest/fit/rules.mk @@ -21,6 +21,7 @@ fit_tests := \ $(LOCAL_DIR)/promise_tests.cpp \ $(LOCAL_DIR)/result_tests.cpp \ $(LOCAL_DIR)/scheduler_tests.cpp \ + $(LOCAL_DIR)/sequencer_tests.cpp \ $(LOCAL_DIR)/sequential_executor_tests.cpp \ $(LOCAL_DIR)/suspended_task_tests.cpp \ $(LOCAL_DIR)/traits_tests.cpp \ diff --git a/system/utest/fit/sequencer_tests.cpp b/system/utest/fit/sequencer_tests.cpp new file mode 100644 index 0000000000000000000000000000000000000000..3bdc4f2a9cf0ae1be4bd83ccace23c71cb4305e8 --- /dev/null +++ b/system/utest/fit/sequencer_tests.cpp @@ -0,0 +1,111 @@ +// Copyright 2018 The Fuchsia Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include <unistd.h> + +#include <future> // for std::async +#include <string> + +#include <lib/fit/sequencer.h> +#include <lib/fit/sequential_executor.h> +#include <unittest/unittest.h> + +namespace { + +bool sequencing_tasks() { + BEGIN_TEST; + + fit::sequencer seq; + std::string str; + + // This promise writes ":a" sequentially then writes ":a2" later. + auto a = fit::make_promise([&] { str += ":a"; }) + .wrap_with(seq) + .then([&](fit::result<>) { str += ":a2"; }); + + // This promise writes ":b" sequentially then writes ":b2" and ":b3" later. + // Also schedules another sequential task that writes ":e". + auto b = fit::make_promise([&](fit::context& context) { + str += ":b"; + context.executor()->schedule_task( + fit::make_promise([&] { str += ":e"; }) + .wrap_with(seq)); + }) + .wrap_with(seq) + .then([&, count = 0](fit::context& context, fit::result<>) mutable + -> fit::result<> { + if (++count == 5) { + str += ":b3"; + return fit::error(); + } + str += ":b2"; + context.suspend_task().resume_task(); // immediately resume + return fit::pending(); + }); + + // This promise writes ":c" sequentially then abandons itself. + auto c = fit::make_promise([&](fit::context& context) { + str += ":c"; + context.suspend_task(); // abandon result + return fit::pending(); + }) + .wrap_with(seq) + .then([&](fit::result<>) { str += ":c2"; }); + + // This promise writes ":d" sequentially. + auto d = fit::make_promise([&] { str += ":d"; }) + .wrap_with(seq); + + // These promises just write ":z1" and ":z2" whenever they happen to run. + auto z1 = fit::make_promise([&] { str += ":z1"; }); + auto z2 = fit::make_promise([&] { str += ":z2"; }); + + // Schedule the promises in an order which intentionally does not + // match the sequencing. + fit::sequential_executor executor; + executor.schedule_task(std::move(z1)); + executor.schedule_task(std::move(b)); + executor.schedule_task(std::move(c)); + executor.schedule_task(std::move(a)); + executor.schedule_task(std::move(d)); + executor.schedule_task(std::move(z2)); + executor.run(); + + // Evaluate the promises and check the execution order. + EXPECT_STR_EQ(":z1:a:a2:z2:b:b2:c:b2:d:b2:e:b2:b3", str.c_str()); + + END_TEST; +} + +bool thread_safety() { + BEGIN_TEST; + + fit::sequencer seq; + fit::sequential_executor executor; + uint64_t run_count = 0; + + // Schedule work from a few threads, just to show that we can. + for (int i = 0; i < 4; i++) { + std::async(std::launch::async, [&]() mutable { + for (int j = 0; j < 100; j++) { + executor.schedule_task( + fit::make_promise([&] { run_count++; }).wrap_with(seq)); + sleep(0); + } + }); + } + + // Run the tasks. + executor.run(); + EXPECT_EQ(4 * 100, run_count); + + END_TEST; +} + +} // namespace + +BEGIN_TEST_CASE(sequencer_tests) +RUN_TEST(sequencing_tasks) +RUN_TEST(thread_safety) +END_TEST_CASE(sequencer_tests)