Source code for carlogtt_python_library.utils.encryption

# ======================================================================
# MODULE DETAILS
# This section provides metadata about the module, including its
# creation date, author, copyright information, and a brief description
# of the module's purpose and functionality.
# ======================================================================

#   __|    \    _ \  |      _ \   __| __ __| __ __|
#  (      _ \     /  |     (   | (_ |    |      |
# \___| _/  _\ _|_\ ____| \___/ \___|   _|     _|

# src/carlogtt_python_library/utils/encryption.py
# Created 10/2/23 - 9:25 AM UK Time (London) by carlogtt

"""
Utilities for authenticated encryption and password hashing.
"""

# ======================================================================
# EXCEPTIONS
# This section documents any exceptions made or code quality rules.
# These exceptions may be necessary due to specific coding requirements
# or to bypass false positives.
# ======================================================================
#

# ======================================================================
# IMPORTS
# Importing required libraries and modules for the application.
# ======================================================================

# Standard Library Imports
import ast
import base64
import enum
import logging
import os
import secrets
import string
import warnings
from collections.abc import Sequence
from typing import Optional, TypedDict, Union

# Third Party Library Imports
import cryptography.fernet
import cryptography.hazmat.primitives.ciphers.aead
import cryptography.hazmat.primitives.ciphers.algorithms
import cryptography.hazmat.primitives.ciphers.modes
import cryptography.hazmat.primitives.hashes
import cryptography.hazmat.primitives.hmac
import cryptography.hazmat.primitives.kdf.hkdf
import cryptography.hazmat.primitives.kdf.scrypt
import cryptography.hazmat.primitives.padding

# Local Folder (Relative) Imports
from .. import exceptions

# END IMPORTS
# ======================================================================


# List of public names in the module
__all__ = [
    'KeyType',
    'KeyOutputType',
    'EncryptionAlgorithm',
    'Cryptography',
]

# Setting up logger for current module
module_logger = logging.getLogger(__name__)

# Type aliases
#


class ScryptParamsType(TypedDict):
    n: int
    r: int
    p: int
    length: int


