Skip to content
Snippets Groups Projects
Commit bbdac51e authored by nweiz@google.com's avatar nweiz@google.com
Browse files

Work around issue 8512 in pub.

Review URL: https://codereview.chromium.org//12225157

git-svn-id: https://dart.googlecode.com/svn/branches/bleeding_edge@18427 260f80e4-7a28-3924-810f-c04153c831b5
parent fb0e90cb
No related branches found
No related tags found
No related merge requests found
...@@ -232,19 +232,27 @@ class _ErrorGroupStream extends Stream { ...@@ -232,19 +232,27 @@ class _ErrorGroupStream extends Stream {
/// Whether [this] has any listeners. /// Whether [this] has any listeners.
bool get _hasListeners => _controller.hasSubscribers; bool get _hasListeners => _controller.hasSubscribers;
// TODO(nweiz): Remove this when issue 8512 is fixed.
/// Whether the subscription has been cancelled.
bool _cancelled = false;
/// Creates a new [_ErrorGroupFuture] that's a child of [_group] and wraps /// Creates a new [_ErrorGroupFuture] that's a child of [_group] and wraps
/// [inner]. /// [inner].
_ErrorGroupStream(this._group, Stream inner) _ErrorGroupStream(this._group, Stream inner)
: _controller = inner.isBroadcast ? : _controller = inner.isBroadcast ?
new StreamController.broadcast() : new StreamController.broadcast() :
new StreamController() { new StreamController() {
_subscription = inner.listen(_controller.add, _subscription = inner.listen((v) {
onError: (e) => _group._signalError(e), if (!_cancelled) _controller.add(v);
onDone: () { }, onError: (e) {
_isDone = true; if (!_cancelled) _group._signalError(e);
_group._signalStreamComplete(this); }, onDone: () {
_controller.close(); if (!_cancelled) {
}); _isDone = true;
_group._signalStreamComplete(this);
_controller.close();
}
});
} }
StreamSubscription listen(void onData(value), StreamSubscription listen(void onData(value),
...@@ -260,6 +268,7 @@ class _ErrorGroupStream extends Stream { ...@@ -260,6 +268,7 @@ class _ErrorGroupStream extends Stream {
/// unless it's already complete. /// unless it's already complete.
void _signalError(AsyncError e) { void _signalError(AsyncError e) {
if (_isDone) return; if (_isDone) return;
_cancelled = true;
_subscription.cancel(); _subscription.cancel();
// Call these asynchronously to work around issue 7913. // Call these asynchronously to work around issue 7913.
defer(() { defer(() {
......
...@@ -173,15 +173,25 @@ void chainToCompleter(Future future, Completer completer) { ...@@ -173,15 +173,25 @@ void chainToCompleter(Future future, Completer completer) {
/// Returns a [Future] that will complete to the first element of [stream]. /// Returns a [Future] that will complete to the first element of [stream].
/// Unlike [Stream.first], this is safe to use with single-subscription streams. /// Unlike [Stream.first], this is safe to use with single-subscription streams.
Future streamFirst(Stream stream) { Future streamFirst(Stream stream) {
// TODO(nweiz): remove this when issue 8512 is fixed.
var cancelled = false;
var completer = new Completer(); var completer = new Completer();
var subscription; var subscription;
subscription = stream.listen((value) { subscription = stream.listen((value) {
subscription.cancel(); if (!cancelled) {
completer.complete(value); cancelled = true;
}, subscription.cancel();
onError: (e) => completer.completeError(e.error, e.stackTrace), completer.complete(value);
onDone: () => completer.completeError(new StateError("No elements")), }
unsubscribeOnError: true); }, onError: (e) {
if (!cancelled) {
completer.completeError(e.error, e.stackTrace);
}
}, onDone: () {
if (!cancelled) {
completer.completeError(new StateError("No elements"));
}
}, unsubscribeOnError: true);
return completer.future; return completer.future;
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment