From c9d8486538b3658facb66ab7c25e15f9a4408dd1 Mon Sep 17 00:00:00 2001 From: "nweiz@google.com" <nweiz@google.com> Date: Wed, 30 Jan 2013 19:26:03 +0000 Subject: [PATCH] Roll back Pub stream changes. These changes are still breaking the Windows bots, so I'm rolling them back until we figure out what's going on. TBR Review URL: https://codereview.chromium.org//12095050 git-svn-id: https://dart.googlecode.com/svn/branches/bleeding_edge@17867 260f80e4-7a28-3924-810f-c04153c831b5 --- lib/src/command_lish.dart | 2 +- lib/src/curl_client.dart | 45 ++-- lib/src/error_group.dart | 4 +- lib/src/hosted_source.dart | 2 +- lib/src/io.dart | 419 +++++++++++++++++++----------------- lib/src/log.dart | 25 +-- lib/src/utils.dart | 77 ------- test/curl_client_test.dart | 2 +- test/oauth2_test.dart | 4 +- test/pub_uploader_test.dart | 2 +- test/test_pub.dart | 174 ++++++--------- 11 files changed, 340 insertions(+), 416 deletions(-) diff --git a/lib/src/command_lish.dart b/lib/src/command_lish.dart index 8b83d66d..59bfbbf4 100644 --- a/lib/src/command_lish.dart +++ b/lib/src/command_lish.dart @@ -91,7 +91,7 @@ class LishCommand extends PubCommand { files = f; log.fine('Archiving and publishing ${entrypoint.root}.'); return createTarGz(files, baseDir: entrypoint.root.dir); - }).then((stream) => stream.toBytes()).then((packageBytes) { + }).then(consumeInputStream).then((packageBytes) { // Show the package contents so the user can verify they look OK. var package = entrypoint.root; log.message( diff --git a/lib/src/curl_client.dart b/lib/src/curl_client.dart index a1cc1ed3..7001a17c 100644 --- a/lib/src/curl_client.dart +++ b/lib/src/curl_client.dart @@ -41,10 +41,9 @@ class CurlClient extends http.BaseClient { var process; return startProcess(executable, arguments).then((process_) { process = process_; - return Future.wait([ - store(requestStream, process.stdin), - _waitForHeaders(process, expectBody: request.method != "HEAD") - ]); + return requestStream.pipe(wrapOutputStream(process.stdin)); + }).then((_) { + return _waitForHeaders(process, expectBody: request.method != "HEAD"); }).then((_) => new File(headerFile).readAsLines()) .then((lines) => _buildResponse(request, process, lines)); }); @@ -112,33 +111,44 @@ class CurlClient extends http.BaseClient { /// in stdout. However, that seems to be too early to successfully read the /// file (at least on Mac). Instead, this just waits until the entire process /// has completed. - Future _waitForHeaders(PubProcess process, {bool expectBody}) { - var future = process.exitCode.then((exitCode) { + Future _waitForHeaders(Process process, {bool expectBody}) { + var completer = new Completer(); + process.onExit = (exitCode) { log.io("Curl process exited with code $exitCode."); - if (exitCode == 0) return; - process.stderr.bytesToString().then((message) { + if (exitCode == 0) { + completer.complete(null); + return; + } + + chainToCompleter(consumeInputStream(process.stderr).then((stderrBytes) { + var message = new String.fromCharCodes(stderrBytes); log.fine('Got error reading headers from curl: $message'); if (exitCode == 47) { throw new RedirectLimitExceededException([]); } else { throw new HttpException(message); } - }); - }); - - if (expectBody) return future; + }), completer); + }; // If there's not going to be a response body (e.g. for HEAD requests), curl // prints the headers to stdout instead of the body. We want to wait until // all the headers are received to read them from the header file. - return Future.wait([process.stdout.toBytes(), future]); + if (!expectBody) { + return Future.wait([ + consumeInputStream(process.stdout), + completer.future + ]); + } + + return completer.future; } /// Returns a [http.StreamedResponse] from the response data printed by the /// `curl` [process]. [lines] are the headers that `curl` wrote to a file. http.StreamedResponse _buildResponse( - http.BaseRequest request, PubProcess process, List<String> lines) { + http.BaseRequest request, Process process, List<String> lines) { // When curl follows redirects, it prints the redirect headers as well as // the headers of the final request. Each block is separated by a blank // line. We just care about the last block. There is one trailing empty @@ -157,13 +167,18 @@ class CurlClient extends http.BaseClient { var split = split1(line, ":"); headers[split[0].toLowerCase()] = split[1].trim(); } + var responseStream = process.stdout; + if (responseStream.closed) { + responseStream = new ListInputStream(); + responseStream.markEndOfStream(); + } var contentLength = -1; if (headers.containsKey('content-length')) { contentLength = int.parse(headers['content-length']); } return new http.StreamedResponse( - process.stdout, status, contentLength, + wrapInputStream(responseStream), status, contentLength, request: request, headers: headers, isRedirect: isRedirect, diff --git a/lib/src/error_group.dart b/lib/src/error_group.dart index 0bdf5e55..b78ceacb 100644 --- a/lib/src/error_group.dart +++ b/lib/src/error_group.dart @@ -179,7 +179,7 @@ class _ErrorGroupFuture implements Future { _completer.future.catchError((_) {}); } - Future then(onValue(value), {onError(AsyncError asyncError)}) { + Future then(onValue(T value), {onError(AsyncError asyncError)}) { _hasListeners = true; return _completer.future.then(onValue, onError: onError); } @@ -194,7 +194,7 @@ class _ErrorGroupFuture implements Future { return _completer.future.whenComplete(action); } - Stream asStream() { + Stream<T> asStream() { _hasListeners = true; return _completer.future.asStream(); } diff --git a/lib/src/hosted_source.dart b/lib/src/hosted_source.dart index 27901362..81a80574 100644 --- a/lib/src/hosted_source.dart +++ b/lib/src/hosted_source.dart @@ -77,7 +77,7 @@ class HostedSource extends Source { .then((response) => response.stream), systemCache.createTempDir() ]).then((args) { - var stream = args[0]; + var stream = streamToInputStream(args[0]); tempDir = args[1]; return timeout(extractTarGz(stream, tempDir), HTTP_TIMEOUT, 'fetching URL "$fullUrl"'); diff --git a/lib/src/io.dart b/lib/src/io.dart index 60876c8f..79f0a065 100644 --- a/lib/src/io.dart +++ b/lib/src/io.dart @@ -12,14 +12,9 @@ import 'dart:json'; import 'dart:uri'; import '../../pkg/path/lib/path.dart' as path; -import '../../pkg/http/lib/http.dart' show ByteStream; -import 'error_group.dart'; -import 'exit_codes.dart' as exit_codes; import 'log.dart' as log; import 'utils.dart'; -export '../../pkg/http/lib/http.dart' show ByteStream; - final NEWLINE_PATTERN = new RegExp("\r\n?|\n\r?"); /// Joins a number of path string parts into a single path. Handles @@ -125,15 +120,44 @@ Future<File> deleteFile(file) { /// Writes [stream] to a new file at [path], which may be a [String] or a /// [File]. Will replace any file already at that path. Completes when the file /// is done being written. -Future<File> createFileFromStream(Stream<List<int>> stream, path) { +Future<File> createFileFromStream(InputStream stream, path) { path = _getPath(path); log.io("Creating $path from stream."); + var completer = new Completer<File>(); + var completed = false; var file = new File(path); - return stream.pipe(wrapOutputStream(file.openOutputStream())).then((_) { + var outputStream = file.openOutputStream(); + stream.pipe(outputStream); + + outputStream.onClosed = () { log.fine("Created $path from stream."); - }); + completed = true; + completer.complete(file); + }; + + // TODO(nweiz): remove this when issue 4061 is fixed. + var stackTrace; + try { + throw ""; + } catch (_, localStackTrace) { + stackTrace = localStackTrace; + } + + completeError(error) { + if (!completed) { + completed = true; + completer.completeError(error, stackTrace); + } else { + log.fine("Got error after stream was closed: $error"); + } + } + + stream.onError = completeError; + outputStream.onError = completeError; + + return completer.future; } /// Creates a directory [dir]. Returns a [Future] that completes when the @@ -409,33 +433,8 @@ String relativeToPub(String target) { return path.normalize(join(utilDir, 'pub', target)); } -// TODO(nweiz): add a ByteSink wrapper to make writing strings to stdout/stderr -// nicer. - -/// A sink that writes to standard output. Errors piped to this stream will be -/// surfaced to the top-level error handler. -final StreamSink<List<int>> stdoutSink = _wrapStdio(stdout, "stdout"); - -/// A sink that writes to standard error. Errors piped to this stream will be -/// surfaced to the top-level error handler. -final StreamSink<List<int>> stderrSink = _wrapStdio(stderr, "stderr"); - -/// Wrap the standard output or error [stream] in a [StreamSink]. Any errors are -/// logged, and then the program is terminated. [name] is used for debugging. -StreamSink<List<int>> _wrapStdio(OutputStream stream, String name) { - var pair = consumerToSink(wrapOutputStream(stream)); - pair.last.catchError((e) { - // This log may or may not work, depending on how the stream failed. Not - // much we can do about that. - log.error("Error writing to $name: $e"); - exit(exit_codes.IO); - }); - return pair.first; -} - -/// A line-by-line stream of standard input. -final Stream<String> stdinLines = - streamToLines(wrapInputStream(stdin).toStringStream()); +/// A StringInputStream reading from stdin. +final _stringStdin = new StringInputStream(stdin); /// Displays a message and reads a yes/no confirmation from the user. Returns /// a [Future] that completes to `true` if the user confirms or `false` if they @@ -445,28 +444,143 @@ final Stream<String> stdinLines = /// should just be a fragment like, "Are you sure you want to proceed". Future<bool> confirm(String message) { log.fine('Showing confirm message: $message'); - stdoutSink.add("$message (y/n)? ".charCodes); - return streamFirst(stdinLines) - .then((line) => new RegExp(r"^[yY]").hasMatch(line)); + stdout.writeString("$message (y/n)? "); + return readLine().then((line) => new RegExp(r"^[yY]").hasMatch(line)); +} + +/// Returns a single line read from a [StringInputStream]. By default, reads +/// from stdin. +/// +/// A [StringInputStream] passed to this should have no callbacks registered. +Future<String> readLine([StringInputStream stream]) { + if (stream == null) stream = _stringStdin; + if (stream.closed) return new Future.immediate(''); + void removeCallbacks() { + stream.onClosed = null; + stream.onLine = null; + stream.onError = null; + } + + // TODO(nweiz): remove this when issue 4061 is fixed. + var stackTrace; + try { + throw ""; + } catch (_, localStackTrace) { + stackTrace = localStackTrace; + } + + var completer = new Completer(); + stream.onClosed = () { + removeCallbacks(); + completer.complete(''); + }; + + stream.onLine = () { + removeCallbacks(); + var line = stream.readLine(); + log.io('Read line: $line'); + completer.complete(line); + }; + + stream.onError = (e) { + removeCallbacks(); + completer.completeError(e, stackTrace); + }; + + return completer.future; +} + +/// Takes all input from [source] and writes it to [sink]. +/// +/// Returns a future that completes when [source] is closed. +Future pipeInputToInput(InputStream source, ListInputStream sink) { + var completer = new Completer(); + source.onClosed = () { + sink.markEndOfStream(); + completer.complete(null); + }; + source.onData = () { + // Even if the sink is closed and we aren't going to do anything with more + // data, we still need to drain it from source to work around issue 7218. + var data = source.read(); + try { + if (!sink.closed) sink.write(data); + } on StreamException catch (e, stackTrace) { + // Ignore an exception to work around issue 4222. + log.io("Writing to an unclosed ListInputStream caused exception $e\n" + "$stackTrace"); + } + }; + // TODO(nweiz): propagate this error to the sink. See issue 3657. + source.onError = (e) { throw e; }; + return completer.future; +} + +/// Buffers all input from an InputStream and returns it as a future. +Future<List<int>> consumeInputStream(InputStream stream) { + if (stream.closed) return new Future.immediate(<int>[]); + + // TODO(nweiz): remove this when issue 4061 is fixed. + var stackTrace; + try { + throw ""; + } catch (_, localStackTrace) { + stackTrace = localStackTrace; + } + + var completer = new Completer<List<int>>(); + var buffer = <int>[]; + stream.onClosed = () => completer.complete(buffer); + stream.onData = () => buffer.addAll(stream.read()); + stream.onError = (e) => completer.completeError(e, stackTrace); + return completer.future; +} + +/// Buffers all input from a StringInputStream and returns it as a future. +Future<String> consumeStringInputStream(StringInputStream stream) { + if (stream.closed) return new Future.immediate(''); + + // TODO(nweiz): remove this when issue 4061 is fixed. + var stackTrace; + try { + throw ""; + } catch (_, localStackTrace) { + stackTrace = localStackTrace; + } + + var completer = new Completer<String>(); + var buffer = new StringBuffer(); + stream.onClosed = () => completer.complete(buffer.toString()); + stream.onData = () => buffer.add(stream.read()); + stream.onError = (e) => completer.completeError(e, stackTrace); + return completer.future; } /// Wraps [stream] in a single-subscription [Stream] that emits the same data. -ByteStream wrapInputStream(InputStream stream) { +Stream<List<int>> wrapInputStream(InputStream stream) { var controller = new StreamController(); if (stream.closed) { controller.close(); - return new ByteStream(controller.stream); + return controller.stream; } stream.onClosed = controller.close; stream.onData = () => controller.add(stream.read()); stream.onError = (e) => controller.signalError(new AsyncError(e)); - return new ByteStream(controller.stream); + return controller.stream; +} + +// TODO(nweiz): remove this ASAP (issue 7807). +/// Wraps [stream] in an [InputStream]. +InputStream streamToInputStream(Stream<List<int>> stream) { + var inputStream = new ListInputStream(); + stream.listen((chunk) => inputStream.write(chunk), + onDone: inputStream.markEndOfStream); + return inputStream; } /// Wraps [stream] in a [StreamConsumer] so that [Stream]s can by piped into it -/// using [Stream.pipe]. Errors piped to the returned [StreamConsumer] will be -/// forwarded to the [Future] returned by [Stream.pipe]. +/// using [Stream.pipe]. StreamConsumer<List<int>, dynamic> wrapOutputStream(OutputStream stream) => new _OutputStreamConsumer(stream); @@ -494,9 +608,6 @@ class _OutputStreamConsumer implements StreamConsumer<List<int>, dynamic> { if (!completed) completer.completeError(e, stack); completed = true; } - }, onError: (e) { - if (!completed) completer.completeError(e.error, e.stackTrace); - completed = true; }, onDone: () => _outputStream.close()); _outputStream.onError = (e) { @@ -513,43 +624,6 @@ class _OutputStreamConsumer implements StreamConsumer<List<int>, dynamic> { } } -/// Returns a [StreamSink] that pipes all data to [consumer] and a [Future] that -/// will succeed when [StreamSink] is closed or fail with any errors that occur -/// while writing. -Pair<StreamSink, Future> consumerToSink(StreamConsumer consumer) { - var controller = new StreamController(); - var done = controller.stream.pipe(consumer); - return new Pair<StreamSink, Future>(controller.sink, done); -} - -// TODO(nweiz): remove this when issue 7786 is fixed. -/// Pipes all data and errors from [stream] into [sink]. When [stream] is done, -/// the returned [Future] is completed and [sink] is closed if [closeSink] is -/// true. -/// -/// When an error occurs on [stream], that error is passed to [sink]. If -/// [unsubscribeOnError] is true, [Future] will be completed successfully and no -/// more data or errors will be piped from [stream] to [sink]. If -/// [unsubscribeOnError] and [closeSink] are both true, [sink] will then be -/// closed. -Future store(Stream stream, StreamSink sink, - {bool unsubscribeOnError: true, closeSink: true}) { - var completer = new Completer(); - stream.listen(sink.add, - onError: (e) { - sink.signalError(e); - if (unsubscribeOnError) { - completer.complete(); - if (closeSink) sink.close(); - } - }, - onDone: () { - if (closeSink) sink.close(); - completer.complete(); - }, unsubscribeOnError: unsubscribeOnError); - return completer.future; -} - /// Spawns and runs the process located at [executable], passing in [args]. /// Returns a [Future] that will complete with the results of the process after /// it has ended. @@ -583,92 +657,41 @@ Future<PubProcessResult> runProcess(String executable, List<String> args, /// The spawned process will inherit its parent's environment variables. If /// [environment] is provided, that will be used to augment (not replace) the /// the inherited variables. -Future<PubProcess> startProcess(String executable, List<String> args, +Future<Process> startProcess(String executable, List<String> args, {workingDir, Map<String, String> environment}) => _doProcess(Process.start, executable, args, workingDir, environment) - .then((process) => new PubProcess(process)); + .then((process) => new _WrappedProcess(process)); -/// A wrapper around [Process] that exposes `dart:async`-style APIs. -class PubProcess { - /// The underlying `dart:io` [Process]. +/// A wrapper around [Process] that buffers the stdout and stderr to avoid +/// running into issue 7218. +class _WrappedProcess implements Process { final Process _process; + final InputStream stderr; + final InputStream stdout; - /// The mutable field for [stdin]. - StreamSink<List<int>> _stdin; - - /// The mutable field for [stdinClosed]. - Future _stdinClosed; - - /// The mutable field for [stdout]. - ByteStream _stdout; - - /// The mutable field for [stderr]. - ByteStream _stderr; - - /// The mutable field for [exitCode]. - Future<int> _exitCode; - - /// The sink used for passing data to the process's standard input stream. - /// Errors on this stream are surfaced through [stdinClosed], [stdout], - /// [stderr], and [exitCode], which are all members of an [ErrorGroup]. - StreamSink<List<int>> get stdin => _stdin; - - // TODO(nweiz): write some more sophisticated Future machinery so that this - // doesn't surface errors from the other streams/futures, but still passes its - // unhandled errors to them. Right now it's impossible to recover from a stdin - // error and continue interacting with the process. - /// A [Future] that completes when [stdin] is closed, either by the user or by - /// the process itself. - /// - /// This is in an [ErrorGroup] with [stdout], [stderr], and [exitCode], so any - /// error in process will be passed to it, but won't reach the top-level error - /// handler unless nothing has handled it. - Future get stdinClosed => _stdinClosed; - - /// The process's standard output stream. - /// - /// This is in an [ErrorGroup] with [stdinClosed], [stderr], and [exitCode], - /// so any error in process will be passed to it, but won't reach the - /// top-level error handler unless nothing has handled it. - ByteStream get stdout => _stdout; - - /// The process's standard error stream. - /// - /// This is in an [ErrorGroup] with [stdinClosed], [stdout], and [exitCode], - /// so any error in process will be passed to it, but won't reach the - /// top-level error handler unless nothing has handled it. - ByteStream get stderr => _stderr; - - /// A [Future] that will complete to the process's exit code once the process - /// has finished running. - /// - /// This is in an [ErrorGroup] with [stdinClosed], [stdout], and [stderr], so - /// any error in process will be passed to it, but won't reach the top-level - /// error handler unless nothing has handled it. - Future<int> get exitCode => _exitCode; - - /// Creates a new [PubProcess] wrapping [process]. - PubProcess(Process process) - : _process = process { - var errorGroup = new ErrorGroup(); - - var pair = consumerToSink(wrapOutputStream(process.stdin)); - _stdin = pair.first; - _stdinClosed = errorGroup.registerFuture(pair.last); - - _stdout = new ByteStream( - errorGroup.registerStream(wrapInputStream(process.stdout))); - _stderr = new ByteStream( - errorGroup.registerStream(wrapInputStream(process.stderr))); - - var exitCodeCompleter = new Completer(); - _exitCode = errorGroup.registerFuture(exitCodeCompleter.future); - _process.onExit = (code) => exitCodeCompleter.complete(code); + OutputStream get stdin => _process.stdin; + + void set onExit(void callback(int exitCode)) { + _process.onExit = callback; } - /// Sends [signal] to the underlying process. + _WrappedProcess(Process process) + : _process = process, + stderr = _wrapInputStream(process.stderr), + stdout = _wrapInputStream(process.stdout); + bool kill([ProcessSignal signal = ProcessSignal.SIGTERM]) => _process.kill(signal); + + /// Wrap an InputStream in a ListInputStream. This eagerly drains the [source] + /// input stream. This is useful for spawned processes which will not exit + /// until their output streams have been drained. TODO(rnystrom): We should + /// use this logic anywhere we spawn a process. + static InputStream _wrapInputStream(InputStream source) { + var sink = new ListInputStream(); + pipeInputToInput(source, sink); + return sink; + } } /// Calls [fn] with appropriately modified arguments. [fn] should have the same @@ -745,7 +768,7 @@ Future withTempDir(Future fn(String path)) { /// Extracts a `.tar.gz` file from [stream] to [destination], which can be a /// directory or a path. Returns whether or not the extraction was successful. -Future<bool> extractTarGz(Stream<List<int>> stream, destination) { +Future<bool> extractTarGz(InputStream stream, destination) { destination = _getPath(destination); log.fine("Extracting .tar.gz stream to $destination."); @@ -754,29 +777,27 @@ Future<bool> extractTarGz(Stream<List<int>> stream, destination) { return _extractTarGzWindows(stream, destination); } - return startProcess("tar", - ["--extract", "--gunzip", "--directory", destination]).then((process) { - // Ignore errors on process.std{out,err}. They'll be passed to - // process.exitCode, and we don't want them being top-levelled by - // std{out,err}Sink. - store(process.stdout.handleError((_) {}), stdoutSink, closeSink: false); - store(process.stderr.handleError((_) {}), stderrSink, closeSink: false); - return Future.wait([ - store(stream, process.stdin), - process.exitCode - ]); - }).then((results) { - var exitCode = results[1]; - if (exitCode != 0) { - throw "Failed to extract .tar.gz stream to $destination (exit code " - "$exitCode)."; - } + var completer = new Completer<int>(); + var processFuture = startProcess("tar", + ["--extract", "--gunzip", "--directory", destination]); + processFuture.then((process) { + process.onExit = (exitCode) => completer.complete(exitCode); + stream.pipe(process.stdin); + process.stdout.pipe(stdout, close: false); + process.stderr.pipe(stderr, close: false); + }).catchError((e) { + completer.completeError(e.error, e.stackTrace); + }); + + return completer.future.then((exitCode) { log.fine("Extracted .tar.gz stream to $destination. Exit code $exitCode."); + // TODO(rnystrom): Does anything check this result value? If not, it should + // throw on a bad exit code. + return exitCode == 0; }); } -Future<bool> _extractTarGzWindows(Stream<List<int>> stream, - String destination) { +Future<bool> _extractTarGzWindows(InputStream stream, String destination) { // TODO(rnystrom): In the repo's history, there is an older implementation of // this that does everything in memory by piping streams directly together // instead of writing out temp files. The code is simpler, but unfortunately, @@ -839,8 +860,8 @@ Future<bool> _extractTarGzWindows(Stream<List<int>> stream, /// Create a .tar.gz archive from a list of entries. Each entry can be a /// [String], [Directory], or [File] object. The root of the archive is /// considered to be [baseDir], which defaults to the current working directory. -/// Returns a [ByteStream] that will emit the contents of the archive. -ByteStream createTarGz(List contents, {baseDir}) { +/// Returns an [InputStream] that will emit the contents of the archive. +InputStream createTarGz(List contents, {baseDir}) { var buffer = new StringBuffer(); buffer.add('Creating .tag.gz stream containing:\n'); contents.forEach((file) => buffer.add('$file\n')); @@ -848,7 +869,7 @@ ByteStream createTarGz(List contents, {baseDir}) { // TODO(nweiz): Propagate errors to the returned stream (including non-zero // exit codes). See issue 3657. - var controller = new StreamController<List<int>>(); + var stream = new ListInputStream(); if (baseDir == null) baseDir = path.current; baseDir = getFullPath(baseDir); @@ -867,14 +888,15 @@ ByteStream createTarGz(List contents, {baseDir}) { // the process choke, so at some point we should save the arguments to a // file and pass them in via --files-from for tar and -i@filename for 7zip. startProcess("tar", args).then((process) { - store(process.stdout, controller); - }).catchError((e) { - // We don't have to worry about double-signaling here, since the store() - // above will only be reached if startProcess succeeds. - controller.signalError(e.error, e.stackTrace); - controller.close(); + pipeInputToInput(process.stdout, stream); + + // Drain and discard 7zip's stderr. 7zip writes its normal output to + // stderr. We don't want to show that since it's meaningless. + // TODO(rnystrom): Should log this and display it if an actual error + // occurs. + consumeInputStream(process.stderr); }); - return new ByteStream(controller.stream); + return stream; } withTempDir((tempDir) { @@ -898,20 +920,15 @@ ByteStream createTarGz(List contents, {baseDir}) { args = ["a", "unused", "-tgzip", "-so", tarFile]; return startProcess(command, args); }).then((process) { - // Ignore 7zip's stderr. 7zip writes its normal output to stderr. We don't - // want to show that since it's meaningless. - // - // TODO(rnystrom): Should log the stderr and display it if an actual error + // Drain and discard 7zip's stderr. 7zip writes its normal output to + // stderr. We don't want to show that since it's meaningless. + // TODO(rnystrom): Should log this and display it if an actual error // occurs. - store(process.stdout, controller); + consumeInputStream(process.stderr); + return pipeInputToInput(process.stdout, stream); }); - }).catchError((e) { - // We don't have to worry about double-signaling here, since the store() - // above will only be reached if everything succeeds. - controller.signalError(e.error, e.stackTrace); - controller.close(); }); - return new ByteStream(controller.stream); + return stream; } /// Exception thrown when an operation times out. diff --git a/lib/src/log.dart b/lib/src/log.dart index 43499ef2..ee1ad1e2 100644 --- a/lib/src/log.dart +++ b/lib/src/log.dart @@ -6,6 +6,7 @@ library log; import 'dart:async'; +import 'dart:io'; import 'io.dart'; typedef LogFn(Entry entry); @@ -153,11 +154,11 @@ void recordTranscript() { void dumpTranscript() { if (_transcript == null) return; - stderrSink.add('---- Log transcript ----\n'.charCodes); + stderr.writeString('---- Log transcript ----\n'); for (var entry in _transcript) { _logToStderrWithLabel(entry); } - stderrSink.add('---- End log transcript ----\n'.charCodes); + stderr.writeString('---- End log transcript ----\n'); } /// Sets the verbosity to "normal", which shows errors, warnings, and messages. @@ -190,38 +191,38 @@ void showAll() { /// Log function that prints the message to stdout. void _logToStdout(Entry entry) { - _logToStream(stdoutSink, entry, showLabel: false); + _logToStream(stdout, entry, showLabel: false); } /// Log function that prints the message to stdout with the level name. void _logToStdoutWithLabel(Entry entry) { - _logToStream(stdoutSink, entry, showLabel: true); + _logToStream(stdout, entry, showLabel: true); } /// Log function that prints the message to stderr. void _logToStderr(Entry entry) { - _logToStream(stderrSink, entry, showLabel: false); + _logToStream(stderr, entry, showLabel: false); } /// Log function that prints the message to stderr with the level name. void _logToStderrWithLabel(Entry entry) { - _logToStream(stderrSink, entry, showLabel: true); + _logToStream(stderr, entry, showLabel: true); } -void _logToStream(Sink<List<int>> sink, Entry entry, {bool showLabel}) { +void _logToStream(OutputStream stream, Entry entry, {bool showLabel}) { bool firstLine = true; for (var line in entry.lines) { if (showLabel) { if (firstLine) { - sink.add(entry.level.name.charCodes); - sink.add(': '.charCodes); + stream.writeString(entry.level.name); + stream.writeString(': '); } else { - sink.add(' | '.charCodes); + stream.writeString(' | '); } } - sink.add(line.charCodes); - sink.add('\n'.charCodes); + stream.writeString(line); + stream.writeString('\n'); firstLine = false; } diff --git a/lib/src/utils.dart b/lib/src/utils.dart index c807ea82..7710ac19 100644 --- a/lib/src/utils.dart +++ b/lib/src/utils.dart @@ -118,83 +118,6 @@ void chainToCompleter(Future future, Completer completer) { onError: (e) => completer.completeError(e.error, e.stackTrace)); } -// TODO(nweiz): remove this when issue 7964 is fixed. -/// Returns a [Future] that will complete to the first element of [stream]. -/// Unlike [Stream.first], this is safe to use with single-subscription streams. -Future streamFirst(Stream stream) { - var completer = new Completer(); - var subscription; - subscription = stream.listen((value) { - subscription.cancel(); - completer.complete(value); - }, - onError: (e) => completer.completeError(e.error, e.stackTrace), - onDone: () => completer.completeError(new StateError("No elements")), - unsubscribeOnError: true); - return completer.future; -} - -/// Returns a wrapped version of [stream] along with a [StreamSubscription] that -/// can be used to control the wrapped stream. -Pair<Stream, StreamSubscription> streamWithSubscription(Stream stream) { - var controller = stream.isSingleSubscription ? - new StreamController() : - new StreamController.multiSubscription(); - var subscription = stream.listen(controller.add, - onError: controller.signalError, - onDone: controller.close); - return new Pair<Stream, StreamSubscription>(controller.stream, subscription); -} - -// TODO(nweiz): remove this when issue 7787 is fixed. -/// Creates two single-subscription [Stream]s that each emit all values and -/// errors from [stream]. This is useful if [stream] is single-subscription but -/// multiple subscribers are necessary. -Pair<Stream, Stream> tee(Stream stream) { - var controller1 = new StreamController(); - var controller2 = new StreamController(); - stream.listen((value) { - controller1.add(value); - controller2.add(value); - }, onError: (error) { - controller1.signalError(error); - controller2.signalError(error); - }, onDone: () { - controller1.close(); - controller2.close(); - }); - return new Pair<Stream, Stream>(controller1.stream, controller2.stream); -} - -/// A regular expression matching a line termination character or character -/// sequence. -final RegExp _lineRegexp = new RegExp(r"\r\n|\r|\n"); - -/// Converts a stream of arbitrarily chunked strings into a line-by-line stream. -/// The lines don't include line termination characters. A single trailing -/// newline is ignored. -Stream<String> streamToLines(Stream<String> stream) { - var buffer = new StringBuffer(); - return stream.transform(new StreamTransformer.from( - onData: (chunk, sink) { - var lines = chunk.split(_lineRegexp); - var leftover = lines.removeLast(); - for (var line in lines) { - if (!buffer.isEmpty) { - buffer.add(line); - line = buffer.toString(); - buffer.clear(); - } - - sink.add(line); - } - buffer.add(leftover); - }, onDone: (sink) { - if (!buffer.isEmpty) sink.add(buffer.toString()); - sink.close(); - })); -} - // TODO(nweiz): unify the following functions with the utility functions in // pkg/http. diff --git a/test/curl_client_test.dart b/test/curl_client_test.dart index 2bb226a8..924e994e 100644 --- a/test/curl_client_test.dart +++ b/test/curl_client_test.dart @@ -87,7 +87,7 @@ void startServer() { }); _server.defaultRequestHandler = (request, response) { - wrapInputStream(request.inputStream).toBytes().then((requestBodyBytes) { + consumeInputStream(request.inputStream).then((requestBodyBytes) { response.statusCode = 200; response.headers.contentType = new ContentType("application", "json"); diff --git a/test/oauth2_test.dart b/test/oauth2_test.dart index 4529760e..e5ff4bca 100644 --- a/test/oauth2_test.dart +++ b/test/oauth2_test.dart @@ -64,7 +64,7 @@ main() { confirmPublish(pub); server.handle('POST', '/token', (request, response) { - return wrapInputStream(request.inputStream).toBytes().then((bytes) { + return consumeInputStream(request.inputStream).then((bytes) { var body = new String.fromCharCodes(bytes); expect(body, matches( new RegExp(r'(^|&)refresh_token=refresh\+token(&|$)'))); @@ -198,7 +198,7 @@ void authorizePub(ScheduledProcess pub, ScheduledServer server, void handleAccessTokenRequest(ScheduledServer server, String accessToken) { server.handle('POST', '/token', (request, response) { - return wrapInputStream(request.inputStream).toBytes().then((bytes) { + return consumeInputStream(request.inputStream).then((bytes) { var body = new String.fromCharCodes(bytes); expect(body, matches(new RegExp(r'(^|&)code=access\+code(&|$)'))); diff --git a/test/pub_uploader_test.dart b/test/pub_uploader_test.dart index 2ac8722d..fa585af0 100644 --- a/test/pub_uploader_test.dart +++ b/test/pub_uploader_test.dart @@ -54,7 +54,7 @@ main() { var pub = startPubUploader(server, ['--package', 'pkg', 'add', 'email']); server.handle('POST', '/packages/pkg/uploaders.json', (request, response) { - expect(wrapInputStream(request.inputStream).toBytes().then((bodyBytes) { + expect(consumeInputStream(request.inputStream).then((bodyBytes) { expect(new String.fromCharCodes(bodyBytes), equals('email=email')); response.headers.contentType = new ContentType("application", "json"); diff --git a/test/test_pub.dart b/test/test_pub.dart index 61dad292..98ba5ba3 100644 --- a/test/test_pub.dart +++ b/test/test_pub.dart @@ -110,7 +110,8 @@ void serve([List<Descriptor> contents]) { return; } - stream.toBytes().then((data) { + var future = consumeInputStream(stream); + future.then((data) { response.statusCode = 200; response.contentLength = data.length; response.outputStream.write(data); @@ -781,7 +782,7 @@ abstract class Descriptor { /// Loads the file at [path] from within this descriptor. If [path] is empty, /// loads the contents of the descriptor itself. - ByteStream load(List<String> path); + InputStream load(List<String> path); /// Schedules the directory to be created before Pub is run with /// [schedulePub]. The directory will be created relative to the sandbox @@ -901,13 +902,16 @@ class FileDescriptor extends Descriptor { } /// Loads the contents of the file. - ByteStream load(List<String> path) { + InputStream load(List<String> path) { if (!path.isEmpty) { var joinedPath = Strings.join(path, '/'); throw "Can't load $joinedPath from within $name: not a directory."; } - return new ByteStream.fromBytes(contents.charCodes); + var stream = new ListInputStream(); + stream.write(contents.charCodes); + stream.markEndOfStream(); + return stream; } } @@ -959,7 +963,7 @@ class DirectoryDescriptor extends Descriptor { } /// Loads [path] from within this directory. - ByteStream load(List<String> path) { + InputStream load(List<String> path) { if (path.isEmpty) { throw "Can't load the contents of $name: is a directory."; } @@ -989,10 +993,10 @@ class FutureDescriptor extends Descriptor { Future delete(dir) => _future.then((desc) => desc.delete(dir)); - ByteStream load(List<String> path) { - var controller = new StreamController<List<int>>(); - _future.then((desc) => store(desc.load(path), controller)); - return new ByteStream(controller.stream); + InputStream load(List<String> path) { + var resultStream = new ListInputStream(); + _future.then((desc) => pipeInputToInput(desc.load(path), resultStream)); + return resultStream; } } @@ -1085,7 +1089,7 @@ class TarFileDescriptor extends Descriptor { tempDir = _tempDir; return Future.wait(contents.mappedBy((child) => child.create(tempDir))); }).then((createdContents) { - return createTarGz(createdContents, baseDir: tempDir).toBytes(); + return consumeInputStream(createTarGz(createdContents, baseDir: tempDir)); }).then((bytes) { return new File(join(parentDir, _stringName)).writeAsBytes(bytes); }).then((file) { @@ -1104,13 +1108,13 @@ class TarFileDescriptor extends Descriptor { } /// Loads the contents of this tar file. - ByteStream load(List<String> path) { + InputStream load(List<String> path) { if (!path.isEmpty) { var joinedPath = Strings.join(path, '/'); throw "Can't load $joinedPath from within $name: not a directory."; } - var controller = new StreamController<List<int>>(); + var sinkStream = new ListInputStream(); var tempDir; // TODO(rnystrom): Use withTempDir() here. // TODO(nweiz): propagate any errors to the return value. See issue 3657. @@ -1119,11 +1123,11 @@ class TarFileDescriptor extends Descriptor { return create(tempDir); }).then((tar) { var sourceStream = tar.openInputStream(); - return store(wrapInputStream(sourceStream), controller).then((_) { + return pipeInputToInput(sourceStream, sinkStream).then((_) { tempDir.delete(recursive: true); }); }); - return new ByteStream(controller.stream); + return sinkStream; } } @@ -1142,7 +1146,7 @@ class NothingDescriptor extends Descriptor { }); } - ByteStream load(List<String> path) { + InputStream load(List<String> path) { if (path.isEmpty) { throw "Can't load the contents of $name: it doesn't exist."; } else { @@ -1212,7 +1216,7 @@ class ScheduledProcess { final String name; /// The process future that's scheduled to run. - Future<PubProcess> _processFuture; + Future<Process> _processFuture; /// The process that's scheduled to run. It may be null. Process _process; @@ -1220,35 +1224,13 @@ class ScheduledProcess { /// The exit code of the scheduled program. It may be null. int _exitCode; - /// A future that will complete to a list of all the lines emitted on the - /// process's standard output stream. This is independent of what data is read - /// from [_stdout]. - Future<List<String>> _stdoutLines; + /// A [StringInputStream] wrapping the stdout of the process that's scheduled + /// to run. + final Future<StringInputStream> _stdoutFuture; - /// A [Stream] of stdout lines emitted by the process that's scheduled to run. - /// It may be null. - Stream<String> _stdout; - - /// A [Future] that will resolve to [_stdout] once it's available. - Future get _stdoutFuture => _processFuture.then((_) => _stdout); - - /// A [StreamSubscription] that controls [_stdout]. - StreamSubscription _stdoutSubscription; - - /// A future that will complete to a list of all the lines emitted on the - /// process's standard error stream. This is independent of what data is read - /// from [_stderr]. - Future<List<String>> _stderrLines; - - /// A [Stream] of stderr lines emitted by the process that's scheduled to run. - /// It may be null. - Stream<String> _stderr; - - /// A [Future] that will resolve to [_stderr] once it's available. - Future get _stderrFuture => _processFuture.then((_) => _stderr); - - /// A [StreamSubscription] that controls [_stderr]. - StreamSubscription _stderrSubscription; + /// A [StringInputStream] wrapping the stderr of the process that's scheduled + /// to run. + final Future<StringInputStream> _stderrFuture; /// The exit code of the process that's scheduled to run. This will naturally /// only complete once the process has terminated. @@ -1265,57 +1247,42 @@ class ScheduledProcess { bool _endExpected = false; /// Wraps a [Process] [Future] in a scheduled process. - ScheduledProcess(this.name, Future<PubProcess> process) - : _processFuture = process { - var pairFuture = process.then((p) { + ScheduledProcess(this.name, Future<Process> process) + : _processFuture = process, + _stdoutFuture = process.then((p) => new StringInputStream(p.stdout)), + _stderrFuture = process.then((p) => new StringInputStream(p.stderr)) { + process.then((p) { _process = p; - - var stdoutTee = tee(p.stdout.handleError((e) { - registerException(e.error, e.stackTrace); - })); - var stdoutPair = streamWithSubscription(stdoutTee.last); - _stdout = stdoutPair.first; - _stdoutSubscription = stdoutPair.last; - - var stderrTee = tee(p.stderr.handleError((e) { - registerException(e.error, e.stackTrace); - })); - var stderrPair = streamWithSubscription(stderrTee.last); - _stderr = stderrPair.first; - _stderrSubscription = stderrPair.last; - - return new Pair(stdoutTee.first, stderrTee.first); }); - _stdoutLines = pairFuture.then((pair) => pair.first.toList()); - _stderrLines = pairFuture.then((pair) => pair.last.toList()); - _schedule((_) { if (!_endScheduled) { throw new StateError("Scheduled process $name must have shouldExit() " "or kill() called before the test is run."); } - return process.then((p) => p.exitCode).then((exitCode) { - if (_endExpected) { - _exitCode = exitCode; - _exitCodeCompleter.complete(exitCode); - return; - } - - // Sleep for half a second in case _endExpected is set in the next - // scheduled event. - sleep(500).then((_) { + return process.then((p) { + p.onExit = (c) { if (_endExpected) { - _exitCodeCompleter.complete(exitCode); + _exitCode = c; + _exitCodeCompleter.complete(c); return; } - return _printStreams().then((_) { - registerException(new ExpectException("Process $name ended " - "earlier than scheduled with exit code $exitCode")); + // Sleep for half a second in case _endExpected is set in the next + // scheduled event. + sleep(500).then((_) { + if (_endExpected) { + _exitCodeCompleter.complete(c); + return; + } + + _printStreams().then((_) { + registerException(new ExpectException("Process $name ended " + "earlier than scheduled with exit code $c")); + }); }); - }); + }; }); }); @@ -1335,15 +1302,15 @@ class ScheduledProcess { if (_process == null) return; // Ensure that the process is dead and we aren't waiting on any IO. _process.kill(); - _stdoutSubscription.cancel(); - _stderrSubscription.cancel(); + _process.stdout.close(); + _process.stderr.close(); }); } /// Reads the next line of stdout from the process. Future<String> nextLine() { return _scheduleValue((_) { - return timeout(_stdoutFuture.then((stream) => streamFirst(stream)), + return timeout(_stdoutFuture.then((stream) => readLine(stream)), _SCHEDULE_TIMEOUT, "waiting for the next stdout line from process $name"); }); @@ -1352,7 +1319,7 @@ class ScheduledProcess { /// Reads the next line of stderr from the process. Future<String> nextErrLine() { return _scheduleValue((_) { - return timeout(_stderrFuture.then((stream) => streamFirst(stream)), + return timeout(_stderrFuture.then((stream) => readLine(stream)), _SCHEDULE_TIMEOUT, "waiting for the next stderr line from process $name"); }); @@ -1367,8 +1334,7 @@ class ScheduledProcess { } return _scheduleValue((_) { - return timeout(_stdoutFuture.then((stream) => stream.toList()) - .then((lines) => lines.join("\n")), + return timeout(_stdoutFuture.then(consumeStringInputStream), _SCHEDULE_TIMEOUT, "waiting for the last stdout line from process $name"); }); @@ -1383,8 +1349,7 @@ class ScheduledProcess { } return _scheduleValue((_) { - return timeout(_stderrFuture.then((stream) => stream.toList()) - .then((lines) => lines.join("\n")), + return timeout(_stderrFuture.then(consumeStringInputStream), _SCHEDULE_TIMEOUT, "waiting for the last stderr line from process $name"); }); @@ -1393,7 +1358,7 @@ class ScheduledProcess { /// Writes [line] to the process as stdin. void writeLine(String line) { _schedule((_) => _processFuture.then( - (p) => p.stdin.write('$line\n'.charCodes))); + (p) => p.stdin.writeString('$line\n'))); } /// Kills the process, and waits until it's dead. @@ -1402,7 +1367,7 @@ class ScheduledProcess { _schedule((_) { _endExpected = true; _process.kill(); - timeout(_exitCodeFuture, _SCHEDULE_TIMEOUT, + timeout(_exitCodeCompleter.future, _SCHEDULE_TIMEOUT, "waiting for process $name to die"); }); } @@ -1413,7 +1378,7 @@ class ScheduledProcess { _endScheduled = true; _schedule((_) { _endExpected = true; - return timeout(_exitCodeFuture, _SCHEDULE_TIMEOUT, + return timeout(_exitCodeCompleter.future, _SCHEDULE_TIMEOUT, "waiting for process $name to exit").then((exitCode) { if (expectedExitCode != null) { expect(exitCode, equals(expectedExitCode)); @@ -1423,21 +1388,24 @@ class ScheduledProcess { } /// Prints the remaining data in the process's stdout and stderr streams. - /// Prints nothing if the streams are empty. + /// Prints nothing if the straems are empty. Future _printStreams() { - void printStream(String streamName, List<String> lines) { - if (lines.isEmpty) return; + Future printStream(String streamName, StringInputStream stream) { + return consumeStringInputStream(stream).then((output) { + if (output.isEmpty) return; - print('\nProcess $name $streamName:'); - for (var line in lines) { - print('| $line'); - } + print('\nProcess $name $streamName:'); + for (var line in output.trim().split("\n")) { + print('| $line'); + } + return; + }); } - return _stdoutLines.then((stdoutLines) { - printStream('stdout', stdoutLines); - return _stderrLines.then((stderrLines) { - printStream('stderr', stderrLines); + return _stdoutFuture.then((stdout) { + return _stderrFuture.then((stderr) { + return printStream('stdout', stdout) + .then((_) => printStream('stderr', stderr)); }); }); } -- GitLab