diff --git a/zircon/system/ulib/io-scheduler/README.md b/zircon/system/ulib/io-scheduler/README.md index fcebfbc17a21407938ad3ed6ff4c0d7c1653a03a..caf53880308e4d26460d05d44148509473386494 100644 --- a/zircon/system/ulib/io-scheduler/README.md +++ b/zircon/system/ulib/io-scheduler/README.md @@ -10,7 +10,7 @@ The Fuchsia system architecture pushes subsystems that would be inside a traditi * **Client** - The code using this library. This document defines the API surface used by the client code. -* **Op** - The library schedules operations, or `ops`. These are of type ``struct SchedulerOp``. An IO operation is a discrete unit of IO that is meaningful to that driver stack. Examples are: reading of a sequence of bytes, scatter-gather writing a set of buffers, or a flush of caches. An op may be completed synchronously or asynchronously. An op can be in the following states: +* **Op** - The library schedules operations, or `ops`. These are of type ``class StreamOp``. An IO operation is a discrete unit of IO that is meaningful to that driver stack. Examples are: reading of a sequence of bytes, scatter-gather writing a set of buffers, or a flush of caches. An op may be completed synchronously or asynchronously. An op can be in the following states: * **Acquired** - an op arrives into the scheduler from a request source. * **Issued** - the op is transmitted to the client-provided execution callback for immediate execution. The execution may be synchronous or asynchronous. * **Completed** - The client has reported that the operation completed with either success or error. diff --git a/zircon/system/ulib/io-scheduler/include/io-scheduler/io-scheduler.h b/zircon/system/ulib/io-scheduler/include/io-scheduler/io-scheduler.h index 27450d34e01d7c0f22a944e433ceaabeccf4508e..7bef413ef64600eec241cfb3f92e847a4dcc6603 100644 --- a/zircon/system/ulib/io-scheduler/include/io-scheduler/io-scheduler.h +++ b/zircon/system/ulib/io-scheduler/include/io-scheduler/io-scheduler.h @@ -10,6 +10,7 @@ #include <fbl/condition_variable.h> #include <fbl/function.h> +#include <fbl/intrusive_double_list.h> #include <fbl/mutex.h> #include <fbl/vector.h> #include <zircon/types.h> @@ -48,7 +49,6 @@ constexpr uint32_t kMaxPriority = 31; // Suggested default priority for a stream. constexpr uint32_t kDefaultPriority = 8; - // Callback interface from Scheduler to client. Callbacks are made from within // the Scheduler library to the client implementation. All callbacks are made // with no locks held and are allowed to block. Any callbacks may be invoked @@ -68,7 +68,8 @@ public: // Acquire // Read zero or more ops from the client for intake into the - // Scheduler. + // Scheduler. Every op obtained through Acquire will be returned to the client + // via the Release callback. The Scheduler will never attempt to free these pointers. // Args: // sop_list - an empty array of op pointers to be filled. // list_count - number of entries in sop_list @@ -137,7 +138,7 @@ public: // before it can be used. // The Scheduler holds a pointer to |client| until Shutdown() has returned. It does not // manage the lifetime of this pointer and does not free it. - zx_status_t Init(SchedulerClient* client, uint32_t options); + zx_status_t Init(SchedulerClient* client, uint32_t options) __TA_EXCLUDES(stream_lock_); // Open a new stream with the requested ID and priority. It is safe to invoke // this function from a Scheduler callback context, except from Fatal(). @@ -148,21 +149,21 @@ public: // ZX_ERR_ALREADY_EXISTS if stream with same |id| is already open. // ZX_ERR_INVALID_ARGS if |priority| is out of range. // Other error status for internal errors. - zx_status_t StreamOpen(uint32_t id, uint32_t priority); + zx_status_t StreamOpen(uint32_t id, uint32_t priority) __TA_EXCLUDES(stream_lock_); // Close an open stream. All ops in the stream will be issued before the stream // is closed. New incoming ops to the closed stream will be released with // an error. - zx_status_t StreamClose(uint32_t id); + zx_status_t StreamClose(uint32_t id) __TA_EXCLUDES(stream_lock_); // Begin scheduler service. This creates the worker threads that will invoke // the callbacks in SchedulerCallbacks. - zx_status_t Serve(); + zx_status_t Serve() __TA_EXCLUDES(stream_lock_); // End scheduler service. This function blocks until all outstanding ops in // all streams are completed and closes all streams. Shutdown should not be invoked from a // callback function. To reuse the scheduler, call Init() again. - void Shutdown(); + void Shutdown() __TA_EXCLUDES(stream_lock_); // Client API - asynchronous calls. @@ -172,28 +173,60 @@ public: // asynchronously, this function should be called. The status of the operation // should be set in |sop|’s result field. This function is non-blocking and // safe to call from an interrupt handler context. - void AsyncComplete(StreamOp* sop); + void AsyncComplete(StreamOp* sop) __TA_EXCLUDES(stream_lock_); // API invoked by worker threads. // -------------------------------- SchedulerClient* client() { return client_; } + // Insert a list of ops into the scheduler queue. + // + // Ownership: + // Ops are exclusively retained by the Scheduler if they were successfully enqueued. Ops that + // encounter enqueueing errors will be added to |out_list| for caller to release. + // + // |in_list| and |out_list| may point to the same buffer. + // |out_num_ready| is an optional parameter that returns how many ops are ready to be dequeued + // in all streams. + zx_status_t Enqueue(UniqueOp* in_list, size_t in_count, + UniqueOp* out_list, size_t* out_actual, + size_t* out_num_ready = nullptr) __TA_EXCLUDES(stream_lock_); + + // Remove an op from the scheduler queue. + // + // Ownership: + // If successful, ownership of the op is transferred to the caller. + // + // If no ops are available: + // returns ZX_ERR_CANCELED if shutdown has started. + // returns ZX_ERR_SHOULD_WAIT if |wait| is false. + // otherwise returns ZX_ERR_SHOULD_WAIT. + zx_status_t Dequeue(UniqueOp* op_out, bool wait) __TA_EXCLUDES(stream_lock_); + private: using StreamIdMap = Stream::WAVLTreeSortById; using StreamList = Stream::ListUnsorted; + zx_status_t FindStreamLocked(uint32_t id, StreamRef* out) __TA_REQUIRES(stream_lock_); + SchedulerClient* client_ = nullptr; // Client-supplied callback interface. uint32_t options_ = 0; // Ordering options. fbl::Mutex stream_lock_; + // Set when shutdown has been called and workers should exit. + bool shutdown_initiated_ __TA_GUARDED(stream_lock_) = true; // Number of existing streams. uint32_t num_streams_ __TA_GUARDED(stream_lock_) = 0; // Number of streams that have ops that need to be issued or completed. uint32_t active_streams_ __TA_GUARDED(stream_lock_) = 0; + // Total number of acquired ops in all streams. + uint32_t acquired_ops_ __TA_GUARDED(stream_lock_) = 0; // Map of id to stream. Contains all streams. StreamIdMap stream_map_ __TA_GUARDED(stream_lock_); // List of streams that have ops ready to be scheduled. StreamList active_list_ __TA_GUARDED(stream_lock_); + // Event notifying worker threads that active streams are available. + fbl::ConditionVariable active_available_ __TA_GUARDED(stream_lock_); fbl::Vector<fbl::unique_ptr<Worker>> workers_; }; diff --git a/zircon/system/ulib/io-scheduler/include/io-scheduler/stream-op.h b/zircon/system/ulib/io-scheduler/include/io-scheduler/stream-op.h index 1e3a49aa8528ef4091398c43d8d99b9f7ce592f5..df0fa5625ea4155d106f2c157e42f332d09b1732 100644 --- a/zircon/system/ulib/io-scheduler/include/io-scheduler/stream-op.h +++ b/zircon/system/ulib/io-scheduler/include/io-scheduler/stream-op.h @@ -6,6 +6,7 @@ #include <stdint.h> +#include <fbl/intrusive_double_list.h> #include <zircon/types.h> namespace ioscheduler { @@ -55,14 +56,51 @@ constexpr uint32_t kOpFlagGroupLeader = (1u << 8); constexpr uint32_t kOpGroupNone = 0; -struct StreamOp { - OpType type; // Type of operation. - uint32_t flags; // Flags. Should be zero. - uint32_t stream_id; // Stream into which this op is queued. - uint32_t group_id; // Group of operations. - uint32_t group_members; // Number of members in the group. - zx_status_t result; // Status code of the released operation. - void* cookie; // User-defined per-op cookie. +// class StreamOp. +// The library schedules operations, or ops of type StreamOp. An IO operation is a discrete +// unit of IO that is meaningful to the client. StreamOps are allocated and freed by the client. +// The Scheduler interacts with these via the SchedulerClient interface. A reference to each op +// acquired through this interface is retained until the Release() method is called. +class StreamOp : public fbl::DoublyLinkedListable<StreamOp*> { +public: + StreamOp() { + StreamOp(OpType::kOpTypeUnknown, 0, kOpGroupNone, 0, nullptr); + } + + StreamOp(OpType type, uint32_t stream_id, uint32_t group_id, uint32_t group_members, + void* cookie) + : type_(type), stream_id_(stream_id), group_id_(group_id), + group_members_(group_members), result_(ZX_OK), cookie_(cookie) {} + + DISALLOW_COPY_ASSIGN_AND_MOVE(StreamOp); + + OpType type() { return type_; } + void set_ype(OpType type) { type_ = type; } + + uint32_t stream() { return stream_id_; } + void set_stream(uint32_t stream_id) { stream_id_ = stream_id; } + + uint32_t group() { return group_id_; } + void set_group(uint32_t gid) { group_id_ = gid; } + + uint32_t members() { return group_members_; } + void set_members(uint32_t group_members) { group_members_ = group_members; } + + zx_status_t result() { return result_; } + void set_result(zx_status_t result) { result_ = result; } + + void* cookie() { return cookie_; } + void set_cookie(void* cookie) { cookie_ = cookie; } + +private: + fbl::DoublyLinkedListNodeState<StreamOp*> node_state_; + + OpType type_; // Type of operation. + uint32_t stream_id_; // Stream into which this op is queued. + uint32_t group_id_; // Group of operations. + uint32_t group_members_; // Number of members in the group. + zx_status_t result_; // Status code of the released operation. + void* cookie_; // User-defined per-op cookie. }; // UniqueOp is a wrapper around StreamOp designed to clarify the ownership of an op pointer. @@ -81,7 +119,6 @@ public: // Assignment UniqueOp& operator=(const UniqueOp& r) = delete; - // Move construction. UniqueOp(UniqueOp&& r) : op_(r.op_) { r.op_ = nullptr; } @@ -91,10 +128,17 @@ public: // Move assignment. UniqueOp& operator=(UniqueOp&& r) { - UniqueOp(std::move(r)); + ZX_DEBUG_ASSERT(op_ == nullptr); + op_ = r.op_; + r.op_ = nullptr; return *this; } + void set(StreamOp* op) { + ZX_DEBUG_ASSERT(op_ == nullptr); + op_ = op; + } + StreamOp* release() { StreamOp* old = op_; op_ = nullptr; diff --git a/zircon/system/ulib/io-scheduler/include/io-scheduler/stream.h b/zircon/system/ulib/io-scheduler/include/io-scheduler/stream.h index 510a179a3b02319d03f593b889ea78aa185a3000..ce612e0601ae9c106d76171b96574b163dbefbc9 100644 --- a/zircon/system/ulib/io-scheduler/include/io-scheduler/stream.h +++ b/zircon/system/ulib/io-scheduler/include/io-scheduler/stream.h @@ -9,14 +9,19 @@ #include <fbl/macros.h> #include <fbl/ref_counted.h> #include <fbl/ref_ptr.h> +#include <zircon/types.h> #include <io-scheduler/stream-op.h> namespace ioscheduler { +class Scheduler; class Stream; using StreamRef = fbl::RefPtr<Stream>; +// Stream - a logical sequence of ops. +// The Stream class is not thread safe, streams depend on the scheduler for synchronization. +// Certain calls must be performed with the Scheduler's stream_lock_ held. class Stream : public fbl::RefCounted<Stream> { public: Stream(uint32_t id, uint32_t pri); @@ -26,11 +31,24 @@ public: uint32_t Id() { return id_; } uint32_t Priority() { return priority_; } - // Functions requiring the Scheduler stream lock be held. - bool IsActive() { return active_; } - void SetActive(bool active) { active_ = active; } void Close(); + // Functions requiring the Scheduler stream lock be held. + // --------------------------------------------------------- + + // Insert an op into the tail of the stream (subject to reordering). + // On error op's error status is set and it is moved to |*op_err|. + zx_status_t Push(UniqueOp op, UniqueOp* op_err); + + // Fetch an op from the head of the stream. + UniqueOp Pop(); + + // Does the stream contain any ops that are not yet issued? + bool IsEmpty() { return (num_acquired_ == 0); } + + // --------------------------------------------------------- + // End functions requiring stream lock. + // WAVL Tree support. using WAVLTreeNodeState = fbl::WAVLTreeNodeState<StreamRef>; struct WAVLTreeNodeTraitsSortById { @@ -63,7 +81,9 @@ private: uint32_t id_; uint32_t priority_; bool open_ = true; // Stream is open, can accept more ops. - bool active_ = false; // Stream has ops and is being scheduled. + + uint32_t num_acquired_ = 0; // Number of ops acquired and waiting for issue. + fbl::DoublyLinkedList<StreamOp*> acquired_list_; }; diff --git a/zircon/system/ulib/io-scheduler/include/io-scheduler/worker.h b/zircon/system/ulib/io-scheduler/include/io-scheduler/worker.h index 19472971119e2b872cd452efea764ce6b5f24fb3..2aee5eccc2eb69d24d039fa2577fda76a5cee0f3 100644 --- a/zircon/system/ulib/io-scheduler/include/io-scheduler/worker.h +++ b/zircon/system/ulib/io-scheduler/include/io-scheduler/worker.h @@ -33,6 +33,7 @@ private: Scheduler* sched_ = nullptr; uint32_t id_; + bool cancelled_ = false; // Exit has been requested. bool thread_started_ = false; thrd_t thread_; }; diff --git a/zircon/system/ulib/io-scheduler/scheduler.cpp b/zircon/system/ulib/io-scheduler/scheduler.cpp index 1889f6d3b3d0c8f0a2c462d90ba2ff9125f8d669..5e945f626722a8e052b49ffbbb022f073b763c64 100644 --- a/zircon/system/ulib/io-scheduler/scheduler.cpp +++ b/zircon/system/ulib/io-scheduler/scheduler.cpp @@ -11,6 +11,8 @@ namespace ioscheduler { zx_status_t Scheduler::Init(SchedulerClient* client, uint32_t options) { client_ = client; options_ = options; + fbl::AutoLock lock(&stream_lock_); + shutdown_initiated_ = false; return ZX_OK; } @@ -24,11 +26,16 @@ void Scheduler::Shutdown() { client_->CancelAcquire(); { - // Close all streams. fbl::AutoLock lock(&stream_lock_); + shutdown_initiated_ = true; + + // Close all streams. for (auto& stream : stream_map_) { stream.Close(); } + + // Wake all workers blocking on the queue. They will observe shutdown_initiated_ and exit. + active_available_.Broadcast(); } // Block until all worker threads exit. @@ -40,7 +47,6 @@ void Scheduler::Shutdown() { ZX_DEBUG_ASSERT(active_list_.is_empty()); // Delete any existing stream in the case where no worker threads were launched. stream_map_.clear(); - ZX_DEBUG_ASSERT(stream_map_.is_empty()); num_streams_ = 0; } @@ -77,12 +83,11 @@ zx_status_t Scheduler::StreamClose(uint32_t id) { StreamRef stream = iter.CopyPointer(); stream->Close(); // Once closed, the stream cannot transition from idle to active. - if (!stream->IsActive()) { + if (stream->IsEmpty()) { // Stream is inactive, delete here. // Otherwise, it will be deleted by the worker that drains it. stream_map_.erase(*stream); } - return ZX_OK; } @@ -106,7 +111,7 @@ zx_status_t Scheduler::Serve() { } void Scheduler::AsyncComplete(StreamOp* sop) { - + ZX_DEBUG_ASSERT(false); // Not yet implemented. } Scheduler::~Scheduler() { @@ -115,4 +120,86 @@ Scheduler::~Scheduler() { ZX_DEBUG_ASSERT(active_streams_ == 0); } +zx_status_t Scheduler::Enqueue(UniqueOp* in_list, size_t in_count, + UniqueOp* out_list, size_t* out_actual, size_t* out_num_ready) { + fbl::AutoLock lock(&stream_lock_); + StreamRef stream = nullptr; + size_t out_num = 0; + for (size_t i = 0; i < in_count; i++) { + UniqueOp op = std::move(in_list[i]); + + // Initialize op fields modified by scheduler. + op->set_result(ZX_OK); + + // Find stream if not already cached. + if ((stream == nullptr) || (stream->Id() != op->stream())) { + stream.reset(); + if (FindStreamLocked(op->stream(), &stream) != ZX_OK) { + // No stream, mark as failed and leave in list for caller to clean up. + op->set_result(ZX_ERR_INVALID_ARGS); + out_list[out_num++] = std::move(op); + continue; + } + } + bool was_empty = stream->IsEmpty(); + zx_status_t status = stream->Push(std::move(op), &out_list[out_num]); + if (status != ZX_OK) { + // Stream is closed, cannot add ops. Op has been added to out_list[out_num] + out_num++; + continue; + } + if (was_empty) { + // Add to active list. + active_list_.push_back(stream); + active_streams_++; + } + acquired_ops_++; + } + *out_actual = out_num; + if (out_num_ready) { + *out_num_ready = acquired_ops_; + } + if (acquired_ops_ > 0) { + active_available_.Broadcast(); // Wake all worker threads waiting for more work. + } + return ZX_OK; +} + +zx_status_t Scheduler::Dequeue(UniqueOp* op_out, bool wait) { + fbl::AutoLock lock(&stream_lock_); + while (acquired_ops_ == 0) { + ZX_DEBUG_ASSERT(active_list_.is_empty()); + if (shutdown_initiated_) { + return ZX_ERR_CANCELED; + } + if (!wait) { + return ZX_ERR_SHOULD_WAIT; + } + active_available_.Wait(&stream_lock_); + } + ZX_DEBUG_ASSERT(!active_list_.is_empty()); + StreamRef stream = active_list_.pop_front(); + ZX_DEBUG_ASSERT(stream != nullptr); + + *op_out = stream->Pop(); + acquired_ops_--; + if (stream->IsEmpty()) { + // Stream has been removed from active list. + active_streams_--; + } else { + // Return stream to tail of active list. + active_list_.push_back(std::move(stream)); + } + return ZX_OK; +} + +zx_status_t Scheduler::FindStreamLocked(uint32_t id, StreamRef* out) { + auto iter = stream_map_.find(id); + if (!iter.IsValid()) { + return ZX_ERR_NOT_FOUND; + } + *out = iter.CopyPointer(); + return ZX_OK; +} + } // namespace ioscheduler diff --git a/zircon/system/ulib/io-scheduler/stream.cpp b/zircon/system/ulib/io-scheduler/stream.cpp index 03c9c351f377b561b5e526b435507fb6119ee96a..c3c66af7a60b0ec2b5bb017af1023566b7997ce3 100644 --- a/zircon/system/ulib/io-scheduler/stream.cpp +++ b/zircon/system/ulib/io-scheduler/stream.cpp @@ -3,14 +3,38 @@ // found in the LICENSE file. #include <io-scheduler/stream.h> +#include <io-scheduler/io-scheduler.h> namespace ioscheduler { Stream::Stream(uint32_t id, uint32_t pri) : id_(id), priority_(pri) {} -Stream::~Stream() {} + +Stream::~Stream() { + ZX_DEBUG_ASSERT(open_ == false); + ZX_DEBUG_ASSERT(acquired_list_.is_empty()); +} void Stream::Close() { open_ = false; } +zx_status_t Stream::Push(UniqueOp op, UniqueOp* op_err) { + ZX_DEBUG_ASSERT(op != nullptr); + if (!open_) { + op->set_result(ZX_ERR_BAD_STATE); + *op_err = std::move(op); + return ZX_ERR_BAD_STATE; + } + acquired_list_.push_back(op.release()); + num_acquired_++; + return ZX_OK; +} + +UniqueOp Stream::Pop() { + UniqueOp op(acquired_list_.pop_front()); + ZX_DEBUG_ASSERT(op != nullptr); + num_acquired_--; + return op; +} + } // namespace ioscheduler \ No newline at end of file diff --git a/zircon/system/ulib/io-scheduler/test/main.cpp b/zircon/system/ulib/io-scheduler/test/main.cpp index 6f0e29318f92fc92c6a05bc0e5def93a77688f51..d53a66a31653b7c36e8d7bb2369f6a925b4d3585 100644 --- a/zircon/system/ulib/io-scheduler/test/main.cpp +++ b/zircon/system/ulib/io-scheduler/test/main.cpp @@ -23,23 +23,18 @@ using SchedOp = ioscheduler::StreamOp; class TestOp : public fbl::DoublyLinkedListable<fbl::RefPtr<TestOp>>, public fbl::RefCounted<TestOp> { public: - TestOp(uint32_t id, uint32_t stream_id, uint32_t group = ioscheduler::kOpGroupNone) : id_(id) { - sop_.stream_id = stream_id; - sop_.flags = 0; - sop_.group_id = group; - sop_.result = ZX_OK; // Nonsense value. - sop_.cookie = this; - } + TestOp(uint32_t id, uint32_t stream_id, uint32_t group = ioscheduler::kOpGroupNone) : id_(id), + sop_(ioscheduler::OpType::kOpTypeUnknown, stream_id, group, 0, this) {} - void SetId(uint32_t id) { id_ = id; } + void set_id(uint32_t id) { id_ = id; } uint32_t id() { return id_; } - bool ShouldFail() { return should_fail_; } - void SetShouldFail(bool should_fail) { should_fail_ = should_fail; } - void SetResult(zx_status_t result) { sop_.result = result; } + bool should_fail() { return should_fail_; } + void set_should_fail(bool should_fail) { should_fail_ = should_fail; } + void set_result(zx_status_t result) { sop_.set_result(result); } bool CheckExpected() { - return (should_fail_ ? (sop_.result != ZX_OK) : (sop_.result == ZX_OK)); + return (should_fail_ ? (sop_.result() != ZX_OK) : (sop_.result() == ZX_OK)); } - zx_status_t result() { return sop_.result; } + zx_status_t result() { return sop_.result(); } SchedOp* sop() { return &sop_; } bool async() { return async_; } @@ -199,7 +194,7 @@ zx_status_t IOSchedTestFixture::Acquire(SchedOp** sop_list, size_t list_count, zx_status_t IOSchedTestFixture::Issue(SchedOp* sop) { fbl::AutoLock lock(&lock_); - TopRef top(static_cast<TestOp*>(sop->cookie)); + TopRef top(static_cast<TestOp*>(sop->cookie())); acquired_list_.erase(*top); issued_total_++; if (top->async()) { @@ -210,10 +205,10 @@ zx_status_t IOSchedTestFixture::Issue(SchedOp* sop) { // Executing op here... // Todo: pretend to do work here. - if (top->ShouldFail()) { - top->SetResult(ZX_ERR_BAD_PATH); // Error unlikely to be generated by IO Scheduler. + if (top->should_fail()) { + top->set_result(ZX_ERR_BAD_PATH); // Error unlikely to be generated by IO Scheduler. } else { - top->SetResult(ZX_OK); + top->set_result(ZX_OK); } completed_list_.push_back(std::move(top)); completed_total_++; @@ -222,7 +217,7 @@ zx_status_t IOSchedTestFixture::Issue(SchedOp* sop) { void IOSchedTestFixture::Release(SchedOp* sop) { fbl::AutoLock lock(&lock_); - TopRef top(static_cast<TestOp*>(sop->cookie)); + TopRef top(static_cast<TestOp*>(sop->cookie())); completed_list_.erase(*top); released_list_.push_back(std::move(top)); released_total_++; @@ -286,13 +281,12 @@ void IOSchedTestFixture::DoServeTest(uint32_t num_ops, bool async, uint32_t fail for (uint32_t i = 0; i < num_ops; i++) { TopRef top = fbl::AdoptRef(new TestOp(i, 0)); if (fail_pct) { - if((static_cast<uint32_t>(rand()) % 100u) <= fail_pct) { - top->SetShouldFail(true); + if((static_cast<uint32_t>(rand()) % 100u) < fail_pct) { + top->set_should_fail(true); } } InsertOp(std::move(top)); } - ASSERT_OK(sched_->Serve(), "Failed to begin service"); // Wait until all ops have been acquired. @@ -317,6 +311,44 @@ TEST_F(IOSchedTestFixture, ServeTestMultiFailures) { DoServeTest(200, false, 10); } +TEST_F(IOSchedTestFixture, ServeTestMultistream) { + zx_status_t status = sched_->Init(this, ioscheduler::kOptionStrictlyOrdered); + ASSERT_OK(status, "Failed to init scheduler"); + const uint32_t num_streams = 5; + for (uint32_t i = 0; i < num_streams; i++) { + status = sched_->StreamOpen(i, ioscheduler::kDefaultPriority); + ASSERT_OK(status, "Failed to open stream"); + } + + const uint32_t num_ops = num_streams * 1000; + uint32_t op_id; + // Add half of the ops before starting the server. + for (op_id = 0; op_id < num_ops / 2; op_id++) { + uint32_t stream_id = (static_cast<uint32_t>(rand()) % num_streams); + TopRef top = fbl::AdoptRef(new TestOp(op_id, stream_id)); + InsertOp(std::move(top)); + } + + ASSERT_OK(sched_->Serve(), "Failed to begin service"); + + // Add other half while running. + for ( ; op_id < num_ops; op_id++) { + uint32_t stream_id = (static_cast<uint32_t>(rand()) % num_streams); + TopRef top = fbl::AdoptRef(new TestOp(op_id, stream_id)); + InsertOp(std::move(top)); + } + + // Wait until all ops have been acquired. + WaitAcquire(); + + ASSERT_OK(sched_->StreamClose(0), "Failed to close stream"); + // Other streams intentionally left open. Will be closed by Shutdown(). + sched_->Shutdown(); + + // Assert all ops completed. + CheckExpectedResult(); +} + } // namespace diff --git a/zircon/system/ulib/io-scheduler/worker.cpp b/zircon/system/ulib/io-scheduler/worker.cpp index 1aa07e577f21f010cee51db97e1883ea76145462..241b4a9db494db6a9f39e2a99429f0213b726f9a 100644 --- a/zircon/system/ulib/io-scheduler/worker.cpp +++ b/zircon/system/ulib/io-scheduler/worker.cpp @@ -47,10 +47,13 @@ void Worker::WorkerLoop() { const size_t max_ops = 10; SchedulerClient* client = sched_->client(); zx_status_t status; - for ( ; ; ) { - size_t actual_count = 0; + while (!cancelled_) { + + // Fetch ops from the client. + + size_t acquire_count = 0; StreamOp* op_list[max_ops]; - status = client->Acquire(op_list, max_ops, &actual_count, true); + status = client->Acquire(op_list, max_ops, &acquire_count, true); if (status == ZX_ERR_CANCELED) { // Cancel received, no more ops to read. Drain the streams and exit. break; @@ -60,12 +63,61 @@ void Worker::WorkerLoop() { client->Fatal(); break; } - // Dummy issue loop. In the future, ops will be added to the scheduler. - for (size_t i = 0; i < actual_count; i++) { - UniqueOp ref(op_list[i]); - status = client->Issue(ref.get()); - ZX_DEBUG_ASSERT(status == ZX_OK); // Require synchronous completion, for now. - client->Release(ref.release()); + + // Containerize all ops for safety. + UniqueOp uop_list[max_ops]; + for (size_t i = 0; i < acquire_count; i++) { + uop_list[i].set(op_list[i]); + } + + // Enqueue ops in the scheduler's priority queue. + + size_t num_ready = 0; + size_t num_error = 0; + sched_->Enqueue(uop_list, acquire_count, uop_list, &num_error, &num_ready); + // Any ops remaining in the list have encountered an error and should be released. + for (size_t i = 0; i < num_error; i++) { + client->Release(uop_list[i].release()); + } + + // Drain the priority queue. + + for ( ; ; ) { + + // Fetch an op. + + UniqueOp op; + status = sched_->Dequeue(&op, false); + if (status == ZX_ERR_SHOULD_WAIT) { + // No more ops. + break; + } else if (status == ZX_ERR_CANCELED) { + // Shutdown initiated. + cancelled_ = true; + break; + } else if (status != ZX_OK) { + fprintf(stderr, "Dequeue() failed %d\n", status); + return; + } + + // Execute it. + + status = client->Issue(op.get()); + if (status == ZX_OK) { + // Op completed successfully or encountered a synchronous error. + client->Release(op.release()); + } else if (status == ZX_ERR_ASYNC) { + // Op queued for async completion. Released when completed. + + // Todo: transfer op to pending list to await async completion. + // sched_->AddAsync(std::move(op)); + ZX_DEBUG_ASSERT(false); + } else { + fprintf(stderr, "Unexpected return status from Issue() %d\n", status); + // Mark op as failed. + op->set_result(ZX_ERR_IO); + client->Release(op.release()); + } } } }