diff --git a/lib/src/error_group.dart b/lib/src/error_group.dart index 90edfa5179678e276142788b61472519e1f06280..16b96b19c36dd3c1ddcb70c54e7b535e1db6c865 100644 --- a/lib/src/error_group.dart +++ b/lib/src/error_group.dart @@ -232,19 +232,27 @@ class _ErrorGroupStream extends Stream { /// Whether [this] has any listeners. bool get _hasListeners => _controller.hasSubscribers; + // TODO(nweiz): Remove this when issue 8512 is fixed. + /// Whether the subscription has been cancelled. + bool _cancelled = false; + /// Creates a new [_ErrorGroupFuture] that's a child of [_group] and wraps /// [inner]. _ErrorGroupStream(this._group, Stream inner) : _controller = inner.isBroadcast ? new StreamController.broadcast() : new StreamController() { - _subscription = inner.listen(_controller.add, - onError: (e) => _group._signalError(e), - onDone: () { - _isDone = true; - _group._signalStreamComplete(this); - _controller.close(); - }); + _subscription = inner.listen((v) { + if (!_cancelled) _controller.add(v); + }, onError: (e) { + if (!_cancelled) _group._signalError(e); + }, onDone: () { + if (!_cancelled) { + _isDone = true; + _group._signalStreamComplete(this); + _controller.close(); + } + }); } StreamSubscription listen(void onData(value), @@ -260,6 +268,7 @@ class _ErrorGroupStream extends Stream { /// unless it's already complete. void _signalError(AsyncError e) { if (_isDone) return; + _cancelled = true; _subscription.cancel(); // Call these asynchronously to work around issue 7913. defer(() { diff --git a/lib/src/utils.dart b/lib/src/utils.dart index 729f55a0e0235380dad5be8198df6cbcc95ceb09..601290789e5dbbe1bdbf5cc3c257c307f5f54918 100644 --- a/lib/src/utils.dart +++ b/lib/src/utils.dart @@ -173,15 +173,25 @@ void chainToCompleter(Future future, Completer completer) { /// Returns a [Future] that will complete to the first element of [stream]. /// Unlike [Stream.first], this is safe to use with single-subscription streams. Future streamFirst(Stream stream) { + // TODO(nweiz): remove this when issue 8512 is fixed. + var cancelled = false; var completer = new Completer(); var subscription; subscription = stream.listen((value) { - subscription.cancel(); - completer.complete(value); - }, - onError: (e) => completer.completeError(e.error, e.stackTrace), - onDone: () => completer.completeError(new StateError("No elements")), - unsubscribeOnError: true); + if (!cancelled) { + cancelled = true; + subscription.cancel(); + completer.complete(value); + } + }, onError: (e) { + if (!cancelled) { + completer.completeError(e.error, e.stackTrace); + } + }, onDone: () { + if (!cancelled) { + completer.completeError(new StateError("No elements")); + } + }, unsubscribeOnError: true); return completer.future; }