From 07f48a701d02303cc38152da70a24e772b4d2720 Mon Sep 17 00:00:00 2001
From: Bartosz Przydatek <przydatek@google.com>
Date: Mon, 18 Sep 2017 13:21:08 +0200
Subject: [PATCH] Adding StreamingAeadFactory.

Change-Id: I3c8e0919f410c4ca48dd6f1f463ee876e0251e0f
ORIGINAL_AUTHOR=Bartosz Przydatek <przydatek@google.com>
GitOrigin-RevId: f02fc8097b8337adfc8ca70b7957922343a0ab45
---
 .../google/crypto/tink/streamingaead/BUILD    |   6 +
 .../ReadableByteChannelDecrypter.java         | 133 +++++++++++
 .../SeekableByteChannelDecrypter.java         | 183 ++++++++++++++
 .../streamingaead/StreamingAeadCatalogue.java |  65 +++++
 .../streamingaead/StreamingAeadConfig.java    |  77 ++++++
 .../streamingaead/StreamingAeadFactory.java   |  79 ++++++
 .../streamingaead/StreamingAeadHelper.java    |  68 ++++++
 .../java/com/google/crypto/tink/subtle/BUILD  |   2 +
 .../subtle/RewindableReadableByteChannel.java | 162 +++++++++++++
 .../google/crypto/tink/StreamingTestUtil.java | 142 +++++++++++
 .../java/com/google/crypto/tink/TestUtil.java |  45 ++++
 .../AesCtrHmacStreamingKeyManagerTest.java    |  48 +---
 .../AesGcmHkdfStreamingKeyManagerTest.java    |  48 +---
 .../StreamingAeadFactoryTest.java             | 144 +++++++++++
 .../tink/subtle/AesCtrHmacStreamingTest.java  |  54 +----
 .../RewindableReadableByteChannelTest.java    | 226 ++++++++++++++++++
 16 files changed, 1339 insertions(+), 143 deletions(-)
 create mode 100644 java/src/main/java/com/google/crypto/tink/streamingaead/ReadableByteChannelDecrypter.java
 create mode 100644 java/src/main/java/com/google/crypto/tink/streamingaead/SeekableByteChannelDecrypter.java
 create mode 100644 java/src/main/java/com/google/crypto/tink/streamingaead/StreamingAeadCatalogue.java
 create mode 100644 java/src/main/java/com/google/crypto/tink/streamingaead/StreamingAeadConfig.java
 create mode 100644 java/src/main/java/com/google/crypto/tink/streamingaead/StreamingAeadFactory.java
 create mode 100644 java/src/main/java/com/google/crypto/tink/streamingaead/StreamingAeadHelper.java
 create mode 100644 java/src/main/java/com/google/crypto/tink/subtle/RewindableReadableByteChannel.java
 create mode 100644 java/src/test/java/com/google/crypto/tink/streamingaead/StreamingAeadFactoryTest.java
 create mode 100644 java/src/test/java/com/google/crypto/tink/subtle/RewindableReadableByteChannelTest.java

diff --git a/java/src/main/java/com/google/crypto/tink/streamingaead/BUILD b/java/src/main/java/com/google/crypto/tink/streamingaead/BUILD
index 608df3b3f..bea20c31b 100644
--- a/java/src/main/java/com/google/crypto/tink/streamingaead/BUILD
+++ b/java/src/main/java/com/google/crypto/tink/streamingaead/BUILD
@@ -21,6 +21,7 @@ FULL_PROTOS = [
     "//proto:aes_ctr_hmac_streaming_java_proto",
     "//proto:aes_gcm_hkdf_streaming_java_proto",
     "//proto:common_java_proto",
+    "//proto:config_java_proto",
     "//proto:tink_java_proto",
 ]
 
@@ -28,6 +29,7 @@ LITE_PROTOS = [
     "//proto:aes_ctr_hmac_streaming_java_proto_lite",
     "//proto:aes_gcm_hkdf_streaming_java_proto_lite",
     "//proto:common_java_proto_lite",
+    "//proto:config_java_proto_lite",
     "//proto:tink_java_proto_lite",
 ]
 
@@ -37,9 +39,11 @@ java_library(
     javacopts = JAVACOPTS,
     deps = [
         "//java/src/main/java/com/google/crypto/tink",
+        "//java/src/main/java/com/google/crypto/tink/aead",
         "//java/src/main/java/com/google/crypto/tink/subtle",
         "//java/src/main/java/com/google/crypto/tink/subtle:streaming",
         "@com_google_protobuf_javalite//:protobuf_java_lite",
+        "@com_google_code_findbugs_jsr305//jar",
     ] + FULL_PROTOS,
 )
 
@@ -49,8 +53,10 @@ java_library(
     javacopts = JAVACOPTS,
     deps = [
         "//java/src/main/java/com/google/crypto/tink:android",
+        "//java/src/main/java/com/google/crypto/tink/aead:android",
         "//java/src/main/java/com/google/crypto/tink/subtle",
         "//java/src/main/java/com/google/crypto/tink/subtle:streaming",
         "@com_google_protobuf_javalite//:protobuf_java_lite",
+        "@com_google_code_findbugs_jsr305//jar",
     ] + LITE_PROTOS,
 )
