diff --git a/mypy.ini b/mypy.ini index 04ba591a..c14e6ab5 100644 --- a/mypy.ini +++ b/mypy.ini @@ -32,3 +32,9 @@ ignore_missing_imports = True [mypy-azure.*] ignore_missing_imports = True + +[mypy-boto3.*] +ignore_missing_imports = True + +[mypy-botocore.*] +ignore_missing_imports = True diff --git a/pyproject.toml b/pyproject.toml index 6f35e62e..fdcaeaa4 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -46,6 +46,7 @@ Issues = "https://github.com/secure-systems-lab/securesystemslib/issues" crypto = ["cryptography>=40.0.0"] gcpkms = ["google-cloud-kms", "cryptography>=40.0.0"] azurekms = ["azure-identity", "azure-keyvault-keys", "cryptography>=40.0.0"] +awskms = ["boto3", "botocore", "cryptography>=40.0.0"] hsm = ["asn1crypto", "cryptography>=40.0.0", "PyKCS11"] pynacl = ["pynacl>1.2.0"] PySPX = ["PySPX>=0.5.0"] diff --git a/securesystemslib/signer/__init__.py b/securesystemslib/signer/__init__.py index 66b93e92..f28bd445 100644 --- a/securesystemslib/signer/__init__.py +++ b/securesystemslib/signer/__init__.py @@ -4,6 +4,7 @@ This module provides extensible interfaces for public keys and signers: Some implementations are provided by default but more can be added by users. """ +from securesystemslib.signer._aws_signer import AWSSigner from securesystemslib.signer._azure_signer import AzureSigner from securesystemslib.signer._gcp_signer import GCPSigner from securesystemslib.signer._gpg_signer import GPGKey, GPGSigner @@ -32,6 +33,7 @@ HSMSigner.SCHEME: HSMSigner, GPGSigner.SCHEME: GPGSigner, AzureSigner.SCHEME: AzureSigner, + AWSSigner.SCHEME: AWSSigner, } ) diff --git a/securesystemslib/signer/_aws_signer.py b/securesystemslib/signer/_aws_signer.py new file mode 100644 index 00000000..e13ebd74 --- /dev/null +++ b/securesystemslib/signer/_aws_signer.py @@ -0,0 +1,240 @@ +"""Signer implementation for AWS Key Management Service""" + +import logging +from typing import Optional, Tuple +from urllib import parse + +import securesystemslib.hash as sslib_hash +from securesystemslib import exceptions +from securesystemslib.exceptions import UnsupportedLibraryError +from securesystemslib.signer._key import Key +from securesystemslib.signer._signer import ( + SecretsHandler, + Signature, + Signer, + SSlibKey, +) + +logger = logging.getLogger(__name__) + +AWS_IMPORT_ERROR = None +try: + import boto3 + from botocore.exceptions import BotoCoreError, ClientError + from cryptography.hazmat.primitives import serialization +except ImportError: + AWS_IMPORT_ERROR = "Signing with AWS KMS requires aws-kms and cryptography." + + +class AWSSigner(Signer): + """AWS Key Management Service Signer + + This Signer uses AWS KMS to sign and supports signing with RSA/EC keys and + uses "ambient" credentials typically environment variables such as + AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, and AWS_SESSION_TOKEN. These will + be recognized by the boto3 SDK, which underlies the aws_kms Python module. + + For more details on AWS authentication, refer to the AWS Command Line + Interface User Guide: + https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-files.html + + Some practical authentication options include: + AWS CLI: https://aws.amazon.com/cli/ + AWS SDKs: https://aws.amazon.com/tools/ + + The specific permissions that AWS KMS signer needs are: + kms:Sign for sign() + kms:GetPublicKey for import() + + Arguments: + aws_key_id (str): AWS KMS key ID or alias. + public_key (Key): The related public key instance. + + Returns: + AWSSigner: An instance of the AWSSigner class. + + Raises: + UnsupportedAlgorithmError: If the payload hash algorithm is unsupported. + BotoCoreError: Errors from the botocore.exceptions library. + ClientError: Errors related to AWS KMS client. + UnsupportedLibraryError: If necessary libraries for AWS KMS are not available. + """ + + SCHEME = "awskms" + + def __init__(self, aws_key_id: str, public_key: Key): + if AWS_IMPORT_ERROR: + raise UnsupportedLibraryError(AWS_IMPORT_ERROR) + + self.hash_algorithm = self._get_hash_algorithm(public_key) + self.aws_key_id = aws_key_id + self.public_key = public_key + self.client = boto3.client("kms") + self.aws_algo = self._get_aws_signing_algo(self.public_key.scheme) + + @classmethod + def from_priv_key_uri( + cls, + priv_key_uri: str, + public_key: Key, + secrets_handler: Optional[SecretsHandler] = None, + ) -> "AWSSigner": + uri = parse.urlparse(priv_key_uri) + + if uri.scheme != cls.SCHEME: + raise ValueError(f"AWSSigner does not support {priv_key_uri}") + + return cls(uri.path, public_key) + + @classmethod + def import_(cls, aws_key_id: str, local_scheme: str) -> Tuple[str, Key]: + """Loads a key and signer details from AWS KMS. + + Returns the private key uri and the public key. This method should only + be called once per key: the uri and Key should be stored for later use. + + Arguments: + aws_key_id (str): AWS KMS key ID. + local_scheme (str): Local scheme to use. + + Returns: + Tuple[str, Key]: A tuple where the first element is a string + representing the private key URI, and the second element is an + instance of the public key. + + Raises: + UnsupportedAlgorithmError: If the AWS KMS signing algorithm is + unsupported. + BotoCoreError: Errors from the botocore.exceptions library. + ClientError: Errors related to AWS KMS client. + """ + if AWS_IMPORT_ERROR: + raise UnsupportedLibraryError(AWS_IMPORT_ERROR) + + client = boto3.client("kms") + request = client.get_public_key(KeyId=aws_key_id) + kms_pubkey = serialization.load_der_public_key(request["PublicKey"]) + + public_key_pem = kms_pubkey.public_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PublicFormat.SubjectPublicKeyInfo, + ).decode("utf-8") + try: + keytype = cls._get_keytype_for_scheme(local_scheme) + except KeyError as e: + raise exceptions.UnsupportedAlgorithmError( + f"{local_scheme} is not a supported signing algorithm" + ) from e + + keyval = {"public": public_key_pem} + keyid = cls._get_keyid(keytype, local_scheme, keyval) + public_key = SSlibKey(keyid, keytype, local_scheme, keyval) + return f"{cls.SCHEME}:{aws_key_id}", public_key + + @staticmethod + def _get_keytype_for_scheme( + scheme: str, + ) -> str: + """Returns the Secure Systems Library key type. + + Arguments: + (str): The Secure Systems Library scheme. + + Returns: + str: The Secure Systems Library key type. + """ + keytype_for_scheme = { + "ecdsa-sha2-nistp256": "ecdsa", + "ecdsa-sha2-nistp384": "ecdsa", + "ecdsa-sha2-nistp512": "ecdsa", + "rsassa-pss-sha256": "rsa", + "rsassa-pss-sha384": "rsa", + "rsassa-pss-sha512": "rsa", + "rsa-pkcs1v15-sha256": "rsa", + "rsa-pkcs1v15-sha384": "rsa", + "rsa-pkcs1v15-sha512": "rsa", + } + return keytype_for_scheme[scheme] + + @staticmethod + def _get_aws_signing_algo( + scheme: str, + ) -> str: + """Returns AWS signing algorithm + + Arguments: + scheme (str): The Secure Systems Library signing scheme. + + Returns: + str: AWS signing scheme. + """ + aws_signing_algorithms = { + "ecdsa-sha2-nistp256": "ECDSA_SHA_256", + "ecdsa-sha2-nistp384": "ECDSA_SHA_384", + "ecdsa-sha2-nistp512": "ECDSA_SHA_512", + "rsassa-pss-sha256": "RSASSA_PSS_SHA_256", + "rsassa-pss-sha384": "RSASSA_PSS_SHA_384", + "rsassa-pss-sha512": "RSASSA_PSS_SHA_512", + "rsa-pkcs1v15-sha256": "RSASSA_PKCS1_V1_5_SHA_256", + "rsa-pkcs1v15-sha384": "RSASSA_PKCS1_V1_5_SHA_384", + "rsa-pkcs1v15-sha512": "RSASSA_PKCS1_V1_5_SHA_512", + } + return aws_signing_algorithms[scheme] + + @staticmethod + def _get_hash_algorithm(public_key: Key) -> str: + """Helper function to return payload hash algorithm used for this key + + Arguments: + public_key (Key): Public key object + + Returns: + str: Hash algorithm + """ + if public_key.keytype == "rsa": + # hash algorithm is encoded as last scheme portion + algo = public_key.scheme.split("-")[-1] + if public_key.keytype in [ + "ecdsa", + "ecdsa-sha2-nistp256", + "ecdsa-sha2-nistp384", + ]: + # nistp256 uses sha-256, nistp384 uses sha-384 + bits = public_key.scheme.split("-nistp")[-1] + algo = f"sha{bits}" + + # trigger UnsupportedAlgorithm if appropriate + _ = sslib_hash.digest(algo) + return algo + + def sign(self, payload: bytes) -> Signature: + """Sign the payload with the AWS KMS key + + Arguments: + payload: bytes to be signed. + + Raises: + BotoCoreError: Errors from the botocore.exceptions library. + ClientError: Errors related to AWS KMS client. + + Returns: + Signature. + """ + try: + request = self.client.sign( + KeyId=self.aws_key_id, + Message=payload, + MessageType="RAW", + SigningAlgorithm=self.aws_algo, + ) + + hasher = sslib_hash.digest(self.hash_algorithm) + hasher.update(payload) + logger.debug("signing response %s", request) + response = request["Signature"] + logger.debug("signing response %s", response) + + return Signature(self.public_key.keyid, response.hex()) + except (BotoCoreError, ClientError) as e: + logger.error("Failed to sign with AWS KMS: %s", str(e)) + raise e diff --git a/tests/check_aws_signer.py b/tests/check_aws_signer.py new file mode 100644 index 00000000..e2603cff --- /dev/null +++ b/tests/check_aws_signer.py @@ -0,0 +1,62 @@ +"""This module confirms that signing using AWS KMS keys works. + +The purpose is to do a smoke test, not to exhaustively test every possible key +and environment combination. + +For AWS, the requirements to successfully test are: +* AWS authentication details +have to be available in the environment +* The key defined in the test has to be +available to the authenticated user + +Remember to replace the REDACTED fields to include the necessary values: +* keyid: Hash of the public key +* public: The public key, refer to other KMS tests to see the format +* aws_id: AWS KMS ID or alias +""" + +import unittest + +from securesystemslib.exceptions import UnverifiedSignatureError +from securesystemslib.signer import AWSSigner, Key, Signer + + +class TestAWSKMSKeys(unittest.TestCase): + """Test that AWS KMS keys can be used to sign.""" + + pubkey = Key.from_dict( + "REDACTED", + { + "keytype": "rsa", + "scheme": "rsassa-pss-sha256", + "keyval": { + "public": "-----BEGIN PUBLIC KEY-----\nREDACTED\n-----END PUBLIC KEY-----\n" + }, + }, + ) + aws_key_id = "REDACTED" + + def test_aws_sign(self): + """Test that AWS KMS key works for signing""" + + data = "data".encode("utf-8") + + signer = Signer.from_priv_key_uri( + f"awskms:{self.aws_key_id}", self.pubkey + ) + sig = signer.sign(data) + + self.pubkey.verify_signature(sig, data) + with self.assertRaises(UnverifiedSignatureError): + self.pubkey.verify_signature(sig, b"NOT DATA") + + def test_aws_import(self): + """Test that AWS KMS key can be imported""" + + uri, key = AWSSigner.import_(self.aws_key_id, self.pubkey.scheme) + self.assertEqual(key.keytype, self.pubkey.keytype) + self.assertEqual(uri, f"awskms:{self.aws_key_id}") + + +if __name__ == "__main__": + unittest.main(verbosity=1) diff --git a/tox.ini b/tox.ini index 805aefbd..b1c6bc7f 100644 --- a/tox.ini +++ b/tox.ini @@ -21,7 +21,7 @@ deps = commands = python -m tests.check_gpg_available coverage run tests/aggregate_tests.py - coverage report -m --fail-under 85 + coverage report -m --fail-under 83 [testenv:purepy311] deps =