Skip to content
Snippets Groups Projects
Commit fb37c7a4 authored by Natalie Weizenbaum's avatar Natalie Weizenbaum
Browse files

Add a MultiChannel class.

This class multiplexes communication over a single underlying channel.
It will be useful for communicating individually with test suites and
tests over the server :left_right_arrow: browser WebSocket connection.

See #5

R=kevmoo@google.com

Review URL: https://codereview.chromium.org//957583002
parent aede514b
No related branches found
No related tags found
No related merge requests found
// Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
library unittest.multi_channel;
import 'dart:async';
/// A class that multiplexes multiple virtual channels across a single
/// underlying transport layer.
///
/// This should be connected to another [MultiChannel] on the other end of the
/// underlying channel. It starts with a single default virtual channel,
/// accessible via [stream] and [sink]. Additional virtual channels can be
/// created with [virtualChannel].
///
/// When a virtual channel is created by one endpoint, the other must connect to
/// it before messages may be sent through it. The first endpoint passes its
/// [VirtualChannel.id] to the second, which then creates a channel from that id
/// also using [virtualChannel]. For example:
///
/// ```dart
/// // First endpoint
/// var virtual = multiChannel.virtualChannel();
/// multiChannel.sink.add({
/// "channel": virtual.id
/// });
///
/// // Second endpoint
/// multiChannel.stream.listen((message) {
/// var virtual = multiChannel.virtualChannel(message["channel"]);
/// // ...
/// });
/// ```
///
/// Sending errors across a [MultiChannel] is not supported. Any errors from the
/// underlying stream will be reported only via the default
/// [MultiChannel.stream].
///
/// Each virtual channel may be closed individually. When all of them are
/// closed, the underlying [StreamSink] is closed automatically.
abstract class MultiChannel {
/// The default input stream.
///
/// This connects to the remote [sink].
Stream get stream;
/// The default output stream.
///
/// This connects to the remote [stream]. If this is closed, the remote
/// [stream] will close, but other virtual channels will remain open and new
/// virtual channels may be opened.
StreamSink get sink;
/// Creates a new [MultiChannel] that sends messages over [innerStream] and
/// [innerSink].
///
/// The inner streams must take JSON-like objects.
factory MultiChannel(Stream innerStream, StreamSink innerSink) =>
new _MultiChannel(innerStream, innerSink);
/// Creates a new virtual channel.
///
/// If [id] is not passed, this creates a virtual channel from scratch. Before
/// it's used, its [VirtualChannel.id] must be sent to the remote endpoint
/// where [virtualChannel] should be called with that id.
///
/// If [id] is passed, this creates a virtual channel corresponding to the
/// channel with that id on the remote channel.
///
/// Throws an [ArgumentError] if a virtual channel already exists for [id].
/// Throws a [StateError] if the underlying channel is closed.
VirtualChannel virtualChannel([id]);
}
/// The implementation of [MultiChannel].
///
/// This is private so that [VirtualChannel] can inherit from [MultiChannel]
/// without having to implement all the private members.
class _MultiChannel implements MultiChannel {
/// The inner stream over which all communication is received.
///
/// This will be `null` if the underlying communication channel is closed.
Stream _innerStream;
/// The inner sink over which all communication is sent.
///
/// This will be `null` if the underlying communication channel is closed.
StreamSink _innerSink;
/// The subscription to [_innerStream].
StreamSubscription _innerStreamSubscription;
Stream get stream => _streamController.stream;
final _streamController = new StreamController(sync: true);
StreamSink get sink => _sinkController.sink;
final _sinkController = new StreamController(sync: true);
/// A map from virtual channel ids to [StreamController]s that should be used
/// to write messages received from those channels.
final _streamControllers = new Map<int, StreamController>();
/// A map from virtual channel ids to [StreamControllers]s that are used
/// to receive messages to write to those channels.
///
/// Note that this uses the same keys as [_streamControllers].
final _sinkControllers = new Map<int, StreamController>();
/// The next id to use for a local virtual channel.
///
/// Ids are used to identify virtual channels. Each message is tagged with an
/// id; the receiving [MultiChannel] uses this id to look up which
/// [VirtualChannel] the message should be dispatched to.
///
/// The id scheme for virtual channels is somewhat complicated. This is
/// necessary to ensure that there are no conflicts even when both endpoints
/// have virtual channels with the same id; since both endpoints can send and
/// receive messages across each virtual channel, a naïve scheme would make it
/// impossible to tell whether a message was from a channel that originated in
/// the remote endpoint or a reply on a channel that originated in the local
/// endpoint.
///
/// The trick is that each endpoint only uses odd ids for its own channels.
/// When sending a message over a channel that was created by the remote
/// endpoint, the channel's id plus one is used. This way each [MultiChannel]
/// knows that if an incoming message has an odd id, it's using the local id
/// scheme, but if it has an even id, it's using the remote id scheme.
var _nextId = 1;
_MultiChannel(this._innerStream, this._innerSink) {
// The default connection is a special case which has id 0 on both ends.
// This allows it to begin connected without having to send over an id.
_streamControllers[0] = _streamController;
_sinkControllers[0] = _sinkController;
_sinkController.stream.listen(
(message) => _innerSink.add([0, message]),
onDone: () => _closeChannel(0, 0));
_innerStreamSubscription = _innerStream.listen((message) {
var id = message[0];
var sink = _streamControllers[id];
// A sink might not exist if the channel was closed before an incoming
// message was processed.
if (sink == null) return;
if (message.length > 1) {
sink.add(message[1]);
return;
}
// A message without data indicates that the channel has been closed.
_sinkControllers[id].close();
}, onDone: _closeInnerChannel,
onError: _streamController.addError);
}
VirtualChannel virtualChannel([id]) {
if (_innerStream == null) {
throw new StateError("The underlying channel is closed.");
}
var inputId;
var outputId;
if (id != null) {
// Since the user is passing in an id, we're connected to a remote
// VirtualChannel. This means messages they send over this channel will
// have the original odd id, but our replies will have an even id.
inputId = id;
outputId = (id as int) + 1;
} else {
// Since we're generating an id, we originated this VirtualChannel. This
// means messages we send over this channel will have the original odd id,
// but the remote channel's replies will have an even id.
inputId = _nextId + 1;
outputId = _nextId;
_nextId += 2;
}
if (_streamControllers.containsKey(inputId)) {
throw new ArgumentError("A virtual channel with id $id already exists.");
}
var streamController = new StreamController(sync: true);
var sinkController = new StreamController(sync: true);
_streamControllers[inputId] = streamController;
_sinkControllers[inputId] = sinkController;
sinkController.stream.listen(
(message) => _innerSink.add([outputId, message]),
onDone: () => _closeChannel(inputId, outputId));
return new VirtualChannel._(
this, outputId, streamController.stream, sinkController.sink);
}
/// Closes the virtual channel for which incoming messages have [inputId] and
/// outgoing messages have [outputId].
void _closeChannel(int inputId, int outputId) {
// A message without data indicates that the virtual channel has been
// closed.
_streamControllers.remove(inputId).close();
_sinkControllers.remove(inputId).close();
if (_innerSink == null) return;
_innerSink.add([outputId]);
if (_streamControllers.isEmpty) _closeInnerChannel();
}
/// Closes the underlying communication channel.
void _closeInnerChannel() {
_innerSink.close();
_innerStreamSubscription.cancel();
_innerStream = null;
_innerSink = null;
for (var controller in _sinkControllers.values.toList()) {
controller.close();
}
}
}
/// A virtual channel created by [MultiChannel].
///
/// This implements [MultiChannel] for convenience.
/// [VirtualChannel.virtualChannel] is semantically identical to the parent's
/// [MultiChannel.virtualChannel].
class VirtualChannel implements MultiChannel {
/// The [MultiChannel] that created this.
final MultiChannel _parent;
/// The identifier for this channel.
///
/// This can be sent across the [MultiChannel] to provide the remote endpoint
/// a means to connect to this channel. Nothing about this is guaranteed
/// except that it will be JSON-serializable.
final id;
final Stream stream;
final StreamSink sink;
VirtualChannel._(this._parent, this.id, this.stream, this.sink);
VirtualChannel virtualChannel([id]) => _parent.virtualChannel(id);
}
// Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
import 'dart:async';
import 'package:unittest/src/util/multi_channel.dart';
import 'package:unittest/unittest.dart';
import 'utils.dart';
void main() {
var oneToTwo;
var twoToOne;
var channel1;
var channel2;
setUp(() {
oneToTwo = new StreamController();
twoToOne = new StreamController();
channel1 = new MultiChannel(twoToOne.stream, oneToTwo.sink);
channel2 = new MultiChannel(oneToTwo.stream, twoToOne.sink);
});
group("the default virtual channel", () {
test("begins connected", () {
var first = true;
channel2.stream.listen(expectAsync((message) {
if (first) {
expect(message, equals("hello"));
first = false;
} else {
expect(message, equals("world"));
}
}, count: 2));
channel1.sink.add("hello");
channel1.sink.add("world");
});
test("closes the remote virtual channel when it closes", () {
expect(channel2.stream.toList(), completion(isEmpty));
expect(channel2.sink.done, completes);
channel1.sink.close();
});
test("closes the local virtual channel when it closes", () {
expect(channel1.stream.toList(), completion(isEmpty));
expect(channel1.sink.done, completes);
channel1.sink.close();
});
test("doesn't closes the local virtual channel when the stream "
"subscription is canceled", () {
channel1.sink.done.then(expectAsync((_) {}, count: 0));
channel1.stream.listen((_) {}).cancel();
// Ensure that there's enough time for the channel to close if it's going
// to.
return pumpEventQueue();
});
test("closes the underlying channel when it closes without any other "
"virtual channels", () {
expect(oneToTwo.done, completes);
expect(twoToOne.done, completes);
channel1.sink.close();
});
test("doesn't close the underlying channel when it closes with other "
"virtual channels", () {
oneToTwo.done.then(expectAsync((_) {}, count: 0));
twoToOne.done.then(expectAsync((_) {}, count: 0));
// Establish another virtual connection which should keep the underlying
// connection open.
channel2.virtualChannel(channel1.virtualChannel().id);
channel1.sink.close();
// Ensure that there's enough time for the underlying channel to complete
// if it's going to.
return pumpEventQueue();
});
});
group("a locally-created virtual channel", () {
var virtual1;
var virtual2;
setUp(() {
virtual1 = channel1.virtualChannel();
virtual2 = channel2.virtualChannel(virtual1.id);
});
test("sends messages only to the other virtual channel", () {
var first = true;
virtual2.stream.listen(expectAsync((message) {
if (first) {
expect(message, equals("hello"));
first = false;
} else {
expect(message, equals("world"));
}
}, count: 2));
// No other virtual channels should receive the message.
for (var i = 0; i < 10; i++) {
var virtual = channel2.virtualChannel(channel1.virtualChannel().id);
virtual.stream.listen(expectAsync((_) {}, count: 0));
}
channel2.stream.listen(expectAsync((_) {}, count: 0));
virtual1.sink.add("hello");
virtual1.sink.add("world");
});
test("closes the remote virtual channel when it closes", () {
expect(virtual2.stream.toList(), completion(isEmpty));
expect(virtual2.sink.done, completes);
virtual1.sink.close();
});
test("closes the local virtual channel when it closes", () {
expect(virtual1.stream.toList(), completion(isEmpty));
expect(virtual1.sink.done, completes);
virtual1.sink.close();
});
test("doesn't closes the local virtual channel when the stream "
"subscription is canceled", () {
virtual1.sink.done.then(expectAsync((_) {}, count: 0));
virtual1.stream.listen((_) {}).cancel();
// Ensure that there's enough time for the channel to close if it's going
// to.
return pumpEventQueue();
});
test("closes the underlying channel when it closes without any other "
"virtual channels", () {
// First close the default channel so we can test the new channel as the
// last living virtual channel.
channel1.sink.close();
return channel2.stream.toList().then((_) {
expect(oneToTwo.done, completes);
expect(twoToOne.done, completes);
virtual1.sink.close();
});
});
test("doesn't close the underlying channel when it closes with other "
"virtual channels", () {
oneToTwo.done.then(expectAsync((_) {}, count: 0));
twoToOne.done.then(expectAsync((_) {}, count: 0));
virtual1.sink.close();
// Ensure that there's enough time for the underlying channel to complete
// if it's going to.
return pumpEventQueue();
});
test("doesn't conflict with a remote virtual channel", () {
var virtual3 = channel2.virtualChannel();
var virtual4 = channel1.virtualChannel(virtual3.id);
// This is an implementation detail, but we assert it here to make sure
// we're properly testing two channels with the same id.
expect(virtual1.id, equals(virtual3.id));
virtual2.stream.listen(
expectAsync((message) => expect(message, equals("hello"))));
virtual4.stream.listen(
expectAsync((message) => expect(message, equals("goodbye"))));
virtual1.sink.add("hello");
virtual3.sink.add("goodbye");
});
});
group("a remotely-created virtual channel", () {
var virtual1;
var virtual2;
setUp(() {
virtual1 = channel1.virtualChannel();
virtual2 = channel2.virtualChannel(virtual1.id);
});
test("sends messages only to the other virtual channel", () {
var first = true;
virtual1.stream.listen(expectAsync((message) {
if (first) {
expect(message, equals("hello"));
first = false;
} else {
expect(message, equals("world"));
}
}, count: 2));
// No other virtual channels should receive the message.
for (var i = 0; i < 10; i++) {
var virtual = channel2.virtualChannel(channel1.virtualChannel().id);
virtual.stream.listen(expectAsync((_) {}, count: 0));
}
channel1.stream.listen(expectAsync((_) {}, count: 0));
virtual2.sink.add("hello");
virtual2.sink.add("world");
});
test("closes the remote virtual channel when it closes", () {
expect(virtual1.stream.toList(), completion(isEmpty));
expect(virtual1.sink.done, completes);
virtual2.sink.close();
});
test("closes the local virtual channel when it closes", () {
expect(virtual2.stream.toList(), completion(isEmpty));
expect(virtual2.sink.done, completes);
virtual2.sink.close();
});
test("doesn't closes the local virtual channel when the stream "
"subscription is canceled", () {
virtual2.sink.done.then(expectAsync((_) {}, count: 0));
virtual2.stream.listen((_) {}).cancel();
// Ensure that there's enough time for the channel to close if it's going
// to.
return pumpEventQueue();
});
test("closes the underlying channel when it closes without any other "
"virtual channels", () {
// First close the default channel so we can test the new channel as the
// last living virtual channel.
channel2.sink.close();
return channel1.stream.toList().then((_) {
expect(oneToTwo.done, completes);
expect(twoToOne.done, completes);
virtual2.sink.close();
});
});
test("doesn't close the underlying channel when it closes with other "
"virtual channels", () {
oneToTwo.done.then(expectAsync((_) {}, count: 0));
twoToOne.done.then(expectAsync((_) {}, count: 0));
virtual2.sink.close();
// Ensure that there's enough time for the underlying channel to complete
// if it's going to.
return pumpEventQueue();
});
test("doesn't allow another virtual channel with the same id", () {
expect(() => channel2.virtualChannel(virtual1.id),
throwsArgumentError);
});
});
group("when the underlying stream", () {
var virtual1;
var virtual2;
setUp(() {
virtual1 = channel1.virtualChannel();
virtual2 = channel2.virtualChannel(virtual1.id);
});
test("closes, all virtual channels close", () {
expect(channel1.stream.toList(), completion(isEmpty));
expect(channel1.sink.done, completes);
expect(channel2.stream.toList(), completion(isEmpty));
expect(channel2.sink.done, completes);
expect(virtual1.stream.toList(), completion(isEmpty));
expect(virtual1.sink.done, completes);
expect(virtual2.stream.toList(), completion(isEmpty));
expect(virtual2.sink.done, completes);
oneToTwo.close();
});
test("closes, no more virtual channels may be created", () {
expect(channel1.sink.done.then((_) => channel1.virtualChannel()),
throwsStateError);
expect(channel2.sink.done.then((_) => channel2.virtualChannel()),
throwsStateError);
oneToTwo.close();
});
test("emits an error, the error is sent only to the default channel", () {
channel1.stream.listen(expectAsync((_) {}, count: 0),
onError: expectAsync((error) => expect(error, equals("oh no"))));
virtual1.stream.listen(expectAsync((_) {}, count: 0),
onError: expectAsync((_) {}, count: 0));
twoToOne.addError("oh no");
});
});
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment