diff --git a/python/core/__init__.py b/python/core/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..9287f2de1cea9798a392ff2f38c6e766ad65f933 --- /dev/null +++ b/python/core/__init__.py @@ -0,0 +1,54 @@ +# Copyright 2019 Google LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Core package.""" +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +from tink.python.core import crypto_format as _crypto_format +from tink.python.core import key_manager +from tink.python.core import keyset_handle +from tink.python.core import keyset_reader +from tink.python.core import keyset_writer +from tink.python.core import primitive_set +from tink.python.core import primitive_wrapper +from tink.python.core import registry +from tink.python.core import tink_error + + +KeyManager = key_manager.KeyManager +PrivateKeyManager = key_manager.PrivateKeyManager + +new_keyset_handle = keyset_handle.generate_new +read_keyset_handle = keyset_handle.read +KeysetHandle = keyset_handle.KeysetHandle + +KeysetReader = keyset_reader.KeysetReader +JsonKeysetReader = keyset_reader.JsonKeysetReader +BinaryKeysetReader = keyset_reader.BinaryKeysetReader + +KeysetWriter = keyset_writer.KeysetWriter +JsonKeysetWriter = keyset_writer.JsonKeysetWriter +BinaryKeysetWriter = keyset_writer.BinaryKeysetWriter + +Registry = registry.Registry + +TinkError = tink_error.TinkError + +new_primitive_set = primitive_set.new_primitive_set +PrimitiveSet = primitive_set.PrimitiveSet +PrimitiveWrapper = primitive_wrapper.PrimitiveWrapper + +crypto_format = _crypto_format diff --git a/python/core/crypto_format.py b/python/core/crypto_format.py new file mode 100644 index 0000000000000000000000000000000000000000..35fb768c92be5df37487791002691a5c70a35691 --- /dev/null +++ b/python/core/crypto_format.py @@ -0,0 +1,47 @@ +# Copyright 2019 Google LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Constants and convenience methods for the outputs handled by Tink.""" + +from __future__ import absolute_import +from __future__ import division +from __future__ import google_type_annotations +from __future__ import print_function + +import struct + +from tink.proto import tink_pb2 +from tink.python.core import tink_error + +TINK_START_BYTE = b'\x01' +LEGACY_START_BYTE = b'\x00' +RAW_PREFIX_SIZE = 0 +NON_RAW_PREFIX_SIZE = 5 +TINK_PREFIX_SIZE = NON_RAW_PREFIX_SIZE +RAW_PREFIX = b'' + + +def output_prefix(key: tink_pb2.Keyset.Key) -> bytes: + """Generates the prefix for the outputs handled by the specified key.""" + if key.output_prefix_type == tink_pb2.TINK: + return struct.pack('>cL', TINK_START_BYTE, key.key_id) + elif (key.output_prefix_type == tink_pb2.CRUNCHY or + key.output_prefix_type == tink_pb2.LEGACY): + return struct.pack('>cL', LEGACY_START_BYTE, key.key_id) + elif key.output_prefix_type == tink_pb2.RAW: + return b'' + else: + raise tink_error.TinkError( + 'The given key has invalid OutputPrefixType {}.'.format( + key.output_prefix_type)) diff --git a/python/core/crypto_format_test.py b/python/core/crypto_format_test.py new file mode 100644 index 0000000000000000000000000000000000000000..9894946996fdedf49e46386f751679e000f13d15 --- /dev/null +++ b/python/core/crypto_format_test.py @@ -0,0 +1,84 @@ +# Copyright 2019 Google LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Tests for tink.python.crypto_format.""" + +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import unittest +from tink.proto import tink_pb2 +from tink.python import core + + +def to_byte(c): + # items in byte strings are of type str in Python v2.7 and int in v3.5 + if isinstance(c, str): + return c.encode() + else: + return chr(c).encode() + + +class CryptoFormatTest(googletest.TestCase): + + def test_tink_prefix(self): + key = tink_pb2.Keyset.Key() + key.output_prefix_type = tink_pb2.TINK + key.key_id = 0x00040695 + prefix = core.crypto_format.output_prefix(key) + self.assertLen(prefix, core.crypto_format.TINK_PREFIX_SIZE) + self.assertEqual( + to_byte(prefix[0]), bytes(core.crypto_format.TINK_START_BYTE)) + # key_id in big-endian format. + self.assertEqual(prefix[1:5], b'\x00\x04\x06\x95') + + def test_legacy_prefix(self): + key = tink_pb2.Keyset.Key() + key.output_prefix_type = tink_pb2.LEGACY + key.key_id = 0xFF7F1058 + prefix = core.crypto_format.output_prefix(key) + self.assertLen(prefix, core.crypto_format.NON_RAW_PREFIX_SIZE) + self.assertEqual(to_byte(prefix[0]), core.crypto_format.LEGACY_START_BYTE) + # key_id in big-endian format. + self.assertEqual(prefix[1:5], b'\xFF\x7F\x10\x58') + + def test_crunchy_prefix(self): + key = tink_pb2.Keyset.Key() + key.output_prefix_type = tink_pb2.CRUNCHY + key.key_id = 0x12AAB1 + prefix = core.crypto_format.output_prefix(key) + self.assertLen(prefix, core.crypto_format.NON_RAW_PREFIX_SIZE) + self.assertEqual(to_byte(prefix[0]), core.crypto_format.LEGACY_START_BYTE) + # key_id in big-endian format. + self.assertEqual(prefix[1:5], b'\x00\x12\xAA\xB1') + + def test_raw_prefix(self): + key = tink_pb2.Keyset.Key() + key.output_prefix_type = tink_pb2.RAW + key.key_id = 0x74EB33 + prefix = core.crypto_format.output_prefix(key) + self.assertLen(prefix, core.crypto_format.RAW_PREFIX_SIZE) + + def test_invalid_output_prefix(self): + with self.assertRaisesRegex( + core.TinkError, 'The given key has invalid OutputPrefixType 42.'): + key = tink_pb2.Keyset.Key() + key.output_prefix_type = 42 + key.key_id = 0x11223344 + _ = core.crypto_format.output_prefix(key) + + +if __name__ == '__main__': + googletest.main() diff --git a/python/core/key_manager.py b/python/core/key_manager.py new file mode 100644 index 0000000000000000000000000000000000000000..7422d630e7802e387824bd21a5f859d5df64dfba --- /dev/null +++ b/python/core/key_manager.py @@ -0,0 +1,163 @@ +# Copyright 2019 Google LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""This module defines the basic types and abstract classes in Tink.""" +from __future__ import absolute_import +from __future__ import division +from __future__ import google_type_annotations +from __future__ import print_function + +import abc + +import google3 + +from typing import Any, Generic, Text, Type, TypeVar + +from tink.proto import tink_pb2 +from tink.python.core import tink_error + + +P = TypeVar('P') + + +class KeyManager(Generic[P]): + """Generates keys and provides primitives for the keys. + + A KeyManager "understands" keys of a specific key types: it can generate keys + of a supported type and create primitives for supported keys. A key type is + identified by the global name of the protocol buffer that holds the + corresponding key material, and is given by type_url-field of KeyData-protocol + buffer. + """ + + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def primitive_class(self) -> Type[P]: + """The class of the primitive it uses. Used for internal management.""" + pass + + @abc.abstractmethod + def primitive(self, key_data: tink_pb2.KeyData) -> P: + """Constructs an primitive for the given key. + + Args: + key_data: KeyData protocol buffer + Returns: + A primitive, for example an instance of Aead or Mac. + Raises: + google3.third_party.tink.python.tink_error.TinkError if getting the + primitive fails. + """ + pass + + @abc.abstractmethod + def key_type(self) -> Text: + """Returns the type_url identifying the key type handled by this manager.""" + pass + + @abc.abstractmethod + def new_key_data(self, + key_template: tink_pb2.KeyTemplate) -> tink_pb2.KeyData: + """Generates a new random key, based on the specified key_template. + + Args: + key_template: KeyTemplate protocol buffer + Returns: + A KeyData protocol buffer that contains the key. + Raises: + google3.third_party.tink.python.error.TinkError if the key generation + fails. + """ + pass + + def does_support(self, type_url: Text) -> bool: + return self.key_type() == type_url + + +class PrivateKeyManager(KeyManager[P]): + """Generates keys and provides primitives for the keys.""" + + @abc.abstractmethod + def public_key_data( + self, private_key_data: tink_pb2.KeyData) -> tink_pb2.KeyData: + """Generates a new random key, based on the specified key_template. + + Args: + private_key_data: KeyData protocol buffer + Returns: + A KeyData protocol buffer that contains the public key. + Raises: + google3.third_party.tink.python.error.TinkError if the key generation + fails. + """ + pass + + +class KeyManagerCcToPyWrapper(KeyManager[P]): + """Transforms cliffed C++ KeyManager into a Python KeyManager.""" + + def __init__(self, + cc_key_manager: Any, # A cliffed CcKeyManager<P> instance + primitive_class: Type[P], + primitive_py_wrapper: Type[P]): + self._cc_key_manager = cc_key_manager + self._primitive_class = primitive_class + self._primitive_py_wrapper = primitive_py_wrapper + + def primitive_class(self) -> Type[P]: + return self._primitive_class + + @tink_error.use_tink_errors + def primitive(self, key_data: tink_pb2.KeyData) -> P: + return self._primitive_py_wrapper(self._cc_key_manager.primitive(key_data)) + + def key_type(self) -> Text: + return self._cc_key_manager.key_type() + + @tink_error.use_tink_errors + def new_key_data(self, + key_template: tink_pb2.KeyTemplate) -> tink_pb2.KeyData: + return self._cc_key_manager.new_key_data(key_template) + + +class PrivateKeyManagerCcToPyWrapper(PrivateKeyManager[P]): + """Transforms cliffed C++ KeyManager into a Python KeyManager.""" + + def __init__(self, + cc_key_manager: Any, # A cliffed CcKeyManager<P> instance + primitive_class: Type[P], + primitive_py_wrapper: Type[P]): + self._cc_key_manager = cc_key_manager + self._primitive_class = primitive_class + self._primitive_py_wrapper = primitive_py_wrapper + + def primitive_class(self) -> Type[P]: + return self._primitive_class + + @tink_error.use_tink_errors + def primitive(self, key_data: tink_pb2.KeyData) -> P: + return self._primitive_py_wrapper(self._cc_key_manager.primitive(key_data)) + + def key_type(self) -> Text: + return self._cc_key_manager.key_type() + + @tink_error.use_tink_errors + def new_key_data(self, + key_template: tink_pb2.KeyTemplate) -> tink_pb2.KeyData: + return self._cc_key_manager.new_key_data(key_template) + + @tink_error.use_tink_errors + def public_key_data(self, key_data: tink_pb2.KeyData) -> tink_pb2.KeyData: + return self._cc_key_manager.public_key_data(key_data) diff --git a/python/core/keyset_handle.py b/python/core/keyset_handle.py new file mode 100644 index 0000000000000000000000000000000000000000..e3151362ba6c260e90723d9825da29b8b970fd25 --- /dev/null +++ b/python/core/keyset_handle.py @@ -0,0 +1,230 @@ +# Copyright 2019 Google LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""This module defines KeysetHandle.""" + +from __future__ import absolute_import +from __future__ import division +from __future__ import google_type_annotations +from __future__ import print_function + +import random + +from typing import Type, TypeVar + +from google3.net.proto2.python.public import message +from tink.proto import tink_pb2 +from tink.python.aead import aead +from tink.python.core import keyset_reader as reader +from tink.python.core import keyset_writer as writer +from tink.python.core import primitive_set +from tink.python.core import registry +from tink.python.core import tink_error + +P = TypeVar('P') + +MAX_INT32 = 2147483647 # = 2^31 - 1 + + +class KeysetHandle(object): + """A KeysetHandle provides abstracted access to Keyset. + + KeysetHandle limits the exposure of actual protocol buffers that hold + sensitive key material. This class allows reading and writing encrypted + keysets. + """ + + def __init__(self, keyset: tink_pb2.Keyset): + self._keyset = keyset + + def keyset_info(self) -> tink_pb2.KeysetInfo: + """Returns the KeysetInfo that doesn't contain actual key material.""" + return _keyset_info(self._keyset) + + def write(self, keyset_writer: writer.KeysetWriter, + master_key_primitive: aead.Aead) -> None: + """Serializes, encrypts with master_key_primitive and writes the keyset.""" + encrypted_keyset = _encrypt(self._keyset, master_key_primitive) + keyset_writer.write_encrypted(encrypted_keyset) + + def public_keyset_handle(self) -> 'KeysetHandle': + """Returns a new KeysetHandle for the corresponding public keys.""" + public_keyset = tink_pb2.Keyset() + for key in self._keyset.key: + public_key = public_keyset.key.add() + public_key.CopyFrom(key) + public_key.key_data.CopyFrom( + registry.Registry.public_key_data(key.key_data)) + _validate_key(public_key) + public_keyset.primary_key_id = self._keyset.primary_key_id + return KeysetHandle(public_keyset) + + def primitive(self, primitive_class: Type[P]) -> P: + """Returns a wrapped primitive from this KeysetHandle. + + Uses the KeyManager and the PrimitiveWrapper objects in the global + registry.Registry + to create the primitive. This function is the most common way of creating a + primitive. + + Args: + primitive_class: The class of the primitive. + + Returns: + The primitive. + Raises: + google3.third_party.tink.python.error.tink_error.TinkError if creation of + the + primitive fails, for example if primitive_class cannot be used with this + KeysetHandle. + """ + _validate_keyset(self._keyset) + pset = primitive_set.PrimitiveSet(primitive_class) + for key in self._keyset.key: + if key.status == tink_pb2.ENABLED: + primitive = registry.Registry.primitive(key.key_data, primitive_class) + entry = pset.add_primitive(primitive, key) + if key.key_id == self._keyset.primary_key_id: + pset.set_primary(entry) + return registry.Registry.wrap(pset) + + +def generate_new(key_template: tink_pb2.KeyTemplate) -> KeysetHandle: + """Return a new KeysetHandle. + + It contains a single fresh key generated according to key_template. + + Args: + key_template: A tink_pb2.KeyTemplate object. + + Returns: + A new KeysetHandle. + """ + keyset = tink_pb2.Keyset() + key_data = registry.Registry.new_key_data(key_template) + key_id = _generate_unused_key_id(keyset) + key = keyset.key.add() + key.key_data.CopyFrom(key_data) + key.status = tink_pb2.ENABLED + key.key_id = key_id + key.output_prefix_type = key_template.output_prefix_type + keyset.primary_key_id = key_id + return KeysetHandle(keyset) + + +def read(keyset_reader: reader.KeysetReader, + master_key_aead: aead.Aead) -> KeysetHandle: + """Tries to create a KeysetHandle from an encrypted keyset.""" + encrypted_keyset = keyset_reader.read_encrypted() + _assert_enough_encrypted_key_material(encrypted_keyset) + return KeysetHandle(_decrypt(encrypted_keyset, master_key_aead)) + + +def _keyset_info(keyset: tink_pb2.Keyset) -> tink_pb2.KeysetInfo: + keyset_info = tink_pb2.KeysetInfo(primary_key_id=keyset.primary_key_id) + for key in keyset.key: + key_info = keyset_info.key_info.add() + key_info.type_url = key.key_data.type_url + key_info.status = key.status + key_info.output_prefix_type = key.output_prefix_type + key_info.key_id = key.key_id + return keyset_info + + +def _encrypt(keyset: tink_pb2.Keyset, + master_key_primitive: aead.Aead) -> tink_pb2.EncryptedKeyset: + """Encrypts a Keyset and returns an EncryptedKeyset.""" + encrypted_keyset = master_key_primitive.encrypt(keyset.SerializeToString(), + b'') + # Check if we can decrypt, to detect errors + try: + keyset2 = tink_pb2.Keyset.FromString( + master_key_primitive.decrypt(encrypted_keyset, b'')) + if keyset != keyset2: + raise tink_error.TinkError('cannot encrypt keyset: %s != %s' % + (keyset, keyset2)) + except message.DecodeError: + raise tink_error.TinkError('invalid keyset, corrupted key material') + return tink_pb2.EncryptedKeyset( + encrypted_keyset=encrypted_keyset, keyset_info=_keyset_info(keyset)) + + +def _decrypt(encrypted_keyset: tink_pb2.EncryptedKeyset, + master_key_aead: aead.Aead) -> tink_pb2.Keyset: + """Decrypts an EncryptedKeyset and returns a Keyset.""" + try: + keyset = tink_pb2.Keyset.FromString( + master_key_aead.decrypt(encrypted_keyset.encrypted_keyset, b'')) + # Check emptiness here too, in case the encrypted keys unwrapped to nothing? + _assert_enough_key_material(keyset) + return keyset + except message.DecodeError: + raise tink_error.TinkError('invalid keyset, corrupted key material') + + +def _validate_keyset(keyset: tink_pb2.Keyset): + """Raises tink_error.TinkError if keyset is not valid.""" + for key in keyset.key: + if key.status != tink_pb2.DESTROYED: + _validate_key(key) + num_non_destroyed_keys = sum( + 1 for key in keyset.key if key.status != tink_pb2.DESTROYED) + num_non_public_key_material = sum( + 1 for key in keyset.key + if key.key_data.key_material_type != tink_pb2.KeyData.ASYMMETRIC_PUBLIC) + num_primary_keys = sum( + 1 for key in keyset.key + if key.status == tink_pb2.ENABLED and key.key_id == keyset.primary_key_id) + if num_non_destroyed_keys == 0: + raise tink_error.TinkError('empty keyset') + if num_primary_keys > 1: + raise tink_error.TinkError('keyset contains multiple primary keys') + if num_primary_keys == 0 and num_non_public_key_material > 0: + raise tink_error.TinkError('keyset does not contain a valid primary key') + + +def _validate_key(key: tink_pb2.Keyset.Key): + """Raises tink_error.TinkError if key is not valid.""" + if not key.HasField('key_data'): + raise tink_error.TinkError('key {} has no key data'.format(key.key_id)) + if key.output_prefix_type == tink_pb2.UNKNOWN_PREFIX: + raise tink_error.TinkError('key {} has unknown prefix'.format(key.key_id)) + if key.status == tink_pb2.UNKNOWN_STATUS: + raise tink_error.TinkError('key {} has unknown status'.format(key.key_id)) + + +def _assert_no_secret_key_material(keyset: tink_pb2.Keyset): + for key in keyset.key: + if (key.key_data.key_material_type == tink_pb2.KeyData.UNKNOWN_KEYMATERIAL + or key.key_data.key_material_type == tink_pb2.KeyData.SYMMETRIC or + key.key_data.key_material_type == tink_pb2.KeyData.ASYMMETRIC_PRIVATE): + raise tink_error.TinkError('keyset contains secret key material') + + +def _assert_enough_key_material(keyset: tink_pb2.Keyset): + if not keyset or not keyset.key: + raise tink_error.TinkError('empty keyset') + + +def _assert_enough_encrypted_key_material( + encrypted_keyset: tink_pb2.EncryptedKeyset): + if not encrypted_keyset or not encrypted_keyset.encrypted_keyset: + raise tink_error.TinkError('empty keyset') + + +def _generate_unused_key_id(keyset: tink_pb2.Keyset) -> int: + while True: + key_id = random.randint(1, MAX_INT32) + if key_id not in {key.key_id for key in keyset.key}: + return key_id diff --git a/python/core/keyset_handle_test.py b/python/core/keyset_handle_test.py new file mode 100644 index 0000000000000000000000000000000000000000..a28d3c73825e8d9ff724622ee0dcd0e7606f10e2 --- /dev/null +++ b/python/core/keyset_handle_test.py @@ -0,0 +1,299 @@ +# Copyright 2019 Google LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Tests for tink.python.keyset_handle.""" + +from __future__ import absolute_import +from __future__ import division +from __future__ import google_type_annotations +from __future__ import print_function + +import io + +import unittest +from tink.proto import tink_pb2 +from tink.python import aead +from tink.python import core +from tink.python import hybrid +from tink.python import mac +from tink.python.core import tink_config +from tink.python.testing import helper + + +def setUpModule(): + tink_config.register() + + +class FaultyAead(aead.Aead): + + def encrypt(self, plaintext: bytes, associated_data: bytes) -> bytes: + raise core.TinkError('encrypt failed.') + + def decrypt(self, plaintext: bytes, associated_data: bytes) -> bytes: + raise core.TinkError('decrypt failed.') + + +class BadAead1(aead.Aead): + + def encrypt(self, plaintext: bytes, associated_data: bytes) -> bytes: + return b'ciphertext' + + def decrypt(self, plaintext: bytes, associated_data: bytes) -> bytes: + return b'plaintext' + + +class BadAead2(aead.Aead): + + def encrypt(self, plaintext: bytes, associated_data: bytes) -> bytes: + return b'ciphertext' + + def decrypt(self, plaintext: bytes, associated_data: bytes) -> bytes: + return tink_pb2.Keyset(primary_key_id=42).SerializeToString() + + +def _master_key_aead(): + return core.Registry.primitive( + core.Registry.new_key_data(aead.aead_key_templates.AES128_EAX), + aead.Aead) + + +class KeysetHandleTest(googletest.TestCase): + + def test_generate_new(self): + keyset_info = core.new_keyset_handle( + mac.mac_key_templates.HMAC_SHA256_128BITTAG).keyset_info() + self.assertLen(keyset_info.key_info, 1) + key_info = keyset_info.key_info[0] + self.assertEqual(key_info.status, tink_pb2.ENABLED) + self.assertEqual( + key_info.output_prefix_type, + mac.mac_key_templates.HMAC_SHA256_128BITTAG.output_prefix_type) + self.assertEqual(keyset_info.primary_key_id, key_info.key_id) + + def test_generate_new_key_id_is_randomized(self): + handle1 = core.new_keyset_handle( + mac.mac_key_templates.HMAC_SHA256_128BITTAG) + handle2 = core.new_keyset_handle( + mac.mac_key_templates.HMAC_SHA256_128BITTAG) + self.assertNotEqual(handle1.keyset_info().key_info[0].key_id, + handle2.keyset_info().key_info[0].key_id) + + def test_write_encrypted(self): + handle = core.new_keyset_handle( + mac.mac_key_templates.HMAC_SHA256_128BITTAG) + # Encrypt the keyset with Aead. + master_key_aead = _master_key_aead() + output_stream = io.BytesIO() + writer = core.BinaryKeysetWriter(output_stream) + handle.write(writer, master_key_aead) + reader = core.BinaryKeysetReader(output_stream.getvalue()) + handle2 = core.read_keyset_handle(reader, master_key_aead) + # Check that handle2 has the same primitive as handle. + handle2.primitive(mac.Mac).verify_mac( + handle.primitive(mac.Mac).compute_mac(b'data'), b'data') + + def test_write_raises_error_when_encrypt_failed(self): + handle = core.new_keyset_handle( + mac.mac_key_templates.HMAC_SHA256_128BITTAG) + writer = core.BinaryKeysetWriter(io.BytesIO()) + with self.assertRaisesRegex(core.TinkError, 'encrypt failed'): + handle.write(writer, FaultyAead()) + + def test_write_raises_error_when_decrypt_not_possible(self): + handle = core.new_keyset_handle( + mac.mac_key_templates.HMAC_SHA256_128BITTAG) + writer = core.BinaryKeysetWriter(io.BytesIO()) + with self.assertRaisesRegex(core.TinkError, + 'invalid keyset, corrupted key material'): + handle.write(writer, BadAead1()) + + def test_write_raises_error_when_decrypt_to_wrong_keyset(self): + handle = core.new_keyset_handle( + mac.mac_key_templates.HMAC_SHA256_128BITTAG) + writer = core.BinaryKeysetWriter(io.BytesIO()) + with self.assertRaisesRegex(core.TinkError, 'cannot encrypt keyset:'): + handle.write(writer, BadAead2()) + + def test_read_empty_keyset_fails(self): + with self.assertRaisesRegex(core.TinkError, 'No keyset found'): + core.read_keyset_handle( + core.BinaryKeysetReader(b''), _master_key_aead()) + + def test_public_keyset_handle(self): + private_handle = core.new_keyset_handle( + hybrid.hybrid_key_templates.ECIES_P256_HKDF_HMAC_SHA256_AES128_GCM) + public_handle = private_handle.public_keyset_handle() + hybrid_dec = private_handle.primitive(hybrid.HybridDecrypt) + hybrid_enc = public_handle.primitive(hybrid.HybridEncrypt) + + self.assertEqual(public_handle.keyset_info().primary_key_id, + private_handle.keyset_info().primary_key_id) + self.assertLen(public_handle.keyset_info().key_info, 1) + self.assertEqual( + public_handle.keyset_info().key_info[0].type_url, + 'type.googleapis.com/google.crypto.tink.EciesAeadHkdfPublicKey') + + ciphertext = hybrid_enc.encrypt(b'some plaintext', b'some context info') + self.assertEqual( + hybrid_dec.decrypt(ciphertext, b'some context info'), b'some plaintext') + + def test_primitive_success(self): + keyset = tink_pb2.Keyset() + key = keyset.key.add() + key.key_data.CopyFrom( + core.Registry.new_key_data(aead.aead_key_templates.AES128_EAX)) + key.output_prefix_type = tink_pb2.TINK + key.key_id = 1 + key.status = tink_pb2.ENABLED + keyset.primary_key_id = 1 + handle = core.KeysetHandle(keyset) + aead_primitive = handle.primitive(aead.Aead) + self.assertEqual( + aead_primitive.decrypt( + aead_primitive.encrypt(b'message', b'aad'), b'aad'), b'message') + + def test_primitive_fails_on_empty_keyset(self): + keyset = tink_pb2.Keyset() + keyset.key.extend([helper.fake_key(key_id=1, status=tink_pb2.DESTROYED)]) + keyset.primary_key_id = 1 + handle = core.KeysetHandle(keyset) + with self.assertRaisesRegex(core.TinkError, 'empty keyset'): + handle.primitive(aead.Aead) + + def test_primitive_fails_on_key_without_keydata(self): + keyset = tink_pb2.Keyset() + key = helper.fake_key(key_id=123) + key.ClearField('key_data') + keyset.key.extend([key]) + keyset.primary_key_id = 123 + handle = core.KeysetHandle(keyset) + with self.assertRaisesRegex(core.TinkError, + 'key 123 has no key data'): + handle.primitive(aead.Aead) + + def test_primitive_fails_on_key_with_unknown_prefix(self): + keyset = tink_pb2.Keyset() + keyset.key.extend([ + helper.fake_key(key_id=12, output_prefix_type=tink_pb2.UNKNOWN_PREFIX) + ]) + keyset.primary_key_id = 12 + handle = core.KeysetHandle(keyset) + with self.assertRaisesRegex(core.TinkError, + 'key 12 has unknown prefix'): + handle.primitive(aead.Aead) + + def test_primitive_fails_on_key_with_unknown_status(self): + keyset = tink_pb2.Keyset() + keyset.key.extend( + [helper.fake_key(key_id=1234, status=tink_pb2.UNKNOWN_STATUS)]) + keyset.primary_key_id = 1234 + handle = core.KeysetHandle(keyset) + with self.assertRaisesRegex(core.TinkError, + 'key 1234 has unknown status'): + handle.primitive(aead.Aead) + + def test_primitive_fails_on_multiple_primary_keys(self): + keyset = tink_pb2.Keyset() + keyset.key.extend( + [helper.fake_key(key_id=12345), + helper.fake_key(key_id=12345)]) + keyset.primary_key_id = 12345 + handle = core.KeysetHandle(keyset) + with self.assertRaisesRegex(core.TinkError, + 'keyset contains multiple primary keys'): + handle.primitive(aead.Aead) + + def test_primitive_fails_without_primary_key_present(self): + keyset = tink_pb2.Keyset() + key = keyset.key.add() + key.key_data.CopyFrom( + core.Registry.new_key_data(aead.aead_key_templates.AES128_EAX)) + key.output_prefix_type = tink_pb2.TINK + key.key_id = 2 + key.status = tink_pb2.ENABLED + keyset.primary_key_id = 1 + handle = core.KeysetHandle(keyset) + with self.assertRaisesRegex(core.TinkError, + 'keyset does not contain a valid primary key'): + handle.primitive(aead.Aead) + + def test_primitive_fails_on_wrong_primitive_class(self): + keyset = tink_pb2.Keyset() + key = keyset.key.add() + key.key_data.CopyFrom( + core.Registry.new_key_data(aead.aead_key_templates.AES128_EAX)) + key.output_prefix_type = tink_pb2.TINK + key.key_id = 1 + key.status = tink_pb2.ENABLED + keyset.primary_key_id = 1 + handle = core.KeysetHandle(keyset) + with self.assertRaisesRegex(core.TinkError, 'Wrong primitive class'): + handle.primitive(mac.Mac) + + def test_primitive_wrapped_correctly(self): + keydata2 = core.Registry.new_key_data( + aead.aead_key_templates.AES128_EAX) + keyset = tink_pb2.Keyset() + key = keyset.key.add() + key.key_data.CopyFrom( + core.Registry.new_key_data(aead.aead_key_templates.AES128_EAX)) + key.output_prefix_type = tink_pb2.TINK + key.key_id = 1 + key.status = tink_pb2.ENABLED + key = keyset.key.add() + key.key_data.CopyFrom(keydata2) + key.output_prefix_type = tink_pb2.RAW + key.key_id = 2 + key.status = tink_pb2.ENABLED + keyset.primary_key_id = 1 + handle = core.KeysetHandle(keyset) + aead_primitive = handle.primitive(aead.Aead) + aead_primitive2 = core.Registry.primitive(keydata2, aead.Aead) + self.assertEqual( + aead_primitive.decrypt( + aead_primitive2.encrypt(b'message', b'aad'), b'aad'), b'message') + + def test_keyset_info(self): + keyset = tink_pb2.Keyset(primary_key_id=2) + keyset.key.extend([ + helper.fake_key( + value=b'v1', + type_url='t1', + key_id=1, + status=tink_pb2.ENABLED, + output_prefix_type=tink_pb2.TINK), + helper.fake_key( + value=b'v2', + type_url='t2', + key_id=2, + status=tink_pb2.DESTROYED, + output_prefix_type=tink_pb2.RAW) + ]) + handle = core.KeysetHandle(keyset) + expected_keyset_info = tink_pb2.KeysetInfo(primary_key_id=2) + info1 = expected_keyset_info.key_info.add() + info1.type_url = 't1' + info1.status = tink_pb2.ENABLED + info1.output_prefix_type = tink_pb2.TINK + info1.key_id = 1 + info2 = expected_keyset_info.key_info.add() + info2.type_url = 't2' + info2.status = tink_pb2.DESTROYED + info2.output_prefix_type = tink_pb2.RAW + info2.key_id = 2 + self.assertEqual(expected_keyset_info, handle.keyset_info()) + + +if __name__ == '__main__': + googletest.main() diff --git a/python/core/keyset_reader.py b/python/core/keyset_reader.py new file mode 100644 index 0000000000000000000000000000000000000000..ef69644652744dffb5d77e484d6b08e40a64bd25 --- /dev/null +++ b/python/core/keyset_reader.py @@ -0,0 +1,92 @@ +# Copyright 2019 Google LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Reads KeySets from File.""" + +from __future__ import absolute_import +from __future__ import division +from __future__ import google_type_annotations +from __future__ import print_function + +import abc + +from typing import Text + +from tink.proto import tink_pb2 +from tink.python.core import tink_error +from google3.net.proto2.python.public import json_format +from google3.net.proto2.python.public import message + + +class KeysetReader(object): + """Reads a Keyset.""" + + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def read(self) -> tink_pb2.Keyset: + """Reads and returns a (cleartext) tink_pb2.Keyset from its source.""" + pass + + @abc.abstractmethod + def read_encrypted(self) -> tink_pb2.EncryptedKeyset: + """Reads and returns an tink_pb2.EncryptedKeyset from its source.""" + pass + + +class JsonKeysetReader(KeysetReader): + """Reads a JSON Keyset.""" + + def __init__(self, serialized_keyset: Text): + self._serialized_keyset = serialized_keyset + + def read(self) -> tink_pb2.Keyset: + try: + return json_format.Parse(self._serialized_keyset, tink_pb2.Keyset()) + except json_format.ParseError as e: + raise tink_error.TinkError(e) + + def read_encrypted(self) -> tink_pb2.EncryptedKeyset: + try: + return json_format.Parse(self._serialized_keyset, + tink_pb2.EncryptedKeyset()) + except json_format.ParseError as e: + raise tink_error.TinkError(e) + + +class BinaryKeysetReader(KeysetReader): + """Reads a binary Keyset.""" + + def __init__(self, serialized_keyset: bytes): + self._serialized_keyset = serialized_keyset + + def read(self) -> tink_pb2.Keyset: + if not self._serialized_keyset: + raise tink_error.TinkError('No keyset found') + try: + keyset = tink_pb2.Keyset() + keyset.ParseFromString(self._serialized_keyset) + return keyset + except message.DecodeError as e: + raise tink_error.TinkError(e) + + def read_encrypted(self) -> tink_pb2.EncryptedKeyset: + if not self._serialized_keyset: + raise tink_error.TinkError('No keyset found') + try: + encrypted_keyset = tink_pb2.EncryptedKeyset() + encrypted_keyset.ParseFromString(self._serialized_keyset) + return encrypted_keyset + except message.DecodeError as e: + raise tink_error.TinkError(e) diff --git a/python/core/keyset_reader_test.py b/python/core/keyset_reader_test.py new file mode 100644 index 0000000000000000000000000000000000000000..b7880d591fe422d40e955f1f3d5365009f9ce4f9 --- /dev/null +++ b/python/core/keyset_reader_test.py @@ -0,0 +1,146 @@ +# Copyright 2019 Google LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Tests for tink.python.keyset_reader.""" + +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import unittest +from tink.proto import tink_pb2 +from tink.python import core + + +class JsonKeysetReaderTest(googletest.TestCase): + + def test_read(self): + json_keyset = """ + { + "primaryKeyId": 42, + "key": [ + { + "keyData": { + "typeUrl": "type.googleapis.com/google.crypto.tink.AesGcmKey", + "keyMaterialType": "SYMMETRIC", + "value": "GhCS/1+ejWpx68NfGt6ziYHd" + }, + "outputPrefixType": "TINK", + "keyId": 42, + "status": "ENABLED" + } + ] + }""" + reader = core.JsonKeysetReader(json_keyset) + keyset = reader.read() + self.assertEqual(keyset.primary_key_id, 42) + self.assertLen(keyset.key, 1) + + def test_read_invalid(self): + with self.assertRaisesRegex(core.TinkError, 'Failed to load JSON'): + reader = core.JsonKeysetReader('not json') + reader.read() + + def test_read_encrypted(self): + # encryptedKeyset is a base64-encoding of 'some ciphertext with keyset' + json_encrypted_keyset = """ + { + "encryptedKeyset": "c29tZSBjaXBoZXJ0ZXh0IHdpdGgga2V5c2V0", + "keysetInfo": { + "primaryKeyId": 42, + "keyInfo": [ + { + "typeUrl": "type.googleapis.com/google.crypto.tink.AesGcmKey", + "outputPrefixType": "TINK", + "keyId": 42, + "status": "ENABLED" + } + ] + } + }""" + reader = core.JsonKeysetReader(json_encrypted_keyset) + enc_keyset = reader.read_encrypted() + self.assertEqual(enc_keyset.encrypted_keyset, + b'some ciphertext with keyset') + self.assertLen(enc_keyset.keyset_info.key_info, 1) + self.assertEqual(enc_keyset.keyset_info.key_info[0].type_url, + 'type.googleapis.com/google.crypto.tink.AesGcmKey') + + def test_read_encrypted_invalid(self): + with self.assertRaisesRegex(core.TinkError, 'Failed to load JSON'): + reader = core.JsonKeysetReader('not json') + reader.read_encrypted() + + +class BinaryKeysetReaderTest(googletest.TestCase): + + def test_read(self): + keyset = tink_pb2.Keyset() + keyset.primary_key_id = 42 + key = keyset.key.add() + key.key_data.type_url = 'type.googleapis.com/google.crypto.tink.AesGcmKey' + key.key_data.key_material_type = tink_pb2.KeyData.SYMMETRIC + key.key_data.value = b'GhCS/1+ejWpx68NfGt6ziYHd' + key.output_prefix_type = tink_pb2.TINK + key.key_id = 42 + key.status = tink_pb2.ENABLED + reader = core.BinaryKeysetReader(keyset.SerializeToString()) + self.assertEqual(keyset, reader.read()) + + def test_read_none(self): + with self.assertRaisesRegex(core.TinkError, 'No keyset found'): + reader = core.BinaryKeysetReader(None) + reader.read() + + def test_read_empty(self): + with self.assertRaisesRegex(core.TinkError, 'No keyset found'): + reader = core.BinaryKeysetReader('') + reader.read() + + def test_read_invalid(self): + with self.assertRaisesRegex(core.TinkError, 'Wrong wire type'): + reader = core.BinaryKeysetReader(b'some weird data') + reader.read() + + def test_read_encrypted(self): + encrypted_keyset = tink_pb2.EncryptedKeyset() + encrypted_keyset.encrypted_keyset = b'c29tZSBjaXBoZXJ0ZXh0IHdpdGgga2V5c2V0' + encrypted_keyset.keyset_info.primary_key_id = 42 + key_info = encrypted_keyset.keyset_info.key_info.add() + key_info.type_url = 'type.googleapis.com/google.crypto.tink.AesGcmKey' + key_info.output_prefix_type = tink_pb2.TINK + key_info.key_id = 42 + key_info.status = tink_pb2.ENABLED + reader = core.BinaryKeysetReader( + encrypted_keyset.SerializeToString()) + self.assertEqual(encrypted_keyset, reader.read_encrypted()) + + def test_read_encrypted_none(self): + with self.assertRaisesRegex(core.TinkError, 'No keyset found'): + reader = core.BinaryKeysetReader(None) + reader.read_encrypted() + + def test_read_encrypted_empty(self): + with self.assertRaisesRegex(core.TinkError, 'No keyset found'): + reader = core.BinaryKeysetReader('') + reader.read_encrypted() + + def test_read_encrypted_invalid(self): + with self.assertRaisesRegex(core.TinkError, 'Wrong wire type'): + reader = core.BinaryKeysetReader(b'some weird data') + reader.read_encrypted() + + +if __name__ == '__main__': + googletest.main() diff --git a/python/core/keyset_writer.py b/python/core/keyset_writer.py new file mode 100644 index 0000000000000000000000000000000000000000..aaae8844b254668e72ef896aab4ad3d4bfb3270b --- /dev/null +++ b/python/core/keyset_writer.py @@ -0,0 +1,97 @@ +# Copyright 2019 Google LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Reads KeySets from File.""" + +from __future__ import absolute_import +from __future__ import division +from __future__ import google_type_annotations +from __future__ import print_function + +import abc +import io + +from google3.net.proto2.python.public import json_format +from tink.proto import tink_pb2 +from tink.python.core import tink_error + + +class KeysetWriter(object): + """Knows how to write keysets to some storage system.""" + + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def write(self, keyset: tink_pb2.Keyset) -> None: + """Tries to write a tink_pb2.Keyset to some storage system.""" + pass + + @abc.abstractmethod + def write_encrypted(self, encrypted_keyset: tink_pb2.EncryptedKeyset) -> None: + """Tries to write an tink_pb2.EncryptedKeyset to some storage system.""" + pass + + +class JsonKeysetWriter(KeysetWriter): + """Writes keysets in proto JSON wire format to some storage system. + + cf. https://developers.google.com/protocol-buffers/docs/encoding + """ + + def __init__(self, text_io_stream: io.TextIOBase): + self._io_stream = text_io_stream + + def write(self, keyset: tink_pb2.Keyset) -> None: + if not isinstance(keyset, tink_pb2.Keyset): + raise tink_error.TinkError('invalid keyset.') + json_keyset = json_format.MessageToJson(keyset) + # Needed for python 2.7 compatibility. StringIO expects unicode, but + # MessageToJson outputs UTF-8. + if isinstance(json_keyset, bytes): + json_keyset = json_keyset.decode('utf-8') + self._io_stream.write(json_keyset) + self._io_stream.flush() + + def write_encrypted(self, encrypted_keyset: tink_pb2.EncryptedKeyset) -> None: + if not isinstance(encrypted_keyset, tink_pb2.EncryptedKeyset): + raise tink_error.TinkError('invalid encrypted keyset.') + json_keyset = json_format.MessageToJson(encrypted_keyset) + # Needed for python 2.7 compatibility. StringIO expects unicode, but + # MessageToJson outputs UTF-8. + if isinstance(json_keyset, bytes): + json_keyset = json_keyset.decode('utf-8') + self._io_stream.write(json_keyset) + self._io_stream.flush() + + +class BinaryKeysetWriter(KeysetWriter): + """Writes keysets in proto binary wire format to some storage system. + + cf. https://developers.google.com/protocol-buffers/docs/encoding + """ + + def __init__(self, binary_io_stream: io.BufferedIOBase): + self._io_stream = binary_io_stream + + def write(self, keyset: tink_pb2.Keyset) -> None: + if not isinstance(keyset, tink_pb2.Keyset): + raise tink_error.TinkError('invalid keyset.') + self._io_stream.write(keyset.SerializeToString()) + self._io_stream.flush() + + def write_encrypted(self, encrypted_keyset: tink_pb2.EncryptedKeyset) -> None: + if not isinstance(encrypted_keyset, tink_pb2.EncryptedKeyset): + raise tink_error.TinkError('invalid encrypted keyset.') + self._io_stream.write(encrypted_keyset.SerializeToString()) + self._io_stream.flush() diff --git a/python/core/keyset_writer_test.py b/python/core/keyset_writer_test.py new file mode 100644 index 0000000000000000000000000000000000000000..6297341aaee3f26cdc703349ee0188342972d834 --- /dev/null +++ b/python/core/keyset_writer_test.py @@ -0,0 +1,128 @@ +# Copyright 2019 Google LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Tests for tink.python.core.""" + +from __future__ import absolute_import +from __future__ import division +from __future__ import google_type_annotations +from __future__ import print_function + +import io + +import unittest +from tink.proto import tink_pb2 +from tink.python import core + + +def example_keyset(): + keyset = tink_pb2.Keyset() + keyset.primary_key_id = 42 + key = keyset.key.add() + key.key_data.type_url = u'type.googleapis.com/google.crypto.tink.AesGcmKey' + key.key_data.key_material_type = tink_pb2.KeyData.SYMMETRIC + key.key_data.value = b'GhCS/1+ejWpx68NfGt6ziYHd' + key.output_prefix_type = tink_pb2.TINK + key.key_id = 42 + key.status = tink_pb2.ENABLED + return keyset + + +def example_encrypted_keyset(): + encrypted_keyset = tink_pb2.EncryptedKeyset() + encrypted_keyset.encrypted_keyset = b'c29tZSBjaXBoZXJ0ZXh0IHdpdGgga2V5c2V0' + encrypted_keyset.keyset_info.primary_key_id = 42 + key_info = encrypted_keyset.keyset_info.key_info.add() + key_info.type_url = 'type.googleapis.com/google.crypto.tink.AesGcmKey' + key_info.output_prefix_type = tink_pb2.TINK + key_info.key_id = 42 + key_info.status = tink_pb2.ENABLED + return encrypted_keyset + + +class JsonKeysetWriterTest(googletest.TestCase): + + def test_write_read(self): + keyset = example_keyset() + stream = io.StringIO() + writer = core.JsonKeysetWriter(stream) + writer.write(keyset) + reader = core.JsonKeysetReader(stream.getvalue()) + self.assertEqual(keyset, reader.read()) + + def test_write_encrypted_read_encrypted(self): + encrypted_keyset = example_encrypted_keyset() + stream = io.StringIO() + writer = core.JsonKeysetWriter(stream) + writer.write_encrypted(encrypted_keyset) + reader = core.JsonKeysetReader(stream.getvalue()) + self.assertEqual(encrypted_keyset, reader.read_encrypted()) + + def test_write_read_with_unicode_chars(self): + keyset = tink_pb2.Keyset() + key = keyset.key.add() + key.key_data.type_url = ( + u'\xe3\x82\xb3\xe3\x83\xb3\xe3\x83\x8b\xe3\x83\x81\xe3\x83\x8f') + stream = io.StringIO() + writer = core.JsonKeysetWriter(stream) + writer.write(keyset) + reader = core.JsonKeysetReader(stream.getvalue()) + self.assertEqual(keyset, reader.read()) + + def test_write_invalid_fails(self): + with self.assertRaisesRegex(core.TinkError, 'invalid keyset'): + stream = io.StringIO() + writer = core.JsonKeysetWriter(stream) + writer.write(example_encrypted_keyset()) + + def test_write_encrypted_invalid_fails(self): + with self.assertRaisesRegex(core.TinkError, 'invalid encrypted keyset'): + stream = io.StringIO() + writer = core.JsonKeysetWriter(stream) + writer.write_encrypted(example_keyset()) + + +class BinaryKeysetReaderTest(googletest.TestCase): + + def test_write_read(self): + keyset = example_keyset() + stream = io.BytesIO() + writer = core.BinaryKeysetWriter(stream) + writer.write(keyset) + reader = core.BinaryKeysetReader(stream.getvalue()) + self.assertEqual(keyset, reader.read()) + + def test_write_encrypted_read_encrypted(self): + encrypted_keyset = example_encrypted_keyset() + stream = io.BytesIO() + writer = core.BinaryKeysetWriter(stream) + writer.write_encrypted(encrypted_keyset) + reader = core.BinaryKeysetReader(stream.getvalue()) + self.assertEqual(encrypted_keyset, reader.read_encrypted()) + + def test_write_invalid_fails(self): + with self.assertRaisesRegex(core.TinkError, 'invalid keyset'): + stream = io.BytesIO() + writer = core.BinaryKeysetWriter(stream) + writer.write(example_encrypted_keyset()) + + def test_write_encrypted_invalid_fails(self): + with self.assertRaisesRegex(core.TinkError, 'invalid encrypted keyset'): + stream = io.BytesIO() + writer = core.BinaryKeysetWriter(stream) + writer.write_encrypted(example_keyset()) + + +if __name__ == '__main__': + googletest.main() diff --git a/python/core/primitive_set.py b/python/core/primitive_set.py new file mode 100644 index 0000000000000000000000000000000000000000..69a15a6f74178677b903bc6a5e29267443b47f2b --- /dev/null +++ b/python/core/primitive_set.py @@ -0,0 +1,92 @@ +# Copyright 2019 Google LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""A container class for a set of primitives.""" + +from __future__ import absolute_import +from __future__ import division +from __future__ import google_type_annotations +from __future__ import print_function + +import collections +from typing import Generic, List, Type, TypeVar + +from tink.proto import tink_pb2 +from tink.python.core import crypto_format +from tink.python.core import tink_error + +P = TypeVar('P') +Entry = collections.namedtuple( + 'Entry', 'primitive, identifier, status, output_prefix_type') + + +def new_primitive_set(primitive_class): + return PrimitiveSet(primitive_class) + + +class PrimitiveSet(Generic[P]): + """A container class for a set of primitives. + + PrimitiveSet is an auxiliary class used for supporting key rotation: + primitives in a set correspond to keys in a keyset. Users will usually work + with primitive instances, which essentially wrap primitive sets. For example + an instance of an Aead-primitive for a given keyset holds a set of + Aead-primitives corresponding to the keys in the keyset, and uses the set + members to do the actual crypto operations: to encrypt data the primary + Aead-primitive from the set is used, and upon decryption the ciphertext's + prefix determines the id of the primitive from the set. + """ + + def __init__(self, primitive_class: Type[P]): + # map from identifier to a list of Entry + self._primitives = {} + self._primary = None + self._primitive_class = primitive_class + + def primitive_class(self) -> Type[P]: + return self._primitive_class + + def primitive_from_identifier(self, identifier: bytes) -> List[P]: + """Returns a copy of the list of entries for a given identifier.""" + # Copy the list so that if the user modifies the list, it does not affect + # the internal data structure. + return self._primitives.get(identifier, [])[:] + + def primitive(self, key: tink_pb2.Keyset.Key) -> List[P]: + """Returns a copy of the list of primitives for a given identifier.""" + return self.primitive_from_identifier(crypto_format.output_prefix(key)) + + def raw_primitives(self) -> List[P]: + """Returns a copy of the list of primitives for a given identifier.""" + # All raw keys have the same identifier, which is just b''. + return self.primitive_from_identifier(crypto_format.RAW_PREFIX) + + def add_primitive(self, primitive: P, key: tink_pb2.Keyset.Key) -> Entry: + """Adds a new primitive and key entry to the set, and returns the entry.""" + if not isinstance(primitive, self._primitive_class): + raise tink_error.TinkError( + 'The primitive is not an instance of {}'.format( + self._primitive_class)) + identifier = crypto_format.output_prefix(key) + + entry = Entry(primitive, identifier, key.status, key.output_prefix_type) + entries = self._primitives.setdefault(identifier, []) + entries.append(entry) + return entry + + def set_primary(self, entry: Entry) -> None: + self._primary = entry + + def primary(self) -> Entry: + return self._primary diff --git a/python/core/primitive_set_test.py b/python/core/primitive_set_test.py new file mode 100644 index 0000000000000000000000000000000000000000..f803e3c15c35c17fba50b59cf58b67e283a88065 --- /dev/null +++ b/python/core/primitive_set_test.py @@ -0,0 +1,170 @@ +# Copyright 2019 Google LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Tests for tink.python.core.""" + +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import unittest +from tink.proto import tink_pb2 +from tink.python import aead +from tink.python import core +from tink.python import mac +from tink.python.core import crypto_format +from tink.python.testing import helper + + +class PrimitiveSetTest(googletest.TestCase): + + def test_primitive_returns_entry(self): + key = helper.fake_key(key_id=1) + fake_mac = helper.FakeMac('FakeMac') + primitive_set = core.new_primitive_set(mac.Mac) + primitive_set.add_primitive(fake_mac, key) + entries = primitive_set.primitive(key) + self.assertLen(entries, 1) + entry = entries[0] + self.assertEqual(fake_mac, entry.primitive) + self.assertEqual(tink_pb2.ENABLED, entry.status) + self.assertEqual(crypto_format.output_prefix(key), entry.identifier) + + def test_unknown_key_returns_empty_list(self): + primitive_set = core.new_primitive_set(mac.Mac) + unknown_key = helper.fake_key(key_id=1) + self.assertEqual(primitive_set.primitive(unknown_key), []) + + def test_primitive_from_identifier_returns_entry(self): + primitive_set = core.new_primitive_set(mac.Mac) + key = helper.fake_key(key_id=1) + fake_mac = helper.FakeMac('FakeMac') + primitive_set.add_primitive(fake_mac, key) + + ident = crypto_format.output_prefix(key) + entries = primitive_set.primitive_from_identifier(ident) + self.assertLen(entries, 1) + entry = entries[0] + self.assertEqual(fake_mac, entry.primitive) + self.assertEqual(tink_pb2.ENABLED, entry.status) + self.assertEqual(ident, entry.identifier) + + def test_list_of_entries_can_be_modified(self): + primitive_set = core.new_primitive_set(mac.Mac) + key = helper.fake_key(key_id=1) + primitive_set.add_primitive(helper.FakeMac('FakeMac'), key) + entries = primitive_set.primitive(key) + entries.append('Something') + self.assertLen(primitive_set.primitive(key), 1) + + def test_primary_returns_primary(self): + primitive_set = core.new_primitive_set(mac.Mac) + key = helper.fake_key(key_id=1) + fake_mac = helper.FakeMac('FakeMac') + entry = primitive_set.add_primitive(fake_mac, key) + primitive_set.set_primary(entry) + + entry = primitive_set.primary() + self.assertEqual(fake_mac, entry.primitive) + self.assertEqual(tink_pb2.ENABLED, entry.status) + self.assertEqual(crypto_format.output_prefix(key), entry.identifier) + + def test_primary_returns_none(self): + primitive_set = core.new_primitive_set(mac.Mac) + primitive_set.add_primitive( + helper.FakeMac('FakeMac'), helper.fake_key(key_id=1)) + self.assertEqual(primitive_set.primary(), None) + + def test_same_key_id_and_prefix_type(self): + primitive_set = core.new_primitive_set(mac.Mac) + key1 = helper.fake_key(key_id=1, status=tink_pb2.ENABLED) + fake_mac1 = helper.FakeMac('FakeMac1') + primitive_set.add_primitive(fake_mac1, key1) + key2 = helper.fake_key(key_id=1, status=tink_pb2.DISABLED) + fake_mac2 = helper.FakeMac('FakeMac2') + primitive_set.add_primitive(fake_mac2, key2) + + expected_ident = crypto_format.output_prefix(key1) + entries = primitive_set.primitive(key1) + self.assertLen(entries, 2) + self.assertEqual(fake_mac1, entries[0].primitive) + self.assertEqual(fake_mac2, entries[1].primitive) + self.assertEqual(tink_pb2.ENABLED, entries[0].status) + self.assertEqual(tink_pb2.DISABLED, entries[1].status) + self.assertEqual(expected_ident, entries[0].identifier) + self.assertEqual(expected_ident, entries[1].identifier) + self.assertLen(primitive_set.primitive(key2), 2) + + def test_same_key_id_but_different_prefix_type(self): + primitive_set = core.new_primitive_set(mac.Mac) + key1 = helper.fake_key(key_id=1, output_prefix_type=tink_pb2.TINK) + fake_mac1 = helper.FakeMac('FakeMac1') + primitive_set.add_primitive(fake_mac1, key1) + key2 = helper.fake_key(key_id=1, output_prefix_type=tink_pb2.LEGACY) + fake_mac2 = helper.FakeMac('FakeMac2') + primitive_set.add_primitive(fake_mac2, key2) + + entries1 = primitive_set.primitive(key1) + self.assertLen(entries1, 1) + self.assertEqual(fake_mac1, entries1[0].primitive) + self.assertEqual(tink_pb2.ENABLED, entries1[0].status) + self.assertEqual(crypto_format.output_prefix(key1), entries1[0].identifier) + + entries2 = primitive_set.primitive(key2) + self.assertLen(entries2, 1) + self.assertEqual(fake_mac2, entries2[0].primitive) + self.assertEqual(tink_pb2.ENABLED, entries2[0].status) + self.assertEqual(crypto_format.output_prefix(key2), entries2[0].identifier) + + def test_add_invalid_key_fails(self): + primitive_set = core.new_primitive_set(mac.Mac) + key = helper.fake_key() + key.ClearField('output_prefix_type') + with self.assertRaisesRegex(core.TinkError, 'invalid OutputPrefixType'): + primitive_set.add_primitive(helper.FakeMac(), key) + + def test_add_wrong_primitive_fails(self): + primitive_set = core.new_primitive_set(aead.Aead) + with self.assertRaisesRegex(core.TinkError, + 'The primitive is not an instance of '): + primitive_set.add_primitive(helper.FakeMac(), helper.fake_key()) + + def test_primitive_class(self): + primitive_set = core.new_primitive_set(mac.Mac) + self.assertEqual(primitive_set.primitive_class(), mac.Mac) + + def test_raw_primitives(self): + primitive_set = core.new_primitive_set(mac.Mac) + primitive_set.add_primitive( + helper.FakeMac('FakeMac1'), helper.fake_key(key_id=1)) + key2 = helper.fake_key(key_id=1, output_prefix_type=tink_pb2.RAW) + fake_mac2 = helper.FakeMac('FakeMac2') + primitive_set.add_primitive(fake_mac2, key2) + key3 = helper.fake_key( + key_id=3, status=tink_pb2.DISABLED, output_prefix_type=tink_pb2.RAW) + fake_mac3 = helper.FakeMac('FakeMac3') + primitive_set.add_primitive(fake_mac3, key3) + + entries = primitive_set.raw_primitives() + self.assertLen(entries, 2) + self.assertEqual(fake_mac2, entries[0].primitive) + self.assertEqual(tink_pb2.ENABLED, entries[0].status) + self.assertEqual(crypto_format.RAW_PREFIX, entries[0].identifier) + self.assertEqual(fake_mac3, entries[1].primitive) + self.assertEqual(tink_pb2.DISABLED, entries[1].status) + self.assertEqual(crypto_format.RAW_PREFIX, entries[1].identifier) + + +if __name__ == '__main__': + googletest.main() diff --git a/python/core/primitive_wrapper.py b/python/core/primitive_wrapper.py new file mode 100644 index 0000000000000000000000000000000000000000..f19a1f9af9aeac8faccc9233ec83c5549712d54f --- /dev/null +++ b/python/core/primitive_wrapper.py @@ -0,0 +1,49 @@ +# Copyright 2019 Google LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Basic interface for wrapping a primitive.""" + +from __future__ import absolute_import +from __future__ import division +from __future__ import google_type_annotations +from __future__ import print_function + +import abc +from typing import Generic, Type, TypeVar + +from tink.python.core import primitive_set + +P = TypeVar('P') + + +class PrimitiveWrapper(Generic[P]): + """Basic interface for wrapping a primitive. + + A PrimitiveSet can be wrapped by a single primitive in order to fulfill a + cryptographic task. This is done by the PrimitiveWrapper. Whenever a new + primitive type is added to Tink, the user should define a new PrimitiveWrapper + and register it by calling registry.registerPrimitiveWrapper(). + """ + + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def wrap(self, pset: primitive_set.PrimitiveSet) -> P: + """Wraps a PrimitiveSet and returns a single primitive instance.""" + pass + + @abc.abstractmethod + def primitive_class(self) -> Type[P]: + """Returns the primitive class of the primitive managed.""" + pass diff --git a/python/core/registry.py b/python/core/registry.py new file mode 100644 index 0000000000000000000000000000000000000000..b91c3ee448b7a698850cd4828ccdcf5dd33c63c0 --- /dev/null +++ b/python/core/registry.py @@ -0,0 +1,207 @@ +# Copyright 2019 Google LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Tink Registry.""" + +from __future__ import absolute_import +from __future__ import division +from __future__ import google_type_annotations +from __future__ import print_function + +from typing import Any, Text, Tuple, Type, TypeVar + +from tink.proto import tink_pb2 +from tink.python.core import key_manager as km_module +from tink.python.core import primitive_set as pset_module +from tink.python.core import primitive_wrapper +from tink.python.core import tink_error + +P = TypeVar('P') + + +class Registry(object): + """A global container of key managers. + + Registry maps supported key types to a corresponding KeyManager object, + which 'understands' the key type (i.e., the KeyManager can instantiate the + primitive corresponding to given key, or can generate new keys of the + supported key type). Keeping KeyManagers for all primitives in a single + Registry (rather than having a separate KeyManager per primitive) enables + modular construction of compound primitives from 'simple' ones, + e.g., AES-CTR-HMAC AEAD encryption uses IND-CPA encryption and a MAC. + + Registry is initialized at startup, and is later used to instantiate + primitives for given keys or keysets. + """ + + _key_managers = {} # type: dict[Text, Tuple[km_module.KeyManager, bool]] + _wrappers = {} # type: dict[Type, primitive_wrapper.PrimitiveWrapper] + + @classmethod + def reset(cls) -> None: + """Resets the registry.""" + cls._key_managers = {} + cls._wrappers = {} + + @classmethod + def _key_manager_internal( + cls, type_url: Text) -> Tuple[km_module.KeyManager, bool]: + """Returns a key manager, new_key_allowed pair for the given type_url.""" + if type_url not in cls._key_managers: + raise tink_error.TinkError( + 'No manager for type {} has been registered.'.format(type_url)) + return cls._key_managers[type_url] + + @classmethod + def key_manager(cls, type_url: Text) -> km_module.KeyManager: + """Returns a key manager for the given type_url and primitive_class. + + Args: + type_url: Key type string + + Returns: + A KeyManager object + """ + key_mgr, _ = cls._key_manager_internal(type_url) + return key_mgr + + @classmethod + def register_key_manager(cls, + key_manager: km_module.KeyManager, + new_key_allowed: bool = True) -> None: + """Tries to register a key_manager for the given key_manager.key_type(). + + Args: + key_manager: A KeyManager object + new_key_allowed: If new_key_allowed is true, users can generate new keys + with this manager using Registry.new_key() + """ + key_managers = cls._key_managers + type_url = key_manager.key_type() + primitive_class = key_manager.primitive_class() + + if not key_manager.does_support(type_url): + raise tink_error.TinkError( + 'The manager does not support its own type {}.'.format(type_url)) + + if type_url in key_managers: + existing, existing_new_key = key_managers[type_url] + if (type(existing) != type(key_manager) or # pylint: disable=unidiomatic-typecheck + existing.primitive_class() != primitive_class): + raise tink_error.TinkError( + 'A manager for type {} has been already registered.'.format( + type_url)) + else: + if not existing_new_key and new_key_allowed: + raise tink_error.TinkError( + ('A manager for type {} has been already registered ' + 'with forbidden new key operation.').format(type_url)) + key_managers[type_url] = (existing, new_key_allowed) + else: + key_managers[type_url] = (key_manager, new_key_allowed) + + @classmethod + def primitive(cls, key_data: tink_pb2.KeyData, primitive_class: Type[P]) -> P: + """Creates a new primitive for the key given in key_data. + + It looks up a KeyManager identified by key_data.type_url, + and calls manager's primitive(key_data) method. + + Args: + key_data: KeyData object + primitive_class: The expected primitive class + + Returns: + A primitive for the given key_data + Raises: + Error if primitive_class does not match the registered primitive class. + """ + key_mgr = cls.key_manager(key_data.type_url) + if key_mgr.primitive_class() != primitive_class: + raise tink_error.TinkError( + 'Wrong primitive class: type {} uses primitive {}, and not {}.' + .format(key_data.type_url, key_mgr.primitive_class().__name__, + primitive_class.__name__)) + return key_mgr.primitive(key_data) + + @classmethod + def new_key_data(cls, key_template: tink_pb2.KeyTemplate) -> tink_pb2.KeyData: + """Generates a new key for the specified key_template.""" + key_mgr, new_key_allowed = cls._key_manager_internal( + key_template.type_url) + + if not new_key_allowed: + raise tink_error.TinkError( + 'KeyManager for type {} does not allow for creation of new keys.' + .format(key_template.type_url)) + + return key_mgr.new_key_data(key_template) + + @classmethod + def public_key_data(cls, + private_key_data: tink_pb2.KeyData) -> tink_pb2.KeyData: + """Generates a new key for the specified key_template.""" + if (private_key_data.key_material_type != + tink_pb2.KeyData.ASYMMETRIC_PRIVATE): + raise tink_error.TinkError('The keyset contains a non-private key') + key_mgr = cls.key_manager(private_key_data.type_url) + if not isinstance(key_mgr, km_module.PrivateKeyManager): + raise tink_error.TinkError( + 'manager for key type {} is not a PrivateKeyManager' + .format(private_key_data.type_url)) + return key_mgr.public_key_data(private_key_data) + + @classmethod + def register_primitive_wrapper( + cls, wrapper: primitive_wrapper.PrimitiveWrapper) -> None: + """Tries to register a PrimitiveWrapper. + + Args: + wrapper: A PrimitiveWrapper object. + Raises: + Error if a different wrapper has already been registered for the same + Primitive. + """ + if (wrapper.primitive_class() in cls._wrappers and + type(cls._wrappers[wrapper.primitive_class()]) != type(wrapper)): # pylint: disable=unidiomatic-typecheck + raise tink_error.TinkError( + 'A wrapper for primitive {} has already been added.'.format( + wrapper.primitive_class().__name__)) + wrapped = wrapper.wrap( + pset_module.PrimitiveSet(wrapper.primitive_class())) + if not isinstance(wrapped, wrapper.primitive_class()): + raise tink_error.TinkError( + 'Wrapper for primitive {} generates incompatibe primitve of type {}' + .format(wrapper.primitive_class().__name__, + type(wrapped).__name__)) + cls._wrappers[wrapper.primitive_class()] = wrapper + + @classmethod + def wrap( + cls, primitive_set: pset_module.PrimitiveSet) -> Any: # -> Primitive + """Tries to register a PrimitiveWrapper. + + Args: + primitive_set: A PrimitiveSet object. + Returns: + A primitive that wraps the primitives in primitive_set. + Raises: + Error if no wrapper for this primitive class is registered. + """ + if primitive_set.primitive_class() not in cls._wrappers: + raise tink_error.TinkError( + 'No PrimitiveWrapper registered for primitive {}.' + .format(primitive_set.primitive_class() .__name__)) + wrapper = cls._wrappers[primitive_set.primitive_class()] + return wrapper.wrap(primitive_set) diff --git a/python/core/registry_test.py b/python/core/registry_test.py new file mode 100644 index 0000000000000000000000000000000000000000..152bc8ab9269783c098bd82aece2fef6d893cf3f --- /dev/null +++ b/python/core/registry_test.py @@ -0,0 +1,247 @@ +# Copyright 2019 Google LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Tests for tink.python.registry.""" +import unittest + +from tink.proto import tink_pb2 + +from tink.python import aead +from tink.python import core +from tink.python import mac +from tink.python.testing import helper + + +class DummyKeyManager(core.KeyManager): + + def __init__(self, type_url, primitive_class=aead.Aead): + self._type_url = type_url + self._primitive_class = primitive_class + + def primitive_class(self): + return self._primitive_class + + def primitive(self, key_data): + return helper.FakeAead() + + def key_type(self): + return self._type_url + + def new_key_data(self, key_template): + return tink_pb2.KeyData(type_url=key_template.type_url) + + +class DummyPrivateKeyManager(core.PrivateKeyManager): + + def __init__(self, type_url): + self._type_url = type_url + + def primitive_class(self): + return None + + def primitive(self, key_data): + return None + + def key_type(self): + return self._type_url + + def new_key_data(self, key_template): + return None + + def public_key_data(self, private_key_data): + return tink_pb2.KeyData(type_url='public_' + private_key_data.type_url) + + +class DummyMacWrapper(core.PrimitiveWrapper): + + def wrap(self, _): + return helper.FakeMac() + + def primitive_class(self): + return mac.Mac + + +class InconsistentWrapper(core.PrimitiveWrapper): + + def wrap(self, _): + return helper.FakeAead() + + def primitive_class(self): + return mac.Mac + + +def _mac_set(mac_list): + """Converts a List of Mac in a PrimitiveSet and sets the last primary.""" + mac_set = core.new_primitive_set(mac.Mac) + for i, primitive in enumerate(mac_list): + mac_set.set_primary( + mac_set.add_primitive( + primitive, + helper.fake_key(key_id=i, output_prefix_type=tink_pb2.RAW))) + return mac_set + + +class RegistryTest(googletest.TestCase): + + def setUp(self): + super(RegistryTest, self).setUp() + self.reg = core.Registry() + self.reg.reset() + + def test_key_manager_no_exist(self): + with self.assertRaises(core.TinkError): + self.reg.key_manager('invalid') + + def test_key_manager_register(self): + dummy_key_manager = DummyKeyManager('dummy_type_url') + self.reg.register_key_manager(dummy_key_manager) + self.assertEqual(dummy_key_manager, self.reg.key_manager('dummy_type_url')) + + def test_key_manager_reset(self): + dummy_key_manager = DummyKeyManager('dummy_type_url') + self.reg.register_key_manager(dummy_key_manager) + self.reg.reset() + with self.assertRaises(core.TinkError): + self.reg.key_manager('dummy_type_url') + + def test_register_same_key_manager_twice(self): + self.reg.register_key_manager(DummyKeyManager('dummy_type_url', aead.Aead)) + self.reg.register_key_manager(DummyKeyManager('dummy_type_url', aead.Aead)) + + def test_key_manager_replace_fails(self): + self.reg.register_key_manager(DummyKeyManager('dummy_type_url', aead.Aead)) + # Replacing the primitive_class for a type_url not allowed. + with self.assertRaises(core.TinkError): + self.reg.register_key_manager( + DummyKeyManager('dummy_type_url', mac.Mac), new_key_allowed=False) + + def test_key_manager_disable_new_key_enable_fails(self): + self.reg.register_key_manager(DummyKeyManager('dummy_type_url')) + # Disable new keys. + self.reg.register_key_manager( + DummyKeyManager('dummy_type_url'), new_key_allowed=False) + # Check new keys can't be enabled again. + with self.assertRaises(core.TinkError): + self.reg.register_key_manager( + DummyKeyManager('dummy_type_url'), new_key_allowed=True) + + def test_primitive_ok(self): + self.reg.register_key_manager(DummyKeyManager('dummy_type_url', aead.Aead)) + primitive = self.reg.primitive( + tink_pb2.KeyData(type_url='dummy_type_url'), aead.Aead) + self.assertIsInstance(primitive, helper.FakeAead) + + def test_primitive_fails_on_wrong_primitive(self): + self.reg.register_key_manager(DummyKeyManager('dummy_type_url', aead.Aead)) + with self.assertRaisesRegex(core.TinkError, + 'uses primitive Aead, and not Mac'): + self.reg.primitive(tink_pb2.KeyData(type_url='dummy_type_url'), mac.Mac) + + def test_primitive_fails_on_subclass(self): + self.reg.register_key_manager( + DummyKeyManager('dummy_type_url', helper.FakeAead)) + with self.assertRaisesRegex(core.TinkError, + 'uses primitive FakeAead, and not Aead'): + self.reg.primitive(tink_pb2.KeyData(type_url='dummy_type_url'), aead.Aead) + + def test_new_key_data_success(self): + self.reg.register_key_manager(DummyKeyManager('dummy_type_url')) + key_template = tink_pb2.KeyTemplate(type_url='dummy_type_url') + key_data = self.reg.new_key_data(key_template) + self.assertEqual(key_data.type_url, 'dummy_type_url') + + def test_new_key_data_wrong_type_url(self): + self.reg.register_key_manager(DummyKeyManager('dummy_type_url')) + unknown_key_template = tink_pb2.KeyTemplate(type_url='unknown_type_url') + with self.assertRaisesRegex(core.TinkError, + 'No manager for type unknown_type_url'): + self.reg.new_key_data(unknown_key_template) + + def test_new_key_data_no_new_key_allowed(self): + self.reg.register_key_manager( + DummyKeyManager('dummy_type_url'), new_key_allowed=False) + key_template = tink_pb2.KeyTemplate(type_url='dummy_type_url') + with self.assertRaisesRegex(core.TinkError, + 'does not allow for creation of new keys'): + self.reg.new_key_data(key_template) + + def test_public_key_data_success(self): + self.reg.register_key_manager(DummyPrivateKeyManager('dummy_type_url')) + key_data = tink_pb2.KeyData( + type_url='dummy_type_url', + key_material_type=tink_pb2.KeyData.ASYMMETRIC_PRIVATE) + public_key_data = self.reg.public_key_data(key_data) + self.assertEqual(public_key_data.type_url, 'public_dummy_type_url') + + def test_public_key_data_fails_for_non_asymmetric_private_key(self): + self.reg.register_key_manager(DummyPrivateKeyManager('dummy_type_url')) + key_data = tink_pb2.KeyData( + type_url='dummy_type_url', + key_material_type=tink_pb2.KeyData.ASYMMETRIC_PUBLIC) + with self.assertRaisesRegex(core.TinkError, + 'contains a non-private key'): + self.reg.public_key_data(key_data) + + def test_public_key_data_fails_for_non_private_key_manager(self): + self.reg.register_key_manager(DummyKeyManager('dummy_type_url')) + key_data = tink_pb2.KeyData( + type_url='dummy_type_url', + key_material_type=tink_pb2.KeyData.ASYMMETRIC_PRIVATE) + with self.assertRaisesRegex(core.TinkError, + 'is not a PrivateKeyManager'): + self.reg.public_key_data(key_data) + + def test_wrap_success(self): + self.reg.register_primitive_wrapper(mac.MacWrapper()) + mac1 = helper.FakeMac('FakeMac1') + mac2 = helper.FakeMac('FakeMac2') + wrapped_mac = self.reg.wrap(_mac_set([mac1, mac2])) + wrapped_mac.verify_mac(mac1.compute_mac(b'data1'), b'data1') + wrapped_mac.verify_mac(mac2.compute_mac(b'data2'), b'data2') + wrapped_mac.verify_mac(wrapped_mac.compute_mac(b'data'), b'data') + + def test_wrap_unknown_primitive(self): + with self.assertRaisesRegex( + core.TinkError, + 'No PrimitiveWrapper registered for primitive Mac.'): + self.reg.wrap(_mac_set([helper.FakeMac()])) + + def test_primitive_wrapper_reset(self): + self.reg.register_primitive_wrapper(mac.MacWrapper()) + self.reg.reset() + with self.assertRaisesRegex( + core.TinkError, + 'No PrimitiveWrapper registered for primitive Mac.'): + self.reg.wrap(_mac_set([helper.FakeMac()])) + + def test_register_same_primitive_wrapper_twice(self): + self.reg.register_primitive_wrapper(mac.MacWrapper()) + self.reg.register_primitive_wrapper(mac.MacWrapper()) + + def test_register_different_primitive_wrappers_twice_fails(self): + self.reg.register_primitive_wrapper(mac.MacWrapper()) + with self.assertRaisesRegex( + core.TinkError, + 'A wrapper for primitive Mac has already been added.'): + self.reg.register_primitive_wrapper(DummyMacWrapper()) + + def test_register_inconsistent_wrapper_fails(self): + with self.assertRaisesRegex( + core.TinkError, + 'Wrapper for primitive Mac generates incompatibe primitve'): + self.reg.register_primitive_wrapper(InconsistentWrapper()) + + +if __name__ == '__main__': + googletest.main() diff --git a/python/core/tink_config.py b/python/core/tink_config.py new file mode 100644 index 0000000000000000000000000000000000000000..0ba8523cc023b5f85b4cb72e03b9437b3f8b1287 --- /dev/null +++ b/python/core/tink_config.py @@ -0,0 +1,75 @@ +# Copyright 2019 Google LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Static methods for handling of Tink configurations.""" + +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +from tink.python import aead +from tink.python import daead +from tink.python import hybrid +from tink.python import mac +from tink.python import signature +from tink.python.aead import aead_key_manager +from tink.python.cc.clif import cc_tink_config +from tink.python.core import registry +from tink.python.daead import deterministic_aead_key_manager +from tink.python.hybrid import hybrid_decrypt_key_manager +from tink.python.hybrid import hybrid_encrypt_key_manager +from tink.python.mac import mac_key_manager +from tink.python.signature import public_key_sign_key_manager +from tink.python.signature import public_key_verify_key_manager + + +KEY_MANAGER_GENERATORS = { + 'Aead': aead_key_manager.from_cc_registry, + 'DeterministicAead': deterministic_aead_key_manager.from_cc_registry, + 'HybridDecrypt': hybrid_decrypt_key_manager.from_cc_registry, + 'HybridEncrypt': hybrid_encrypt_key_manager.from_cc_registry, + 'Mac': mac_key_manager.from_cc_registry, + 'PublicKeySign': public_key_sign_key_manager.from_cc_registry, + 'PublicKeyVerify': public_key_verify_key_manager.from_cc_registry, +} + + +def register(): + cc_tink_config.register() + _register_key_managers() + _register_primitive_wrappers() + + +def latest(): + return cc_tink_config.latest() + + +def _register_key_managers(): + for entry in cc_tink_config.latest().entry: + if entry.primitive_name in KEY_MANAGER_GENERATORS: + registry.Registry.register_key_manager( + KEY_MANAGER_GENERATORS[entry.primitive_name](entry.type_url), + entry.new_key_allowed) + + +def _register_primitive_wrappers(): + """Registers all primitive wrappers.""" + register_primitive_wrapper = registry.Registry.register_primitive_wrapper + register_primitive_wrapper(aead.AeadWrapper()) + register_primitive_wrapper(daead.DeterministicAeadWrapper()) + register_primitive_wrapper(hybrid.HybridDecryptWrapper()) + register_primitive_wrapper(hybrid.HybridEncryptWrapper()) + register_primitive_wrapper(mac.MacWrapper()) + register_primitive_wrapper(signature.PublicKeySignWrapper()) + register_primitive_wrapper(signature.PublicKeyVerifyWrapper()) diff --git a/python/core/tink_config_test.py b/python/core/tink_config_test.py new file mode 100644 index 0000000000000000000000000000000000000000..2ef5fbdba4ca30054db19c35c4b24a1b493b0f91 --- /dev/null +++ b/python/core/tink_config_test.py @@ -0,0 +1,225 @@ +# Copyright 2019 Google LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Tests for tink.python.tink_config.""" + +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import unittest +from tink.proto import tink_pb2 +from tink.python import aead +from tink.python import core +from tink.python import daead +from tink.python import hybrid +from tink.python import mac +from tink.python import signature +from tink.python.core import tink_config + + +def setUpModule(): + tink_config.register() + + +def _primitive_and_key(key_data, primitive_class, output_prefix_type): + primitive = core.Registry.primitive(key_data, primitive_class) + key = tink_pb2.Keyset.Key( + key_id=1, status=tink_pb2.ENABLED, output_prefix_type=output_prefix_type) + key.key_data.CopyFrom(key_data) + return primitive, key + + +def _new_primitive_and_key(template, primitive_class, output_prefix_type): + return _primitive_and_key( + core.Registry.new_key_data(template), primitive_class, output_prefix_type) + + +def _public_primitive_and_key(private_key, primitive_class, output_prefix_type): + return _primitive_and_key( + core.Registry.public_key_data(private_key.key_data), primitive_class, + output_prefix_type) + + +class TinkConfigTest(googletest.TestCase): + + def test_all_aead_templates_are_registered(self): + for template in [ + aead.aead_key_templates.AES128_EAX, + aead.aead_key_templates.AES256_EAX, + aead.aead_key_templates.AES128_GCM, + aead.aead_key_templates.AES256_GCM, + aead.aead_key_templates.AES128_CTR_HMAC_SHA256, + aead.aead_key_templates.AES256_CTR_HMAC_SHA256, + aead.aead_key_templates.XCHACHA20_POLY1305 + ]: + key_data = core.Registry.new_key_data(template) + primitive = core.Registry.primitive(key_data, aead.Aead) + self.assertEqual( + primitive.decrypt(primitive.encrypt(b'message', b'ad'), b'ad'), + b'message') + + def test_all_mac_templates_are_registered(self): + for template in [ + mac.mac_key_templates.HMAC_SHA256_128BITTAG, + mac.mac_key_templates.HMAC_SHA256_256BITTAG + ]: + key_data = core.Registry.new_key_data(template) + primitive = core.Registry.primitive(key_data, mac.Mac) + self.assertIsNone( + primitive.verify_mac(primitive.compute_mac(b'data'), b'data')) + + def test_all_deterministic_aead_templates_are_registered(self): + key_data = core.Registry.new_key_data( + daead.deterministic_aead_key_templates.AES256_SIV) + daead_primitive = core.Registry.primitive(key_data, daead.DeterministicAead) + ciphertext = daead_primitive.encrypt_deterministically(b'message', b'ad') + self.assertEqual( + daead_primitive.decrypt_deterministically(ciphertext, b'ad'), + b'message') + + def test_aead_wrapper_is_correctly_registered(self): + aead1, key1 = _new_primitive_and_key(aead.aead_key_templates.AES128_EAX, + aead.Aead, tink_pb2.RAW) + aead2, key2 = _new_primitive_and_key(aead.aead_key_templates.AES256_GCM, + aead.Aead, tink_pb2.TINK) + pset = core.PrimitiveSet(aead.Aead) + pset.add_primitive(aead1, key1) + pset.set_primary(pset.add_primitive(aead2, key2)) + wrapped_aead = core.Registry.wrap(pset) + + self.assertEqual( + wrapped_aead.decrypt(aead1.encrypt(b'plaintext1', b'ad1'), b'ad1'), + b'plaintext1') + self.assertEqual( + wrapped_aead.decrypt( + wrapped_aead.encrypt(b'plaintext2', b'ad2'), b'ad2'), b'plaintext2') + + def test_mac_wrapper_is_correctly_registered(self): + mac1, key1 = _new_primitive_and_key( + mac.mac_key_templates.HMAC_SHA256_128BITTAG, mac.Mac, tink_pb2.RAW) + mac2, key2 = _new_primitive_and_key( + mac.mac_key_templates.HMAC_SHA256_256BITTAG, mac.Mac, tink_pb2.TINK) + pset = core.PrimitiveSet(mac.Mac) + pset.add_primitive(mac1, key1) + pset.set_primary(pset.add_primitive(mac2, key2)) + wrapped_mac = core.Registry.wrap(pset) + + self.assertIsNone( + wrapped_mac.verify_mac(mac1.compute_mac(b'data1'), b'data1')) + self.assertIsNone( + wrapped_mac.verify_mac(wrapped_mac.compute_mac(b'data2'), b'data2')) + + def test_deterministic_aead_wrapper_is_correctly_registered(self): + daead1, key1 = _new_primitive_and_key( + daead.deterministic_aead_key_templates.AES256_SIV, + daead.DeterministicAead, tink_pb2.RAW) + daead2, key2 = _new_primitive_and_key( + daead.deterministic_aead_key_templates.AES256_SIV, + daead.DeterministicAead, tink_pb2.TINK) + pset = core.PrimitiveSet(daead.DeterministicAead) + pset.add_primitive(daead1, key1) + pset.set_primary(pset.add_primitive(daead2, key2)) + wrapped_daead = core.Registry.wrap(pset) + + self.assertEqual( + wrapped_daead.decrypt_deterministically( + daead1.encrypt_deterministically(b'plaintext1', b'ad1'), b'ad1'), + b'plaintext1') + self.assertEqual( + wrapped_daead.decrypt_deterministically( + wrapped_daead.encrypt_deterministically(b'plaintext2', b'ad2'), + b'ad2'), b'plaintext2') + + def test_hybrid_wrappers_are_correctly_registered(self): + dec1, dec1_key = _new_primitive_and_key( + hybrid.hybrid_key_templates.ECIES_P256_HKDF_HMAC_SHA256_AES128_GCM, + hybrid.HybridDecrypt, tink_pb2.RAW) + enc1, enc1_key = _public_primitive_and_key(dec1_key, hybrid.HybridEncrypt, + tink_pb2.RAW) + + dec2, dec2_key = _new_primitive_and_key( + hybrid.hybrid_key_templates.ECIES_P256_HKDF_HMAC_SHA256_AES128_GCM, + hybrid.HybridDecrypt, tink_pb2.RAW) + enc2, enc2_key = _public_primitive_and_key(dec2_key, hybrid.HybridEncrypt, + tink_pb2.RAW) + + dec_pset = core.PrimitiveSet(hybrid.HybridDecrypt) + dec_pset.add_primitive(dec1, dec1_key) + dec_pset.set_primary(dec_pset.add_primitive(dec2, dec2_key)) + wrapped_dec = core.Registry.wrap(dec_pset) + + enc_pset = core.PrimitiveSet(hybrid.HybridEncrypt) + enc_pset.add_primitive(enc1, enc1_key) + enc_pset.set_primary(enc_pset.add_primitive(enc2, enc2_key)) + wrapped_enc = core.Registry.wrap(enc_pset) + + self.assertEqual( + wrapped_dec.decrypt(enc1.encrypt(b'plaintext1', b'ad1'), b'ad1'), + b'plaintext1') + self.assertEqual( + wrapped_dec.decrypt(wrapped_enc.encrypt(b'plaintext2', b'ad2'), b'ad2'), + b'plaintext2') + + def test_key_managers_for_signature_templates_are_registered(self): + key_templates = signature.signature_key_templates + for template in [ + key_templates.ECDSA_P256, key_templates.ECDSA_P384, + key_templates.ECDSA_P521, key_templates.ECDSA_P256_IEEE_P1363, + key_templates.ECDSA_P256_IEEE_P1363, + key_templates.ECDSA_P521_IEEE_P1363, key_templates.ED25519, + key_templates.RSA_SSA_PSS_3072_SHA256_SHA256_32_F4, + key_templates.RSA_SSA_PSS_4096_SHA512_SHA512_64_F4, + key_templates.RSA_SSA_PKCS1_3072_SHA256_F4, + key_templates.RSA_SSA_PKCS1_4096_SHA512_F4 + ]: + key_data = core.Registry.new_key_data(template) + primitive = core.Registry.primitive(key_data, signature.PublicKeySign) + sig = primitive.sign(b'data') + + public_key = core.Registry.public_key_data(key_data) + primitive_verify = core.Registry.primitive(public_key, + signature.PublicKeyVerify) + + primitive_verify.verify(sig, b'data') + + def test_signature_wrapper_is_correctly_registered(self): + sig1, key1 = _new_primitive_and_key( + signature.signature_key_templates.ECDSA_P256, signature.PublicKeySign, + tink_pb2.TINK) + sig2, key2 = _new_primitive_and_key( + signature.signature_key_templates.ECDSA_P256, signature.PublicKeySign, + tink_pb2.TINK) + + ver1, pubkey1 = _public_primitive_and_key(key1, signature.PublicKeyVerify, + tink_pb2.TINK) + ver2, pubkey2 = _public_primitive_and_key(key2, signature.PublicKeyVerify, + tink_pb2.TINK) + + pset = core.PrimitiveSet(signature.PublicKeySign) + pset.add_primitive(sig1, key1) + pset.set_primary(pset.add_primitive(sig2, key2)) + wrapped_sig = core.Registry.wrap(pset) + + pset_verify = core.new_primitive_set(signature.PublicKeyVerify) + pset_verify.add_primitive(ver1, pubkey1) + pset_verify.set_primary(pset_verify.add_primitive(ver2, pubkey2)) + wrapped_ver = core.Registry.wrap(pset_verify) + + sig = wrapped_sig.sign(b'data') + wrapped_ver.verify(sig, b'data') + + +if __name__ == '__main__': + googletest.main() diff --git a/python/core/tink_error.py b/python/core/tink_error.py new file mode 100644 index 0000000000000000000000000000000000000000..c20581e4b15c0096b757305dc4354dcd2ce2a004 --- /dev/null +++ b/python/core/tink_error.py @@ -0,0 +1,35 @@ +# Copyright 2019 Google LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""This module defines basic exceptions in tink.""" + +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +from tink.util import error as clif_error + + +def use_tink_errors(func): + """Transforms StatusNotOk errors into TinkErrors.""" + def wrapper(*args): + try: + return func(*args) + except clif_error.StatusNotOk as e: + raise TinkError(e) + return wrapper + + +class TinkError(Exception): + """Common exception in Tink."""