diff --git a/java/src/main/java/com/google/crypto/tink/streamingaead/ReadableByteChannelDecrypter.java b/java/src/main/java/com/google/crypto/tink/streamingaead/ReadableByteChannelDecrypter.java
new file mode 100644
index 000000000..4f07a35a4
--- /dev/null
+++ b/java/src/main/java/com/google/crypto/tink/streamingaead/ReadableByteChannelDecrypter.java
@@ -0,0 +1,133 @@
+// Copyright 2017 Google Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+////////////////////////////////////////////////////////////////////////////////
+
+package com.google.crypto.tink.streamingaead;
+
+import com.google.crypto.tink.PrimitiveSet;
+import com.google.crypto.tink.StreamingAead;
+import com.google.crypto.tink.subtle.RewindableReadableByteChannel;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.ReadableByteChannel;
+import java.security.GeneralSecurityException;
+import java.util.List;
+import javax.annotation.concurrent.GuardedBy;
+
+/**
+ * A decrypter for ciphertext given in a {@link ReadableByteChannel}.
+ */
+final class ReadableByteChannelDecrypter implements ReadableByteChannel {
+  @GuardedBy("this")
+  boolean attemptedMatching;
+  @GuardedBy("this")
+  ReadableByteChannel matchingChannel;
+  @GuardedBy("this")
+  RewindableReadableByteChannel ciphertextChannel;
+
+  PrimitiveSet<StreamingAead> primitives;
+  byte[] associatedData;
+
+  /**
+   * Constructs a new decrypter for {@code ciphertextChannel}.
+   *
+   * <p>The decrypter picks a matching {@code StreamingAead}-primitive from {@code primitives},
+   * and uses it for decryption.  The matching happens as follows:
+   * upon first {@code read()}-call each candidate primitive reads an initial portion
+   * of the channel, until it can determine whether the channel matches the key of the primitive.
+   * If a canditate does not match, then the channel is reset to its initial position,
+   * and the next candiate can attempt matching.  The first successful candidate
+   * is then used exclusively on subsequent {@code read()}-calls.
+   *
+   * <p> The matching process uses a buffering wrapper around {@code ciphertextChannel}
+   * to enable resetting of the channel to the initial position.  The buffering
+   * is removed once the matching is successful.
+   */
+  public ReadableByteChannelDecrypter(PrimitiveSet<StreamingAead> primitives,
+      ReadableByteChannel ciphertextChannel, final byte[] associatedData) {
+    this.attemptedMatching = false;
+    this.matchingChannel = null;
+    this.primitives = primitives;
+    this.ciphertextChannel = new RewindableReadableByteChannel(ciphertextChannel);
+    this.associatedData = associatedData.clone();
+  }
+
+  @Override
+  @GuardedBy("this")
+  public synchronized int read(ByteBuffer dst) throws IOException {
+    if (dst.remaining() == 0) {
+      return 0;
+    }
+    if (matchingChannel != null) {
+      return matchingChannel.read(dst);
+    } else {
+      if (attemptedMatching) {
+        throw new IOException("No matching key found for the ciphertext in the stream.");
+      }
+      attemptedMatching = true;
+      List<PrimitiveSet.Entry<StreamingAead>> entries;
+      try {
+        entries = primitives.getRawPrimitives();
+      } catch (GeneralSecurityException e) {
+        throw new IOException("Keyset failure: ", e);
+      }
+      for (PrimitiveSet.Entry<StreamingAead> entry : entries) {
+        try {
+          ReadableByteChannel attemptedChannel =
+              entry.getPrimitive().newDecryptingChannel(ciphertextChannel, associatedData);
+          int retValue = attemptedChannel.read(dst);
+          if (retValue > 0) {
+            // Found a matching channel
+            matchingChannel = attemptedChannel;
+            ciphertextChannel.disableRewinding();
+          } else if (retValue == 0) {
+            // Not clear whether the channel could be matched: it might be
+            // that the underlying channel didn't provide sufficiently many bytes
+            // to check the header, or maybe the header was checked, but there
+            // were no actual encrypted bytes in the channel yet.
+            // Should try again.
+            ciphertextChannel.rewind();
+            attemptedMatching = false;
+          }
+          return retValue;
+        } catch (IOException e) {
+          // Try another key.
+          // IOException is thrown e.g. when MAC is incorrect, but also in case
+          // of I/O failures.
+          // TODO(b/66098906): Use a subclass of IOException.
+          ciphertextChannel.rewind();
+          continue;
+        } catch (GeneralSecurityException e) {
+          // Try another key.
+          ciphertextChannel.rewind();
+          continue;
+        }
+      }
+      throw new IOException("No matching key found for the ciphertext in the stream.");
+    }
+  }
+
+  @Override
+  @GuardedBy("this")
+  public synchronized void close() throws IOException {
+    ciphertextChannel.close();
+  }
+
+  @Override
+  @GuardedBy("this")
+  public synchronized boolean isOpen() {
+    return ciphertextChannel.isOpen();
+  }
+}
diff --git a/java/src/main/java/com/google/crypto/tink/streamingaead/SeekableByteChannelDecrypter.java b/java/src/main/java/com/google/crypto/tink/streamingaead/SeekableByteChannelDecrypter.java
new file mode 100644
index 000000000..acbfb01a0
--- /dev/null
+++ b/java/src/main/java/com/google/crypto/tink/streamingaead/SeekableByteChannelDecrypter.java
@@ -0,0 +1,183 @@
+// Copyright 2017 Google Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+////////////////////////////////////////////////////////////////////////////////
+
+package com.google.crypto.tink.streamingaead;
+
+import com.google.crypto.tink.PrimitiveSet;
+import com.google.crypto.tink.StreamingAead;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.NonWritableChannelException;
+import java.nio.channels.SeekableByteChannel;
+import java.security.GeneralSecurityException;
+import java.util.List;
+import javax.annotation.concurrent.GuardedBy;
+
+/**
+ * A decrypter for ciphertext given in a {@link SeekableByteChannel}.
+ */
+final class SeekableByteChannelDecrypter implements SeekableByteChannel {
+  @GuardedBy("this")
+  boolean attemptedMatching;
+  @GuardedBy("this")
+  SeekableByteChannel matchingChannel;
+  @GuardedBy("this")
+  SeekableByteChannel ciphertextChannel;
+  @GuardedBy("this")
+  long cachedPosition;    // Position to which matchingChannel should be set before 1st read();
+  @GuardedBy("this")
+  long startingPosition;  // Position at which the ciphertext should begin.
+
+  PrimitiveSet<StreamingAead> primitives;
+  byte[] associatedData;
+
+  /**
+   * Constructs a new decrypter for {@code ciphertextChannel}.
+   *
+   * <p>The decrypter picks a matching {@code StreamingAead}-primitive from {@code primitives},
+   * and uses it for decryption.  The matching happens as follows:
+   * upon first {@code read()}-call each candidate primitive reads an initial portion
+   * of the channel, until it can determine whether the channel matches the key of the primitive.
+   * If a canditate does not match, then the channel is reset to its initial position,
+   * and the next candiate can attempt matching.  The first successful candidate
+   * is then used exclusively on subsequent {@code read()}-calls.
+   */
+  public SeekableByteChannelDecrypter(PrimitiveSet<StreamingAead> primitives,
+      SeekableByteChannel ciphertextChannel, final byte[] associatedData) throws IOException {
+    this.attemptedMatching = false;
+    this.matchingChannel = null;
+    this.primitives = primitives;
+    this.ciphertextChannel = ciphertextChannel;
+    this.cachedPosition = -1;
+    this.startingPosition = ciphertextChannel.position();
+    this.associatedData = associatedData.clone();
+  }
+
+  @Override
+  @GuardedBy("this")
+  public synchronized int read(ByteBuffer dst) throws IOException {
+    if (dst.remaining() == 0) {
+      return 0;
+    }
+    if (matchingChannel != null) {
+      return matchingChannel.read(dst);
+    } else {
+      if (attemptedMatching) {
+        throw new IOException("No matching key found for the ciphertext in the stream.");
+      }
+      attemptedMatching = true;
+      List<PrimitiveSet.Entry<StreamingAead>> entries;
+      try {
+        entries = primitives.getRawPrimitives();
+      } catch (GeneralSecurityException e) {
+        throw new IOException("Keyset failure: ", e);
+      }
+      for (PrimitiveSet.Entry<StreamingAead> entry : entries) {
+        try {
+          SeekableByteChannel attemptedChannel =
+              entry.getPrimitive().newSeekableDecryptingChannel(ciphertextChannel, associatedData);
+          if (cachedPosition >= 0) {  // Caller did set new position before 1st read().
+            attemptedChannel.position(cachedPosition);
+          }
+          int retValue = attemptedChannel.read(dst);
+          if (retValue > 0) {
+            // Found a matching channel.
+            matchingChannel = attemptedChannel;
+          } else if (retValue == 0) {
+            // Not clear whether the channel could be matched: it might be
+            // that the underlying channel didn't provide sufficiently many bytes
+            // to check the header, or maybe the header was checked, but there
+            // were no actual encrypted bytes in the channel yet.
+            // Should try again.
+            ciphertextChannel.position(startingPosition);
+            attemptedMatching = false;
+          }
+          matchingChannel = attemptedChannel;
+          return retValue;
+        } catch (IOException e) {
+          // Try another key.
+          // IOException is thrown e.g. when MAC is incorrect, but also in case
+          // of I/O failures.
+          // TODO(b/66098906): Use a subclass of IOException.
+          ciphertextChannel.position(startingPosition);
+          continue;
+        } catch (GeneralSecurityException e) {
+          // Try another key.
+          ciphertextChannel.position(startingPosition);
+          continue;
+        }
+      }
+      throw new IOException("No matching key found for the ciphertext in the stream.");
+    }
+  }
+
+  @Override
+  @GuardedBy("this")
+  public synchronized SeekableByteChannel position(long newPosition) throws IOException {
+    if (matchingChannel != null) {
+      matchingChannel.position(newPosition);
+    } else {
+      if (newPosition < 0) {
+        throw new IllegalArgumentException("Position must be non-negative");
+      }
+      cachedPosition = newPosition;
+    }
+    return this;
+  }
+
+  @Override
+  @GuardedBy("this")
+  public synchronized long position() throws IOException {
+    if (matchingChannel != null) {
+      return matchingChannel.position();
+    } else {
+      return cachedPosition;
+    }
+  }
+
+  @Override
+  @GuardedBy("this")
+  public synchronized long size() throws IOException {
+    if (matchingChannel != null) {
+      return matchingChannel.size();
+    } else {
+      throw new IOException("Cannot determine size before first read()-call.");
+    }
+  }
+
+  @Override
+  public SeekableByteChannel truncate(long size) throws IOException {
+    throw new NonWritableChannelException();
+  }
+
+  @Override
+  public int write(ByteBuffer src) throws IOException {
+    throw new NonWritableChannelException();
+  }
+
+  @Override
+  @GuardedBy("this")
+  public synchronized void close() throws IOException {
+    ciphertextChannel.close();
+  }
+
+
+  @Override
+  @GuardedBy("this")
+  public synchronized boolean isOpen() {
+    return ciphertextChannel.isOpen();
+  }
+}
diff --git a/java/src/main/java/com/google/crypto/tink/streamingaead/StreamingAeadCatalogue.java b/java/src/main/java/com/google/crypto/tink/streamingaead/StreamingAeadCatalogue.java
new file mode 100644
index 000000000..922c0ddf1
--- /dev/null
+++ b/java/src/main/java/com/google/crypto/tink/streamingaead/StreamingAeadCatalogue.java
@@ -0,0 +1,65 @@
+// Copyright 2017 Google Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+////////////////////////////////////////////////////////////////////////////////
+
+package com.google.crypto.tink.streamingaead;
+
+import com.google.crypto.tink.Catalogue;
+import com.google.crypto.tink.KeyManager;
+import com.google.crypto.tink.StreamingAead;
+import java.security.GeneralSecurityException;
+
+/** A catalogue of {@link StreamingAead} key managers. */
+class StreamingAeadCatalogue implements Catalogue {
+  public StreamingAeadCatalogue() {}
+
+  /**
+   * @return a KeyManager for the given {@code typeUrl}, {@code primitiveName} and version at least
+   *     {@code minVersion} (if it exists in the catalogue).
+   */
+  @Override
+  @SuppressWarnings("rawtypes")
+  public KeyManager getKeyManager(String typeUrl, String primitiveName, int minVersion)
+      throws GeneralSecurityException {
+    KeyManager keyManager;
+    switch (primitiveName.toLowerCase()) {
+      case "streamingaead":
+        keyManager = streamingAeadKeyManager(typeUrl);
+        break;
+      default:
+        throw new GeneralSecurityException(
+            String.format("No support for primitive '%s'.", primitiveName));
+    }
+    if (keyManager.getVersion() < minVersion) {
+      throw new GeneralSecurityException(
+          String.format(
+              "No key manager for key type '%s' with version at least %d.", typeUrl, minVersion));
+    }
+    return keyManager;
+  }
+
+  private KeyManager<StreamingAead> streamingAeadKeyManager(String typeUrl)
+      throws GeneralSecurityException {
+    switch (typeUrl) {
+      case AesCtrHmacStreamingKeyManager.TYPE_URL:
+        return new AesCtrHmacStreamingKeyManager();
+      case AesGcmHkdfStreamingKeyManager.TYPE_URL:
+        return new AesGcmHkdfStreamingKeyManager();
+      default:
+        throw new GeneralSecurityException(
+            String.format("No support for primitive 'StreamingAead' with key type '%s'.", typeUrl));
+    }
+  }
+}
diff --git a/java/src/main/java/com/google/crypto/tink/streamingaead/StreamingAeadConfig.java b/java/src/main/java/com/google/crypto/tink/streamingaead/StreamingAeadConfig.java
new file mode 100644
index 000000000..3f3b0638b
--- /dev/null
+++ b/java/src/main/java/com/google/crypto/tink/streamingaead/StreamingAeadConfig.java
@@ -0,0 +1,77 @@
+// Copyright 2017 Google Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+////////////////////////////////////////////////////////////////////////////////
+
+package com.google.crypto.tink.streamingaead;
+
+import com.google.crypto.tink.Config;
+import com.google.crypto.tink.Registry;
+import com.google.crypto.tink.aead.AeadConfig;
+import com.google.crypto.tink.proto.RegistryConfig;
+import java.security.GeneralSecurityException;
+
+/**
+ * Static methods and constants for registering with the {@link Registry} all instances
+ * of {@link com.google.crypto.tink.StreamingAead} key types supported in a particular
+ * release of Tink.
+ *
+ * <p>To register all StreamingAead key types provided in Tink release 1.1.0 one can do:
+ * <pre>{@code
+ * Config.register(StreamingAeadConfig.TINK_1_1_0);
+ * }</pre>
+ *
+ * <p>For more information on how to obtain and use instances of StreamingAead,
+ * see {@link StreamingAeadFactory}.
+ */
+public final class StreamingAeadConfig {
+  public static final String AES_CTR_HMAC_STREAMINGAEAD_TYPE_URL =
+      AesCtrHmacStreamingKeyManager.TYPE_URL;
+  public static final String AES_GCM_HKDF_STREAMINGAEAD_TYPE_URL =
+      AesGcmHkdfStreamingKeyManager.TYPE_URL;
+
+  private static final String CATALOGUE_NAME = "TinkStreamingAead";
+  private static final String PRIMITIVE_NAME = "StreamingAead";
+
+  public static final RegistryConfig TINK_1_1_0 = RegistryConfig.newBuilder()
+      .mergeFrom(AeadConfig.TINK_1_0_0)
+      .addEntry(Config.getTinkKeyTypeEntry(
+          CATALOGUE_NAME, PRIMITIVE_NAME, "AesCtrHmacStreamingKey", 0, true))
+      .addEntry(Config.getTinkKeyTypeEntry(
+          CATALOGUE_NAME, PRIMITIVE_NAME, "AesGcmHkdfStreamingKey", 0, true))
+      .setConfigName("TINK_STREAMINGAEAD_1_1_0")
+      .build();
+
+  static {
+    try {
+      init();
+    } catch (GeneralSecurityException e) {
+      throw new ExceptionInInitializerError(e);
+    }
+  }
+
+  /**
+   * Tries to register with the {@link Registry} all instances of
+   * {@link com.google.crypto.tink.Catalogue} needed to handle StreamingAead key types
+   * supported in Tink.
+   *
+   * <p>Because StreamingAead key types depend on {@link com.google.crypto.tink.Aead} and
+   * {@link com.google.crypto.tink.Mac} key types, this method also
+   * registers all Aead and Mac catalogues.
+   */
+  public static void init() throws GeneralSecurityException {
+    Registry.addCatalogue(CATALOGUE_NAME, new StreamingAeadCatalogue());
+    AeadConfig.init();  // includes Mac
+  }
+}
diff --git a/java/src/main/java/com/google/crypto/tink/streamingaead/StreamingAeadFactory.java b/java/src/main/java/com/google/crypto/tink/streamingaead/StreamingAeadFactory.java
new file mode 100644
index 000000000..8dfbed747
--- /dev/null
+++ b/java/src/main/java/com/google/crypto/tink/streamingaead/StreamingAeadFactory.java
@@ -0,0 +1,79 @@
+// Copyright 2017 Google Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+////////////////////////////////////////////////////////////////////////////////
+
+package com.google.crypto.tink.streamingaead;
+
+import com.google.crypto.tink.KeyManager;
+import com.google.crypto.tink.KeysetHandle;
+import com.google.crypto.tink.PrimitiveSet;
+import com.google.crypto.tink.Registry;
+import com.google.crypto.tink.StreamingAead;
+import java.security.GeneralSecurityException;
+import java.util.logging.Logger;
+
+/**
+ * Static methods for obtaining {@link StreamingAead} instances.
+ *
+ * <p>Usage:
+ *
+ * <pre>{@code
+ *
+ * KeysetHandle keysetHandle = ...;
+ * StreamingAead streamingAead = StreamingAeadFactory.getPrimitive(keysetHandle);
+ * java.nio.channels.FileChannel ciphertextDesitnation =
+ *     FileChannel.open(path, java.nio.file.StandardOpenOption.CREATE,
+ *                            java.nio.file.StandardOpenOption.WRITE);
+ * byte[] aad = ...
+ * WritableByteChannel encryptingChannel = s.newEncryptingChannel(ciphertextDesitnation, aad);
+ *
+ * while ( ... ) {
+ *   int r = encryptingChannel.write(buffer);
+ *   ...
+ * }
+ * encryptingChannel.close();
+ *
+ * }</pre>
+ *
+ * <p>The returned primitive works with a keyset (rather than a single key).
+ * To encrypt a plaintext, it uses the primary key in the keyset.
+ * To decrypt, the primitive tries the enabled keys from the keyset to select
+ * the right key for decryption.  All keys in a keyset of StreamingAead have type
+ * {@link com.google.crypto.tink.proto.OutputPrefixType#RAW}.
+ */
+public final class StreamingAeadFactory {
+  private static final Logger logger = Logger.getLogger(StreamingAeadFactory.class.getName());
+
+  /**
+   * @return a StreamingAead primitive from a {@code keysetHandle}.
+   * @throws GeneralSecurityException
+   */
+  public static StreamingAead getPrimitive(KeysetHandle keysetHandle)
+      throws GeneralSecurityException {
+    return getPrimitive(keysetHandle, /* keyManager= */ null);
+  }
+
+  /**
+   * @return a StreamingAead primitive from a {@code keysetHandle} and a custom {@code keyManager}.
+   * @throws GeneralSecurityException
+   */
+  public static StreamingAead getPrimitive(
+      KeysetHandle keysetHandle,
+      final KeyManager<StreamingAead> keyManager)
+      throws GeneralSecurityException {
+    final PrimitiveSet<StreamingAead> primitives = Registry.getPrimitives(keysetHandle, keyManager);
+    return new StreamingAeadHelper(primitives);
+  }
+}
diff --git a/java/src/main/java/com/google/crypto/tink/streamingaead/StreamingAeadHelper.java b/java/src/main/java/com/google/crypto/tink/streamingaead/StreamingAeadHelper.java
new file mode 100644
index 000000000..ea81fed0f
--- /dev/null
+++ b/java/src/main/java/com/google/crypto/tink/streamingaead/StreamingAeadHelper.java
@@ -0,0 +1,68 @@
+// Copyright 2017 Google Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+////////////////////////////////////////////////////////////////////////////////
+
+package com.google.crypto.tink.streamingaead;
+
+import com.google.crypto.tink.PrimitiveSet;
+import com.google.crypto.tink.StreamingAead;
+import java.io.IOException;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.SeekableByteChannel;
+import java.nio.channels.WritableByteChannel;
+import java.security.GeneralSecurityException;
+
+/**
+ * A helper for creating {@link StreamingAead}-primitives from keysets.
+ */
+final class StreamingAeadHelper implements StreamingAead {
+  PrimitiveSet<StreamingAead> primitives;
+
+  /**
+   * Creates a helper that uses the provided primitives for encryption
+   * and decryption of data provided via channels.
+   * For encryption it uses the primitive corresponding to the primary key.
+   * For decryption it uses an enabled primitive that matches the given ciphertext.
+   */
+  public StreamingAeadHelper(PrimitiveSet<StreamingAead> primitives)
+      throws GeneralSecurityException {
+    if (primitives.getPrimary() == null) {
+      throw new GeneralSecurityException("Missing primary primitive.");
+    }
+    this.primitives = primitives;
+  }
+
+  @Override
+  public WritableByteChannel newEncryptingChannel(
+      WritableByteChannel ciphertextDestination, byte[] associatedData)
+      throws GeneralSecurityException, IOException {
+    return primitives.getPrimary().getPrimitive()
+        .newEncryptingChannel(ciphertextDestination, associatedData);
+  }
+
+  @Override
+  public ReadableByteChannel newDecryptingChannel(
+      ReadableByteChannel ciphertextChannel, byte[] associatedData)
+      throws GeneralSecurityException, IOException {
+    return new ReadableByteChannelDecrypter(primitives, ciphertextChannel, associatedData);
+  }
+
+  @Override
+  public SeekableByteChannel newSeekableDecryptingChannel(
+      SeekableByteChannel ciphertextChannel, byte[] associatedData)
+      throws GeneralSecurityException, IOException {
+    return new SeekableByteChannelDecrypter(primitives, ciphertextChannel, associatedData);
+  }
+}
diff --git a/java/src/main/java/com/google/crypto/tink/subtle/BUILD b/java/src/main/java/com/google/crypto/tink/subtle/BUILD
index 83632f1d8..891b6139f 100644
--- a/java/src/main/java/com/google/crypto/tink/subtle/BUILD
+++ b/java/src/main/java/com/google/crypto/tink/subtle/BUILD
@@ -21,11 +21,13 @@ java_library(
         "Hex.java",
         "ImmutableByteArray.java",
         "Random.java",
+        "RewindableReadableByteChannel.java",
         "SubtleUtil.java",
         "Validators.java",
     ],
     javacopts = JAVACOPTS,
     deps = [
+        "@com_google_code_findbugs_jsr305//jar",
         "@com_google_errorprone_error_prone_annotations//jar",
     ],
 )
