import base64
from typing import Union, Dict, Tuple, Optional, Any
from cryptography.hazmat.primitives.asymmetric import padding
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.backends import default_backend
from cryptography.exceptions import InvalidSignature
from cryptography.fernet import Fernet
import json
class CryptUtility:
def __init__(self):
self.private_key = None
self.public_key = None
def load_private_key(self, private_key_data: Union[str, bytes], password: Optional[Union[str, bytes]] = None) -> Any:
"""
Load a private key from PEM formatted data.
Args:
private_key_data: PEM encoded private key data
password: Optional password if the key is encrypted
Returns:
The loaded private key object
"""
if isinstance(private_key_data, str):
private_key_data = private_key_data.encode()
self.private_key = serialization.load_pem_private_key(
private_key_data,
password=password,
backend=default_backend()
)
self.public_key = self.private_key.public_key()
return self.private_key
def load_public_key(self, public_key_data: Union[str, bytes]) -> Any:
"""
Load a public key from PEM formatted data.
Args:
public_key_data: PEM encoded public key data
Returns:
The loaded public key object
"""
if isinstance(public_key_data, str):
public_key_data = public_key_data.encode()
self.public_key = serialization.load_pem_public_key(
public_key_data,
backend=default_backend()
)
return self.public_key
def decrypt(self, encrypted_package: Union[str, bytes], private_key: Any = None) -> str:
"""
Decrypt an encrypted package using hybrid encryption.
Args:
encrypted_package: The encrypted package containing the encrypted key and message
private_key: The private key to use for decryption (uses self.private_key if None)
Returns:
The decrypted message as a string
"""
if isinstance(encrypted_package, str):
package = json.loads(base64.b64decode(
encrypted_package).decode('utf-8'))
else:
package = json.loads(
encrypted_package.decode('utf-8'))
key_to_use = private_key if private_key is not None else self.private_key
if key_to_use is None:
raise ValueError(
"No private key available. Generate or load one first.")
encrypted_key = base64.b64decode(package["key"])
encrypted_message = base64.b64decode(package["message"])
key = key_to_use.decrypt(
encrypted_key,
padding.OAEP(
mgf=padding.MGF1(algorithm=hashes.SHA256()),
algorithm=hashes.SHA256(),
label=None
)
)
f = Fernet(key)
decrypted_message = f.decrypt(encrypted_message)
return decrypted_message.decode('utf-8')
def verify(self, message: Union[str, bytes], signature: Union[str, bytes], public_key: Any = None) -> bool:
"""
Verify a signature for a message.
Args:
message: The message to verify
signature: The signature to verify
public_key: The public key to use for verification (uses self.public_key if None)
Returns:
True if the signature is valid, False otherwise
"""
if isinstance(message, str):
message = message.encode()
if isinstance(signature, str):
signature = base64.b64decode(signature)
key = public_key if public_key is not None else self.public_key
if key is None:
raise ValueError(
"No public key available. Generate or load one first.")
try:
key.verify(
signature,
message,
padding.PSS(
mgf=padding.MGF1(hashes.SHA256()),
salt_length=padding.PSS.MAX_LENGTH
),
hashes.SHA256()
)
return True
except InvalidSignature:
return False
def verify_secure_message(self, secure_message: Dict[str, str], sender_public_key: Any) -> Tuple[str, bool]:
"""
Verify and decrypt a secure message.
Args:
secure_message: The secure message package with ciphertext and signature
sender_public_key: The sender's public key
Returns:
tuple: (decrypted_message, is_authentic)
"""
if self.private_key is None:
raise ValueError("Private key required to decrypt the message")
decrypted_message = self.decrypt(secure_message["encrypted_message"])
is_authentic = self.verify(
decrypted_message,
secure_message["signature"],
public_key=sender_public_key
)
return decrypted_message, is_authentic