Simplify implementation: remove bytes_in and bytes_out counters from all classes.
This commit is contained in:
@@ -554,7 +554,7 @@ class Encryptor:
|
||||
- final([into]) -> returns MAC tag
|
||||
"""
|
||||
|
||||
__slots__ = ("_state", "_bytes_in", "_bytes_out", "_maclen")
|
||||
__slots__ = ("_state", "_maclen")
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
@@ -588,23 +588,8 @@ class Encryptor:
|
||||
_ptr(nonce),
|
||||
_ptr(key),
|
||||
)
|
||||
self._bytes_in = 0
|
||||
self._bytes_out = 0
|
||||
self._maclen = maclen
|
||||
|
||||
@property
|
||||
def bytes_in(self) -> int:
|
||||
"""Total plaintext bytes fed to update() so far."""
|
||||
return self._bytes_in
|
||||
|
||||
@property
|
||||
def bytes_out(self) -> int:
|
||||
"""Total ciphertext bytes produced so far.
|
||||
|
||||
Includes update() and final() output.
|
||||
"""
|
||||
return self._bytes_out
|
||||
|
||||
def update(
|
||||
self, message: Buffer, into: Buffer | None = None
|
||||
) -> bytearray | memoryview:
|
||||
@@ -647,8 +632,6 @@ class Encryptor:
|
||||
)
|
||||
w = int(written[0])
|
||||
assert w == expected_out
|
||||
self._bytes_in += len(message)
|
||||
self._bytes_out += w
|
||||
return out if into is None else memoryview(out)[:w] # type: ignore
|
||||
|
||||
def final(self, into: Buffer | None = None) -> bytearray | memoryview:
|
||||
@@ -684,7 +667,6 @@ class Encryptor:
|
||||
if into is None:
|
||||
# Only the tag bytes are returned when we allocate the buffer
|
||||
assert w == maclen
|
||||
self._bytes_out += w
|
||||
self._state = None
|
||||
return out if into is None else memoryview(out)[:w] # type: ignore
|
||||
|
||||
@@ -696,7 +678,7 @@ class Decryptor:
|
||||
- final(mac) -> verifies the MAC tag
|
||||
"""
|
||||
|
||||
__slots__ = ("_state", "_bytes_in", "_bytes_out", "_maclen")
|
||||
__slots__ = ("_state", "_maclen")
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
@@ -730,20 +712,8 @@ class Decryptor:
|
||||
_ptr(nonce),
|
||||
_ptr(key),
|
||||
)
|
||||
self._bytes_in = 0
|
||||
self._bytes_out = 0
|
||||
self._maclen = maclen
|
||||
|
||||
@property
|
||||
def bytes_in(self) -> int:
|
||||
"""Total ciphertext bytes fed to update() so far."""
|
||||
return self._bytes_in
|
||||
|
||||
@property
|
||||
def bytes_out(self) -> int:
|
||||
"""Total plaintext bytes produced so far."""
|
||||
return self._bytes_out
|
||||
|
||||
def update(self, ct: Buffer, into: Buffer | None = None) -> bytearray | memoryview:
|
||||
"""Process a chunk of ciphertext.
|
||||
|
||||
@@ -780,8 +750,6 @@ class Decryptor:
|
||||
raise RuntimeError(f"state decrypt update failed: {err_name}")
|
||||
w = int(written[0])
|
||||
assert w == expected_out, f"got {w}, expected {expected_out}, len(ct)={len(ct)}"
|
||||
self._bytes_in += len(ct)
|
||||
self._bytes_out += w
|
||||
return out if into is None else memoryview(out)[:w] # type: ignore
|
||||
|
||||
def final(self, mac: Buffer) -> None:
|
||||
|
||||
@@ -554,7 +554,7 @@ class Encryptor:
|
||||
- final([into]) -> returns MAC tag
|
||||
"""
|
||||
|
||||
__slots__ = ("_state", "_bytes_in", "_bytes_out", "_maclen")
|
||||
__slots__ = ("_state", "_maclen")
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
@@ -588,23 +588,8 @@ class Encryptor:
|
||||
_ptr(nonce),
|
||||
_ptr(key),
|
||||
)
|
||||
self._bytes_in = 0
|
||||
self._bytes_out = 0
|
||||
self._maclen = maclen
|
||||
|
||||
@property
|
||||
def bytes_in(self) -> int:
|
||||
"""Total plaintext bytes fed to update() so far."""
|
||||
return self._bytes_in
|
||||
|
||||
@property
|
||||
def bytes_out(self) -> int:
|
||||
"""Total ciphertext bytes produced so far.
|
||||
|
||||
Includes update() and final() output.
|
||||
"""
|
||||
return self._bytes_out
|
||||
|
||||
def update(
|
||||
self, message: Buffer, into: Buffer | None = None
|
||||
) -> bytearray | memoryview:
|
||||
@@ -647,8 +632,6 @@ class Encryptor:
|
||||
)
|
||||
w = int(written[0])
|
||||
assert w == expected_out
|
||||
self._bytes_in += len(message)
|
||||
self._bytes_out += w
|
||||
return out if into is None else memoryview(out)[:w] # type: ignore
|
||||
|
||||
def final(self, into: Buffer | None = None) -> bytearray | memoryview:
|
||||
@@ -684,7 +667,6 @@ class Encryptor:
|
||||
if into is None:
|
||||
# Only the tag bytes are returned when we allocate the buffer
|
||||
assert w == maclen
|
||||
self._bytes_out += w
|
||||
self._state = None
|
||||
return out if into is None else memoryview(out)[:w] # type: ignore
|
||||
|
||||
@@ -696,7 +678,7 @@ class Decryptor:
|
||||
- final(mac) -> verifies the MAC tag
|
||||
"""
|
||||
|
||||
__slots__ = ("_state", "_bytes_in", "_bytes_out", "_maclen")
|
||||
__slots__ = ("_state", "_maclen")
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
@@ -730,20 +712,8 @@ class Decryptor:
|
||||
_ptr(nonce),
|
||||
_ptr(key),
|
||||
)
|
||||
self._bytes_in = 0
|
||||
self._bytes_out = 0
|
||||
self._maclen = maclen
|
||||
|
||||
@property
|
||||
def bytes_in(self) -> int:
|
||||
"""Total ciphertext bytes fed to update() so far."""
|
||||
return self._bytes_in
|
||||
|
||||
@property
|
||||
def bytes_out(self) -> int:
|
||||
"""Total plaintext bytes produced so far."""
|
||||
return self._bytes_out
|
||||
|
||||
def update(self, ct: Buffer, into: Buffer | None = None) -> bytearray | memoryview:
|
||||
"""Process a chunk of ciphertext.
|
||||
|
||||
@@ -780,8 +750,6 @@ class Decryptor:
|
||||
raise RuntimeError(f"state decrypt update failed: {err_name}")
|
||||
w = int(written[0])
|
||||
assert w == expected_out, f"got {w}, expected {expected_out}, len(ct)={len(ct)}"
|
||||
self._bytes_in += len(ct)
|
||||
self._bytes_out += w
|
||||
return out if into is None else memoryview(out)[:w] # type: ignore
|
||||
|
||||
def final(self, mac: Buffer) -> None:
|
||||
|
||||
@@ -554,7 +554,7 @@ class Encryptor:
|
||||
- final([into]) -> returns MAC tag
|
||||
"""
|
||||
|
||||
__slots__ = ("_state", "_bytes_in", "_bytes_out", "_maclen")
|
||||
__slots__ = ("_state", "_maclen")
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
@@ -588,23 +588,8 @@ class Encryptor:
|
||||
_ptr(nonce),
|
||||
_ptr(key),
|
||||
)
|
||||
self._bytes_in = 0
|
||||
self._bytes_out = 0
|
||||
self._maclen = maclen
|
||||
|
||||
@property
|
||||
def bytes_in(self) -> int:
|
||||
"""Total plaintext bytes fed to update() so far."""
|
||||
return self._bytes_in
|
||||
|
||||
@property
|
||||
def bytes_out(self) -> int:
|
||||
"""Total ciphertext bytes produced so far.
|
||||
|
||||
Includes update() and final() output.
|
||||
"""
|
||||
return self._bytes_out
|
||||
|
||||
def update(
|
||||
self, message: Buffer, into: Buffer | None = None
|
||||
) -> bytearray | memoryview:
|
||||
@@ -647,8 +632,6 @@ class Encryptor:
|
||||
)
|
||||
w = int(written[0])
|
||||
assert w == expected_out
|
||||
self._bytes_in += len(message)
|
||||
self._bytes_out += w
|
||||
return out if into is None else memoryview(out)[:w] # type: ignore
|
||||
|
||||
def final(self, into: Buffer | None = None) -> bytearray | memoryview:
|
||||
@@ -684,7 +667,6 @@ class Encryptor:
|
||||
if into is None:
|
||||
# Only the tag bytes are returned when we allocate the buffer
|
||||
assert w == maclen
|
||||
self._bytes_out += w
|
||||
self._state = None
|
||||
return out if into is None else memoryview(out)[:w] # type: ignore
|
||||
|
||||
@@ -696,7 +678,7 @@ class Decryptor:
|
||||
- final(mac) -> verifies the MAC tag
|
||||
"""
|
||||
|
||||
__slots__ = ("_state", "_bytes_in", "_bytes_out", "_maclen")
|
||||
__slots__ = ("_state", "_maclen")
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
@@ -730,20 +712,8 @@ class Decryptor:
|
||||
_ptr(nonce),
|
||||
_ptr(key),
|
||||
)
|
||||
self._bytes_in = 0
|
||||
self._bytes_out = 0
|
||||
self._maclen = maclen
|
||||
|
||||
@property
|
||||
def bytes_in(self) -> int:
|
||||
"""Total ciphertext bytes fed to update() so far."""
|
||||
return self._bytes_in
|
||||
|
||||
@property
|
||||
def bytes_out(self) -> int:
|
||||
"""Total plaintext bytes produced so far."""
|
||||
return self._bytes_out
|
||||
|
||||
def update(self, ct: Buffer, into: Buffer | None = None) -> bytearray | memoryview:
|
||||
"""Process a chunk of ciphertext.
|
||||
|
||||
@@ -780,8 +750,6 @@ class Decryptor:
|
||||
raise RuntimeError(f"state decrypt update failed: {err_name}")
|
||||
w = int(written[0])
|
||||
assert w == expected_out, f"got {w}, expected {expected_out}, len(ct)={len(ct)}"
|
||||
self._bytes_in += len(ct)
|
||||
self._bytes_out += w
|
||||
return out if into is None else memoryview(out)[:w] # type: ignore
|
||||
|
||||
def final(self, mac: Buffer) -> None:
|
||||
|
||||
@@ -554,7 +554,7 @@ class Encryptor:
|
||||
- final([into]) -> returns MAC tag
|
||||
"""
|
||||
|
||||
__slots__ = ("_state", "_bytes_in", "_bytes_out", "_maclen")
|
||||
__slots__ = ("_state", "_maclen")
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
@@ -588,23 +588,8 @@ class Encryptor:
|
||||
_ptr(nonce),
|
||||
_ptr(key),
|
||||
)
|
||||
self._bytes_in = 0
|
||||
self._bytes_out = 0
|
||||
self._maclen = maclen
|
||||
|
||||
@property
|
||||
def bytes_in(self) -> int:
|
||||
"""Total plaintext bytes fed to update() so far."""
|
||||
return self._bytes_in
|
||||
|
||||
@property
|
||||
def bytes_out(self) -> int:
|
||||
"""Total ciphertext bytes produced so far.
|
||||
|
||||
Includes update() and final() output.
|
||||
"""
|
||||
return self._bytes_out
|
||||
|
||||
def update(
|
||||
self, message: Buffer, into: Buffer | None = None
|
||||
) -> bytearray | memoryview:
|
||||
@@ -647,8 +632,6 @@ class Encryptor:
|
||||
)
|
||||
w = int(written[0])
|
||||
assert w == expected_out
|
||||
self._bytes_in += len(message)
|
||||
self._bytes_out += w
|
||||
return out if into is None else memoryview(out)[:w] # type: ignore
|
||||
|
||||
def final(self, into: Buffer | None = None) -> bytearray | memoryview:
|
||||
@@ -684,7 +667,6 @@ class Encryptor:
|
||||
if into is None:
|
||||
# Only the tag bytes are returned when we allocate the buffer
|
||||
assert w == maclen
|
||||
self._bytes_out += w
|
||||
self._state = None
|
||||
return out if into is None else memoryview(out)[:w] # type: ignore
|
||||
|
||||
@@ -696,7 +678,7 @@ class Decryptor:
|
||||
- final(mac) -> verifies the MAC tag
|
||||
"""
|
||||
|
||||
__slots__ = ("_state", "_bytes_in", "_bytes_out", "_maclen")
|
||||
__slots__ = ("_state", "_maclen")
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
@@ -730,20 +712,8 @@ class Decryptor:
|
||||
_ptr(nonce),
|
||||
_ptr(key),
|
||||
)
|
||||
self._bytes_in = 0
|
||||
self._bytes_out = 0
|
||||
self._maclen = maclen
|
||||
|
||||
@property
|
||||
def bytes_in(self) -> int:
|
||||
"""Total ciphertext bytes fed to update() so far."""
|
||||
return self._bytes_in
|
||||
|
||||
@property
|
||||
def bytes_out(self) -> int:
|
||||
"""Total plaintext bytes produced so far."""
|
||||
return self._bytes_out
|
||||
|
||||
def update(self, ct: Buffer, into: Buffer | None = None) -> bytearray | memoryview:
|
||||
"""Process a chunk of ciphertext.
|
||||
|
||||
@@ -780,8 +750,6 @@ class Decryptor:
|
||||
raise RuntimeError(f"state decrypt update failed: {err_name}")
|
||||
w = int(written[0])
|
||||
assert w == expected_out, f"got {w}, expected {expected_out}, len(ct)={len(ct)}"
|
||||
self._bytes_in += len(ct)
|
||||
self._bytes_out += w
|
||||
return out if into is None else memoryview(out)[:w] # type: ignore
|
||||
|
||||
def final(self, mac: Buffer) -> None:
|
||||
|
||||
@@ -554,7 +554,7 @@ class Encryptor:
|
||||
- final([into]) -> returns MAC tag
|
||||
"""
|
||||
|
||||
__slots__ = ("_state", "_bytes_in", "_bytes_out", "_maclen")
|
||||
__slots__ = ("_state", "_maclen")
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
@@ -588,23 +588,8 @@ class Encryptor:
|
||||
_ptr(nonce),
|
||||
_ptr(key),
|
||||
)
|
||||
self._bytes_in = 0
|
||||
self._bytes_out = 0
|
||||
self._maclen = maclen
|
||||
|
||||
@property
|
||||
def bytes_in(self) -> int:
|
||||
"""Total plaintext bytes fed to update() so far."""
|
||||
return self._bytes_in
|
||||
|
||||
@property
|
||||
def bytes_out(self) -> int:
|
||||
"""Total ciphertext bytes produced so far.
|
||||
|
||||
Includes update() and final() output.
|
||||
"""
|
||||
return self._bytes_out
|
||||
|
||||
def update(
|
||||
self, message: Buffer, into: Buffer | None = None
|
||||
) -> bytearray | memoryview:
|
||||
@@ -647,8 +632,6 @@ class Encryptor:
|
||||
)
|
||||
w = int(written[0])
|
||||
assert w == expected_out
|
||||
self._bytes_in += len(message)
|
||||
self._bytes_out += w
|
||||
return out if into is None else memoryview(out)[:w] # type: ignore
|
||||
|
||||
def final(self, into: Buffer | None = None) -> bytearray | memoryview:
|
||||
@@ -684,7 +667,6 @@ class Encryptor:
|
||||
if into is None:
|
||||
# Only the tag bytes are returned when we allocate the buffer
|
||||
assert w == maclen
|
||||
self._bytes_out += w
|
||||
self._state = None
|
||||
return out if into is None else memoryview(out)[:w] # type: ignore
|
||||
|
||||
@@ -696,7 +678,7 @@ class Decryptor:
|
||||
- final(mac) -> verifies the MAC tag
|
||||
"""
|
||||
|
||||
__slots__ = ("_state", "_bytes_in", "_bytes_out", "_maclen")
|
||||
__slots__ = ("_state", "_maclen")
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
@@ -730,20 +712,8 @@ class Decryptor:
|
||||
_ptr(nonce),
|
||||
_ptr(key),
|
||||
)
|
||||
self._bytes_in = 0
|
||||
self._bytes_out = 0
|
||||
self._maclen = maclen
|
||||
|
||||
@property
|
||||
def bytes_in(self) -> int:
|
||||
"""Total ciphertext bytes fed to update() so far."""
|
||||
return self._bytes_in
|
||||
|
||||
@property
|
||||
def bytes_out(self) -> int:
|
||||
"""Total plaintext bytes produced so far."""
|
||||
return self._bytes_out
|
||||
|
||||
def update(self, ct: Buffer, into: Buffer | None = None) -> bytearray | memoryview:
|
||||
"""Process a chunk of ciphertext.
|
||||
|
||||
@@ -780,8 +750,6 @@ class Decryptor:
|
||||
raise RuntimeError(f"state decrypt update failed: {err_name}")
|
||||
w = int(written[0])
|
||||
assert w == expected_out, f"got {w}, expected {expected_out}, len(ct)={len(ct)}"
|
||||
self._bytes_in += len(ct)
|
||||
self._bytes_out += w
|
||||
return out if into is None else memoryview(out)[:w] # type: ignore
|
||||
|
||||
def final(self, mac: Buffer) -> None:
|
||||
|
||||
@@ -554,7 +554,7 @@ class Encryptor:
|
||||
- final([into]) -> returns MAC tag
|
||||
"""
|
||||
|
||||
__slots__ = ("_state", "_bytes_in", "_bytes_out", "_maclen")
|
||||
__slots__ = ("_state", "_maclen")
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
@@ -588,23 +588,8 @@ class Encryptor:
|
||||
_ptr(nonce),
|
||||
_ptr(key),
|
||||
)
|
||||
self._bytes_in = 0
|
||||
self._bytes_out = 0
|
||||
self._maclen = maclen
|
||||
|
||||
@property
|
||||
def bytes_in(self) -> int:
|
||||
"""Total plaintext bytes fed to update() so far."""
|
||||
return self._bytes_in
|
||||
|
||||
@property
|
||||
def bytes_out(self) -> int:
|
||||
"""Total ciphertext bytes produced so far.
|
||||
|
||||
Includes update() and final() output.
|
||||
"""
|
||||
return self._bytes_out
|
||||
|
||||
def update(
|
||||
self, message: Buffer, into: Buffer | None = None
|
||||
) -> bytearray | memoryview:
|
||||
@@ -647,8 +632,6 @@ class Encryptor:
|
||||
)
|
||||
w = int(written[0])
|
||||
assert w == expected_out
|
||||
self._bytes_in += len(message)
|
||||
self._bytes_out += w
|
||||
return out if into is None else memoryview(out)[:w] # type: ignore
|
||||
|
||||
def final(self, into: Buffer | None = None) -> bytearray | memoryview:
|
||||
@@ -684,7 +667,6 @@ class Encryptor:
|
||||
if into is None:
|
||||
# Only the tag bytes are returned when we allocate the buffer
|
||||
assert w == maclen
|
||||
self._bytes_out += w
|
||||
self._state = None
|
||||
return out if into is None else memoryview(out)[:w] # type: ignore
|
||||
|
||||
@@ -696,7 +678,7 @@ class Decryptor:
|
||||
- final(mac) -> verifies the MAC tag
|
||||
"""
|
||||
|
||||
__slots__ = ("_state", "_bytes_in", "_bytes_out", "_maclen")
|
||||
__slots__ = ("_state", "_maclen")
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
@@ -730,20 +712,8 @@ class Decryptor:
|
||||
_ptr(nonce),
|
||||
_ptr(key),
|
||||
)
|
||||
self._bytes_in = 0
|
||||
self._bytes_out = 0
|
||||
self._maclen = maclen
|
||||
|
||||
@property
|
||||
def bytes_in(self) -> int:
|
||||
"""Total ciphertext bytes fed to update() so far."""
|
||||
return self._bytes_in
|
||||
|
||||
@property
|
||||
def bytes_out(self) -> int:
|
||||
"""Total plaintext bytes produced so far."""
|
||||
return self._bytes_out
|
||||
|
||||
def update(self, ct: Buffer, into: Buffer | None = None) -> bytearray | memoryview:
|
||||
"""Process a chunk of ciphertext.
|
||||
|
||||
@@ -780,8 +750,6 @@ class Decryptor:
|
||||
raise RuntimeError(f"state decrypt update failed: {err_name}")
|
||||
w = int(written[0])
|
||||
assert w == expected_out, f"got {w}, expected {expected_out}, len(ct)={len(ct)}"
|
||||
self._bytes_in += len(ct)
|
||||
self._bytes_out += w
|
||||
return out if into is None else memoryview(out)[:w] # type: ignore
|
||||
|
||||
def final(self, mac: Buffer) -> None:
|
||||
|
||||
@@ -68,21 +68,6 @@ class TestEncryptorFinalization:
|
||||
):
|
||||
encryptor.final()
|
||||
|
||||
def test_properties_accessible_after_final(self):
|
||||
"""Test that properties like bytes_in and bytes_out are still accessible after final()."""
|
||||
key = aegis256x4.random_key()
|
||||
nonce = aegis256x4.random_nonce()
|
||||
|
||||
encryptor = aegis256x4.Encryptor(key, nonce)
|
||||
|
||||
message = b"Hello, world!"
|
||||
encryptor.update(message)
|
||||
tag = encryptor.final()
|
||||
|
||||
# Properties should still be accessible
|
||||
assert encryptor.bytes_in == len(message)
|
||||
assert encryptor.bytes_out == len(message) + len(tag)
|
||||
|
||||
def test_empty_encryption_finalization(self):
|
||||
"""Test that finalization works correctly with no update() calls."""
|
||||
key = aegis256x4.random_key()
|
||||
@@ -169,24 +154,6 @@ class TestDecryptorFinalization:
|
||||
):
|
||||
decryptor.final(tag)
|
||||
|
||||
def test_properties_accessible_after_final(self):
|
||||
"""Test that properties like bytes_in and bytes_out are still accessible after final()."""
|
||||
key = aegis256x4.random_key()
|
||||
nonce = aegis256x4.random_nonce()
|
||||
message = b"Hello, world!"
|
||||
|
||||
# Encrypt first
|
||||
ct, tag = aegis256x4.encrypt_detached(key, nonce, message)
|
||||
|
||||
# Decrypt
|
||||
decryptor = aegis256x4.Decryptor(key, nonce)
|
||||
decryptor.update(ct)
|
||||
decryptor.final(tag)
|
||||
|
||||
# Properties should still be accessible
|
||||
assert decryptor.bytes_in == len(ct)
|
||||
assert decryptor.bytes_out == len(message)
|
||||
|
||||
def test_empty_decryption_finalization(self):
|
||||
"""Test that finalization works correctly with no update() calls."""
|
||||
key = aegis256x4.random_key()
|
||||
|
||||
Reference in New Issue
Block a user