diff --git a/java/src/main/java/com/google/crypto/tink/subtle/RewindableReadableByteChannel.java b/java/src/main/java/com/google/crypto/tink/subtle/RewindableReadableByteChannel.java
new file mode 100644
index 000000000..19738f688
--- /dev/null
+++ b/java/src/main/java/com/google/crypto/tink/subtle/RewindableReadableByteChannel.java
@@ -0,0 +1,162 @@
+// Copyright 2017 Google Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+////////////////////////////////////////////////////////////////////////////////
+
+package com.google.crypto.tink.subtle;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.ReadableByteChannel;
+import javax.annotation.concurrent.GuardedBy;
+
+/**
+ * A wrapper around {@link ReadableByteChannel} that provides rewinding feature:
+ * it caches the read bytes so that after reading some initial part of the channel,
+ * one can "rewind" the channel and again read the bytes from the beginning.
+ * Once the rewinding feature is not needed any more, it can be disabled via
+ * {@link disableRewinding()}: this frees the cache memory and forwadrds
+ * the subsequent read()-calls directly to the wrapped channel.
+ */
+public final class RewindableReadableByteChannel implements ReadableByteChannel {
+  @GuardedBy("this")
+  final ReadableByteChannel baseChannel;
+  @GuardedBy("this")
+  ByteBuffer buffer;  // Buffer for caching initial portion of baseChannel, to enable rewinding.
+  @GuardedBy("this")
+  boolean canRewind;  // True iff this channel still has rewinding enabled.
+  @GuardedBy("this")
+  boolean directRead;  // True iff the read-operations should go directly to baseChannel.
+
+  /**
+   * Constructs a wrapper around {@code baseChannel}.
+   * After wrapping {@code baseChannel} should not be manipulated externally.
+   */
+  public RewindableReadableByteChannel(ReadableByteChannel baseChannel) {
+    this.baseChannel = baseChannel;
+    this.buffer = null;
+    this.canRewind = true;
+    this.directRead = false;
+  }
+
+  /**
+   * Disables the rewinding feature.  After calling this method the
+   * attempts to rewind this channel will fail, and the subsequent
+   * read()-calls will be forwarded directly to the wrapped
+   * channel (after the currently buffered bytes are read).
+   */
+  @GuardedBy("this")
+  public synchronized void disableRewinding() {
+    this.canRewind = false;
+  }
+
+  /**
+   * Rewinds this buffer to the beginning (if rewinding is still enabled).
+   */
+  @GuardedBy("this")
+  public synchronized void rewind() throws IOException {
+    if (!canRewind) {
+      throw new IOException("Cannot rewind anymore.");
+    }
+    if (buffer != null) {
+      buffer.position(0);
+    }
+  }
+
+  @Override
+  @GuardedBy("this")
+  public synchronized int read(ByteBuffer dst) throws IOException {
+    if (directRead) {
+      return baseChannel.read(dst);
+    }
+    int bytesToReadCount = dst.remaining();
+    if (bytesToReadCount == 0) {
+      return 0;
+    }
+    if (buffer == null) {  // The first read, no cached data yet.
+      if (!canRewind) {
+        directRead = true;
+        return baseChannel.read(dst);
+      }
+      buffer = ByteBuffer.allocate(bytesToReadCount);
+      int readBytesCount = baseChannel.read(buffer);
+      if (readBytesCount > 0) {  // Copy the read bytes to destination.
+        buffer.flip();
+        dst.put(buffer);
+      }
+      return readBytesCount;
+    } else {  // Subsequent read, some data might be in the buffer.
+      if (buffer.remaining() >= bytesToReadCount) {
+        // Copy from the buffer and advance the buffer.
+        byte[] toDst = new byte[bytesToReadCount];
+        buffer.get(toDst);
+        dst.put(toDst);
+        if (!canRewind && buffer.remaining() == 0) {
+          directRead = true;
+        }
+        return bytesToReadCount;
+      } else {
+        int bytesFromBufferCount = buffer.remaining();
+        int stillToReadCount = bytesToReadCount - bytesFromBufferCount;
+
+        // Copy the remaining bytes from the current buffer to dst.
+        dst.put(buffer);
+
+        // Read the extra bytes needed, and copy them to dst.
+        ByteBuffer extraBuffer = ByteBuffer.allocate(stillToReadCount);
+        int readBytesCount = baseChannel.read(extraBuffer);
+        if (readBytesCount > 0) {
+          extraBuffer.flip();
+          dst.put(extraBuffer);
+        }
+
+        // If rewind still suported, update the buffer...
+        if (canRewind) {
+          int newBufferSize = buffer.limit() + stillToReadCount;
+          // Allocate a larger buffer and copy the entire current buffer.
+          ByteBuffer newBuffer = ByteBuffer.allocate(newBufferSize);
+          buffer.flip();
+          newBuffer.put(buffer);
+          if (readBytesCount > 0) {
+            extraBuffer.flip();
+            newBuffer.put(extraBuffer);
+          }
+          // Record that all buffered data has been consumed already.
+          newBuffer.flip();
+          newBuffer.position(newBuffer.limit());
+          buffer = newBuffer;
+        } else {  // ... otherwise free the buffer memory.
+          buffer = null;
+          directRead = true;
+        }
+
+        return bytesFromBufferCount + readBytesCount;
+      }
+    }
+  }
+
+  @Override
+  @GuardedBy("this")
+  public synchronized void close() throws IOException {
+    canRewind = false;
+    directRead = true;
+    baseChannel.close();
+  }
+
+  @Override
+  @GuardedBy("this")
+  public synchronized boolean isOpen() {
+    return baseChannel.isOpen();
+  }
+}
diff --git a/java/src/test/java/com/google/crypto/tink/StreamingTestUtil.java b/java/src/test/java/com/google/crypto/tink/StreamingTestUtil.java
index 2e950baa5..671f59bf7 100644
--- a/java/src/test/java/com/google/crypto/tink/StreamingTestUtil.java
+++ b/java/src/test/java/com/google/crypto/tink/StreamingTestUtil.java
@@ -16,11 +16,20 @@
 
 package com.google.crypto.tink;
 
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+import com.google.crypto.tink.subtle.Random;
+
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
 import java.nio.channels.ClosedChannelException;
 import java.nio.channels.NonWritableChannelException;