[docs] class KeyType(enum.Enum): """ Enumeration to specify the key type based on its length. Attributes: AES128: Represents a key length of 128 bits (16 bytes) for AES encryption. This size is often used for its balance between security and performance. AES256: Represents a key length of 256 bits (32 bytes) for AES encryption. Offers a high level of security and is recommended for situations requiring enhanced data protection. INITIALIZATION_VECTOR: A 128-bit (16-byte) IV used in various encryption modes to ensure ciphertext uniqueness. Suitable for use with AES and other block ciphers in modes like CBC. """ AES128 = 16 AES256 = 32 INITIALIZATION_VECTOR = 16
[docs] class KeyOutputType(enum.Enum): """ Enumeration to specify the output format of the generated key. Attributes: BYTES: The key is returned as raw bytes. BASE64: The key is returned as URL-safe Base64-encoded bytes. """ BYTES = enum.auto() BASE64 = enum.auto()
[docs] class EncryptionAlgorithm(enum.Enum): """ Defines supported encryption algorithms anf their envelope prefixes. """ AES_128 = "v1:fernet:" AES_256 = "v1:cbc:" AES_GCM = "v2:gcm:"
[docs] @classmethod def all_alg_prefixes(cls) -> tuple[str, ...]: return tuple(alg.value for alg in cls)
[docs] class Cryptography: """ Provides cryptographic functionalities including key generation, serialization for storage, encryption, decryption, and signing. Utilizes symmetric encryption (AES-256) and HMAC for signing. :param logger: The logging.Logger instance to be used for logging the execution time of the decorated function. If not explicitly provided, the function uses Python's standard logging module as a default logger. """ def __init__(self, logger: logging.Logger = module_logger): self.logger = logger
[docs] def create_key(self, key_type: KeyType, key_output: KeyOutputType) -> bytes: """ Generates a cryptographic key of specified type and returns it in the specified output format. :param key_type: The type of key to generate, affecting its length. :param key_output: The output format of the generated key. :returns: The generated key in the specified output format. """ if not isinstance(key_type, KeyType): raise exceptions.CryptographyError(f"Invalid key type: {key_type}") # Create a random bytes key = self._rand_bytes(n=key_type.value) # Return the key based on the output format self.logger.debug( f"{key_type.value}-Bytes Key for {key_type.name} - {key_output.name} created" ) if key_output is KeyOutputType.BYTES: return key elif key_output is KeyOutputType.BASE64: return base64.urlsafe_b64encode(key) else: raise exceptions.CryptographyError(f"Invalid key output type: {key_output}")
[docs] def serialize_key_for_str_storage(self, key: bytes) -> str: """ Converts a bytes object (key) to a string representation suitable for storage. Utilizes Python's repr() function to create a string that represents the bytes object. :param key: The bytes object to be serialized. :return: A string representation of the bytes object, including the bytes literal prefix b''. """ self.logger.debug("Serializing key for storage") return repr(key)
[docs] def deserialize_key_from_str_storage(self, key: str) -> bytes: """ Converts a string representation of a bytes object back into the original bytes object. This function is intended to be used in conjunction with serialize_key_for_str_storage, allowing for the retrieval of the original bytes object from a stored string. :param key: The string representation of the bytes object, expected to include the bytes literal prefix b''. :return: The original bytes object. """ self.logger.debug("Deserializing key from storage") response = ast.literal_eval(key) assert isinstance(response, bytes) return response
[docs] def hash_string(self, raw_string: str, key: bytes) -> str: """ DEPRECATED — use `hash_string_v2()`. Note: the key parameter is ignored in this deprecated API. Hashes a given string using the Scrypt Key Derivation Function :param raw_string: The raw string to be hashed. Typically, this would be a password or any other sensitive information requiring secure handling. :param key: The secret key used for hashing the hashed string. This key should be generated and managed using secure cryptographic practices and must be a 32-byte key for AES-256. :return: The hash of the input string. """ msg = ( f"[DEPRECATED] '{self.hash_string.__name__}' is deprecated in Class" f" '{self.__class__.__name__}'. Use the new method '{self.hash_string_v2.__name__}()'" " instead." ) warnings.warn(msg, DeprecationWarning, stacklevel=3) module_logger.warning(msg) return self.hash_string_v2(raw_string=raw_string)
[docs] def hash_string_v2( self, raw_string: str, scrypt_params: Optional[ScryptParamsType] = None ) -> str: """ Hash a given string with scrypt and return a PHC-style string: scrypt$ln=<n2exp>,r=<r>,p=<p>$<salt_b64>$<hash_b64> A new 16-byte random salt is generated per given string. :param raw_string: The raw string to be hashed. Typically, this would be a password or any other sensitive information requiring secure handling. :param scrypt_params: The scrypt parameters to be used for hashing. If not provided, secure defaults are used. :return: The hash of the input string PHC-style scrypt string. """ if not scrypt_params: scrypt_params = { "n": 2**17, "r": 8, "p": 1, "length": 32, } n = scrypt_params['n'] if n <= 1 or (n & (n - 1)) != 0: raise ValueError("scrypt 'n' must be a power of two > 1") # since n is power of two ln = n.bit_length() - 1 salt = self._rand_bytes(16) kdf = cryptography.hazmat.primitives.kdf.scrypt.Scrypt( salt=salt, length=scrypt_params['length'], n=scrypt_params['n'], r=scrypt_params['r'], p=scrypt_params['p'], ) # Derive the key dk = kdf.derive(raw_string.encode()) self.logger.debug("Scrypt key derived successfully") hash_string = f"scrypt$ln={ln},r={scrypt_params['r']},p={scrypt_params['p']}${self._b64u_encode(salt)}${self._b64u_encode(dk)}" # noqa return hash_string
[docs] def validate_hash_match(self, raw_string: str, hashed_to_match: str, key: bytes) -> bool: """ DEPRECATED — use `validate_hash_match_v2()`. Note: the key parameter is ignored in this deprecated API. Validates whether a provided raw string matches the hashed string stored. :param raw_string: The plaintext string provided by the user, typically a password or sensitive information that needs validation against a stored, hashed version. :param hashed_to_match: The hashed data that the raw_string is compared against. :param key: The secret key used for hashing the hashed string. :return: True if the raw_string, when hashed and processed, matches the hashed_string_to_match; False otherwise. """ msg = ( f"[DEPRECATED] '{self.validate_hash_match.__name__}' is deprecated in Class" f" '{self.__class__.__name__}'. Use the new method" f" '{self.validate_hash_match_v2.__name__}()' instead." ) warnings.warn(msg, DeprecationWarning, stacklevel=3) module_logger.warning(msg) return self.validate_hash_match_v2(raw_string=raw_string, hashed_to_match=hashed_to_match)
[docs] def validate_hash_match_v2(self, raw_string: str, hashed_to_match: str) -> bool: """ Verify a provided raw string against a PHC-style scrypt string in constant time. :param raw_string: The plaintext string provided by the user, typically a password or sensitive information that needs validation against a stored, hashed version. :param hashed_to_match: The hashed data that the raw_string is compared against. :return: True if the raw_string, when hashed and processed, matches the hashed_string_to_match; False otherwise. """ if not raw_string and hashed_to_match: return False try: scheme, param_str, salt_b64, dk_b64 = hashed_to_match.split("$", 3) if scheme != "scrypt": return False scrypt_params = dict(kv.split("=", 1) for kv in param_str.split(",")) n = 1 << int(scrypt_params["ln"]) r = int(scrypt_params["r"]) p = int(scrypt_params["p"]) salt = self._b64u_decode(salt_b64) expected = self._b64u_decode(dk_b64) length = len(expected) kdf = cryptography.hazmat.primitives.kdf.scrypt.Scrypt( salt=salt, length=length, n=n, r=r, p=p ) # Attempt to verify the derived key kdf.verify(raw_string.encode(), expected) self.logger.debug("Scrypt key verified successfully") return True except Exception as ex: self.logger.warning(f"Hash validation failed w/ error: {ex}") return False
[docs] def sign(self, data_to_sign: bytes, key: bytes) -> bytes: """ Signs the given data using HMAC with SHA256 hash function. First, the data is encoded to Base64. Then, an HMAC signature is generated using the provided key and the SHA256 hash function. The data and its HMAC signature are concatenated, base64 encoded, and returned. :param data_to_sign: Data to be signed. :param key: Secret key used for HMAC. :returns: Base64 encoded signature of the data. """ # Encode the data_to_sign to Base64 data_to_sign_base64 = base64.urlsafe_b64encode(data_to_sign) # Create Hash-based message authentication codes (HMAC) hmac_hash = cryptography.hazmat.primitives.hmac.HMAC( key, cryptography.hazmat.primitives.hashes.SHA256() ) # Hash the data to create the signature hmac_hash.update(data_to_sign) hash_signature = hmac_hash.finalize() hash_signature_base64 = base64.urlsafe_b64encode(hash_signature) # The Hashing Signature need to be stored with the data data_and_hash_signature = data_to_sign_base64 + b"&" + hash_signature_base64 # Base64 encode the data and the signature signature = base64.urlsafe_b64encode(data_and_hash_signature) self.logger.debug("Data signed successfully") return signature
[docs] def verify_signature(self, signature: bytes, key: bytes) -> dict[str, Union[str, bool, bytes]]: """ Verifies the signature of the provided data. Decodes the signature from Base64, extracts the data and its HMAC signature, and verifies it using HMAC with SHA256. Returns a dict indicating whether the signature is valid, the original data, and the hash signature. :param signature: Base64 encoded data and signature. :param key: Secret key used for HMAC verification. :return: A dictionary containing the verification result, the original data, and the hash signature with the keys 'data', 'signature', 'signature_valid', and possibly 'response_info' if an error occurs. """ response: dict[str, Union[str, bool, bytes]] = { 'data': b"", 'signature': b"", 'signature_valid': False, } try: # Decode the Base64 encoded data data_and_hash_signature = base64.urlsafe_b64decode(signature) # Extract the data_to_sign_base64 and the # hash_signature_base64 data_base64, hash_signature_base64 = data_and_hash_signature.split(b"&") # Decode the Base64 data = base64.urlsafe_b64decode(data_base64) hash_signature = base64.urlsafe_b64decode(hash_signature_base64) # Update return dict with data and hash_signature response.update(data=data, signature=hash_signature) except Exception as ex: # Update the return dict with failed validation response.update(response_info=repr(ex)) return response try: # Create Hash-based message authentication codes (HMAC) hmac_hash = cryptography.hazmat.primitives.hmac.HMAC( key, cryptography.hazmat.primitives.hashes.SHA256() ) # Validate the Signature hmac_hash.update(data) hmac_hash.verify(hash_signature) self.logger.debug("Signature verified successfully") # Update the return dict with successful validation response.update(signature_valid=True) return response except Exception as ex: self.logger.warning(f"Signature verification failed w/ error: {ex}") # Update the return dict with failed validation response.update(response_info=repr(ex)) return response
[docs] def encrypt_string( self, plaintext: str, key: bytes, algorithm: EncryptionAlgorithm = EncryptionAlgorithm.AES_GCM, associated_data: Optional[bytes] = None, ) -> str: """ Encrypts plaintext using the selected algorithm and returns a versioned, Base64-url-safe string. Envelope prefixes: - v2:gcm:... → AES-256-GCM (default) - v1:cbc:... → AES-256-CBC + HMAC-SHA256 (legacy) - v1:fernet:... → Fernet (legacy) - AES_GCM: expects a 32-byte key (AES-256-GCM). Optional AAD. - AES_256 (CBC+HMAC): expects a 32-byte key; integrity via HMAC. - AES_128 (Fernet): expects a 44-byte urlsafe base64-encoded key. :param plaintext: The plaintext string to be encrypted. :param key: The secret key used for encryption. The format and length depend on the algorithm being used. :param algorithm: An instance of the EncryptionAlgorithm enum indicating the encryption algorithm to use. Default EncryptionAlgorithm.AES_GCM :param associated_data: Additional authenticated data (AAD) to be authenticated but not encrypted. It must be provided as a byte string if used. :return: The encrypted string, encoded with Base64 to ensure the encrypted data is text-safe. """ if algorithm is EncryptionAlgorithm.AES_GCM: return self._encrypt_aes_gcm(plaintext=plaintext, key=key, aad=associated_data) elif algorithm is EncryptionAlgorithm.AES_256: return self._encrypt_aes256(plaintext=plaintext, key=key) elif algorithm is EncryptionAlgorithm.AES_128: return self._encrypt_aes128(plaintext=plaintext, fernet_key=key) else: raise ValueError(f"algorthm must be one of the following {list(EncryptionAlgorithm)}")
[docs] def decrypt_string( self, ciphertext: str, key: bytes, algorithm: EncryptionAlgorithm = EncryptionAlgorithm.AES_GCM, associated_data: Optional[bytes] = None, ) -> str: """ Decrypts a string previously encrypted by the `encrypt_string` function. If a version prefix is present, auto-detect the algorithm: - v2:gcm:... → AES-GCM - v1:cbc:... → AES-CBC + HMAC (legacy) - v1:fernet:... → Fernet (legacy) Otherwise, fall back to the provided `algorithm`. :param ciphertext: The encrypted string to be decrypted. It is expected to be Base64 encoded. :param key: The secret key used for decryption. Must match the key used for encryption and be appropriate for the specified algorithm. :param algorithm: An instance of the EncryptionAlgorithm enum indicating the encryption algorithm to use. Default EncryptionAlgorithm.AES_GCM :param associated_data: Additional authenticated data (AAD) to be authenticated but not encrypted. It must be provided as a byte string if used. :return: The decrypted plaintext string. Returns an empty string and logs a warning if decryption fails. """ # Try auto-detect alg first then fall back to user provided alg if ciphertext.startswith(EncryptionAlgorithm.AES_GCM.value): return self._decrypt_aes_gcm(ciphertext=ciphertext, key=key, aad=associated_data) if ciphertext.startswith(EncryptionAlgorithm.AES_256.value): return self._decrypt_aes256(ciphertext=ciphertext, key=key) if ciphertext.startswith(EncryptionAlgorithm.AES_128.value): return self._decrypt_aes128(ciphertext=ciphertext, fernet_key=key) # manual selection if algorithm is EncryptionAlgorithm.AES_GCM: return self._decrypt_aes_gcm(ciphertext=ciphertext, key=key, aad=associated_data) elif algorithm is EncryptionAlgorithm.AES_256: return self._decrypt_aes256(ciphertext=ciphertext, key=key) elif algorithm is EncryptionAlgorithm.AES_128: return self._decrypt_aes128(ciphertext=ciphertext, fernet_key=key) else: raise ValueError(f"algorthm must be one of the following {list(EncryptionAlgorithm)}")
[docs] def re_encrypt_string( self, ciphertext_to_re_encrypt: str, old_key: bytes, new_key: bytes, old_algorithm: EncryptionAlgorithm = EncryptionAlgorithm.AES_256, new_algorithm: EncryptionAlgorithm = EncryptionAlgorithm.AES_256, ) -> str: """ Re-encrypts data, transitioning it from an old key to a new key. This function first decrypts the given encrypted data with the old key, then re-encrypts it using the new key. It supports key rotation and the update of the encryption scheme for securely stored data. Ideal for key rotation or encryption scheme updates, ensuring data remains secure during key transitions or algorithm updates. :param ciphertext_to_re_encrypt: Encrypted data with old key. :param old_key: The old encryption key. :param new_key: The new encryption key for re-encryption. :param old_algorithm: An instance of the EncryptionAlgorithm enum indicating the encryption algorithm to use. Default EncryptionAlgorithm.AES_256 :param new_algorithm: An instance of the EncryptionAlgorithm enum indicating the encryption algorithm to use. Default EncryptionAlgorithm.AES_256 :return: Re-encrypted data as a string with the new key. """ # Decrypt data with old key plaintext = self.decrypt_string( ciphertext_to_re_encrypt, old_key, old_algorithm, ) # Encrypt the data with the new key ciphertext_re_encrypted = self.encrypt_string( plaintext, new_key, new_algorithm, ) return ciphertext_re_encrypted
[docs] def create_token( self, length: int, validity_secs: Union[int, float], now_epoch: float, key: bytes, population: Sequence = (), ) -> dict[str, Union[str, float]]: """ Generates a secure token and its expiry time. The token is encrypted with a given key to produce a cipher token. :param length: Length of the random token. :param validity_secs: Time in seconds until the token expires. :param key: The secret key used for encryption. Must be a 32-byte key for AES-256. :param now_epoch: Current time in seconds since the epoch. :param population: Lets you define a different set of characters that the token can be composed of. Default letters and digits :raise ValueError: If length is greater than 62. :return: A dictionary with 'token', 'expiry', and 'ciphertoken' keys. """ alphabet = string.ascii_letters + string.digits population = population or alphabet random_string = "".join(secrets.choice(population) for _ in range(length)) random_string_hashed = self.hash_string_v2(raw_string=random_string) random_string_hashed_base64 = base64.urlsafe_b64encode(random_string_hashed.encode()) expires = float(now_epoch) + float(validity_secs) expires_base64 = base64.urlsafe_b64encode(str(expires).encode()) combined = random_string_hashed_base64 + b"&" + expires_base64 combined_encrypted = self.encrypt_string(plaintext=combined.decode(), key=key) response: dict[str, Union[str, float]] = { 'token': random_string, 'expiry': expires, 'ciphertoken': combined_encrypted, } return response
[docs] def verify_token( self, token: str, ciphertoken: str, now_epoch: float, key: bytes ) -> dict[str, Union[str, float, bool]]: """ Verifies the validity of the given token by decrypting the cipher token using the provided key, and checks if it's expired based on the current time and the expiry time embedded in the cipher token. :param token: The original token to be validated. :param ciphertoken: The encrypted string containing the token and expiry. :param now_epoch: Current time in seconds since the epoch. :param key: The secret key used for encryption. Must be a 32-byte key for AES-256. :return: A dictionary with the keys 'token_valid', 'token', 'expiry', and possibly 'response_info' if an error occurs. """ # Initializing code validity response and override if True response: dict[str, Union[str, float, bool]] = { 'token_valid': False, 'token': "", 'expiry': 0.0, } combined = self.decrypt_string(ciphertext=ciphertoken, key=key).encode() try: random_string_hashed_base64, expiry_base64 = combined.split(b"&") random_string_hashed = base64.urlsafe_b64decode(random_string_hashed_base64).decode() expiry = float(base64.urlsafe_b64decode(expiry_base64).decode()) response.update(token=token, expiry=expiry) except Exception as ex: response.update(response_info=repr(ex)) return response if float(now_epoch) < expiry: confirmation_code_valid = self.validate_hash_match_v2( raw_string=token, hashed_to_match=random_string_hashed ) response.update(token_valid=confirmation_code_valid) else: response.update(response_info="Token expired") return response
def _encrypt_aes128(self, plaintext: str, fernet_key: bytes) -> str: """ Encrypts a string using Fernet symmetric encryption. This function encrypts a given string with Fernet, encoding the input to bytes, then encrypting. :param plaintext: The plaintext string to be encrypted. :param fernet_key: The secret key used for encryption. Must be a URL-safe base64-encoded 32-byte key. :return: The encrypted string, encoded with Base64 to ensure the encrypted data is text-safe. """ # Ensure the Fernet key length is valid # For a bytes_length of 32, the length of the Base64-encoded # string without stripping padding would be 44 characters. self._ensure_len( key=fernet_key, n=44, exc_msg="Fernet key must be a URL-safe base64-encoded and 32-byte key", ) cipher_suite = cryptography.fernet.Fernet(key=fernet_key) b_string_to_encrypt = plaintext.encode() b_encrypted = cipher_suite.encrypt(b_string_to_encrypt) s_encrypted_clean = b_encrypted.decode() # Add alg info prefix alg_prefix = EncryptionAlgorithm.AES_128.value s_encrypted_clean_with_alg = f"{alg_prefix}{s_encrypted_clean}" return s_encrypted_clean_with_alg def _decrypt_aes128(self, ciphertext: str, fernet_key: bytes) -> str: """ Decrypts a string previously encrypted by _encrypt_string_aes128 Attempts to decrypt the provided string using the given Fernet key. If decryption fails, it catches the exception, logs a warning, and returns an empty string to indicate failure. :param ciphertext: The encrypted string to be decrypted. :param fernet_key: The secret key used for decryption. Must match the key used for encryption. :return: The decrypted plaintext string. Returns an empty string and logs a warning if decryption fails. """ # Ensure the Fernet key length is valid # For a bytes_length of 32, the length of the Base64-encoded # string without stripping padding would be 44 characters. self._ensure_len( key=fernet_key, n=44, exc_msg="Fernet key must be a URL-safe base64-encoded and 32-byte key", ) ciphertext = self._get_encrypted_ciphertext(ciphertext=ciphertext) cipher_suite = cryptography.fernet.Fernet(key=fernet_key) b_encrypted = ciphertext.encode() try: b_decrypted = cipher_suite.decrypt(b_encrypted) s_decrypted = b_decrypted.decode() return s_decrypted except Exception as ex: self.logger.warning(f"Decryption failed w/ error: {ex}") return "" def _encrypt_aes256(self, plaintext: str, key: bytes) -> str: """ Encrypts a string using AES-256 symmetric encryption in CBC mode and adds an HMAC for message authentication. The function performs AES-256 encryption on the input string with a given key, then computes an HMAC signature of the encrypted data for verification. This approach provides both confidentiality and integrity/authentication of the message. :param plaintext: The plaintext string to be encrypted. :param key: The secret key used for encryption. Must be a 32-byte key for AES-256. :return: The encrypted string, encoded with Base64 to ensure the encrypted data is text-safe. """ # Ensure the AES key length is valid for AES-256 self._ensure_len(key=key, n=32, exc_msg="AES-256 must be 32-byte key") # Generate a random salt used to randomizes the KDF’s output # Used to derive 2 keys, one for encryption and one for signing salt = self.create_key(KeyType.AES256, KeyOutputType.BYTES) # Encode the salt to url-safe byte array salt_base64 = base64.urlsafe_b64encode(salt) # Derive keys from master key aes_key, hash_key = self._derive_key_hkdf(salt, key) # Generate a random 16-byte IV (Initialization Vector) iv = self.create_key(KeyType.INITIALIZATION_VECTOR, KeyOutputType.BYTES) b_plaintext = plaintext.encode() # Pad the input string to ensure it's a multiple of block size padder = cryptography.hazmat.primitives.padding.PKCS7( cryptography.hazmat.primitives.ciphers.algorithms.AES.block_size ).padder() b_plaintext_padded = padder.update(b_plaintext) + padder.finalize() # Create a Cipher object for AES-256 in CBC mode cipher = cryptography.hazmat.primitives.ciphers.Cipher( cryptography.hazmat.primitives.ciphers.algorithms.AES(aes_key), cryptography.hazmat.primitives.ciphers.modes.CBC(iv), ) # Encrypt the padded data encryptor = cipher.encryptor() b_ciphertext = encryptor.update(b_plaintext_padded) + encryptor.finalize() # The IV needs to be stored with the ciphertext for decryption b_ciphertext_iv = iv + b_ciphertext # Sign the encrypted data with IV b_ciphertext_signed_base64 = self.sign(b_ciphertext_iv, hash_key) # Add the salt used to derive the 2 keys to the data so that the # decrypt function can derive the same keys again b_ciphertext_signed_salt = b_ciphertext_signed_base64 + b"&" + salt_base64 # Encode base64 to remove the & ciphertext = self._b64u_encode(b_ciphertext_signed_salt) # Add alg info prefix alg_prefix = EncryptionAlgorithm.AES_256.value ciphertext = f"{alg_prefix}{ciphertext}" return ciphertext def _decrypt_aes256(self, ciphertext: str, key: bytes) -> str: """ Decrypts a string that was encrypted using AES-256 symmetric encryption in CBC mode and verifies its HMAC signature. This function attempts to decrypt a provided string using a specified AES key. Before decryption, it verifies the HMAC to ensure the message's integrity and authenticity. If the HMAC verification fails, the function logs a warning and returns an empty string, indicating a potential tampering or authenticity issue. :param ciphertext: The encrypted string to be decrypted. :param key: The secret key used for encryption. Must be a 32-byte key for AES-256. :return: The decrypted plaintext string. Returns an empty string and logs a warning if decryption fails. """ # Ensure the AES key length is valid for AES-256 self._ensure_len(key=key, n=32, exc_msg="AES-256 must be 32-byte key") ciphertext = self._get_encrypted_ciphertext(ciphertext=ciphertext) # Decode base64 to reveal the & b_ciphertext_signed_salt = self._b64u_decode(ciphertext) # Split the data and salt at the & b_ciphertext_signed_base64, salt_base64 = b_ciphertext_signed_salt.split(b"&") # Decode the salt to bytes salt = base64.urlsafe_b64decode(salt_base64) # Derive keys from master key aes_key, hash_key = self._derive_key_hkdf(salt, key) # Verify signature response = self.verify_signature(b_ciphertext_signed_base64, hash_key) if not response['signature_valid']: return "" # If verification is successful get the data b_ciphertext_iv = response['data'] assert isinstance(b_ciphertext_iv, bytes) # Extract the IV (Initialization Vector) and the encrypted # string iv = b_ciphertext_iv[:16] b_ciphertext = b_ciphertext_iv[16:] # Create a Cipher object for AES-256 in CBC mode cipher = cryptography.hazmat.primitives.ciphers.Cipher( cryptography.hazmat.primitives.ciphers.algorithms.AES(aes_key), cryptography.hazmat.primitives.ciphers.modes.CBC(iv), ) # Decrypt the data decryptor = cipher.decryptor() b_plaintext_padded = decryptor.update(b_ciphertext) + decryptor.finalize() # Remove padding unpadder = cryptography.hazmat.primitives.padding.PKCS7( cryptography.hazmat.primitives.ciphers.algorithms.AES.block_size ).unpadder() b_plaintext = unpadder.update(b_plaintext_padded) + unpadder.finalize() # Decode plaintext = b_plaintext.decode() return plaintext def _encrypt_aes_gcm(self, plaintext: str, key: bytes, aad: Optional[bytes] = None) -> str: """ Encrypts a string using AES-GCM symmetric encryption. :param plaintext: The plaintext string to be encrypted. :param key: The secret key used for encryption. Must be a 32-byte key. :param aad: Additional authenticated data (AAD) to be authenticated but not encrypted. It must be provided as a byte string if used. :return: The encrypted string, encoded with Base64 to ensure the encrypted data is text-safe. """ # Ensure the AES key length is valid for AES-GCM self._ensure_len(key=key, n=32, exc_msg="AES-GCM must be 32-byte key") # 96-bit recommended nonce = self._rand_bytes(12) aead = cryptography.hazmat.primitives.ciphers.aead.AESGCM(key) ct = aead.encrypt(nonce=nonce, data=plaintext.encode(), associated_data=aad) # Envelope: nonce || ct (ct already includes tag) blob = self._b64u_encode(nonce + ct) # Add alg info prefix alg_prefix = EncryptionAlgorithm.AES_GCM.value encrypted = f"{alg_prefix}{blob}" return encrypted def _decrypt_aes_gcm(self, ciphertext: str, key: bytes, aad: Optional[bytes] = None) -> str: """ Decrypts a string that was encrypted using AES-GCM symmetric encryption. :param ciphertext: The encrypted string to be decrypted. :param key: The secret key used for encryption. Must be a 32-byte key. :param aad: Additional authenticated data (AAD) that was authenticated but not encrypted. It must be provided as a byte string if used. :return: The decrypted plaintext string. """ # Ensure the AES key length is valid for AES-GCM self._ensure_len(key=key, n=32, exc_msg="AES-GCM must be 32-byte key") ciphertext = self._get_encrypted_ciphertext(ciphertext=ciphertext) blob = self._b64u_decode(ciphertext) nonce, ct = blob[:12], blob[12:] aead = cryptography.hazmat.primitives.ciphers.aead.AESGCM(key) try: pt = aead.decrypt(nonce=nonce, data=ct, associated_data=aad) except Exception as ex: self.logger.warning(f"GCM decryption failed (bad tag/AAD/nonce): {ex}") return "" decrypted = pt.decode() return decrypted def _derive_key_hkdf(self, salt: bytes, key_material: bytes) -> tuple[bytes, bytes]: """ Derives two distinct keys (an AES encryption key and a hash key) from a given master key. This function uses the HMAC-based Key Derivation Function (HKDF) with SHA-256 hash algorithm to derive two separate 32-byte keys from the provided master key. A 16-bytes salt is use in the HKDF, ensuring the uniqueness of the derived keys even when the same master key is used. The 'info' parameter is utilized to differentiate the purpose of each derived key. :param salt: A byte string used to salt the key derivation to prevent rainbow table attacks. The salt should be unique for each credential to be protected but does not need to be kept secret. :param key_material: The master key from which the AES and hash keys are derived. :return: A tuple containing two bytes objects: the AES key and the hash key. """ # Derive the AES encryption key aes_key_hkdf = cryptography.hazmat.primitives.kdf.hkdf.HKDF( algorithm=cryptography.hazmat.primitives.hashes.SHA256(), length=32, salt=salt, info=b"aes-key", ) aes_key = aes_key_hkdf.derive(key_material) self.logger.debug("AES key derived successfully") # Derive the hash key hash_key_hkdf = cryptography.hazmat.primitives.kdf.hkdf.HKDF( algorithm=cryptography.hazmat.primitives.hashes.SHA256(), length=32, salt=salt, info=b"hash-key", ) hash_key = hash_key_hkdf.derive(key_material) self.logger.debug("Hashing key derived successfully") return aes_key, hash_key def _get_encrypted_ciphertext(self, ciphertext: str) -> str: """ Extracts the encrypted ciphertext from the input string. :param ciphertext: The input string containing the encrypted ciphertext and other information. :return: The encrypted ciphertext as a string. """ self.logger.debug("Getting encrypted ciphertext") # Remove alg info prefix # Using if for older stored values for alg in EncryptionAlgorithm.all_alg_prefixes(): if ciphertext.startswith(alg): start = len(alg) ciphertext = ciphertext[start:] break # Correct the Base64 padding if necessary missing_padding = len(ciphertext) % 4 if missing_padding: ciphertext += "=" * (4 - missing_padding) return ciphertext def _b64u_encode(self, b: bytes) -> str: """ Encodes a byte string to a Base64 URL-safe string. :param b: The byte string to be encoded. :return: The Base64 URL-safe encoded string. """ encoded = base64.urlsafe_b64encode(b).decode() return encoded def _b64u_decode(self, s: str) -> bytes: """ Decodes a Base64 URL-safe encoded string to a byte string. :param s: The Base64 URL-safe encoded string to be decoded. :return: The decoded byte string. """ decoded = base64.urlsafe_b64decode(s.encode()) return decoded def _rand_bytes(self, n: int) -> bytes: """ Generates cryptographically secure random bytes. :param n: The number of random bytes to generate. :return: A byte string containing the generated random bytes. """ self.logger.debug(f"Generating {n} random bytes") b = os.urandom(n) return b def _ensure_len(self, key: bytes, n: int, exc_msg: str) -> None: """ Ensures that a byte string is of a specific length. :param key: The byte string to be checked. :param n: The expected length of the byte string. :param exc_msg: The exception message to be raised if the length check fails. :raise: ValueError is raised if the byte string's length does not match the expected length. """ self.logger.debug(f"Ensuring key length is {n} bytes") if len(key) != n: self.logger.error(exc_msg) raise ValueError(exc_msg)