Move SymmetricEncryption to corelibs_encryption module
This commit is contained in:
@@ -9,6 +9,7 @@ dependencies = [
|
|||||||
"corelibs-datetime>=1.0.1",
|
"corelibs-datetime>=1.0.1",
|
||||||
"corelibs-debug>=1.0.0",
|
"corelibs-debug>=1.0.0",
|
||||||
"corelibs-dump-data>=1.0.0",
|
"corelibs-dump-data>=1.0.0",
|
||||||
|
"corelibs-encryption>=1.0.0",
|
||||||
"corelibs-enum-base>=1.0.0",
|
"corelibs-enum-base>=1.0.0",
|
||||||
"corelibs-file>=1.0.0",
|
"corelibs-file>=1.0.0",
|
||||||
"corelibs-regex-checks>=1.0.0",
|
"corelibs-regex-checks>=1.0.0",
|
||||||
|
|||||||
@@ -4,24 +4,11 @@ Will be moved to CoreLibs
|
|||||||
TODO: set key per encryption run
|
TODO: set key per encryption run
|
||||||
"""
|
"""
|
||||||
|
|
||||||
import os
|
import warnings
|
||||||
import json
|
from corelibs_encryption.symmetric import SymmetricEncryption as CorelibsSymmetricEncryption
|
||||||
import base64
|
|
||||||
import hashlib
|
|
||||||
from typing import TypedDict, cast
|
|
||||||
from cryptography.fernet import Fernet
|
|
||||||
from cryptography.hazmat.primitives import hashes
|
|
||||||
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
|
|
||||||
|
|
||||||
|
|
||||||
class PackageData(TypedDict):
|
class SymmetricEncryption(CorelibsSymmetricEncryption):
|
||||||
"""encryption package"""
|
|
||||||
encrypted_data: str
|
|
||||||
salt: str
|
|
||||||
key_hash: str
|
|
||||||
|
|
||||||
|
|
||||||
class SymmetricEncryption:
|
|
||||||
"""
|
"""
|
||||||
simple encryption
|
simple encryption
|
||||||
|
|
||||||
@@ -29,124 +16,7 @@ class SymmetricEncryption:
|
|||||||
key from the password to decrypt
|
key from the password to decrypt
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self, password: str):
|
|
||||||
if not password:
|
|
||||||
raise ValueError("A password must be set")
|
|
||||||
self.password = password
|
|
||||||
self.password_hash = hashlib.sha256(password.encode('utf-8')).hexdigest()
|
|
||||||
|
|
||||||
def __derive_key_from_password(self, password: str, salt: bytes) -> bytes:
|
warnings.warn("Use corelibs_encryption.symmetric.SymmetricEncryption instead", DeprecationWarning, stacklevel=2)
|
||||||
_password = password.encode('utf-8')
|
|
||||||
kdf = PBKDF2HMAC(
|
|
||||||
algorithm=hashes.SHA256(),
|
|
||||||
length=32,
|
|
||||||
salt=salt,
|
|
||||||
iterations=100000,
|
|
||||||
)
|
|
||||||
key = base64.urlsafe_b64encode(kdf.derive(_password))
|
|
||||||
return key
|
|
||||||
|
|
||||||
def __encrypt_with_metadata(self, data: str | bytes) -> PackageData:
|
|
||||||
"""Encrypt data and include salt if password-based"""
|
|
||||||
# convert to bytes (for encoding)
|
|
||||||
if isinstance(data, str):
|
|
||||||
data = data.encode('utf-8')
|
|
||||||
|
|
||||||
# generate salt and key from password
|
|
||||||
salt = os.urandom(16)
|
|
||||||
key = self.__derive_key_from_password(self.password, salt)
|
|
||||||
# init the cypher suit
|
|
||||||
cipher_suite = Fernet(key)
|
|
||||||
|
|
||||||
encrypted_data = cipher_suite.encrypt(data)
|
|
||||||
|
|
||||||
# If using password, include salt in the result
|
|
||||||
return {
|
|
||||||
'encrypted_data': base64.urlsafe_b64encode(encrypted_data).decode('utf-8'),
|
|
||||||
'salt': base64.urlsafe_b64encode(salt).decode('utf-8'),
|
|
||||||
'key_hash': hashlib.sha256(key).hexdigest()
|
|
||||||
}
|
|
||||||
|
|
||||||
def encrypt_with_metadata(self, data: str | bytes, return_as: str = 'str') -> str | bytes | PackageData:
|
|
||||||
"""encrypt with metadata, but returns data in string"""
|
|
||||||
match return_as:
|
|
||||||
case 'str':
|
|
||||||
return self.encrypt_with_metadata_return_str(data)
|
|
||||||
case 'json':
|
|
||||||
return self.encrypt_with_metadata_return_str(data)
|
|
||||||
case 'bytes':
|
|
||||||
return self.encrypt_with_metadata_return_bytes(data)
|
|
||||||
case 'dict':
|
|
||||||
return self.encrypt_with_metadata_return_dict(data)
|
|
||||||
case _:
|
|
||||||
# default is string json
|
|
||||||
return self.encrypt_with_metadata_return_str(data)
|
|
||||||
|
|
||||||
def encrypt_with_metadata_return_dict(self, data: str | bytes) -> PackageData:
|
|
||||||
"""encrypt with metadata, but returns data as PackageData dict"""
|
|
||||||
return self.__encrypt_with_metadata(data)
|
|
||||||
|
|
||||||
def encrypt_with_metadata_return_str(self, data: str | bytes) -> str:
|
|
||||||
"""encrypt with metadata, but returns data in string"""
|
|
||||||
return json.dumps(self.__encrypt_with_metadata(data))
|
|
||||||
|
|
||||||
def encrypt_with_metadata_return_bytes(self, data: str | bytes) -> bytes:
|
|
||||||
"""encrypt with metadata, but returns data in bytes"""
|
|
||||||
return json.dumps(self.__encrypt_with_metadata(data)).encode('utf-8')
|
|
||||||
|
|
||||||
def decrypt_with_metadata(self, encrypted_package: str | bytes | PackageData, password: str | None = None) -> str:
|
|
||||||
"""Decrypt data that may include metadata"""
|
|
||||||
try:
|
|
||||||
# Try to parse as JSON (password-based encryption)
|
|
||||||
if isinstance(encrypted_package, bytes):
|
|
||||||
package_data = cast(PackageData, json.loads(encrypted_package.decode('utf-8')))
|
|
||||||
elif isinstance(encrypted_package, str):
|
|
||||||
package_data = cast(PackageData, json.loads(str(encrypted_package)))
|
|
||||||
else:
|
|
||||||
package_data = encrypted_package
|
|
||||||
|
|
||||||
encrypted_data = base64.urlsafe_b64decode(package_data['encrypted_data'])
|
|
||||||
salt = base64.urlsafe_b64decode(package_data['salt'])
|
|
||||||
pwd = password or self.password
|
|
||||||
key = self.__derive_key_from_password(pwd, salt)
|
|
||||||
if package_data['key_hash'] != hashlib.sha256(key).hexdigest():
|
|
||||||
raise ValueError("Key hash is not matching, possible invalid password")
|
|
||||||
cipher_suite = Fernet(key)
|
|
||||||
decrypted_data = cipher_suite.decrypt(encrypted_data)
|
|
||||||
|
|
||||||
except (json.JSONDecodeError, KeyError, UnicodeDecodeError) as e:
|
|
||||||
raise ValueError(f"Invalid encrypted package format {e}") from e
|
|
||||||
|
|
||||||
return decrypted_data.decode('utf-8')
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def encrypt_data(data: str | bytes, password: str) -> str:
|
|
||||||
"""
|
|
||||||
Static method to encrypt some data
|
|
||||||
|
|
||||||
Arguments:
|
|
||||||
data {str | bytes} -- _description_
|
|
||||||
password {str} -- _description_
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
str -- _description_
|
|
||||||
"""
|
|
||||||
encryptor = SymmetricEncryption(password)
|
|
||||||
return encryptor.encrypt_with_metadata_return_str(data)
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def decrypt_data(data: str | bytes | PackageData, password: str) -> str:
|
|
||||||
"""
|
|
||||||
Static method to decrypt some data
|
|
||||||
|
|
||||||
Arguments:
|
|
||||||
data {str | bytes | PackageData} -- _description_
|
|
||||||
password {str} -- _description_
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
str -- _description_
|
|
||||||
"""
|
|
||||||
decryptor = SymmetricEncryption(password)
|
|
||||||
return decryptor.decrypt_with_metadata(data, password=password)
|
|
||||||
|
|
||||||
# __END__
|
# __END__
|
||||||
|
|||||||
@@ -1,3 +0,0 @@
|
|||||||
"""
|
|
||||||
Unit tests for encryption_handling module
|
|
||||||
"""
|
|
||||||
@@ -1,665 +0,0 @@
|
|||||||
"""
|
|
||||||
PyTest: encryption_handling/symmetric_encryption
|
|
||||||
"""
|
|
||||||
# pylint: disable=redefined-outer-name
|
|
||||||
# ^ Disabled because pytest fixtures intentionally redefine names
|
|
||||||
|
|
||||||
import os
|
|
||||||
import json
|
|
||||||
import base64
|
|
||||||
import hashlib
|
|
||||||
import pytest
|
|
||||||
from corelibs.encryption_handling.symmetric_encryption import (
|
|
||||||
SymmetricEncryption
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
class TestSymmetricEncryptionInitialization:
|
|
||||||
"""Tests for SymmetricEncryption initialization"""
|
|
||||||
|
|
||||||
def test_valid_password_initialization(self):
|
|
||||||
"""Test initialization with a valid password"""
|
|
||||||
encryptor = SymmetricEncryption("test_password")
|
|
||||||
assert encryptor.password == "test_password"
|
|
||||||
assert encryptor.password_hash == hashlib.sha256("test_password".encode('utf-8')).hexdigest()
|
|
||||||
|
|
||||||
def test_empty_password_raises_error(self):
|
|
||||||
"""Test that empty password raises ValueError"""
|
|
||||||
with pytest.raises(ValueError, match="A password must be set"):
|
|
||||||
SymmetricEncryption("")
|
|
||||||
|
|
||||||
def test_password_hash_is_consistent(self):
|
|
||||||
"""Test that password hash is consistently generated"""
|
|
||||||
encryptor1 = SymmetricEncryption("test_password")
|
|
||||||
encryptor2 = SymmetricEncryption("test_password")
|
|
||||||
assert encryptor1.password_hash == encryptor2.password_hash
|
|
||||||
|
|
||||||
def test_different_passwords_different_hashes(self):
|
|
||||||
"""Test that different passwords produce different hashes"""
|
|
||||||
encryptor1 = SymmetricEncryption("password1")
|
|
||||||
encryptor2 = SymmetricEncryption("password2")
|
|
||||||
assert encryptor1.password_hash != encryptor2.password_hash
|
|
||||||
|
|
||||||
|
|
||||||
class TestEncryptWithMetadataReturnDict:
|
|
||||||
"""Tests for encrypt_with_metadata_return_dict method"""
|
|
||||||
|
|
||||||
def test_encrypt_string_returns_package_data(self):
|
|
||||||
"""Test encrypting a string returns PackageData dict"""
|
|
||||||
encryptor = SymmetricEncryption("test_password")
|
|
||||||
result = encryptor.encrypt_with_metadata_return_dict("test data")
|
|
||||||
|
|
||||||
assert isinstance(result, dict)
|
|
||||||
assert 'encrypted_data' in result
|
|
||||||
assert 'salt' in result
|
|
||||||
assert 'key_hash' in result
|
|
||||||
|
|
||||||
def test_encrypt_bytes_returns_package_data(self):
|
|
||||||
"""Test encrypting bytes returns PackageData dict"""
|
|
||||||
encryptor = SymmetricEncryption("test_password")
|
|
||||||
result = encryptor.encrypt_with_metadata_return_dict(b"test data")
|
|
||||||
|
|
||||||
assert isinstance(result, dict)
|
|
||||||
assert 'encrypted_data' in result
|
|
||||||
assert 'salt' in result
|
|
||||||
assert 'key_hash' in result
|
|
||||||
|
|
||||||
def test_encrypted_data_is_base64_encoded(self):
|
|
||||||
"""Test that encrypted_data is base64 encoded"""
|
|
||||||
encryptor = SymmetricEncryption("test_password")
|
|
||||||
result = encryptor.encrypt_with_metadata_return_dict("test data")
|
|
||||||
|
|
||||||
# Should not raise exception when decoding
|
|
||||||
base64.urlsafe_b64decode(result['encrypted_data'])
|
|
||||||
|
|
||||||
def test_salt_is_base64_encoded(self):
|
|
||||||
"""Test that salt is base64 encoded"""
|
|
||||||
encryptor = SymmetricEncryption("test_password")
|
|
||||||
result = encryptor.encrypt_with_metadata_return_dict("test data")
|
|
||||||
|
|
||||||
# Should not raise exception when decoding
|
|
||||||
salt = base64.urlsafe_b64decode(result['salt'])
|
|
||||||
# Salt should be 16 bytes
|
|
||||||
assert len(salt) == 16
|
|
||||||
|
|
||||||
def test_key_hash_is_valid_hex(self):
|
|
||||||
"""Test that key_hash is a valid hex string"""
|
|
||||||
encryptor = SymmetricEncryption("test_password")
|
|
||||||
result = encryptor.encrypt_with_metadata_return_dict("test data")
|
|
||||||
|
|
||||||
# Should be 64 characters (SHA256 hex)
|
|
||||||
assert len(result['key_hash']) == 64
|
|
||||||
# Should only contain hex characters
|
|
||||||
int(result['key_hash'], 16)
|
|
||||||
|
|
||||||
def test_different_salts_for_each_encryption(self):
|
|
||||||
"""Test that each encryption uses a different salt"""
|
|
||||||
encryptor = SymmetricEncryption("test_password")
|
|
||||||
result1 = encryptor.encrypt_with_metadata_return_dict("test data")
|
|
||||||
result2 = encryptor.encrypt_with_metadata_return_dict("test data")
|
|
||||||
|
|
||||||
assert result1['salt'] != result2['salt']
|
|
||||||
assert result1['encrypted_data'] != result2['encrypted_data']
|
|
||||||
|
|
||||||
|
|
||||||
class TestEncryptWithMetadataReturnStr:
|
|
||||||
"""Tests for encrypt_with_metadata_return_str method"""
|
|
||||||
|
|
||||||
def test_returns_json_string(self):
|
|
||||||
"""Test that method returns a valid JSON string"""
|
|
||||||
encryptor = SymmetricEncryption("test_password")
|
|
||||||
result = encryptor.encrypt_with_metadata_return_str("test data")
|
|
||||||
|
|
||||||
assert isinstance(result, str)
|
|
||||||
# Should be valid JSON
|
|
||||||
parsed = json.loads(result)
|
|
||||||
assert 'encrypted_data' in parsed
|
|
||||||
assert 'salt' in parsed
|
|
||||||
assert 'key_hash' in parsed
|
|
||||||
|
|
||||||
def test_json_string_parseable(self):
|
|
||||||
"""Test that returned JSON string can be parsed back"""
|
|
||||||
encryptor = SymmetricEncryption("test_password")
|
|
||||||
result = encryptor.encrypt_with_metadata_return_str("test data")
|
|
||||||
|
|
||||||
parsed = json.loads(result)
|
|
||||||
assert isinstance(parsed, dict)
|
|
||||||
|
|
||||||
|
|
||||||
class TestEncryptWithMetadataReturnBytes:
|
|
||||||
"""Tests for encrypt_with_metadata_return_bytes method"""
|
|
||||||
|
|
||||||
def test_returns_bytes(self):
|
|
||||||
"""Test that method returns bytes"""
|
|
||||||
encryptor = SymmetricEncryption("test_password")
|
|
||||||
result = encryptor.encrypt_with_metadata_return_bytes("test data")
|
|
||||||
|
|
||||||
assert isinstance(result, bytes)
|
|
||||||
|
|
||||||
def test_bytes_contains_valid_json(self):
|
|
||||||
"""Test that returned bytes contain valid JSON"""
|
|
||||||
encryptor = SymmetricEncryption("test_password")
|
|
||||||
result = encryptor.encrypt_with_metadata_return_bytes("test data")
|
|
||||||
|
|
||||||
# Should be valid JSON when decoded
|
|
||||||
parsed = json.loads(result.decode('utf-8'))
|
|
||||||
assert 'encrypted_data' in parsed
|
|
||||||
assert 'salt' in parsed
|
|
||||||
assert 'key_hash' in parsed
|
|
||||||
|
|
||||||
|
|
||||||
class TestEncryptWithMetadata:
|
|
||||||
"""Tests for encrypt_with_metadata method with different return types"""
|
|
||||||
|
|
||||||
def test_return_as_str(self):
|
|
||||||
"""Test encrypt_with_metadata with return_as='str'"""
|
|
||||||
encryptor = SymmetricEncryption("test_password")
|
|
||||||
result = encryptor.encrypt_with_metadata("test data", return_as='str')
|
|
||||||
|
|
||||||
assert isinstance(result, str)
|
|
||||||
json.loads(result) # Should be valid JSON
|
|
||||||
|
|
||||||
def test_return_as_json(self):
|
|
||||||
"""Test encrypt_with_metadata with return_as='json'"""
|
|
||||||
encryptor = SymmetricEncryption("test_password")
|
|
||||||
result = encryptor.encrypt_with_metadata("test data", return_as='json')
|
|
||||||
|
|
||||||
assert isinstance(result, str)
|
|
||||||
json.loads(result) # Should be valid JSON
|
|
||||||
|
|
||||||
def test_return_as_bytes(self):
|
|
||||||
"""Test encrypt_with_metadata with return_as='bytes'"""
|
|
||||||
encryptor = SymmetricEncryption("test_password")
|
|
||||||
result = encryptor.encrypt_with_metadata("test data", return_as='bytes')
|
|
||||||
|
|
||||||
assert isinstance(result, bytes)
|
|
||||||
|
|
||||||
def test_return_as_dict(self):
|
|
||||||
"""Test encrypt_with_metadata with return_as='dict'"""
|
|
||||||
encryptor = SymmetricEncryption("test_password")
|
|
||||||
result = encryptor.encrypt_with_metadata("test data", return_as='dict')
|
|
||||||
|
|
||||||
assert isinstance(result, dict)
|
|
||||||
assert 'encrypted_data' in result
|
|
||||||
|
|
||||||
def test_default_return_type(self):
|
|
||||||
"""Test encrypt_with_metadata default return type"""
|
|
||||||
encryptor = SymmetricEncryption("test_password")
|
|
||||||
result = encryptor.encrypt_with_metadata("test data")
|
|
||||||
|
|
||||||
# Default should be 'str'
|
|
||||||
assert isinstance(result, str)
|
|
||||||
|
|
||||||
def test_invalid_return_type_defaults_to_str(self):
|
|
||||||
"""Test that invalid return_as defaults to str"""
|
|
||||||
encryptor = SymmetricEncryption("test_password")
|
|
||||||
result = encryptor.encrypt_with_metadata("test data", return_as='invalid')
|
|
||||||
|
|
||||||
assert isinstance(result, str)
|
|
||||||
|
|
||||||
|
|
||||||
class TestDecryptWithMetadata:
|
|
||||||
"""Tests for decrypt_with_metadata method"""
|
|
||||||
|
|
||||||
def test_decrypt_string_package(self):
|
|
||||||
"""Test decrypting a string JSON package"""
|
|
||||||
encryptor = SymmetricEncryption("test_password")
|
|
||||||
encrypted = encryptor.encrypt_with_metadata_return_str("test data")
|
|
||||||
decrypted = encryptor.decrypt_with_metadata(encrypted)
|
|
||||||
|
|
||||||
assert decrypted == "test data"
|
|
||||||
|
|
||||||
def test_decrypt_bytes_package(self):
|
|
||||||
"""Test decrypting a bytes JSON package"""
|
|
||||||
encryptor = SymmetricEncryption("test_password")
|
|
||||||
encrypted = encryptor.encrypt_with_metadata_return_bytes("test data")
|
|
||||||
decrypted = encryptor.decrypt_with_metadata(encrypted)
|
|
||||||
|
|
||||||
assert decrypted == "test data"
|
|
||||||
|
|
||||||
def test_decrypt_dict_package(self):
|
|
||||||
"""Test decrypting a dict PackageData"""
|
|
||||||
encryptor = SymmetricEncryption("test_password")
|
|
||||||
encrypted = encryptor.encrypt_with_metadata_return_dict("test data")
|
|
||||||
decrypted = encryptor.decrypt_with_metadata(encrypted)
|
|
||||||
|
|
||||||
assert decrypted == "test data"
|
|
||||||
|
|
||||||
def test_decrypt_with_different_password_fails(self):
|
|
||||||
"""Test that decrypting with wrong password fails"""
|
|
||||||
encryptor = SymmetricEncryption("password1")
|
|
||||||
encrypted = encryptor.encrypt_with_metadata_return_str("test data")
|
|
||||||
|
|
||||||
decryptor = SymmetricEncryption("password2")
|
|
||||||
with pytest.raises(ValueError, match="Key hash is not matching"):
|
|
||||||
decryptor.decrypt_with_metadata(encrypted)
|
|
||||||
|
|
||||||
def test_decrypt_with_explicit_password(self):
|
|
||||||
"""Test decrypting with explicitly provided password"""
|
|
||||||
encryptor = SymmetricEncryption("password1")
|
|
||||||
encrypted = encryptor.encrypt_with_metadata_return_str("test data")
|
|
||||||
|
|
||||||
# Decrypt with different password parameter
|
|
||||||
decryptor = SymmetricEncryption("password1")
|
|
||||||
decrypted = decryptor.decrypt_with_metadata(encrypted, password="password1")
|
|
||||||
|
|
||||||
assert decrypted == "test data"
|
|
||||||
|
|
||||||
def test_decrypt_invalid_json_raises_error(self):
|
|
||||||
"""Test that invalid JSON raises ValueError"""
|
|
||||||
encryptor = SymmetricEncryption("test_password")
|
|
||||||
|
|
||||||
with pytest.raises(ValueError, match="Invalid encrypted package format"):
|
|
||||||
encryptor.decrypt_with_metadata("not valid json")
|
|
||||||
|
|
||||||
def test_decrypt_missing_fields_raises_error(self):
|
|
||||||
"""Test that missing required fields raises ValueError"""
|
|
||||||
encryptor = SymmetricEncryption("test_password")
|
|
||||||
invalid_package = json.dumps({"encrypted_data": "test"})
|
|
||||||
|
|
||||||
with pytest.raises(ValueError, match="Invalid encrypted package format"):
|
|
||||||
encryptor.decrypt_with_metadata(invalid_package)
|
|
||||||
|
|
||||||
def test_decrypt_unicode_data(self):
|
|
||||||
"""Test encrypting and decrypting unicode data"""
|
|
||||||
encryptor = SymmetricEncryption("test_password")
|
|
||||||
unicode_data = "Hello 世界 🌍"
|
|
||||||
encrypted = encryptor.encrypt_with_metadata_return_str(unicode_data)
|
|
||||||
decrypted = encryptor.decrypt_with_metadata(encrypted)
|
|
||||||
|
|
||||||
assert decrypted == unicode_data
|
|
||||||
|
|
||||||
def test_decrypt_empty_string(self):
|
|
||||||
"""Test encrypting and decrypting empty string"""
|
|
||||||
encryptor = SymmetricEncryption("test_password")
|
|
||||||
encrypted = encryptor.encrypt_with_metadata_return_str("")
|
|
||||||
decrypted = encryptor.decrypt_with_metadata(encrypted)
|
|
||||||
|
|
||||||
assert decrypted == ""
|
|
||||||
|
|
||||||
def test_decrypt_long_data(self):
|
|
||||||
"""Test encrypting and decrypting long data"""
|
|
||||||
encryptor = SymmetricEncryption("test_password")
|
|
||||||
long_data = "A" * 10000
|
|
||||||
encrypted = encryptor.encrypt_with_metadata_return_str(long_data)
|
|
||||||
decrypted = encryptor.decrypt_with_metadata(encrypted)
|
|
||||||
|
|
||||||
assert decrypted == long_data
|
|
||||||
|
|
||||||
|
|
||||||
class TestStaticMethods:
|
|
||||||
"""Tests for static methods encrypt_data and decrypt_data"""
|
|
||||||
|
|
||||||
def test_encrypt_data_static_method(self):
|
|
||||||
"""Test static encrypt_data method"""
|
|
||||||
encrypted = SymmetricEncryption.encrypt_data("test data", "test_password")
|
|
||||||
|
|
||||||
assert isinstance(encrypted, str)
|
|
||||||
# Should be valid JSON
|
|
||||||
parsed = json.loads(encrypted)
|
|
||||||
assert 'encrypted_data' in parsed
|
|
||||||
assert 'salt' in parsed
|
|
||||||
assert 'key_hash' in parsed
|
|
||||||
|
|
||||||
def test_decrypt_data_static_method(self):
|
|
||||||
"""Test static decrypt_data method"""
|
|
||||||
encrypted = SymmetricEncryption.encrypt_data("test data", "test_password")
|
|
||||||
decrypted = SymmetricEncryption.decrypt_data(encrypted, "test_password")
|
|
||||||
|
|
||||||
assert decrypted == "test data"
|
|
||||||
|
|
||||||
def test_static_methods_roundtrip(self):
|
|
||||||
"""Test complete roundtrip using static methods"""
|
|
||||||
original = "test data with special chars: !@#$%^&*()"
|
|
||||||
encrypted = SymmetricEncryption.encrypt_data(original, "test_password")
|
|
||||||
decrypted = SymmetricEncryption.decrypt_data(encrypted, "test_password")
|
|
||||||
|
|
||||||
assert decrypted == original
|
|
||||||
|
|
||||||
def test_static_decrypt_with_bytes(self):
|
|
||||||
"""Test static decrypt_data with bytes input"""
|
|
||||||
encrypted = SymmetricEncryption.encrypt_data("test data", "test_password")
|
|
||||||
encrypted_bytes = encrypted.encode('utf-8')
|
|
||||||
decrypted = SymmetricEncryption.decrypt_data(encrypted_bytes, "test_password")
|
|
||||||
|
|
||||||
assert decrypted == "test data"
|
|
||||||
|
|
||||||
def test_static_decrypt_with_dict(self):
|
|
||||||
"""Test static decrypt_data with PackageData dict"""
|
|
||||||
encryptor = SymmetricEncryption("test_password")
|
|
||||||
encrypted_dict = encryptor.encrypt_with_metadata_return_dict("test data")
|
|
||||||
decrypted = SymmetricEncryption.decrypt_data(encrypted_dict, "test_password")
|
|
||||||
|
|
||||||
assert decrypted == "test data"
|
|
||||||
|
|
||||||
def test_static_encrypt_bytes_data(self):
|
|
||||||
"""Test static encrypt_data with bytes input"""
|
|
||||||
encrypted = SymmetricEncryption.encrypt_data(b"test data", "test_password")
|
|
||||||
decrypted = SymmetricEncryption.decrypt_data(encrypted, "test_password")
|
|
||||||
|
|
||||||
assert decrypted == "test data"
|
|
||||||
|
|
||||||
|
|
||||||
class TestEncryptionSecurity:
|
|
||||||
"""Security-related tests for encryption"""
|
|
||||||
|
|
||||||
def test_same_data_different_encryption(self):
|
|
||||||
"""Test that same data produces different encrypted outputs due to salt"""
|
|
||||||
encryptor = SymmetricEncryption("test_password")
|
|
||||||
encrypted1 = encryptor.encrypt_with_metadata_return_str("test data")
|
|
||||||
encrypted2 = encryptor.encrypt_with_metadata_return_str("test data")
|
|
||||||
|
|
||||||
assert encrypted1 != encrypted2
|
|
||||||
|
|
||||||
def test_password_not_recoverable_from_hash(self):
|
|
||||||
"""Test that password hash is one-way"""
|
|
||||||
encryptor = SymmetricEncryption("secret_password")
|
|
||||||
# The password_hash should be SHA256 hex (64 chars)
|
|
||||||
assert len(encryptor.password_hash) == 64
|
|
||||||
# Password should not be easily derivable from hash
|
|
||||||
assert "secret_password" not in encryptor.password_hash
|
|
||||||
|
|
||||||
def test_encrypted_data_not_plaintext(self):
|
|
||||||
"""Test that encrypted data doesn't contain plaintext"""
|
|
||||||
encryptor = SymmetricEncryption("test_password")
|
|
||||||
plaintext = "very_secret_data_12345"
|
|
||||||
encrypted = encryptor.encrypt_with_metadata_return_str(plaintext)
|
|
||||||
|
|
||||||
# Plaintext should not appear in encrypted output
|
|
||||||
assert plaintext not in encrypted
|
|
||||||
|
|
||||||
def test_modified_encrypted_data_fails_decryption(self):
|
|
||||||
"""Test that modified encrypted data fails to decrypt"""
|
|
||||||
encryptor = SymmetricEncryption("test_password")
|
|
||||||
encrypted = encryptor.encrypt_with_metadata_return_str("test data")
|
|
||||||
|
|
||||||
# Modify the encrypted data
|
|
||||||
encrypted_dict = json.loads(encrypted)
|
|
||||||
encrypted_dict['encrypted_data'] = encrypted_dict['encrypted_data'][:-5] + "AAAAA"
|
|
||||||
modified_encrypted = json.dumps(encrypted_dict)
|
|
||||||
|
|
||||||
# Should fail to decrypt
|
|
||||||
with pytest.raises(Exception): # Fernet will raise an exception
|
|
||||||
encryptor.decrypt_with_metadata(modified_encrypted)
|
|
||||||
|
|
||||||
def test_modified_salt_fails_decryption(self):
|
|
||||||
"""Test that modified salt fails to decrypt"""
|
|
||||||
encryptor = SymmetricEncryption("test_password")
|
|
||||||
encrypted = encryptor.encrypt_with_metadata_return_str("test data")
|
|
||||||
|
|
||||||
# Modify the salt
|
|
||||||
encrypted_dict = json.loads(encrypted)
|
|
||||||
original_salt = base64.urlsafe_b64decode(encrypted_dict['salt'])
|
|
||||||
modified_salt = bytes([b ^ 1 for b in original_salt])
|
|
||||||
encrypted_dict['salt'] = base64.urlsafe_b64encode(modified_salt).decode('utf-8')
|
|
||||||
modified_encrypted = json.dumps(encrypted_dict)
|
|
||||||
|
|
||||||
# Should fail to decrypt due to key hash mismatch
|
|
||||||
with pytest.raises(ValueError, match="Key hash is not matching"):
|
|
||||||
encryptor.decrypt_with_metadata(modified_encrypted)
|
|
||||||
|
|
||||||
|
|
||||||
class TestEdgeCases:
|
|
||||||
"""Edge case tests"""
|
|
||||||
|
|
||||||
def test_very_long_password(self):
|
|
||||||
"""Test with very long password"""
|
|
||||||
long_password = "a" * 1000
|
|
||||||
encryptor = SymmetricEncryption(long_password)
|
|
||||||
encrypted = encryptor.encrypt_with_metadata_return_str("test data")
|
|
||||||
decrypted = encryptor.decrypt_with_metadata(encrypted)
|
|
||||||
|
|
||||||
assert decrypted == "test data"
|
|
||||||
|
|
||||||
def test_special_characters_in_data(self):
|
|
||||||
"""Test encryption of data with special characters"""
|
|
||||||
special_data = "!@#$%^&*()_+-=[]{}|;':\",./<>?\n\t\r"
|
|
||||||
encryptor = SymmetricEncryption("test_password")
|
|
||||||
encrypted = encryptor.encrypt_with_metadata_return_str(special_data)
|
|
||||||
decrypted = encryptor.decrypt_with_metadata(encrypted)
|
|
||||||
|
|
||||||
assert decrypted == special_data
|
|
||||||
|
|
||||||
def test_binary_data_utf8_bytes(self):
|
|
||||||
"""Test encryption of UTF-8 encoded bytes"""
|
|
||||||
# Test with UTF-8 encoded bytes
|
|
||||||
utf8_bytes = "Hello 世界 🌍".encode('utf-8')
|
|
||||||
encryptor = SymmetricEncryption("test_password")
|
|
||||||
encrypted = encryptor.encrypt_with_metadata_return_str(utf8_bytes)
|
|
||||||
decrypted = encryptor.decrypt_with_metadata(encrypted)
|
|
||||||
|
|
||||||
assert decrypted == "Hello 世界 🌍"
|
|
||||||
|
|
||||||
def test_binary_data_with_base64_encoding(self):
|
|
||||||
"""Test encryption of arbitrary binary data using base64 encoding"""
|
|
||||||
# For arbitrary binary data, encode to base64 first
|
|
||||||
binary_data = bytes(range(256))
|
|
||||||
base64_encoded = base64.b64encode(binary_data).decode('utf-8')
|
|
||||||
|
|
||||||
encryptor = SymmetricEncryption("test_password")
|
|
||||||
encrypted = encryptor.encrypt_with_metadata_return_str(base64_encoded)
|
|
||||||
decrypted = encryptor.decrypt_with_metadata(encrypted)
|
|
||||||
|
|
||||||
# Decode back to binary
|
|
||||||
decoded_binary = base64.b64decode(decrypted)
|
|
||||||
assert decoded_binary == binary_data
|
|
||||||
|
|
||||||
def test_binary_data_image_simulation(self):
|
|
||||||
"""Test encryption of simulated binary image data"""
|
|
||||||
# Simulate image binary data (random bytes)
|
|
||||||
image_data = os.urandom(1024) # 1KB of random binary data
|
|
||||||
base64_encoded = base64.b64encode(image_data).decode('utf-8')
|
|
||||||
|
|
||||||
encryptor = SymmetricEncryption("test_password")
|
|
||||||
encrypted = encryptor.encrypt_with_metadata_return_str(base64_encoded)
|
|
||||||
decrypted = encryptor.decrypt_with_metadata(encrypted)
|
|
||||||
|
|
||||||
# Verify round-trip
|
|
||||||
decoded_data = base64.b64decode(decrypted)
|
|
||||||
assert decoded_data == image_data
|
|
||||||
|
|
||||||
def test_binary_data_with_null_bytes(self):
|
|
||||||
"""Test encryption of data containing null bytes"""
|
|
||||||
# Create data with null bytes
|
|
||||||
data_with_nulls = "text\x00with\x00nulls\x00bytes"
|
|
||||||
|
|
||||||
encryptor = SymmetricEncryption("test_password")
|
|
||||||
encrypted = encryptor.encrypt_with_metadata_return_str(data_with_nulls)
|
|
||||||
decrypted = encryptor.decrypt_with_metadata(encrypted)
|
|
||||||
|
|
||||||
assert decrypted == data_with_nulls
|
|
||||||
|
|
||||||
def test_binary_data_bytes_input(self):
|
|
||||||
"""Test encryption with bytes input directly"""
|
|
||||||
# UTF-8 compatible bytes
|
|
||||||
byte_data = b"Binary data test"
|
|
||||||
|
|
||||||
encryptor = SymmetricEncryption("test_password")
|
|
||||||
encrypted = encryptor.encrypt_with_metadata_return_str(byte_data)
|
|
||||||
decrypted = encryptor.decrypt_with_metadata(encrypted)
|
|
||||||
|
|
||||||
assert decrypted == "Binary data test"
|
|
||||||
|
|
||||||
def test_binary_data_large_file_simulation(self):
|
|
||||||
"""Test encryption of large binary data (simulated file)"""
|
|
||||||
# Simulate a larger binary file (10KB)
|
|
||||||
large_data = os.urandom(10240)
|
|
||||||
base64_encoded = base64.b64encode(large_data).decode('utf-8')
|
|
||||||
|
|
||||||
encryptor = SymmetricEncryption("test_password")
|
|
||||||
encrypted = encryptor.encrypt_with_metadata_return_str(base64_encoded)
|
|
||||||
decrypted = encryptor.decrypt_with_metadata(encrypted)
|
|
||||||
|
|
||||||
# Verify integrity
|
|
||||||
decoded_data = base64.b64decode(decrypted)
|
|
||||||
assert len(decoded_data) == 10240
|
|
||||||
assert decoded_data == large_data
|
|
||||||
|
|
||||||
def test_binary_data_json_with_base64(self):
|
|
||||||
"""Test encryption of JSON containing base64 encoded binary data"""
|
|
||||||
binary_data = os.urandom(256)
|
|
||||||
json_data = json.dumps({
|
|
||||||
"filename": "test.bin",
|
|
||||||
"data": base64.b64encode(binary_data).decode('utf-8'),
|
|
||||||
"size": len(binary_data)
|
|
||||||
})
|
|
||||||
|
|
||||||
encryptor = SymmetricEncryption("test_password")
|
|
||||||
encrypted = encryptor.encrypt_with_metadata_return_str(json_data)
|
|
||||||
decrypted = encryptor.decrypt_with_metadata(encrypted)
|
|
||||||
|
|
||||||
# Parse and verify
|
|
||||||
parsed = json.loads(decrypted)
|
|
||||||
assert parsed["filename"] == "test.bin"
|
|
||||||
assert parsed["size"] == 256
|
|
||||||
decoded_binary = base64.b64decode(parsed["data"])
|
|
||||||
assert decoded_binary == binary_data
|
|
||||||
|
|
||||||
def test_numeric_password(self):
|
|
||||||
"""Test with numeric string password"""
|
|
||||||
encryptor = SymmetricEncryption("12345")
|
|
||||||
encrypted = encryptor.encrypt_with_metadata_return_str("test data")
|
|
||||||
decrypted = encryptor.decrypt_with_metadata(encrypted)
|
|
||||||
|
|
||||||
assert decrypted == "test data"
|
|
||||||
|
|
||||||
def test_unicode_password(self):
|
|
||||||
"""Test with unicode password"""
|
|
||||||
encryptor = SymmetricEncryption("パスワード123")
|
|
||||||
encrypted = encryptor.encrypt_with_metadata_return_str("test data")
|
|
||||||
decrypted = encryptor.decrypt_with_metadata(encrypted)
|
|
||||||
|
|
||||||
assert decrypted == "test data"
|
|
||||||
|
|
||||||
|
|
||||||
class TestIntegration:
|
|
||||||
"""Integration tests"""
|
|
||||||
|
|
||||||
def test_multiple_encrypt_decrypt_cycles(self):
|
|
||||||
"""Test multiple encryption/decryption cycles"""
|
|
||||||
encryptor = SymmetricEncryption("test_password")
|
|
||||||
original = "test data"
|
|
||||||
|
|
||||||
# Encrypt and decrypt multiple times
|
|
||||||
for _ in range(5):
|
|
||||||
encrypted = encryptor.encrypt_with_metadata_return_str(original)
|
|
||||||
decrypted = encryptor.decrypt_with_metadata(encrypted)
|
|
||||||
assert decrypted == original
|
|
||||||
|
|
||||||
def test_different_return_types_interoperability(self):
|
|
||||||
"""Test that different return types can be decrypted"""
|
|
||||||
encryptor = SymmetricEncryption("test_password")
|
|
||||||
original = "test data"
|
|
||||||
|
|
||||||
# Encrypt with different return types
|
|
||||||
encrypted_str = encryptor.encrypt_with_metadata_return_str(original)
|
|
||||||
encrypted_bytes = encryptor.encrypt_with_metadata_return_bytes(original)
|
|
||||||
encrypted_dict = encryptor.encrypt_with_metadata_return_dict(original)
|
|
||||||
|
|
||||||
# All should decrypt to the same value
|
|
||||||
assert encryptor.decrypt_with_metadata(encrypted_str) == original
|
|
||||||
assert encryptor.decrypt_with_metadata(encrypted_bytes) == original
|
|
||||||
assert encryptor.decrypt_with_metadata(encrypted_dict) == original
|
|
||||||
|
|
||||||
def test_cross_instance_encryption_decryption(self):
|
|
||||||
"""Test that different instances with same password can decrypt"""
|
|
||||||
encryptor1 = SymmetricEncryption("test_password")
|
|
||||||
encryptor2 = SymmetricEncryption("test_password")
|
|
||||||
|
|
||||||
encrypted = encryptor1.encrypt_with_metadata_return_str("test data")
|
|
||||||
decrypted = encryptor2.decrypt_with_metadata(encrypted)
|
|
||||||
|
|
||||||
assert decrypted == "test data"
|
|
||||||
|
|
||||||
def test_static_and_instance_methods_compatible(self):
|
|
||||||
"""Test that static and instance methods are compatible"""
|
|
||||||
# Encrypt with static method
|
|
||||||
encrypted = SymmetricEncryption.encrypt_data("test data", "test_password")
|
|
||||||
|
|
||||||
# Decrypt with instance method
|
|
||||||
decryptor = SymmetricEncryption("test_password")
|
|
||||||
decrypted = decryptor.decrypt_with_metadata(encrypted)
|
|
||||||
|
|
||||||
assert decrypted == "test data"
|
|
||||||
|
|
||||||
# And vice versa
|
|
||||||
encryptor = SymmetricEncryption("test_password")
|
|
||||||
encrypted2 = encryptor.encrypt_with_metadata_return_str("test data 2")
|
|
||||||
decrypted2 = SymmetricEncryption.decrypt_data(encrypted2, "test_password")
|
|
||||||
|
|
||||||
assert decrypted2 == "test data 2"
|
|
||||||
|
|
||||||
|
|
||||||
# Parametrized tests
|
|
||||||
@pytest.mark.parametrize("data", [
|
|
||||||
"simple text",
|
|
||||||
"text with spaces and punctuation!",
|
|
||||||
"123456789",
|
|
||||||
"unicode: こんにちは",
|
|
||||||
"emoji: 🔐🔑",
|
|
||||||
"",
|
|
||||||
"a" * 1000, # Long string
|
|
||||||
])
|
|
||||||
def test_encrypt_decrypt_various_data(data: str):
|
|
||||||
"""Parametrized test for various data types"""
|
|
||||||
encryptor = SymmetricEncryption("test_password")
|
|
||||||
encrypted = encryptor.encrypt_with_metadata_return_str(data)
|
|
||||||
decrypted = encryptor.decrypt_with_metadata(encrypted)
|
|
||||||
assert decrypted == data
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize("password", [
|
|
||||||
"simple",
|
|
||||||
"with spaces",
|
|
||||||
"special!@#$%",
|
|
||||||
"unicode世界",
|
|
||||||
"123456",
|
|
||||||
"a" * 100, # Long password
|
|
||||||
])
|
|
||||||
def test_various_passwords(password: str):
|
|
||||||
"""Parametrized test for various passwords"""
|
|
||||||
encryptor = SymmetricEncryption(password)
|
|
||||||
encrypted = encryptor.encrypt_with_metadata_return_str("test data")
|
|
||||||
decrypted = encryptor.decrypt_with_metadata(encrypted)
|
|
||||||
assert decrypted == "test data"
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize("return_type,expected_type", [
|
|
||||||
("str", str),
|
|
||||||
("json", str),
|
|
||||||
("bytes", bytes),
|
|
||||||
("dict", dict),
|
|
||||||
])
|
|
||||||
def test_return_types_parametrized(return_type: str, expected_type: type):
|
|
||||||
"""Parametrized test for different return types"""
|
|
||||||
encryptor = SymmetricEncryption("test_password")
|
|
||||||
result = encryptor.encrypt_with_metadata("test data", return_as=return_type)
|
|
||||||
assert isinstance(result, expected_type)
|
|
||||||
|
|
||||||
|
|
||||||
# Fixtures
|
|
||||||
@pytest.fixture
|
|
||||||
def encryptor() -> SymmetricEncryption:
|
|
||||||
"""Fixture providing a basic encryptor instance"""
|
|
||||||
return SymmetricEncryption("test_password")
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
|
||||||
def sample_encrypted_data(encryptor: SymmetricEncryption) -> str:
|
|
||||||
"""Fixture providing sample encrypted data"""
|
|
||||||
return encryptor.encrypt_with_metadata_return_str("sample data")
|
|
||||||
|
|
||||||
|
|
||||||
def test_with_encryptor_fixture(encryptor: SymmetricEncryption) -> None:
|
|
||||||
"""Test using encryptor fixture"""
|
|
||||||
encrypted: str = encryptor.encrypt_with_metadata_return_str("test")
|
|
||||||
decrypted: str = encryptor.decrypt_with_metadata(encrypted)
|
|
||||||
assert decrypted == "test"
|
|
||||||
|
|
||||||
|
|
||||||
def test_with_encrypted_data_fixture(encryptor: SymmetricEncryption, sample_encrypted_data: str) -> None:
|
|
||||||
"""Test using encrypted data fixture"""
|
|
||||||
decrypted: str = encryptor.decrypt_with_metadata(sample_encrypted_data)
|
|
||||||
assert decrypted == "sample data"
|
|
||||||
|
|
||||||
# __END__
|
|
||||||
Reference in New Issue
Block a user