Newer
Older
// Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
import 'stream_channel.dart';
/// A class that multiplexes multiple virtual channels across a single
/// underlying transport layer.
///
/// This should be connected to another [MultiChannel] on the other end of the
/// underlying channel. It starts with a single default virtual channel,
/// accessible via [stream] and [sink]. Additional virtual channels can be
/// created with [virtualChannel].
///
/// When a virtual channel is created by one endpoint, the other must connect to
/// it before messages may be sent through it. The first endpoint passes its
/// [VirtualChannel.id] to the second, which then creates a channel from that id
/// also using [virtualChannel]. For example:
///
/// ```dart
/// // First endpoint
/// var virtual = multiChannel.virtualChannel();
/// multiChannel.sink.add({
/// "channel": virtual.id
/// });
///
/// // Second endpoint
/// multiChannel.stream.listen((message) {
/// var virtual = multiChannel.virtualChannel(message["channel"]);
/// // ...
/// });
/// ```
///
/// Sending errors across a [MultiChannel] is not supported. Any errors from the
/// underlying stream will be reported only via the default
/// [MultiChannel.stream].
///
/// Each virtual channel may be closed individually. When all of them are
/// closed, the underlying [StreamSink] is closed automatically.
abstract class MultiChannel implements StreamChannel {
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
/// The default input stream.
///
/// This connects to the remote [sink].
Stream get stream;
/// The default output stream.
///
/// This connects to the remote [stream]. If this is closed, the remote
/// [stream] will close, but other virtual channels will remain open and new
/// virtual channels may be opened.
StreamSink get sink;
/// Creates a new [MultiChannel] that sends messages over [innerStream] and
/// [innerSink].
///
/// The inner streams must take JSON-like objects.
factory MultiChannel(Stream innerStream, StreamSink innerSink) =>
new _MultiChannel(innerStream, innerSink);
/// Creates a new virtual channel.
///
/// If [id] is not passed, this creates a virtual channel from scratch. Before
/// it's used, its [VirtualChannel.id] must be sent to the remote endpoint
/// where [virtualChannel] should be called with that id.
///
/// If [id] is passed, this creates a virtual channel corresponding to the
/// channel with that id on the remote channel.
///
/// Throws an [ArgumentError] if a virtual channel already exists for [id].
/// Throws a [StateError] if the underlying channel is closed.
VirtualChannel virtualChannel([id]);
}
/// The implementation of [MultiChannel].
///
/// This is private so that [VirtualChannel] can inherit from [MultiChannel]
/// without having to implement all the private members.
class _MultiChannel extends StreamChannelMixin implements MultiChannel {
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
/// The inner stream over which all communication is received.
///
/// This will be `null` if the underlying communication channel is closed.
Stream _innerStream;
/// The inner sink over which all communication is sent.
///
/// This will be `null` if the underlying communication channel is closed.
StreamSink _innerSink;
/// The subscription to [_innerStream].
StreamSubscription _innerStreamSubscription;
Stream get stream => _streamController.stream;
final _streamController = new StreamController(sync: true);
StreamSink get sink => _sinkController.sink;
final _sinkController = new StreamController(sync: true);
/// A map from virtual channel ids to [StreamController]s that should be used
/// to write messages received from those channels.
final _streamControllers = new Map<int, StreamController>();
/// A map from virtual channel ids to [StreamControllers]s that are used
/// to receive messages to write to those channels.
///
/// Note that this uses the same keys as [_streamControllers].
final _sinkControllers = new Map<int, StreamController>();
/// The next id to use for a local virtual channel.
///
/// Ids are used to identify virtual channels. Each message is tagged with an
/// id; the receiving [MultiChannel] uses this id to look up which
/// [VirtualChannel] the message should be dispatched to.
///
/// The id scheme for virtual channels is somewhat complicated. This is
/// necessary to ensure that there are no conflicts even when both endpoints
/// have virtual channels with the same id; since both endpoints can send and
/// receive messages across each virtual channel, a naïve scheme would make it
/// impossible to tell whether a message was from a channel that originated in
/// the remote endpoint or a reply on a channel that originated in the local
/// endpoint.
///
/// The trick is that each endpoint only uses odd ids for its own channels.
/// When sending a message over a channel that was created by the remote
/// endpoint, the channel's id plus one is used. This way each [MultiChannel]
/// knows that if an incoming message has an odd id, it's using the local id
/// scheme, but if it has an even id, it's using the remote id scheme.
var _nextId = 1;
_MultiChannel(this._innerStream, this._innerSink) {
// The default connection is a special case which has id 0 on both ends.
// This allows it to begin connected without having to send over an id.
_streamControllers[0] = _streamController;
_sinkControllers[0] = _sinkController;
_sinkController.stream.listen(
(message) => _innerSink.add([0, message]),
onDone: () => _closeChannel(0, 0));
_innerStreamSubscription = _innerStream.listen((message) {
var id = message[0];
var sink = _streamControllers[id];
// A sink might not exist if the channel was closed before an incoming
// message was processed.
if (sink == null) return;
if (message.length > 1) {
sink.add(message[1]);
return;
}
// A message without data indicates that the channel has been closed.
_sinkControllers[id].close();
}, onDone: _closeInnerChannel,
onError: _streamController.addError);
}
VirtualChannel virtualChannel([id]) {
if (_innerStream == null) {
throw new StateError("The underlying channel is closed.");
}
var inputId;
var outputId;
if (id != null) {
// Since the user is passing in an id, we're connected to a remote
// VirtualChannel. This means messages they send over this channel will
// have the original odd id, but our replies will have an even id.
inputId = id;
outputId = (id as int) + 1;
} else {
// Since we're generating an id, we originated this VirtualChannel. This
// means messages we send over this channel will have the original odd id,
// but the remote channel's replies will have an even id.
inputId = _nextId + 1;
outputId = _nextId;
_nextId += 2;
}
if (_streamControllers.containsKey(inputId)) {
throw new ArgumentError("A virtual channel with id $id already exists.");
}
var streamController = new StreamController(sync: true);
var sinkController = new StreamController(sync: true);
_streamControllers[inputId] = streamController;
_sinkControllers[inputId] = sinkController;
sinkController.stream.listen(
(message) => _innerSink.add([outputId, message]),
onDone: () => _closeChannel(inputId, outputId));
return new VirtualChannel._(
this, outputId, streamController.stream, sinkController.sink);
}
/// Closes the virtual channel for which incoming messages have [inputId] and
/// outgoing messages have [outputId].
void _closeChannel(int inputId, int outputId) {
// A message without data indicates that the virtual channel has been
// closed.
_streamControllers.remove(inputId).close();
_sinkControllers.remove(inputId).close();
if (_innerSink == null) return;
_innerSink.add([outputId]);
if (_streamControllers.isEmpty) _closeInnerChannel();
}
/// Closes the underlying communication channel.
void _closeInnerChannel() {
_innerSink.close();
_innerStreamSubscription.cancel();
_innerStream = null;
_innerSink = null;
for (var controller in _sinkControllers.values.toList()) {
controller.close();
}
}
}
/// A virtual channel created by [MultiChannel].
///
/// This implements [MultiChannel] for convenience.
/// [VirtualChannel.virtualChannel] is semantically identical to the parent's
/// [MultiChannel.virtualChannel].
class VirtualChannel extends StreamChannelMixin implements MultiChannel {
/// The [MultiChannel] that created this.
final MultiChannel _parent;
/// The identifier for this channel.
///
/// This can be sent across the [MultiChannel] to provide the remote endpoint
/// a means to connect to this channel. Nothing about this is guaranteed
/// except that it will be JSON-serializable.
final id;
final Stream stream;
final StreamSink sink;
VirtualChannel._(this._parent, this.id, this.stream, this.sink);
VirtualChannel virtualChannel([id]) => _parent.virtualChannel(id);
}