Add helper function for calculating the incremental update output buffer size. Add bytes_in and bytes_out counters on the incremental classes. Avoid ERANGE errors caused by too small output buffer.

This commit is contained in:
Leo Vasanko
2025-11-05 15:13:46 -06:00
parent 02310675b7
commit 438627e0db
6 changed files with 450 additions and 96 deletions

View File

@@ -21,15 +21,21 @@ TAILBYTES_MAX = _lib.aegis128l_tailbytes_max()
ALIGNMENT = 32
def _ptr(buf):
"""Return an ffi pointer for a Python buffer, or NULL for None.
def calc_update_output_size(bytes_in: int, bytes_next: int) -> int:
"""Calculate the number of bytes output by the next incremental update.
Args:
buf: Any object supporting the Python buffer protocol, or None.
bytes_in: Total number of bytes passed to update so far.
bytes_next: Length of the next input chunk.
Returns:
An ffi pointer obtained with ffi.from_buffer, or ffi.NULL when buf is None.
Number of bytes that the next update will write.
"""
remainder = bytes_in % ALIGNMENT
return ((remainder + int(bytes_next)) // ALIGNMENT) * ALIGNMENT
def _ptr(buf):
return ffi.NULL if buf is None else ffi.from_buffer(buf)
@@ -559,7 +565,7 @@ class Encryptor:
- final_detached([ct_into], [mac_into], maclen=16) -> returns (tail_bytes, mac)
"""
__slots__ = ("_st",)
__slots__ = ("_st", "_bytes_in", "_bytes_out")
def __init__(self, key: Buffer, nonce: Buffer, ad: Buffer | None = None):
"""Create an incremental encryptor.
@@ -589,6 +595,22 @@ class Encryptor:
_ptr(key),
)
self._st = st
# Track total plaintext bytes passed through update() so far
self._bytes_in = 0
self._bytes_out = 0
@property
def bytes_in(self) -> int:
"""Total plaintext bytes fed to update() so far."""
return self._bytes_in
@property
def bytes_out(self) -> int:
"""Total ciphertext bytes produced so far.
Includes update() and final()/final_detached() output, also MAC tag.
"""
return self._bytes_out
def update(self, message: Buffer, into: Buffer | None = None) -> memoryview:
"""Encrypt a chunk of the message.
@@ -606,9 +628,13 @@ class Encryptor:
RuntimeError: If the C update call fails.
"""
message = memoryview(message)
out = memoryview(into if into is not None else bytearray(message.nbytes))
if out.nbytes < message.nbytes:
raise TypeError("into length must be >= len(message)")
# Compute exact number of bytes this update can emit (multiple of ALIGNMENT)
expected_out = calc_update_output_size(self._bytes_in, message.nbytes)
out = memoryview(into if into is not None else bytearray(expected_out))
if out.nbytes < expected_out:
raise TypeError(
"into length must be >= expected output size for this update"
)
written = ffi.new("size_t *")
rc = _lib.aegis128l_state_encrypt_update(
self._st,
@@ -621,8 +647,14 @@ class Encryptor:
if rc != 0:
err_num = ffi.errno
err_name = errno.errorcode.get(err_num, f"errno_{err_num}")
raise RuntimeError(f"state encrypt update failed: {err_name}")
return out[: int(written[0])]
raise RuntimeError(
f"state encrypt update failed: {err_name} written {written[0]}"
)
w = int(written[0])
# Advance counters for this update call
self._bytes_in += message.nbytes
self._bytes_out += w
return out[:w]
def final(self, into: Buffer | None = None, maclen: int = ABYTES_MIN) -> memoryview:
"""Finalize encryption, writing any remaining bytes and the tag.
@@ -657,7 +689,9 @@ class Encryptor:
err_num = ffi.errno
err_name = errno.errorcode.get(err_num, f"errno_{err_num}")
raise RuntimeError(f"state encrypt final failed: {err_name}")
return out[: int(written[0])]
w = int(written[0])
self._bytes_out += w
return out[:w]
def final_detached(
self,
@@ -700,7 +734,9 @@ class Encryptor:
err_num = ffi.errno
err_name = errno.errorcode.get(err_num, f"errno_{err_num}")
raise RuntimeError(f"state encrypt detached final failed: {err_name}")
return out[: int(written[0])], mac
w = int(written[0])
self._bytes_out += w + maclen
return out[:w], mac
class Decryptor:
@@ -710,7 +746,7 @@ class Decryptor:
- final(mac[, into]) -> returns any remaining plaintext bytes
"""
__slots__ = ("_st",)
__slots__ = ("_st", "_bytes_in", "_bytes_out")
def __init__(self, key: Buffer, nonce: Buffer, ad: Buffer | None = None):
"""Create an incremental decryptor for detached tags.
@@ -740,6 +776,22 @@ class Decryptor:
_ptr(key),
)
self._st = st
# Track total ciphertext bytes passed through update() so far
self._bytes_in = 0
self._bytes_out = 0
@property
def bytes_in(self) -> int:
"""Total ciphertext bytes fed to update() so far."""
return self._bytes_in
@property
def bytes_out(self) -> int:
"""Total plaintext bytes produced so far.
Includes bytes written by update() and by final().
"""
return self._bytes_out
def update(self, ct: Buffer, into: Buffer | None = None) -> memoryview:
"""Process a chunk of ciphertext.
@@ -758,10 +810,12 @@ class Decryptor:
RuntimeError: If the C update call fails.
"""
ct = memoryview(ct)
out = into if into is not None else bytearray(ct.nbytes)
produced = calc_update_output_size(self._bytes_in, ct.nbytes)
requirement = produced + ALIGNMENT # libaegis requires larger than actual w
out = into if into is not None else bytearray(requirement)
out = memoryview(out)
if out.nbytes < ct.nbytes:
raise TypeError("into length must be >= len(ciphertext)")
if out.nbytes < requirement:
raise TypeError("into length must be >= required capacity for this update")
written = ffi.new("size_t *")
rc = _lib.aegis128l_state_decrypt_detached_update(
self._st,
@@ -776,6 +830,8 @@ class Decryptor:
err_name = errno.errorcode.get(err_num, f"errno_{err_num}")
raise RuntimeError(f"state decrypt update failed: {err_name}")
w = int(written[0])
self._bytes_in += ct.nbytes
self._bytes_out += w
return out[:w]
def final(self, mac: Buffer, into: Buffer | None = None) -> memoryview:
@@ -812,6 +868,7 @@ class Decryptor:
if rc != 0:
raise ValueError("authentication failed")
w = int(written[0])
self._bytes_out += w
return out[:w]
@@ -841,6 +898,8 @@ __all__ = [
"ABYTES_MAX",
"TAILBYTES_MAX",
"ALIGNMENT",
# helpers
"calc_update_output_size",
# one-shot functions
"encrypt_detached",
"decrypt_detached",

View File

@@ -21,15 +21,21 @@ TAILBYTES_MAX = _lib.aegis128x2_tailbytes_max()
ALIGNMENT = 64
def _ptr(buf):
"""Return an ffi pointer for a Python buffer, or NULL for None.
def calc_update_output_size(bytes_in: int, bytes_next: int) -> int:
"""Calculate the number of bytes output by the next incremental update.
Args:
buf: Any object supporting the Python buffer protocol, or None.
bytes_in: Total number of bytes passed to update so far.
bytes_next: Length of the next input chunk.
Returns:
An ffi pointer obtained with ffi.from_buffer, or ffi.NULL when buf is None.
Number of bytes that the next update will write.
"""
remainder = bytes_in % ALIGNMENT
return ((remainder + int(bytes_next)) // ALIGNMENT) * ALIGNMENT
def _ptr(buf):
return ffi.NULL if buf is None else ffi.from_buffer(buf)
@@ -559,7 +565,7 @@ class Encryptor:
- final_detached([ct_into], [mac_into], maclen=16) -> returns (tail_bytes, mac)
"""
__slots__ = ("_st",)
__slots__ = ("_st", "_bytes_in", "_bytes_out")
def __init__(self, key: Buffer, nonce: Buffer, ad: Buffer | None = None):
"""Create an incremental encryptor.
@@ -589,6 +595,22 @@ class Encryptor:
_ptr(key),
)
self._st = st
# Track total plaintext bytes passed through update() so far
self._bytes_in = 0
self._bytes_out = 0
@property
def bytes_in(self) -> int:
"""Total plaintext bytes fed to update() so far."""
return self._bytes_in
@property
def bytes_out(self) -> int:
"""Total ciphertext bytes produced so far.
Includes update() and final()/final_detached() output, also MAC tag.
"""
return self._bytes_out
def update(self, message: Buffer, into: Buffer | None = None) -> memoryview:
"""Encrypt a chunk of the message.
@@ -606,9 +628,13 @@ class Encryptor:
RuntimeError: If the C update call fails.
"""
message = memoryview(message)
out = memoryview(into if into is not None else bytearray(message.nbytes))
if out.nbytes < message.nbytes:
raise TypeError("into length must be >= len(message)")
# Compute exact number of bytes this update can emit (multiple of ALIGNMENT)
expected_out = calc_update_output_size(self._bytes_in, message.nbytes)
out = memoryview(into if into is not None else bytearray(expected_out))
if out.nbytes < expected_out:
raise TypeError(
"into length must be >= expected output size for this update"
)
written = ffi.new("size_t *")
rc = _lib.aegis128x2_state_encrypt_update(
self._st,
@@ -621,8 +647,14 @@ class Encryptor:
if rc != 0:
err_num = ffi.errno
err_name = errno.errorcode.get(err_num, f"errno_{err_num}")
raise RuntimeError(f"state encrypt update failed: {err_name}")
return out[: int(written[0])]
raise RuntimeError(
f"state encrypt update failed: {err_name} written {written[0]}"
)
w = int(written[0])
# Advance counters for this update call
self._bytes_in += message.nbytes
self._bytes_out += w
return out[:w]
def final(self, into: Buffer | None = None, maclen: int = ABYTES_MIN) -> memoryview:
"""Finalize encryption, writing any remaining bytes and the tag.
@@ -657,7 +689,9 @@ class Encryptor:
err_num = ffi.errno
err_name = errno.errorcode.get(err_num, f"errno_{err_num}")
raise RuntimeError(f"state encrypt final failed: {err_name}")
return out[: int(written[0])]
w = int(written[0])
self._bytes_out += w
return out[:w]
def final_detached(
self,
@@ -700,7 +734,9 @@ class Encryptor:
err_num = ffi.errno
err_name = errno.errorcode.get(err_num, f"errno_{err_num}")
raise RuntimeError(f"state encrypt detached final failed: {err_name}")
return out[: int(written[0])], mac
w = int(written[0])
self._bytes_out += w + maclen
return out[:w], mac
class Decryptor:
@@ -710,7 +746,7 @@ class Decryptor:
- final(mac[, into]) -> returns any remaining plaintext bytes
"""
__slots__ = ("_st",)
__slots__ = ("_st", "_bytes_in", "_bytes_out")
def __init__(self, key: Buffer, nonce: Buffer, ad: Buffer | None = None):
"""Create an incremental decryptor for detached tags.
@@ -740,6 +776,22 @@ class Decryptor:
_ptr(key),
)
self._st = st
# Track total ciphertext bytes passed through update() so far
self._bytes_in = 0
self._bytes_out = 0
@property
def bytes_in(self) -> int:
"""Total ciphertext bytes fed to update() so far."""
return self._bytes_in
@property
def bytes_out(self) -> int:
"""Total plaintext bytes produced so far.
Includes bytes written by update() and by final().
"""
return self._bytes_out
def update(self, ct: Buffer, into: Buffer | None = None) -> memoryview:
"""Process a chunk of ciphertext.
@@ -758,10 +810,12 @@ class Decryptor:
RuntimeError: If the C update call fails.
"""
ct = memoryview(ct)
out = into if into is not None else bytearray(ct.nbytes)
produced = calc_update_output_size(self._bytes_in, ct.nbytes)
requirement = produced + ALIGNMENT # libaegis requires larger than actual w
out = into if into is not None else bytearray(requirement)
out = memoryview(out)
if out.nbytes < ct.nbytes:
raise TypeError("into length must be >= len(ciphertext)")
if out.nbytes < requirement:
raise TypeError("into length must be >= required capacity for this update")
written = ffi.new("size_t *")
rc = _lib.aegis128x2_state_decrypt_detached_update(
self._st,
@@ -776,6 +830,8 @@ class Decryptor:
err_name = errno.errorcode.get(err_num, f"errno_{err_num}")
raise RuntimeError(f"state decrypt update failed: {err_name}")
w = int(written[0])
self._bytes_in += ct.nbytes
self._bytes_out += w
return out[:w]
def final(self, mac: Buffer, into: Buffer | None = None) -> memoryview:
@@ -812,6 +868,7 @@ class Decryptor:
if rc != 0:
raise ValueError("authentication failed")
w = int(written[0])
self._bytes_out += w
return out[:w]
@@ -841,6 +898,8 @@ __all__ = [
"ABYTES_MAX",
"TAILBYTES_MAX",
"ALIGNMENT",
# helpers
"calc_update_output_size",
# one-shot functions
"encrypt_detached",
"decrypt_detached",

View File

@@ -21,15 +21,21 @@ TAILBYTES_MAX = _lib.aegis128x4_tailbytes_max()
ALIGNMENT = 64
def _ptr(buf):
"""Return an ffi pointer for a Python buffer, or NULL for None.
def calc_update_output_size(bytes_in: int, bytes_next: int) -> int:
"""Calculate the number of bytes output by the next incremental update.
Args:
buf: Any object supporting the Python buffer protocol, or None.
bytes_in: Total number of bytes passed to update so far.
bytes_next: Length of the next input chunk.
Returns:
An ffi pointer obtained with ffi.from_buffer, or ffi.NULL when buf is None.
Number of bytes that the next update will write.
"""
remainder = bytes_in % ALIGNMENT
return ((remainder + int(bytes_next)) // ALIGNMENT) * ALIGNMENT
def _ptr(buf):
return ffi.NULL if buf is None else ffi.from_buffer(buf)
@@ -559,7 +565,7 @@ class Encryptor:
- final_detached([ct_into], [mac_into], maclen=16) -> returns (tail_bytes, mac)
"""
__slots__ = ("_st",)
__slots__ = ("_st", "_bytes_in", "_bytes_out")
def __init__(self, key: Buffer, nonce: Buffer, ad: Buffer | None = None):
"""Create an incremental encryptor.
@@ -589,6 +595,22 @@ class Encryptor:
_ptr(key),
)
self._st = st
# Track total plaintext bytes passed through update() so far
self._bytes_in = 0
self._bytes_out = 0
@property
def bytes_in(self) -> int:
"""Total plaintext bytes fed to update() so far."""
return self._bytes_in
@property
def bytes_out(self) -> int:
"""Total ciphertext bytes produced so far.
Includes update() and final()/final_detached() output, also MAC tag.
"""
return self._bytes_out
def update(self, message: Buffer, into: Buffer | None = None) -> memoryview:
"""Encrypt a chunk of the message.
@@ -606,9 +628,13 @@ class Encryptor:
RuntimeError: If the C update call fails.
"""
message = memoryview(message)
out = memoryview(into if into is not None else bytearray(message.nbytes))
if out.nbytes < message.nbytes:
raise TypeError("into length must be >= len(message)")
# Compute exact number of bytes this update can emit (multiple of ALIGNMENT)
expected_out = calc_update_output_size(self._bytes_in, message.nbytes)
out = memoryview(into if into is not None else bytearray(expected_out))
if out.nbytes < expected_out:
raise TypeError(
"into length must be >= expected output size for this update"
)
written = ffi.new("size_t *")
rc = _lib.aegis128x4_state_encrypt_update(
self._st,
@@ -621,8 +647,14 @@ class Encryptor:
if rc != 0:
err_num = ffi.errno
err_name = errno.errorcode.get(err_num, f"errno_{err_num}")
raise RuntimeError(f"state encrypt update failed: {err_name}")
return out[: int(written[0])]
raise RuntimeError(
f"state encrypt update failed: {err_name} written {written[0]}"
)
w = int(written[0])
# Advance counters for this update call
self._bytes_in += message.nbytes
self._bytes_out += w
return out[:w]
def final(self, into: Buffer | None = None, maclen: int = ABYTES_MIN) -> memoryview:
"""Finalize encryption, writing any remaining bytes and the tag.
@@ -657,7 +689,9 @@ class Encryptor:
err_num = ffi.errno
err_name = errno.errorcode.get(err_num, f"errno_{err_num}")
raise RuntimeError(f"state encrypt final failed: {err_name}")
return out[: int(written[0])]
w = int(written[0])
self._bytes_out += w
return out[:w]
def final_detached(
self,
@@ -700,7 +734,9 @@ class Encryptor:
err_num = ffi.errno
err_name = errno.errorcode.get(err_num, f"errno_{err_num}")
raise RuntimeError(f"state encrypt detached final failed: {err_name}")
return out[: int(written[0])], mac
w = int(written[0])
self._bytes_out += w + maclen
return out[:w], mac
class Decryptor:
@@ -710,7 +746,7 @@ class Decryptor:
- final(mac[, into]) -> returns any remaining plaintext bytes
"""
__slots__ = ("_st",)
__slots__ = ("_st", "_bytes_in", "_bytes_out")
def __init__(self, key: Buffer, nonce: Buffer, ad: Buffer | None = None):
"""Create an incremental decryptor for detached tags.
@@ -740,6 +776,22 @@ class Decryptor:
_ptr(key),
)
self._st = st
# Track total ciphertext bytes passed through update() so far
self._bytes_in = 0
self._bytes_out = 0
@property
def bytes_in(self) -> int:
"""Total ciphertext bytes fed to update() so far."""
return self._bytes_in
@property
def bytes_out(self) -> int:
"""Total plaintext bytes produced so far.
Includes bytes written by update() and by final().
"""
return self._bytes_out
def update(self, ct: Buffer, into: Buffer | None = None) -> memoryview:
"""Process a chunk of ciphertext.
@@ -758,10 +810,12 @@ class Decryptor:
RuntimeError: If the C update call fails.
"""
ct = memoryview(ct)
out = into if into is not None else bytearray(ct.nbytes)
produced = calc_update_output_size(self._bytes_in, ct.nbytes)
requirement = produced + ALIGNMENT # libaegis requires larger than actual w
out = into if into is not None else bytearray(requirement)
out = memoryview(out)
if out.nbytes < ct.nbytes:
raise TypeError("into length must be >= len(ciphertext)")
if out.nbytes < requirement:
raise TypeError("into length must be >= required capacity for this update")
written = ffi.new("size_t *")
rc = _lib.aegis128x4_state_decrypt_detached_update(
self._st,
@@ -776,6 +830,8 @@ class Decryptor:
err_name = errno.errorcode.get(err_num, f"errno_{err_num}")
raise RuntimeError(f"state decrypt update failed: {err_name}")
w = int(written[0])
self._bytes_in += ct.nbytes
self._bytes_out += w
return out[:w]
def final(self, mac: Buffer, into: Buffer | None = None) -> memoryview:
@@ -812,6 +868,7 @@ class Decryptor:
if rc != 0:
raise ValueError("authentication failed")
w = int(written[0])
self._bytes_out += w
return out[:w]
@@ -841,6 +898,8 @@ __all__ = [
"ABYTES_MAX",
"TAILBYTES_MAX",
"ALIGNMENT",
# helpers
"calc_update_output_size",
# one-shot functions
"encrypt_detached",
"decrypt_detached",

View File

@@ -21,15 +21,21 @@ TAILBYTES_MAX = _lib.aegis256_tailbytes_max()
ALIGNMENT = 16
def _ptr(buf):
"""Return an ffi pointer for a Python buffer, or NULL for None.
def calc_update_output_size(bytes_in: int, bytes_next: int) -> int:
"""Calculate the number of bytes output by the next incremental update.
Args:
buf: Any object supporting the Python buffer protocol, or None.
bytes_in: Total number of bytes passed to update so far.
bytes_next: Length of the next input chunk.
Returns:
An ffi pointer obtained with ffi.from_buffer, or ffi.NULL when buf is None.
Number of bytes that the next update will write.
"""
remainder = bytes_in % ALIGNMENT
return ((remainder + int(bytes_next)) // ALIGNMENT) * ALIGNMENT
def _ptr(buf):
return ffi.NULL if buf is None else ffi.from_buffer(buf)
@@ -559,7 +565,7 @@ class Encryptor:
- final_detached([ct_into], [mac_into], maclen=16) -> returns (tail_bytes, mac)
"""
__slots__ = ("_st",)
__slots__ = ("_st", "_bytes_in", "_bytes_out")
def __init__(self, key: Buffer, nonce: Buffer, ad: Buffer | None = None):
"""Create an incremental encryptor.
@@ -589,6 +595,22 @@ class Encryptor:
_ptr(key),
)
self._st = st
# Track total plaintext bytes passed through update() so far
self._bytes_in = 0
self._bytes_out = 0
@property
def bytes_in(self) -> int:
"""Total plaintext bytes fed to update() so far."""
return self._bytes_in
@property
def bytes_out(self) -> int:
"""Total ciphertext bytes produced so far.
Includes update() and final()/final_detached() output, also MAC tag.
"""
return self._bytes_out
def update(self, message: Buffer, into: Buffer | None = None) -> memoryview:
"""Encrypt a chunk of the message.
@@ -606,9 +628,13 @@ class Encryptor:
RuntimeError: If the C update call fails.
"""
message = memoryview(message)
out = memoryview(into if into is not None else bytearray(message.nbytes))
if out.nbytes < message.nbytes:
raise TypeError("into length must be >= len(message)")
# Compute exact number of bytes this update can emit (multiple of ALIGNMENT)
expected_out = calc_update_output_size(self._bytes_in, message.nbytes)
out = memoryview(into if into is not None else bytearray(expected_out))
if out.nbytes < expected_out:
raise TypeError(
"into length must be >= expected output size for this update"
)
written = ffi.new("size_t *")
rc = _lib.aegis256_state_encrypt_update(
self._st,
@@ -621,8 +647,14 @@ class Encryptor:
if rc != 0:
err_num = ffi.errno
err_name = errno.errorcode.get(err_num, f"errno_{err_num}")
raise RuntimeError(f"state encrypt update failed: {err_name}")
return out[: int(written[0])]
raise RuntimeError(
f"state encrypt update failed: {err_name} written {written[0]}"
)
w = int(written[0])
# Advance counters for this update call
self._bytes_in += message.nbytes
self._bytes_out += w
return out[:w]
def final(self, into: Buffer | None = None, maclen: int = ABYTES_MIN) -> memoryview:
"""Finalize encryption, writing any remaining bytes and the tag.
@@ -657,7 +689,9 @@ class Encryptor:
err_num = ffi.errno
err_name = errno.errorcode.get(err_num, f"errno_{err_num}")
raise RuntimeError(f"state encrypt final failed: {err_name}")
return out[: int(written[0])]
w = int(written[0])
self._bytes_out += w
return out[:w]
def final_detached(
self,
@@ -700,7 +734,9 @@ class Encryptor:
err_num = ffi.errno
err_name = errno.errorcode.get(err_num, f"errno_{err_num}")
raise RuntimeError(f"state encrypt detached final failed: {err_name}")
return out[: int(written[0])], mac
w = int(written[0])
self._bytes_out += w + maclen
return out[:w], mac
class Decryptor:
@@ -710,7 +746,7 @@ class Decryptor:
- final(mac[, into]) -> returns any remaining plaintext bytes
"""
__slots__ = ("_st",)
__slots__ = ("_st", "_bytes_in", "_bytes_out")
def __init__(self, key: Buffer, nonce: Buffer, ad: Buffer | None = None):
"""Create an incremental decryptor for detached tags.
@@ -740,6 +776,22 @@ class Decryptor:
_ptr(key),
)
self._st = st
# Track total ciphertext bytes passed through update() so far
self._bytes_in = 0
self._bytes_out = 0
@property
def bytes_in(self) -> int:
"""Total ciphertext bytes fed to update() so far."""
return self._bytes_in
@property
def bytes_out(self) -> int:
"""Total plaintext bytes produced so far.
Includes bytes written by update() and by final().
"""
return self._bytes_out
def update(self, ct: Buffer, into: Buffer | None = None) -> memoryview:
"""Process a chunk of ciphertext.
@@ -758,10 +810,12 @@ class Decryptor:
RuntimeError: If the C update call fails.
"""
ct = memoryview(ct)
out = into if into is not None else bytearray(ct.nbytes)
produced = calc_update_output_size(self._bytes_in, ct.nbytes)
requirement = produced + ALIGNMENT # libaegis requires larger than actual w
out = into if into is not None else bytearray(requirement)
out = memoryview(out)
if out.nbytes < ct.nbytes:
raise TypeError("into length must be >= len(ciphertext)")
if out.nbytes < requirement:
raise TypeError("into length must be >= required capacity for this update")
written = ffi.new("size_t *")
rc = _lib.aegis256_state_decrypt_detached_update(
self._st,
@@ -776,6 +830,8 @@ class Decryptor:
err_name = errno.errorcode.get(err_num, f"errno_{err_num}")
raise RuntimeError(f"state decrypt update failed: {err_name}")
w = int(written[0])
self._bytes_in += ct.nbytes
self._bytes_out += w
return out[:w]
def final(self, mac: Buffer, into: Buffer | None = None) -> memoryview:
@@ -812,6 +868,7 @@ class Decryptor:
if rc != 0:
raise ValueError("authentication failed")
w = int(written[0])
self._bytes_out += w
return out[:w]
@@ -841,6 +898,8 @@ __all__ = [
"ABYTES_MAX",
"TAILBYTES_MAX",
"ALIGNMENT",
# helpers
"calc_update_output_size",
# one-shot functions
"encrypt_detached",
"decrypt_detached",

View File

@@ -21,15 +21,21 @@ TAILBYTES_MAX = _lib.aegis256x2_tailbytes_max()
ALIGNMENT = 32
def _ptr(buf):
"""Return an ffi pointer for a Python buffer, or NULL for None.
def calc_update_output_size(bytes_in: int, bytes_next: int) -> int:
"""Calculate the number of bytes output by the next incremental update.
Args:
buf: Any object supporting the Python buffer protocol, or None.
bytes_in: Total number of bytes passed to update so far.
bytes_next: Length of the next input chunk.
Returns:
An ffi pointer obtained with ffi.from_buffer, or ffi.NULL when buf is None.
Number of bytes that the next update will write.
"""
remainder = bytes_in % ALIGNMENT
return ((remainder + int(bytes_next)) // ALIGNMENT) * ALIGNMENT
def _ptr(buf):
return ffi.NULL if buf is None else ffi.from_buffer(buf)
@@ -559,7 +565,7 @@ class Encryptor:
- final_detached([ct_into], [mac_into], maclen=16) -> returns (tail_bytes, mac)
"""
__slots__ = ("_st",)
__slots__ = ("_st", "_bytes_in", "_bytes_out")
def __init__(self, key: Buffer, nonce: Buffer, ad: Buffer | None = None):
"""Create an incremental encryptor.
@@ -589,6 +595,22 @@ class Encryptor:
_ptr(key),
)
self._st = st
# Track total plaintext bytes passed through update() so far
self._bytes_in = 0
self._bytes_out = 0
@property
def bytes_in(self) -> int:
"""Total plaintext bytes fed to update() so far."""
return self._bytes_in
@property
def bytes_out(self) -> int:
"""Total ciphertext bytes produced so far.
Includes update() and final()/final_detached() output, also MAC tag.
"""
return self._bytes_out
def update(self, message: Buffer, into: Buffer | None = None) -> memoryview:
"""Encrypt a chunk of the message.
@@ -606,9 +628,13 @@ class Encryptor:
RuntimeError: If the C update call fails.
"""
message = memoryview(message)
out = memoryview(into if into is not None else bytearray(message.nbytes))
if out.nbytes < message.nbytes:
raise TypeError("into length must be >= len(message)")
# Compute exact number of bytes this update can emit (multiple of ALIGNMENT)
expected_out = calc_update_output_size(self._bytes_in, message.nbytes)
out = memoryview(into if into is not None else bytearray(expected_out))
if out.nbytes < expected_out:
raise TypeError(
"into length must be >= expected output size for this update"
)
written = ffi.new("size_t *")
rc = _lib.aegis256x2_state_encrypt_update(
self._st,
@@ -621,8 +647,14 @@ class Encryptor:
if rc != 0:
err_num = ffi.errno
err_name = errno.errorcode.get(err_num, f"errno_{err_num}")
raise RuntimeError(f"state encrypt update failed: {err_name}")
return out[: int(written[0])]
raise RuntimeError(
f"state encrypt update failed: {err_name} written {written[0]}"
)
w = int(written[0])
# Advance counters for this update call
self._bytes_in += message.nbytes
self._bytes_out += w
return out[:w]
def final(self, into: Buffer | None = None, maclen: int = ABYTES_MIN) -> memoryview:
"""Finalize encryption, writing any remaining bytes and the tag.
@@ -657,7 +689,9 @@ class Encryptor:
err_num = ffi.errno
err_name = errno.errorcode.get(err_num, f"errno_{err_num}")
raise RuntimeError(f"state encrypt final failed: {err_name}")
return out[: int(written[0])]
w = int(written[0])
self._bytes_out += w
return out[:w]
def final_detached(
self,
@@ -700,7 +734,9 @@ class Encryptor:
err_num = ffi.errno
err_name = errno.errorcode.get(err_num, f"errno_{err_num}")
raise RuntimeError(f"state encrypt detached final failed: {err_name}")
return out[: int(written[0])], mac
w = int(written[0])
self._bytes_out += w + maclen
return out[:w], mac
class Decryptor:
@@ -710,7 +746,7 @@ class Decryptor:
- final(mac[, into]) -> returns any remaining plaintext bytes
"""
__slots__ = ("_st",)
__slots__ = ("_st", "_bytes_in", "_bytes_out")
def __init__(self, key: Buffer, nonce: Buffer, ad: Buffer | None = None):
"""Create an incremental decryptor for detached tags.
@@ -740,6 +776,22 @@ class Decryptor:
_ptr(key),
)
self._st = st
# Track total ciphertext bytes passed through update() so far
self._bytes_in = 0
self._bytes_out = 0
@property
def bytes_in(self) -> int:
"""Total ciphertext bytes fed to update() so far."""
return self._bytes_in
@property
def bytes_out(self) -> int:
"""Total plaintext bytes produced so far.
Includes bytes written by update() and by final().
"""
return self._bytes_out
def update(self, ct: Buffer, into: Buffer | None = None) -> memoryview:
"""Process a chunk of ciphertext.
@@ -758,10 +810,12 @@ class Decryptor:
RuntimeError: If the C update call fails.
"""
ct = memoryview(ct)
out = into if into is not None else bytearray(ct.nbytes)
produced = calc_update_output_size(self._bytes_in, ct.nbytes)
requirement = produced + ALIGNMENT # libaegis requires larger than actual w
out = into if into is not None else bytearray(requirement)
out = memoryview(out)
if out.nbytes < ct.nbytes:
raise TypeError("into length must be >= len(ciphertext)")
if out.nbytes < requirement:
raise TypeError("into length must be >= required capacity for this update")
written = ffi.new("size_t *")
rc = _lib.aegis256x2_state_decrypt_detached_update(
self._st,
@@ -776,6 +830,8 @@ class Decryptor:
err_name = errno.errorcode.get(err_num, f"errno_{err_num}")
raise RuntimeError(f"state decrypt update failed: {err_name}")
w = int(written[0])
self._bytes_in += ct.nbytes
self._bytes_out += w
return out[:w]
def final(self, mac: Buffer, into: Buffer | None = None) -> memoryview:
@@ -812,6 +868,7 @@ class Decryptor:
if rc != 0:
raise ValueError("authentication failed")
w = int(written[0])
self._bytes_out += w
return out[:w]
@@ -841,6 +898,8 @@ __all__ = [
"ABYTES_MAX",
"TAILBYTES_MAX",
"ALIGNMENT",
# helpers
"calc_update_output_size",
# one-shot functions
"encrypt_detached",
"decrypt_detached",

View File

@@ -21,15 +21,21 @@ TAILBYTES_MAX = _lib.aegis256x4_tailbytes_max()
ALIGNMENT = 64
def _ptr(buf):
"""Return an ffi pointer for a Python buffer, or NULL for None.
def calc_update_output_size(bytes_in: int, bytes_next: int) -> int:
"""Calculate the number of bytes output by the next incremental update.
Args:
buf: Any object supporting the Python buffer protocol, or None.
bytes_in: Total number of bytes passed to update so far.
bytes_next: Length of the next input chunk.
Returns:
An ffi pointer obtained with ffi.from_buffer, or ffi.NULL when buf is None.
Number of bytes that the next update will write.
"""
remainder = bytes_in % ALIGNMENT
return ((remainder + int(bytes_next)) // ALIGNMENT) * ALIGNMENT
def _ptr(buf):
return ffi.NULL if buf is None else ffi.from_buffer(buf)
@@ -559,7 +565,7 @@ class Encryptor:
- final_detached([ct_into], [mac_into], maclen=16) -> returns (tail_bytes, mac)
"""
__slots__ = ("_st",)
__slots__ = ("_st", "_bytes_in", "_bytes_out")
def __init__(self, key: Buffer, nonce: Buffer, ad: Buffer | None = None):
"""Create an incremental encryptor.
@@ -589,6 +595,22 @@ class Encryptor:
_ptr(key),
)
self._st = st
# Track total plaintext bytes passed through update() so far
self._bytes_in = 0
self._bytes_out = 0
@property
def bytes_in(self) -> int:
"""Total plaintext bytes fed to update() so far."""
return self._bytes_in
@property
def bytes_out(self) -> int:
"""Total ciphertext bytes produced so far.
Includes update() and final()/final_detached() output, also MAC tag.
"""
return self._bytes_out
def update(self, message: Buffer, into: Buffer | None = None) -> memoryview:
"""Encrypt a chunk of the message.
@@ -606,9 +628,13 @@ class Encryptor:
RuntimeError: If the C update call fails.
"""
message = memoryview(message)
out = memoryview(into if into is not None else bytearray(message.nbytes))
if out.nbytes < message.nbytes:
raise TypeError("into length must be >= len(message)")
# Compute exact number of bytes this update can emit (multiple of ALIGNMENT)
expected_out = calc_update_output_size(self._bytes_in, message.nbytes)
out = memoryview(into if into is not None else bytearray(expected_out))
if out.nbytes < expected_out:
raise TypeError(
"into length must be >= expected output size for this update"
)
written = ffi.new("size_t *")
rc = _lib.aegis256x4_state_encrypt_update(
self._st,
@@ -621,8 +647,14 @@ class Encryptor:
if rc != 0:
err_num = ffi.errno
err_name = errno.errorcode.get(err_num, f"errno_{err_num}")
raise RuntimeError(f"state encrypt update failed: {err_name}")
return out[: int(written[0])]
raise RuntimeError(
f"state encrypt update failed: {err_name} written {written[0]}"
)
w = int(written[0])
# Advance counters for this update call
self._bytes_in += message.nbytes
self._bytes_out += w
return out[:w]
def final(self, into: Buffer | None = None, maclen: int = ABYTES_MIN) -> memoryview:
"""Finalize encryption, writing any remaining bytes and the tag.
@@ -657,7 +689,9 @@ class Encryptor:
err_num = ffi.errno
err_name = errno.errorcode.get(err_num, f"errno_{err_num}")
raise RuntimeError(f"state encrypt final failed: {err_name}")
return out[: int(written[0])]
w = int(written[0])
self._bytes_out += w
return out[:w]
def final_detached(
self,
@@ -700,7 +734,9 @@ class Encryptor:
err_num = ffi.errno
err_name = errno.errorcode.get(err_num, f"errno_{err_num}")
raise RuntimeError(f"state encrypt detached final failed: {err_name}")
return out[: int(written[0])], mac
w = int(written[0])
self._bytes_out += w + maclen
return out[:w], mac
class Decryptor:
@@ -710,7 +746,7 @@ class Decryptor:
- final(mac[, into]) -> returns any remaining plaintext bytes
"""
__slots__ = ("_st",)
__slots__ = ("_st", "_bytes_in", "_bytes_out")
def __init__(self, key: Buffer, nonce: Buffer, ad: Buffer | None = None):
"""Create an incremental decryptor for detached tags.
@@ -740,6 +776,22 @@ class Decryptor:
_ptr(key),
)
self._st = st
# Track total ciphertext bytes passed through update() so far
self._bytes_in = 0
self._bytes_out = 0
@property
def bytes_in(self) -> int:
"""Total ciphertext bytes fed to update() so far."""
return self._bytes_in
@property
def bytes_out(self) -> int:
"""Total plaintext bytes produced so far.
Includes bytes written by update() and by final().
"""
return self._bytes_out
def update(self, ct: Buffer, into: Buffer | None = None) -> memoryview:
"""Process a chunk of ciphertext.
@@ -758,10 +810,12 @@ class Decryptor:
RuntimeError: If the C update call fails.
"""
ct = memoryview(ct)
out = into if into is not None else bytearray(ct.nbytes)
produced = calc_update_output_size(self._bytes_in, ct.nbytes)
requirement = produced + ALIGNMENT # libaegis requires larger than actual w
out = into if into is not None else bytearray(requirement)
out = memoryview(out)
if out.nbytes < ct.nbytes:
raise TypeError("into length must be >= len(ciphertext)")
if out.nbytes < requirement:
raise TypeError("into length must be >= required capacity for this update")
written = ffi.new("size_t *")
rc = _lib.aegis256x4_state_decrypt_detached_update(
self._st,
@@ -776,6 +830,8 @@ class Decryptor:
err_name = errno.errorcode.get(err_num, f"errno_{err_num}")
raise RuntimeError(f"state decrypt update failed: {err_name}")
w = int(written[0])
self._bytes_in += ct.nbytes
self._bytes_out += w
return out[:w]
def final(self, mac: Buffer, into: Buffer | None = None) -> memoryview:
@@ -812,6 +868,7 @@ class Decryptor:
if rc != 0:
raise ValueError("authentication failed")
w = int(written[0])
self._bytes_out += w
return out[:w]
@@ -841,6 +898,8 @@ __all__ = [
"ABYTES_MAX",
"TAILBYTES_MAX",
"ALIGNMENT",
# helpers
"calc_update_output_size",
# one-shot functions
"encrypt_detached",
"decrypt_detached",