+import java.nio.channels.ReadableByteChannel;
 import java.nio.channels.SeekableByteChannel;
+import java.nio.channels.WritableByteChannel;
 import java.nio.file.FileSystems;
 import java.nio.file.Path;
 import java.security.SecureRandom;
@@ -148,6 +157,139 @@ public class StreamingTestUtil {
     }
   }
 
+  /**
+   * Implements a ReadableByteChannel for testing.
+   *
+   * The implementation is backed by an array of bytes of size {@code BLOCK_SIZE},
+   * which upon read()-operation is repeated until the specified size of the channel.
+   */
+  public static class PseudorandomReadableByteChannel implements ReadableByteChannel {
+    private long size;
+    private long position;
+    private boolean open;
+    private byte[] repeatedBlock;
+    public static final int BLOCK_SIZE = 1024;
+
+    /** Returns a plaintext of a given size. */
+    private byte[] generatePlaintext(int size) {
+      byte[] plaintext = new byte[size];
+      for (int i = 0; i < size; i++) {
+        plaintext[i] = (byte) (i % 253);
+      }
+      return plaintext;
+    }
+
+    public PseudorandomReadableByteChannel(long size) {
+      this.size = size;
+      this.position = 0;
+      this.open = true;
+      this.repeatedBlock = generatePlaintext(BLOCK_SIZE);
+    }
+
+    @Override
+    public int read(ByteBuffer dst) throws IOException {
+      if (!open) {
+        throw new ClosedChannelException();
+      }
+      if (position == size) {
+        return -1;
+      }
+      long start = position;
+      long end = java.lang.Math.min(size, start + dst.remaining());
+      long firstBlock = start / BLOCK_SIZE;
+      long lastBlock = end / BLOCK_SIZE;
+      int startOffset = (int) (start % BLOCK_SIZE);
+      int endOffset = (int) (end % BLOCK_SIZE);
+      if (firstBlock == lastBlock) {
+        dst.put(repeatedBlock, startOffset, endOffset - startOffset);
+      } else {
+        dst.put(repeatedBlock, startOffset, BLOCK_SIZE - startOffset);
+        for (long block = firstBlock + 1; block < lastBlock; block++) {
+          dst.put(repeatedBlock);
+        }
+        dst.put(repeatedBlock, 0, endOffset);
+      }
+      position = end;
+      return (int) (position - start);
+    }
+
+    @Override
+    public void close() {
+      this.open = false;
+    }
+
+    @Override
+    public boolean isOpen() {
+      return this.open;
+    }
+  }
+
+  /**
+   * Tests encryption and decryption functionalities using {@code encryptionStreamingAead}
+   * for encryption and  {@code decryptionStreamingAead} for decryption.
+   */
+  public static void testEncryptionAndDecryption(
+      StreamingAead encryptionStreamingAead, StreamingAead decryptionStreamingAead)
+      throws Exception {
+    byte[] aad = Random.randBytes(15);
+    // Short plaintext.
+    byte[] shortPlaintext = Random.randBytes(10);
+    testEncryptionAndDecryption(
+        encryptionStreamingAead, decryptionStreamingAead, shortPlaintext, aad);
+    // Long plaintext.
+    byte[] longPlaintext = Random.randBytes(1100);
+    testEncryptionAndDecryption(
+        encryptionStreamingAead, decryptionStreamingAead, longPlaintext, aad);
+  }
+
+  /** Tests encryption and decryption functionalities of {@code streamingAead}. */
+  public static void testEncryptionAndDecryption(StreamingAead streamingAead) throws Exception {
+    testEncryptionAndDecryption(streamingAead, streamingAead);
+  }
+
+  /**
+   * Tests encryption and decryption functionalities using {@code encryptionStreamingAead}
+   * for encryption and  {@code decryptionStreamingAead} for decryption on inputs
+   * {@code plaintext} and {@code aad}.
+   */
+  public static void testEncryptionAndDecryption(
+      StreamingAead encryptionStreamingAead, StreamingAead decryptionStreamingAead,
+      byte[] plaintext, byte[] aad) throws Exception {
+
+    // Encrypt plaintext.
+    ByteArrayOutputStream ciphertext = new ByteArrayOutputStream();
+    WritableByteChannel encChannel =
+        encryptionStreamingAead.newEncryptingChannel(Channels.newChannel(ciphertext), aad);
+    encChannel.write(ByteBuffer.wrap(plaintext));
+    encChannel.close();
+
+    // Decrypt ciphertext via ReadableByteChannel.
+    {
+      ByteBufferChannel ciphertextChannel = new ByteBufferChannel(ciphertext.toByteArray());
+      ReadableByteChannel decChannel =
+          decryptionStreamingAead.newDecryptingChannel(ciphertextChannel, aad);
+      ByteBuffer decrypted = ByteBuffer.allocate(plaintext.length);
+      int readCount = decChannel.read(decrypted);
+
+      // Compare results;
+      assertEquals(plaintext.length, readCount);
+      assertArrayEquals(plaintext, decrypted.array());
+    }
+
+    // Decrypt ciphertext via SeekableByteChannel.
+    {
+      ByteBufferChannel ciphertextChannel = new ByteBufferChannel(ciphertext.toByteArray());
+      SeekableByteChannel decChannel =
+          decryptionStreamingAead.newSeekableDecryptingChannel(ciphertextChannel, aad);
+      ByteBuffer decrypted = ByteBuffer.allocate(plaintext.length);
+      int readCount = decChannel.read(decrypted);
+
+      // Compare results;
+      assertEquals(plaintext.length, readCount);
+      assertArrayEquals(plaintext, decrypted.array());
+    }
+  }
+
   /** Generates and returns a random, temporary file path. */
   public static Path generateRandomPath(String prefix) {
     String tmpDir = java.lang.System.getenv("TEST_TMPDIR");
diff --git a/java/src/test/java/com/google/crypto/tink/TestUtil.java b/java/src/test/java/com/google/crypto/tink/TestUtil.java
index 776b22235..d47ec297c 100644
--- a/java/src/test/java/com/google/crypto/tink/TestUtil.java
+++ b/java/src/test/java/com/google/crypto/tink/TestUtil.java
@@ -26,10 +26,14 @@ import com.google.crypto.tink.aead.AeadFactory;
 import com.google.crypto.tink.hybrid.HybridKeyTemplates;
 import com.google.crypto.tink.mac.MacConfig;
 import com.google.crypto.tink.proto.AesCtrHmacAeadKey;
+import com.google.crypto.tink.proto.AesCtrHmacStreamingKey;
+import com.google.crypto.tink.proto.AesCtrHmacStreamingParams;
 import com.google.crypto.tink.proto.AesCtrKey;
 import com.google.crypto.tink.proto.AesCtrParams;
 import com.google.crypto.tink.proto.AesEaxKey;
 import com.google.crypto.tink.proto.AesEaxParams;
+import com.google.crypto.tink.proto.AesGcmHkdfStreamingKey;
+import com.google.crypto.tink.proto.AesGcmHkdfStreamingParams;
 import com.google.crypto.tink.proto.AesGcmKey;
 import com.google.crypto.tink.proto.EcPointFormat;
 import com.google.crypto.tink.proto.EcdsaParams;
@@ -51,11 +55,13 @@ import com.google.crypto.tink.proto.Keyset;
 import com.google.crypto.tink.proto.Keyset.Key;
 import com.google.crypto.tink.proto.KeysetInfo;
 import com.google.crypto.tink.proto.OutputPrefixType;
+import com.google.crypto.tink.streamingaead.StreamingAeadConfig;
 import com.google.crypto.tink.subtle.EllipticCurves;
 import com.google.crypto.tink.subtle.Hex;
 import com.google.crypto.tink.subtle.Random;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.MessageLite;
+
 import java.io.File;
 import java.io.FileInputStream;
 import java.security.GeneralSecurityException;
@@ -170,6 +176,45 @@ public class TestUtil {
         .build();
   }
 
+  /** @return a {@code KeyData} containing a {@code AesCtrHmacStreamingKey}. */
+  public static KeyData createAesCtrHmacStreamingKeyData(
+      byte[] keyValue, int derivedKeySize, int ciphertextSegmentSize) throws Exception {
+    HmacParams hmacParams = HmacParams.newBuilder()
+        .setHash(HashType.SHA256)
+        .setTagSize(16)
+        .build();
+    AesCtrHmacStreamingParams keyParams = AesCtrHmacStreamingParams.newBuilder()
+        .setCiphertextSegmentSize(ciphertextSegmentSize)
+        .setDerivedKeySize(derivedKeySize)
+        .setHkdfHashType(HashType.SHA256)
+        .setHmacParams(hmacParams)
+        .build();
+    AesCtrHmacStreamingKey keyProto = AesCtrHmacStreamingKey.newBuilder()
+        .setVersion(0)
+        .setKeyValue(ByteString.copyFrom(keyValue))
+        .setParams(keyParams)
+        .build();
+    return createKeyData(keyProto, StreamingAeadConfig.AES_CTR_HMAC_STREAMINGAEAD_TYPE_URL,
+        KeyData.KeyMaterialType.SYMMETRIC);
+  }
+
+  /** @return a {@code KeyData} containing a {@code AesGcmHkdfStreamingKey}. */
+  public static KeyData createAesGcmHkdfStreamingKeyData(
+      byte[] keyValue, int derivedKeySize, int ciphertextSegmentSize) throws Exception {
+    AesGcmHkdfStreamingParams keyParams = AesGcmHkdfStreamingParams.newBuilder()
+        .setCiphertextSegmentSize(ciphertextSegmentSize)
+        .setDerivedKeySize(derivedKeySize)
+        .setHkdfHashType(HashType.SHA256)
+        .build();
+    AesGcmHkdfStreamingKey keyProto = AesGcmHkdfStreamingKey.newBuilder()
+        .setVersion(0)
+        .setKeyValue(ByteString.copyFrom(keyValue))
+        .setParams(keyParams)
+        .build();
+    return createKeyData(keyProto, StreamingAeadConfig.AES_GCM_HKDF_STREAMINGAEAD_TYPE_URL,
+        KeyData.KeyMaterialType.SYMMETRIC);
+  }
+
   /** @return a {@code KeyData} containing a {@code AesCtrHmacAeadKey}. */
   public static KeyData createAesCtrHmacAeadKeyData(
       byte[] aesCtrKeyValue, int ivSize, byte[] hmacKeyValue, int tagSize) throws Exception {
diff --git a/java/src/test/java/com/google/crypto/tink/streamingaead/AesCtrHmacStreamingKeyManagerTest.java b/java/src/test/java/com/google/crypto/tink/streamingaead/AesCtrHmacStreamingKeyManagerTest.java
index 9aa0b6215..741a68955 100644
--- a/java/src/test/java/com/google/crypto/tink/streamingaead/AesCtrHmacStreamingKeyManagerTest.java
+++ b/java/src/test/java/com/google/crypto/tink/streamingaead/AesCtrHmacStreamingKeyManagerTest.java
@@ -16,12 +16,11 @@
 
 package com.google.crypto.tink.streamingaead;
 
-import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 
 import com.google.crypto.tink.StreamingAead;
-import com.google.crypto.tink.StreamingTestUtil.ByteBufferChannel;
+import com.google.crypto.tink.StreamingTestUtil;
 import com.google.crypto.tink.TestUtil;
 import com.google.crypto.tink.proto.AesCtrHmacStreamingKey;
 import com.google.crypto.tink.proto.AesCtrHmacStreamingKeyFormat;
@@ -31,11 +30,6 @@ import com.google.crypto.tink.proto.HmacParams;
 import com.google.crypto.tink.proto.KeyData;
 import com.google.crypto.tink.subtle.Random;
 import com.google.protobuf.ByteString;
-import java.io.ByteArrayOutputStream;
-import java.nio.ByteBuffer;
-import java.nio.channels.Channels;
-import java.nio.channels.SeekableByteChannel;
-import java.nio.channels.WritableByteChannel;
 import java.security.GeneralSecurityException;
 import java.util.Set;
 import java.util.TreeSet;
@@ -65,42 +59,6 @@ public class AesCtrHmacStreamingKeyManagerTest {
     keyManager = new AesCtrHmacStreamingKeyManager();
   }
 
-  /** Tests encryption and decryption functionalities of {@code streamingAead}. */
-  private void testEncryptionAndDecryption(StreamingAead streamingAead) throws Exception {
-    byte[] aad = Random.randBytes(15);
-    // Short plaintext.
-    byte[] shortPlaintext = Random.randBytes(10);
-    testEncryptionAndDecryption(streamingAead, shortPlaintext, aad);
-    // Long plaintext.
-    byte[] longPlaintext = Random.randBytes(1100);
-    testEncryptionAndDecryption(streamingAead, longPlaintext, aad);
-  }
-
-  /**
-   * Tests encryption and decryption functionalities of {@code streamingAead} using {@code
-   * plaintext} and {@code aad}.
-   */
-  private void testEncryptionAndDecryption(
-      StreamingAead streamingAead, byte[] plaintext, byte[] aad) throws Exception {
-    // Encrypt plaintext.
-    ByteArrayOutputStream ciphertext = new ByteArrayOutputStream();
-    WritableByteChannel encChannel =
-        streamingAead.newEncryptingChannel(Channels.newChannel(ciphertext), aad);
-    encChannel.write(ByteBuffer.wrap(plaintext));
-    encChannel.close();
-
-    // Decrypt ciphertext.
-    ByteBufferChannel ciphertextChannel = new ByteBufferChannel(ciphertext.toByteArray());
-    SeekableByteChannel decChannel =
-        streamingAead.newSeekableDecryptingChannel(ciphertextChannel, aad);
-    ByteBuffer decrypted = ByteBuffer.allocate(plaintext.length);
-    int readCount = decChannel.read(decrypted);
-
-    // Compare results;
-    assertEquals(plaintext.length, readCount);
-    assertArrayEquals(plaintext, decrypted.array());
-  }
-
   @Test
   public void testBasic() throws Exception {
     // Create primitive from a given key.
@@ -111,7 +69,7 @@ public class AesCtrHmacStreamingKeyManagerTest {
             .setParams(keyParams)
             .build();
     StreamingAead streamingAead = keyManager.getPrimitive(key);
-    testEncryptionAndDecryption(streamingAead);
+    StreamingTestUtil.testEncryptionAndDecryption(streamingAead);
 
     // Create a key from KeyFormat, and use the key.
     AesCtrHmacStreamingKeyFormat keyFormat =
@@ -119,7 +77,7 @@ public class AesCtrHmacStreamingKeyManagerTest {
     ByteString serializedKeyFormat = ByteString.copyFrom(keyFormat.toByteArray());
     key = (AesCtrHmacStreamingKey) keyManager.newKey(serializedKeyFormat);
     streamingAead = keyManager.getPrimitive(key);
-    testEncryptionAndDecryption(streamingAead);
+    StreamingTestUtil.testEncryptionAndDecryption(streamingAead);
   }
 
   @Test
diff --git a/java/src/test/java/com/google/crypto/tink/streamingaead/AesGcmHkdfStreamingKeyManagerTest.java b/java/src/test/java/com/google/crypto/tink/streamingaead/AesGcmHkdfStreamingKeyManagerTest.java
index 717d4c3ac..dd3052abc 100644
--- a/java/src/test/java/com/google/crypto/tink/streamingaead/AesGcmHkdfStreamingKeyManagerTest.java
+++ b/java/src/test/java/com/google/crypto/tink/streamingaead/AesGcmHkdfStreamingKeyManagerTest.java
@@ -16,12 +16,11 @@
 
 package com.google.crypto.tink.streamingaead;
 
-import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 
 import com.google.crypto.tink.StreamingAead;
-import com.google.crypto.tink.StreamingTestUtil.ByteBufferChannel;
+import com.google.crypto.tink.StreamingTestUtil;
 import com.google.crypto.tink.TestUtil;
 import com.google.crypto.tink.proto.AesGcmHkdfStreamingKey;
 import com.google.crypto.tink.proto.AesGcmHkdfStreamingKeyFormat;
@@ -30,11 +29,6 @@ import com.google.crypto.tink.proto.HashType;
 import com.google.crypto.tink.proto.KeyData;
 import com.google.crypto.tink.subtle.Random;
 import com.google.protobuf.ByteString;
-import java.io.ByteArrayOutputStream;
-import java.nio.ByteBuffer;
-import java.nio.channels.Channels;
-import java.nio.channels.SeekableByteChannel;
-import java.nio.channels.WritableByteChannel;
 import java.security.GeneralSecurityException;
 import java.util.Set;
 import java.util.TreeSet;
@@ -61,42 +55,6 @@ public class AesGcmHkdfStreamingKeyManagerTest {
     keyManager = new AesGcmHkdfStreamingKeyManager();
   }
 
-  /** Tests encryption and decryption functionalities of {@code streamingAead}. */
-  private void testEncryptionAndDecryption(StreamingAead streamingAead) throws Exception {
-    byte[] aad = Random.randBytes(15);
-    // Short plaintext.
-    byte[] shortPlaintext = Random.randBytes(10);
-    testEncryptionAndDecryption(streamingAead, shortPlaintext, aad);
-    // Long plaintext.
-    byte[] longPlaintext = Random.randBytes(1100);
-    testEncryptionAndDecryption(streamingAead, longPlaintext, aad);
-  }
-
-  /**
-   * Tests encryption and decryption functionalities of {@code streamingAead} using {@code
-   * plaintext} and {@code aad}.
-   */
-  private void testEncryptionAndDecryption(
-      StreamingAead streamingAead, byte[] plaintext, byte[] aad) throws Exception {
-    // Encrypt plaintext.
-    ByteArrayOutputStream ciphertext = new ByteArrayOutputStream();
-    WritableByteChannel encChannel =
-        streamingAead.newEncryptingChannel(Channels.newChannel(ciphertext), aad);
-    encChannel.write(ByteBuffer.wrap(plaintext));
-    encChannel.close();
-
-    // Decrypt ciphertext.
-    ByteBufferChannel ciphertextChannel = new ByteBufferChannel(ciphertext.toByteArray());
-    SeekableByteChannel decChannel =
-        streamingAead.newSeekableDecryptingChannel(ciphertextChannel, aad);
-    ByteBuffer decrypted = ByteBuffer.allocate(plaintext.length);
-    int readCount = decChannel.read(decrypted);
-
-    // Compare results;
-    assertEquals(plaintext.length, readCount);
-    assertArrayEquals(plaintext, decrypted.array());
-  }
-
   @Test
   public void testBasic() throws Exception {
     // Create primitive from a given key.
@@ -107,7 +65,7 @@ public class AesGcmHkdfStreamingKeyManagerTest {
             .setParams(keyParams)
             .build();
     StreamingAead streamingAead = keyManager.getPrimitive(key);
-    testEncryptionAndDecryption(streamingAead);
+    StreamingTestUtil.testEncryptionAndDecryption(streamingAead);
 
     // Create a key from KeyFormat, and use the key.
     AesGcmHkdfStreamingKeyFormat keyFormat =
@@ -115,7 +73,7 @@ public class AesGcmHkdfStreamingKeyManagerTest {
     ByteString serializedKeyFormat = ByteString.copyFrom(keyFormat.toByteArray());
     key = (AesGcmHkdfStreamingKey) keyManager.newKey(serializedKeyFormat);
     streamingAead = keyManager.getPrimitive(key);
-    testEncryptionAndDecryption(streamingAead);
+    StreamingTestUtil.testEncryptionAndDecryption(streamingAead);
   }
 
   @Test
diff --git a/java/src/test/java/com/google/crypto/tink/streamingaead/StreamingAeadFactoryTest.java b/java/src/test/java/com/google/crypto/tink/streamingaead/StreamingAeadFactoryTest.java
new file mode 100644
index 000000000..7cfc949e9
--- /dev/null
+++ b/java/src/test/java/com/google/crypto/tink/streamingaead/StreamingAeadFactoryTest.java
@@ -0,0 +1,144 @@
+// Copyright 2017 Google Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+////////////////////////////////////////////////////////////////////////////////
+
+package com.google.crypto.tink.streamingaead;
+
+import static com.google.crypto.tink.TestUtil.assertExceptionContains;
+import static org.junit.Assert.fail;
+
+import com.google.crypto.tink.Config;
+import com.google.crypto.tink.KeysetHandle;
+import com.google.crypto.tink.StreamingAead;
+import com.google.crypto.tink.StreamingTestUtil;
+import com.google.crypto.tink.TestUtil;
+import com.google.crypto.tink.proto.KeyStatusType;
+import com.google.crypto.tink.proto.Keyset.Key;
+import com.google.crypto.tink.proto.OutputPrefixType;
+import com.google.crypto.tink.subtle.Random;
+import java.io.IOException;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for StreamingAeadFactory. */
+@RunWith(JUnit4.class)
+public class StreamingAeadFactoryTest {
+  private static final int AES_KEY_SIZE = 16;
+  //  private static final int HMAC_KEY_SIZE = 20;
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    Config.register(StreamingAeadConfig.TINK_1_1_0);
+  }
+
+  @Test
+  public void testBasicAesCtrHmacStreamingAead() throws Exception {
+    byte[] keyValue = Random.randBytes(AES_KEY_SIZE);
+    int derivedKeySize = AES_KEY_SIZE;
+    int ciphertextSegmentSize = 128;
+    KeysetHandle keysetHandle =
+        TestUtil.createKeysetHandle(
+            TestUtil.createKeyset(
+                TestUtil.createKey(
+                    TestUtil.createAesCtrHmacStreamingKeyData(
+                        keyValue, derivedKeySize, ciphertextSegmentSize),
+                    42,
+                    KeyStatusType.ENABLED,
+                    OutputPrefixType.RAW)));
+    StreamingAead streamingAead = StreamingAeadFactory.getPrimitive(keysetHandle);
+    StreamingTestUtil.testEncryptionAndDecryption(streamingAead);
+  }
+
+  @Test
+  public void testBasicAesGcmHkdfStreamingAead() throws Exception {
+    byte[] keyValue = Random.randBytes(AES_KEY_SIZE);
+    int derivedKeySize = AES_KEY_SIZE;
+    int ciphertextSegmentSize = 128;
+    KeysetHandle keysetHandle =
+        TestUtil.createKeysetHandle(
+            TestUtil.createKeyset(
+                TestUtil.createKey(
+                    TestUtil.createAesGcmHkdfStreamingKeyData(
+                        keyValue, derivedKeySize, ciphertextSegmentSize),
+                    42,
+                    KeyStatusType.ENABLED,
+                    OutputPrefixType.RAW)));
+    StreamingAead streamingAead = StreamingAeadFactory.getPrimitive(keysetHandle);
+    StreamingTestUtil.testEncryptionAndDecryption(streamingAead);
+  }
+
+
+  @Test
+  public void testMultipleKeys() throws Exception {
+    byte[] primaryKeyValue = Random.randBytes(AES_KEY_SIZE);
+    byte[] otherKeyValue = Random.randBytes(AES_KEY_SIZE);
+    byte[] anotherKeyValue = Random.randBytes(AES_KEY_SIZE);
+    int derivedKeySize = AES_KEY_SIZE;
+    int ciphertextSegmentSize = 128;
+
+    Key primaryKey = TestUtil.createKey(
+        TestUtil.createAesGcmHkdfStreamingKeyData(
+            primaryKeyValue, derivedKeySize, ciphertextSegmentSize),
+        42,
+        KeyStatusType.ENABLED,
+        OutputPrefixType.RAW);
+    Key otherKey = TestUtil.createKey(
+        TestUtil.createAesCtrHmacStreamingKeyData(
+            otherKeyValue, derivedKeySize, ciphertextSegmentSize),
+        43,
+        KeyStatusType.ENABLED,
+        OutputPrefixType.RAW);
+    Key anotherKey = TestUtil.createKey(
+        TestUtil.createAesGcmHkdfStreamingKeyData(
+            anotherKeyValue, derivedKeySize, ciphertextSegmentSize),
+        72,
+        KeyStatusType.ENABLED,
+        OutputPrefixType.RAW);
+
+    KeysetHandle keysetHandle =
+        TestUtil.createKeysetHandle(TestUtil.createKeyset(primaryKey, otherKey, anotherKey));
+    StreamingAead streamingAead = StreamingAeadFactory.getPrimitive(keysetHandle);
+
+    StreamingAead primaryAead = StreamingAeadFactory.getPrimitive(
+        TestUtil.createKeysetHandle(TestUtil.createKeyset(primaryKey)));
+    StreamingAead otherAead = StreamingAeadFactory.getPrimitive(
+        TestUtil.createKeysetHandle(TestUtil.createKeyset(otherKey)));
+    StreamingAead anotherAead = StreamingAeadFactory.getPrimitive(
+        TestUtil.createKeysetHandle(TestUtil.createKeyset(anotherKey)));
+
+    StreamingTestUtil.testEncryptionAndDecryption(streamingAead, streamingAead);
+    StreamingTestUtil.testEncryptionAndDecryption(streamingAead, primaryAead);
+    StreamingTestUtil.testEncryptionAndDecryption(primaryAead, streamingAead);
+    StreamingTestUtil.testEncryptionAndDecryption(otherAead, streamingAead);
+    StreamingTestUtil.testEncryptionAndDecryption(anotherAead, streamingAead);
+    StreamingTestUtil.testEncryptionAndDecryption(primaryAead, primaryAead);
+    StreamingTestUtil.testEncryptionAndDecryption(otherAead, otherAead);
+    StreamingTestUtil.testEncryptionAndDecryption(anotherAead, anotherAead);
+    try {
+      StreamingTestUtil.testEncryptionAndDecryption(otherAead, primaryAead);
+      fail("No matching key, should have thrown an exception");
+    } catch (IOException expected) {
+      assertExceptionContains(expected, "No matching key");
+    }
+    try {
+      StreamingTestUtil.testEncryptionAndDecryption(anotherAead, primaryAead);
+      fail("No matching key, should have thrown an exception");
+    } catch (IOException expected) {
+      assertExceptionContains(expected, "No matching key");
+    }
+  }
+}
diff --git a/java/src/test/java/com/google/crypto/tink/subtle/AesCtrHmacStreamingTest.java b/java/src/test/java/com/google/crypto/tink/subtle/AesCtrHmacStreamingTest.java
index 8b8992846..80386809d 100644
--- a/java/src/test/java/com/google/crypto/tink/subtle/AesCtrHmacStreamingTest.java
+++ b/java/src/test/java/com/google/crypto/tink/subtle/AesCtrHmacStreamingTest.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.fail;
 
 import com.google.crypto.tink.StreamingTestUtil;
 import com.google.crypto.tink.StreamingTestUtil.ByteBufferChannel;
+import com.google.crypto.tink.StreamingTestUtil.PseudorandomReadableByteChannel;
 import com.google.crypto.tink.TestUtil;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
@@ -32,7 +33,6 @@ import java.io.Reader;
 import java.io.Writer;
 import java.nio.ByteBuffer;
 import java.nio.channels.Channels;
-import java.nio.channels.ClosedChannelException;
 import java.nio.channels.FileChannel;
 import java.nio.channels.ReadableByteChannel;
 import java.nio.channels.SeekableByteChannel;
@@ -97,58 +97,6 @@ public class AesCtrHmacStreamingTest {
     assertByteBufferContains("", expected, buffer);
   }
 
-  class PseudorandomReadableByteChannel implements ReadableByteChannel {
-    private long size;
-    private long position;
-    private boolean open;
-    private byte[] repeatedBlock;
-    private static final int BLOCK_SIZE = 1024;
-
-    public PseudorandomReadableByteChannel(long size) {
-      this.size = size;
-      this.position = 0;
-      this.open = true;
-      this.repeatedBlock = generatePlaintext(BLOCK_SIZE);
-    }
-
-    @Override
-    public int read(ByteBuffer dst) throws IOException {
-      if (!open) {
-        throw new ClosedChannelException();
-      }
-      if (position == size) {
-        return -1;
-      }
-      long start = position;
-      long end = java.lang.Math.min(size, start + dst.remaining());
-      long firstBlock = start / BLOCK_SIZE;
-      long lastBlock = end / BLOCK_SIZE;
-      int startOffset = (int) (start % BLOCK_SIZE);
-      int endOffset = (int) (end % BLOCK_SIZE);
-      if (firstBlock == lastBlock) {
-        dst.put(repeatedBlock, startOffset, endOffset - startOffset);
-      } else {
-        dst.put(repeatedBlock, startOffset, BLOCK_SIZE - startOffset);
-        for (long block = firstBlock + 1; block < lastBlock; block++) {
-          dst.put(repeatedBlock);
-        }
-        dst.put(repeatedBlock, 0, endOffset);
-      }
-      position = end;
-      return (int) (position - start);
-    }
-
-    @Override
-    public void close() {
-      this.open = false;
-    }
-
-    @Override
-    public boolean isOpen() {
-      return this.open;
-    }
-  }
-
   /** Returns a plaintext of a given size. */
   private byte[] generatePlaintext(int size) {
     byte[] plaintext = new byte[size];
diff --git a/java/src/test/java/com/google/crypto/tink/subtle/RewindableReadableByteChannelTest.java b/java/src/test/java/com/google/crypto/tink/subtle/RewindableReadableByteChannelTest.java
new file mode 100644
index 000000000..eeee7c64a
--- /dev/null
+++ b/java/src/test/java/com/google/crypto/tink/subtle/RewindableReadableByteChannelTest.java
@@ -0,0 +1,226 @@
+// Copyright 2017 Google Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+////////////////////////////////////////////////////////////////////////////////
+
+package com.google.crypto.tink.subtle;
+
+import static com.google.crypto.tink.TestUtil.assertExceptionContains;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import com.google.crypto.tink.StreamingTestUtil.PseudorandomReadableByteChannel;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.ReadableByteChannel;
+import java.util.Arrays;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for RewindableReadableByteChannel */
+@RunWith(JUnit4.class)
+public class RewindableReadableByteChannelTest {
+
+  @Test
+  public void testSingleReadsOfVariousLengths() throws Exception {
+    int inputSize = 1234;
+    ReadableByteChannel baseChannel = new PseudorandomReadableByteChannel(inputSize);
+    assertTrue(baseChannel.isOpen());
+    RewindableReadableByteChannel rewindableChannel =
+        new RewindableReadableByteChannel(baseChannel);
+    assertTrue(rewindableChannel.isOpen());
+
+    // Read some initial bytes.
+    int buffer1Size = 42;
+    ByteBuffer buffer1 = ByteBuffer.allocate(buffer1Size);
+    assertEquals(buffer1Size, rewindableChannel.read(buffer1));
+
+    // Rewind, and read a shorter sequence of initial bytes.
+    rewindableChannel.rewind();
+    int buffer2Size = 40;
+    ByteBuffer buffer2 = ByteBuffer.allocate(buffer2Size);
+    assertEquals(buffer2Size, rewindableChannel.read(buffer2));
+    assertArrayEquals(buffer2.array(), Arrays.copyOfRange(buffer1.array(), 0, buffer2Size));
+
+    // Rewind, and read a longer sequence of initial bytes.
+    rewindableChannel.rewind();
+    int buffer3Size = 60;
+    ByteBuffer buffer3 = ByteBuffer.allocate(buffer3Size);
+    assertEquals(buffer3Size, rewindableChannel.read(buffer3));
+    assertArrayEquals(buffer1.array(), Arrays.copyOfRange(buffer3.array(), 0, buffer1Size));
+
+    // Read all the remaining bytes.
+    int buffer4Size = inputSize - buffer3Size;
+    ByteBuffer buffer4 = ByteBuffer.allocate(buffer4Size);
+    assertEquals(buffer4Size, rewindableChannel.read(buffer4));
+
+    // Check that no more bytes are left.
+    ByteBuffer buffer5 = ByteBuffer.allocate(inputSize);
+    assertEquals(-1, rewindableChannel.read(buffer5));
+
+    // Rewind, and read the entire file again.
+    rewindableChannel.rewind();
+    assertEquals(inputSize, rewindableChannel.read(buffer5));
+    assertArrayEquals(buffer4.array(),
+        Arrays.copyOfRange(buffer5.array(), buffer3Size, inputSize));
+
+    // Close the channel.
+    rewindableChannel.close();
+    assertFalse(rewindableChannel.isOpen());
+    assertFalse(baseChannel.isOpen());
+
+    // Try rewinding or reading after closing.
+    try {
+      rewindableChannel.rewind();
+      fail("Should have thrown exception, as cannot rewind after closing.");
+    } catch (IOException expected) {
+      assertExceptionContains(expected, "Cannot rewind");
+    }
+    ByteBuffer buffer6 = ByteBuffer.allocate(42);
+    try {
+      int readCount = rewindableChannel.read(buffer6);
+      fail("Should have thrown exception, as cannot read after closing.");
+    } catch (ClosedChannelException expected) {
+    }
+  }
+
+  @Test
+  public void testSubsequentReads() throws Exception {
+    int inputSize = 1234;
+    ReadableByteChannel baseChannel = new PseudorandomReadableByteChannel(inputSize);
+    assertTrue(baseChannel.isOpen());
+    RewindableReadableByteChannel rewindableChannel =
+        new RewindableReadableByteChannel(baseChannel);
+    assertTrue(rewindableChannel.isOpen());
+
+    // Read some initial bytes.
+    int buffer1Size = 105;
+    ByteBuffer buffer1 = ByteBuffer.allocate(buffer1Size);
+    int limit1 = 42;
+    buffer1.limit(limit1);
+    assertEquals(limit1, rewindableChannel.read(buffer1));
+
+    // Continue reading until the buffer is full.
+    buffer1.limit(buffer1.capacity());
+    assertEquals(buffer1Size - limit1, rewindableChannel.read(buffer1));
+
+    // Rewind, and read a longer sequence of initial bytes.
+    rewindableChannel.rewind();
+    int buffer2Size = 160;
+    ByteBuffer buffer2 = ByteBuffer.allocate(buffer2Size);
+    assertEquals(buffer2Size, rewindableChannel.read(buffer2));
+    assertArrayEquals(buffer1.array(), Arrays.copyOfRange(buffer2.array(), 0, buffer1Size));
+
+    // Rewind, and read a longer sequence in multiple steps.
+    rewindableChannel.rewind();
+    int buffer3Size = 150;
+    ByteBuffer buffer3 = ByteBuffer.allocate(buffer3Size);
+    int stepCount = 5;
+    int blockSize = buffer3Size / stepCount;
+    for (int i = 1; i <= stepCount; i++) {
+      buffer3.limit(i * blockSize);
+      assertEquals(blockSize, rewindableChannel.read(buffer3));
+    }
+    assertArrayEquals(buffer3.array(), Arrays.copyOfRange(buffer2.array(), 0, buffer3Size));
+
+    // Read the remaining bytes and check the size;
+    ByteBuffer buffer4 = ByteBuffer.allocate(inputSize);
+    assertEquals(inputSize - buffer3Size, rewindableChannel.read(buffer4));
+    assertEquals(-1, rewindableChannel.read(buffer4));
+
+    // Close the channel.
+    rewindableChannel.close();
+    assertFalse(rewindableChannel.isOpen());
+    assertFalse(baseChannel.isOpen());
+  }
+
+  @Test
+  public void testDisableRewind() throws Exception {
+    int blockSize = PseudorandomReadableByteChannel.BLOCK_SIZE;
+    int extraSize = 123;
+    int blockCount = 5;
+    int inputSize = blockSize * blockCount + extraSize;
+    ReadableByteChannel baseChannel = new PseudorandomReadableByteChannel(inputSize);
+    assertTrue(baseChannel.isOpen());
+    RewindableReadableByteChannel rewindableChannel =
+        new RewindableReadableByteChannel(baseChannel);
+    assertTrue(rewindableChannel.isOpen());
+
+    // Read two blocks.
+    ByteBuffer twoBlocksBuffer = ByteBuffer.allocate(2 * blockSize);
+    assertEquals(2 * blockSize, rewindableChannel.read(twoBlocksBuffer));
+    // Verify that the read bytes are not all the same.
+    assertFalse(Arrays.equals(Arrays.copyOfRange(twoBlocksBuffer.array(), 0, 42),
+            Arrays.copyOfRange(twoBlocksBuffer.array(), 42, 2 * 42)));
+
+    // Rewind and read 1 block + extraSize;
+    rewindableChannel.rewind();
+    ByteBuffer blockAndExtraBuffer = ByteBuffer.allocate(blockSize + extraSize);
+    assertEquals(blockSize + extraSize, rewindableChannel.read(blockAndExtraBuffer));
+    assertArrayEquals(blockAndExtraBuffer.array(),
+        Arrays.copyOfRange(twoBlocksBuffer.array(), 0, blockSize + extraSize));
+
+    // Disable the rewinding feature, and continue reading.
+    rewindableChannel.disableRewinding();
+    try {
+      rewindableChannel.rewind();
+      fail("Should have thrown exception, as rewinding has been dropped");
+    } catch (IOException expected) {
+      assertExceptionContains(expected, "Cannot rewind");
+    }
+    ByteBuffer oneBlockBuffer = ByteBuffer.allocate(blockSize);
+    assertEquals(blockSize, rewindableChannel.read(oneBlockBuffer));
+    assertArrayEquals(oneBlockBuffer.array(),
+        Arrays.copyOfRange(twoBlocksBuffer.array(), extraSize, blockSize + extraSize));
+
+    int remainingSize = (blockCount - 2) * blockSize;
+    ByteBuffer remainingBuffer = ByteBuffer.allocate(remainingSize);
+    assertEquals(remainingSize, rewindableChannel.read(remainingBuffer));
+    assertArrayEquals(blockAndExtraBuffer.array(),
+        Arrays.copyOfRange(remainingBuffer.array(),
+            remainingSize - blockSize - extraSize, remainingSize));
+
+    // Check EOF.
+    ByteBuffer buffer = ByteBuffer.allocate(42);
+    assertEquals(-1, rewindableChannel.read(buffer));
+
+    // Close the channel.
+    rewindableChannel.close();
+    assertFalse(rewindableChannel.isOpen());
+    assertFalse(baseChannel.isOpen());
+  }
+
+  @Test
+  public void testExceptions() throws Exception {
+    int inputSize = 1234;
+    ReadableByteChannel baseChannel = new PseudorandomReadableByteChannel(inputSize);
+    baseChannel.close();
+    assertFalse(baseChannel.isOpen());
+    RewindableReadableByteChannel rewindableChannel =
+        new RewindableReadableByteChannel(baseChannel);
+    assertFalse(rewindableChannel.isOpen());
+
+    ByteBuffer buffer = ByteBuffer.allocate(42);
+    try {
+      int readCount = rewindableChannel.read(buffer);
+      fail("Should have thrown exception, as cannot read after closing.");
+    } catch (ClosedChannelException expected) {
+    }
+  }
+}
-- 
GitLab