Wipe state structs automatically after use. Simplified aligned allocator and its use via a single handle.

This commit is contained in:
Leo Vasanko
2025-11-08 20:04:00 -06:00
parent 95563a43d1
commit 75cbc76845
7 changed files with 232 additions and 262 deletions

View File

@@ -441,7 +441,7 @@ class Mac:
mac = a.final()
"""
__slots__ = ("_st", "_owner", "_maclen")
__slots__ = ("_proxy", "_maclen")
def __init__(self, key: Buffer, nonce: Buffer, maclen: int = MACBYTES) -> None:
"""Create a MAC with the given key, nonce, and tag length.
@@ -457,31 +457,29 @@ class Mac:
raise TypeError(f"nonce length must be {NONCEBYTES}")
self._maclen = maclen
st, owner = new_aligned_struct("aegis128l_mac_state", ALIGNMENT)
self._st = st
self._owner = owner
_lib.aegis128l_mac_init(self._st, _ptr(key), _ptr(nonce))
def __deepcopy__(self) -> "Mac":
"""Return a clone of current MAC state."""
clone = object.__new__(Mac)
clone._maclen = self._maclen
clone._st, clone._owner = new_aligned_struct("aegis128l_mac_state", ALIGNMENT)
_lib.aegis128l_mac_state_clone(clone._st, self._st)
return clone
clone = __deepcopy__
self._proxy = new_aligned_struct("aegis128l_mac_state", ALIGNMENT)
_lib.aegis128l_mac_init(self._proxy.ptr, _ptr(key), _ptr(nonce))
def reset(self) -> None:
"""Reset back to the original state, prior to any updates."""
_lib.aegis128l_mac_reset(self._st)
_lib.aegis128l_mac_reset(self._proxy.ptr)
def clone(self) -> "Mac":
"""Return a clone of current MAC state."""
clone = object.__new__(Mac)
clone._maclen = self._maclen
clone._proxy = new_aligned_struct("aegis128l_mac_state", ALIGNMENT)
_lib.aegis128l_mac_state_clone(clone._proxy.ptr, self._proxy.ptr)
return clone
__deepcopy__ = clone
def update(self, data: Buffer) -> None:
"""Update the MAC state with more data.
Repeated calls to update() are equivalent to a single call with the concatenated data.
"""
rc = _lib.aegis128l_mac_update(self._st, _ptr(data), len(data))
rc = _lib.aegis128l_mac_update(self._proxy.ptr, _ptr(data), len(data))
if rc != 0:
err_num = ffi.errno
err_name = errno.errorcode.get(err_num, f"errno_{err_num}")
@@ -511,7 +509,8 @@ class Mac:
raise TypeError("into length must be at least maclen")
out = into
rc = _lib.aegis128l_mac_final(self.clone()._st, ffi.from_buffer(out), maclen)
clone = self.clone()
rc = _lib.aegis128l_mac_final(clone._proxy.ptr, ffi.from_buffer(out), maclen)
if rc != 0:
err_num = ffi.errno
err_name = errno.errorcode.get(err_num, f"errno_{err_num}")
@@ -543,7 +542,7 @@ class Mac:
raise TypeError("mac length must be 16 or 32")
cloned = self.clone()
rc = _lib.aegis128l_mac_verify(cloned._st, _ptr(mac), maclen)
rc = _lib.aegis128l_mac_verify(cloned._proxy.ptr, _ptr(mac), maclen)
if rc != 0:
raise ValueError("mac verification failed")
@@ -555,7 +554,7 @@ class Encryptor:
- final([into]) -> returns MAC tag
"""
__slots__ = ("_st", "_owner", "_bytes_in", "_bytes_out", "_maclen")
__slots__ = ("_state", "_bytes_in", "_bytes_out", "_maclen")
def __init__(
self,
@@ -581,16 +580,14 @@ class Encryptor:
raise TypeError(f"key length must be {KEYBYTES}")
if len(nonce) != NONCEBYTES:
raise TypeError(f"nonce length must be {NONCEBYTES}")
st, owner = new_aligned_struct("aegis128l_state", ALIGNMENT)
self._state = new_aligned_struct("aegis128l_state", ALIGNMENT)
_lib.aegis128l_state_init(
st,
self._state.ptr,
_ptr(ad) if ad is not None else ffi.NULL,
0 if ad is None else len(ad),
_ptr(nonce),
_ptr(key),
)
self._st = st
self._owner = owner
self._bytes_in = 0
self._bytes_out = 0
self._maclen = maclen
@@ -624,7 +621,7 @@ class Encryptor:
TypeError: If destination buffer is too small.
RuntimeError: If the C update call fails or if called after final().
"""
if self._st is None:
if self._state is None:
raise RuntimeError("Cannot call update() after final()")
expected_out = len(message)
out = into if into is not None else bytearray(expected_out)
@@ -635,7 +632,7 @@ class Encryptor:
)
written = ffi.new("size_t *")
rc = _lib.aegis128l_state_encrypt_update(
self._st,
self._state.ptr,
ffi.from_buffer(out_mv),
len(out_mv),
written,
@@ -666,14 +663,14 @@ class Encryptor:
Raises:
RuntimeError: If the C final call fails or if called after final().
"""
if self._st is None:
if self._state is None:
raise RuntimeError("Cannot call final() after final()")
maclen = self._maclen
# Only the authentication tag is produced here; allocate exactly maclen
out = into if into is not None else bytearray(maclen)
written = ffi.new("size_t *")
rc = _lib.aegis128l_state_encrypt_final(
self._st,
self._state.ptr,
ffi.from_buffer(out),
len(out),
written,
@@ -688,8 +685,7 @@ class Encryptor:
# Only the tag bytes are returned when we allocate the buffer
assert w == maclen
self._bytes_out += w
self._st = None
self._owner = None
self._state = None
return out if into is None else memoryview(out)[:w] # type: ignore
@@ -700,7 +696,7 @@ class Decryptor:
- final(mac) -> verifies the MAC tag
"""
__slots__ = ("_st", "_owner", "_bytes_in", "_bytes_out", "_maclen")
__slots__ = ("_state", "_bytes_in", "_bytes_out", "_maclen")
def __init__(
self,
@@ -726,16 +722,14 @@ class Decryptor:
raise TypeError(f"key length must be {KEYBYTES}")
if len(nonce) != NONCEBYTES:
raise TypeError(f"nonce length must be {NONCEBYTES}")
st, owner = new_aligned_struct("aegis128l_state", ALIGNMENT)
self._state = new_aligned_struct("aegis128l_state", ALIGNMENT)
_lib.aegis128l_state_init(
st,
self._state.ptr,
_ptr(ad) if ad is not None else ffi.NULL,
0 if ad is None else len(ad),
_ptr(nonce),
_ptr(key),
)
self._st = st
self._owner = owner
self._bytes_in = 0
self._bytes_out = 0
self._maclen = maclen
@@ -764,7 +758,7 @@ class Decryptor:
TypeError: If destination buffer is too small.
RuntimeError: If the C update call fails or if called after final().
"""
if self._st is None:
if self._state is None:
raise RuntimeError("Cannot call update() after final()")
expected_out = len(ct)
out = into if into is not None else bytearray(expected_out)
@@ -773,7 +767,7 @@ class Decryptor:
raise TypeError("into length must be >= required capacity for this update")
written = ffi.new("size_t *")
rc = _lib.aegis128l_state_decrypt_detached_update(
self._st,
self._state.ptr,
ffi.from_buffer(out_mv),
len(out_mv),
written,
@@ -801,18 +795,17 @@ class Decryptor:
ValueError: If authentication fails.
RuntimeError: If called after final().
"""
if self._st is None:
if self._state is None:
raise RuntimeError("Cannot call final() after final()")
maclen = self._maclen
if len(mac) != maclen:
raise TypeError(f"mac length must be {maclen}")
rc = _lib.aegis128l_state_decrypt_detached_final(
self._st, ffi.NULL, 0, ffi.NULL, _ptr(mac), maclen
self._state.ptr, ffi.NULL, 0, ffi.NULL, _ptr(mac), maclen
)
if rc != 0:
raise ValueError("authentication failed")
self._st = None
self._owner = None
self._state = None
def new_state():

View File

@@ -441,7 +441,7 @@ class Mac:
mac = a.final()
"""
__slots__ = ("_st", "_owner", "_maclen")
__slots__ = ("_proxy", "_maclen")
def __init__(self, key: Buffer, nonce: Buffer, maclen: int = MACBYTES) -> None:
"""Create a MAC with the given key, nonce, and tag length.
@@ -457,31 +457,29 @@ class Mac:
raise TypeError(f"nonce length must be {NONCEBYTES}")
self._maclen = maclen
st, owner = new_aligned_struct("aegis128x2_mac_state", ALIGNMENT)
self._st = st
self._owner = owner
_lib.aegis128x2_mac_init(self._st, _ptr(key), _ptr(nonce))
def __deepcopy__(self) -> "Mac":
"""Return a clone of current MAC state."""
clone = object.__new__(Mac)
clone._maclen = self._maclen
clone._st, clone._owner = new_aligned_struct("aegis128x2_mac_state", ALIGNMENT)
_lib.aegis128x2_mac_state_clone(clone._st, self._st)
return clone
clone = __deepcopy__
self._proxy = new_aligned_struct("aegis128x2_mac_state", ALIGNMENT)
_lib.aegis128x2_mac_init(self._proxy.ptr, _ptr(key), _ptr(nonce))
def reset(self) -> None:
"""Reset back to the original state, prior to any updates."""
_lib.aegis128x2_mac_reset(self._st)
_lib.aegis128x2_mac_reset(self._proxy.ptr)
def clone(self) -> "Mac":
"""Return a clone of current MAC state."""
clone = object.__new__(Mac)
clone._maclen = self._maclen
clone._proxy = new_aligned_struct("aegis128x2_mac_state", ALIGNMENT)
_lib.aegis128x2_mac_state_clone(clone._proxy.ptr, self._proxy.ptr)
return clone
__deepcopy__ = clone
def update(self, data: Buffer) -> None:
"""Update the MAC state with more data.
Repeated calls to update() are equivalent to a single call with the concatenated data.
"""
rc = _lib.aegis128x2_mac_update(self._st, _ptr(data), len(data))
rc = _lib.aegis128x2_mac_update(self._proxy.ptr, _ptr(data), len(data))
if rc != 0:
err_num = ffi.errno
err_name = errno.errorcode.get(err_num, f"errno_{err_num}")
@@ -511,7 +509,8 @@ class Mac:
raise TypeError("into length must be at least maclen")
out = into
rc = _lib.aegis128x2_mac_final(self.clone()._st, ffi.from_buffer(out), maclen)
clone = self.clone()
rc = _lib.aegis128x2_mac_final(clone._proxy.ptr, ffi.from_buffer(out), maclen)
if rc != 0:
err_num = ffi.errno
err_name = errno.errorcode.get(err_num, f"errno_{err_num}")
@@ -543,7 +542,7 @@ class Mac:
raise TypeError("mac length must be 16 or 32")
cloned = self.clone()
rc = _lib.aegis128x2_mac_verify(cloned._st, _ptr(mac), maclen)
rc = _lib.aegis128x2_mac_verify(cloned._proxy.ptr, _ptr(mac), maclen)
if rc != 0:
raise ValueError("mac verification failed")
@@ -555,7 +554,7 @@ class Encryptor:
- final([into]) -> returns MAC tag
"""
__slots__ = ("_st", "_owner", "_bytes_in", "_bytes_out", "_maclen")
__slots__ = ("_state", "_bytes_in", "_bytes_out", "_maclen")
def __init__(
self,
@@ -581,16 +580,14 @@ class Encryptor:
raise TypeError(f"key length must be {KEYBYTES}")
if len(nonce) != NONCEBYTES:
raise TypeError(f"nonce length must be {NONCEBYTES}")
st, owner = new_aligned_struct("aegis128x2_state", ALIGNMENT)
self._state = new_aligned_struct("aegis128x2_state", ALIGNMENT)
_lib.aegis128x2_state_init(
st,
self._state.ptr,
_ptr(ad) if ad is not None else ffi.NULL,
0 if ad is None else len(ad),
_ptr(nonce),
_ptr(key),
)
self._st = st
self._owner = owner
self._bytes_in = 0
self._bytes_out = 0
self._maclen = maclen
@@ -624,7 +621,7 @@ class Encryptor:
TypeError: If destination buffer is too small.
RuntimeError: If the C update call fails or if called after final().
"""
if self._st is None:
if self._state is None:
raise RuntimeError("Cannot call update() after final()")
expected_out = len(message)
out = into if into is not None else bytearray(expected_out)
@@ -635,7 +632,7 @@ class Encryptor:
)
written = ffi.new("size_t *")
rc = _lib.aegis128x2_state_encrypt_update(
self._st,
self._state.ptr,
ffi.from_buffer(out_mv),
len(out_mv),
written,
@@ -666,14 +663,14 @@ class Encryptor:
Raises:
RuntimeError: If the C final call fails or if called after final().
"""
if self._st is None:
if self._state is None:
raise RuntimeError("Cannot call final() after final()")
maclen = self._maclen
# Only the authentication tag is produced here; allocate exactly maclen
out = into if into is not None else bytearray(maclen)
written = ffi.new("size_t *")
rc = _lib.aegis128x2_state_encrypt_final(
self._st,
self._state.ptr,
ffi.from_buffer(out),
len(out),
written,
@@ -688,8 +685,7 @@ class Encryptor:
# Only the tag bytes are returned when we allocate the buffer
assert w == maclen
self._bytes_out += w
self._st = None
self._owner = None
self._state = None
return out if into is None else memoryview(out)[:w] # type: ignore
@@ -700,7 +696,7 @@ class Decryptor:
- final(mac) -> verifies the MAC tag
"""
__slots__ = ("_st", "_owner", "_bytes_in", "_bytes_out", "_maclen")
__slots__ = ("_state", "_bytes_in", "_bytes_out", "_maclen")
def __init__(
self,
@@ -726,16 +722,14 @@ class Decryptor:
raise TypeError(f"key length must be {KEYBYTES}")
if len(nonce) != NONCEBYTES:
raise TypeError(f"nonce length must be {NONCEBYTES}")
st, owner = new_aligned_struct("aegis128x2_state", ALIGNMENT)
self._state = new_aligned_struct("aegis128x2_state", ALIGNMENT)
_lib.aegis128x2_state_init(
st,
self._state.ptr,
_ptr(ad) if ad is not None else ffi.NULL,
0 if ad is None else len(ad),
_ptr(nonce),
_ptr(key),
)
self._st = st
self._owner = owner
self._bytes_in = 0
self._bytes_out = 0
self._maclen = maclen
@@ -764,7 +758,7 @@ class Decryptor:
TypeError: If destination buffer is too small.
RuntimeError: If the C update call fails or if called after final().
"""
if self._st is None:
if self._state is None:
raise RuntimeError("Cannot call update() after final()")
expected_out = len(ct)
out = into if into is not None else bytearray(expected_out)
@@ -773,7 +767,7 @@ class Decryptor:
raise TypeError("into length must be >= required capacity for this update")
written = ffi.new("size_t *")
rc = _lib.aegis128x2_state_decrypt_detached_update(
self._st,
self._state.ptr,
ffi.from_buffer(out_mv),
len(out_mv),
written,
@@ -801,18 +795,17 @@ class Decryptor:
ValueError: If authentication fails.
RuntimeError: If called after final().
"""
if self._st is None:
if self._state is None:
raise RuntimeError("Cannot call final() after final()")
maclen = self._maclen
if len(mac) != maclen:
raise TypeError(f"mac length must be {maclen}")
rc = _lib.aegis128x2_state_decrypt_detached_final(
self._st, ffi.NULL, 0, ffi.NULL, _ptr(mac), maclen
self._state.ptr, ffi.NULL, 0, ffi.NULL, _ptr(mac), maclen
)
if rc != 0:
raise ValueError("authentication failed")
self._st = None
self._owner = None
self._state = None
def new_state():

View File

@@ -441,7 +441,7 @@ class Mac:
mac = a.final()
"""
__slots__ = ("_st", "_owner", "_maclen")
__slots__ = ("_proxy", "_maclen")
def __init__(self, key: Buffer, nonce: Buffer, maclen: int = MACBYTES) -> None:
"""Create a MAC with the given key, nonce, and tag length.
@@ -457,31 +457,29 @@ class Mac:
raise TypeError(f"nonce length must be {NONCEBYTES}")
self._maclen = maclen
st, owner = new_aligned_struct("aegis128x4_mac_state", ALIGNMENT)
self._st = st
self._owner = owner
_lib.aegis128x4_mac_init(self._st, _ptr(key), _ptr(nonce))
def __deepcopy__(self) -> "Mac":
"""Return a clone of current MAC state."""
clone = object.__new__(Mac)
clone._maclen = self._maclen
clone._st, clone._owner = new_aligned_struct("aegis128x4_mac_state", ALIGNMENT)
_lib.aegis128x4_mac_state_clone(clone._st, self._st)
return clone
clone = __deepcopy__
self._proxy = new_aligned_struct("aegis128x4_mac_state", ALIGNMENT)
_lib.aegis128x4_mac_init(self._proxy.ptr, _ptr(key), _ptr(nonce))
def reset(self) -> None:
"""Reset back to the original state, prior to any updates."""
_lib.aegis128x4_mac_reset(self._st)
_lib.aegis128x4_mac_reset(self._proxy.ptr)
def clone(self) -> "Mac":
"""Return a clone of current MAC state."""
clone = object.__new__(Mac)
clone._maclen = self._maclen
clone._proxy = new_aligned_struct("aegis128x4_mac_state", ALIGNMENT)
_lib.aegis128x4_mac_state_clone(clone._proxy.ptr, self._proxy.ptr)
return clone
__deepcopy__ = clone
def update(self, data: Buffer) -> None:
"""Update the MAC state with more data.
Repeated calls to update() are equivalent to a single call with the concatenated data.
"""
rc = _lib.aegis128x4_mac_update(self._st, _ptr(data), len(data))
rc = _lib.aegis128x4_mac_update(self._proxy.ptr, _ptr(data), len(data))
if rc != 0:
err_num = ffi.errno
err_name = errno.errorcode.get(err_num, f"errno_{err_num}")
@@ -511,7 +509,8 @@ class Mac:
raise TypeError("into length must be at least maclen")
out = into
rc = _lib.aegis128x4_mac_final(self.clone()._st, ffi.from_buffer(out), maclen)
clone = self.clone()
rc = _lib.aegis128x4_mac_final(clone._proxy.ptr, ffi.from_buffer(out), maclen)
if rc != 0:
err_num = ffi.errno
err_name = errno.errorcode.get(err_num, f"errno_{err_num}")
@@ -543,7 +542,7 @@ class Mac:
raise TypeError("mac length must be 16 or 32")
cloned = self.clone()
rc = _lib.aegis128x4_mac_verify(cloned._st, _ptr(mac), maclen)
rc = _lib.aegis128x4_mac_verify(cloned._proxy.ptr, _ptr(mac), maclen)
if rc != 0:
raise ValueError("mac verification failed")
@@ -555,7 +554,7 @@ class Encryptor:
- final([into]) -> returns MAC tag
"""
__slots__ = ("_st", "_owner", "_bytes_in", "_bytes_out", "_maclen")
__slots__ = ("_state", "_bytes_in", "_bytes_out", "_maclen")
def __init__(
self,
@@ -581,16 +580,14 @@ class Encryptor:
raise TypeError(f"key length must be {KEYBYTES}")
if len(nonce) != NONCEBYTES:
raise TypeError(f"nonce length must be {NONCEBYTES}")
st, owner = new_aligned_struct("aegis128x4_state", ALIGNMENT)
self._state = new_aligned_struct("aegis128x4_state", ALIGNMENT)
_lib.aegis128x4_state_init(
st,
self._state.ptr,
_ptr(ad) if ad is not None else ffi.NULL,
0 if ad is None else len(ad),
_ptr(nonce),
_ptr(key),
)
self._st = st
self._owner = owner
self._bytes_in = 0
self._bytes_out = 0
self._maclen = maclen
@@ -624,7 +621,7 @@ class Encryptor:
TypeError: If destination buffer is too small.
RuntimeError: If the C update call fails or if called after final().
"""
if self._st is None:
if self._state is None:
raise RuntimeError("Cannot call update() after final()")
expected_out = len(message)
out = into if into is not None else bytearray(expected_out)
@@ -635,7 +632,7 @@ class Encryptor:
)
written = ffi.new("size_t *")
rc = _lib.aegis128x4_state_encrypt_update(
self._st,
self._state.ptr,
ffi.from_buffer(out_mv),
len(out_mv),
written,
@@ -666,14 +663,14 @@ class Encryptor:
Raises:
RuntimeError: If the C final call fails or if called after final().
"""
if self._st is None:
if self._state is None:
raise RuntimeError("Cannot call final() after final()")
maclen = self._maclen
# Only the authentication tag is produced here; allocate exactly maclen
out = into if into is not None else bytearray(maclen)
written = ffi.new("size_t *")
rc = _lib.aegis128x4_state_encrypt_final(
self._st,
self._state.ptr,
ffi.from_buffer(out),
len(out),
written,
@@ -688,8 +685,7 @@ class Encryptor:
# Only the tag bytes are returned when we allocate the buffer
assert w == maclen
self._bytes_out += w
self._st = None
self._owner = None
self._state = None
return out if into is None else memoryview(out)[:w] # type: ignore
@@ -700,7 +696,7 @@ class Decryptor:
- final(mac) -> verifies the MAC tag
"""
__slots__ = ("_st", "_owner", "_bytes_in", "_bytes_out", "_maclen")
__slots__ = ("_state", "_bytes_in", "_bytes_out", "_maclen")
def __init__(
self,
@@ -726,16 +722,14 @@ class Decryptor:
raise TypeError(f"key length must be {KEYBYTES}")
if len(nonce) != NONCEBYTES:
raise TypeError(f"nonce length must be {NONCEBYTES}")
st, owner = new_aligned_struct("aegis128x4_state", ALIGNMENT)
self._state = new_aligned_struct("aegis128x4_state", ALIGNMENT)
_lib.aegis128x4_state_init(
st,
self._state.ptr,
_ptr(ad) if ad is not None else ffi.NULL,
0 if ad is None else len(ad),
_ptr(nonce),
_ptr(key),
)
self._st = st
self._owner = owner
self._bytes_in = 0
self._bytes_out = 0
self._maclen = maclen
@@ -764,7 +758,7 @@ class Decryptor:
TypeError: If destination buffer is too small.
RuntimeError: If the C update call fails or if called after final().
"""
if self._st is None:
if self._state is None:
raise RuntimeError("Cannot call update() after final()")
expected_out = len(ct)
out = into if into is not None else bytearray(expected_out)
@@ -773,7 +767,7 @@ class Decryptor:
raise TypeError("into length must be >= required capacity for this update")
written = ffi.new("size_t *")
rc = _lib.aegis128x4_state_decrypt_detached_update(
self._st,
self._state.ptr,
ffi.from_buffer(out_mv),
len(out_mv),
written,
@@ -801,18 +795,17 @@ class Decryptor:
ValueError: If authentication fails.
RuntimeError: If called after final().
"""
if self._st is None:
if self._state is None:
raise RuntimeError("Cannot call final() after final()")
maclen = self._maclen
if len(mac) != maclen:
raise TypeError(f"mac length must be {maclen}")
rc = _lib.aegis128x4_state_decrypt_detached_final(
self._st, ffi.NULL, 0, ffi.NULL, _ptr(mac), maclen
self._state.ptr, ffi.NULL, 0, ffi.NULL, _ptr(mac), maclen
)
if rc != 0:
raise ValueError("authentication failed")
self._st = None
self._owner = None
self._state = None
def new_state():

View File

@@ -441,7 +441,7 @@ class Mac:
mac = a.final()
"""
__slots__ = ("_st", "_owner", "_maclen")
__slots__ = ("_proxy", "_maclen")
def __init__(self, key: Buffer, nonce: Buffer, maclen: int = MACBYTES) -> None:
"""Create a MAC with the given key, nonce, and tag length.
@@ -457,31 +457,29 @@ class Mac:
raise TypeError(f"nonce length must be {NONCEBYTES}")
self._maclen = maclen
st, owner = new_aligned_struct("aegis256_mac_state", ALIGNMENT)
self._st = st
self._owner = owner
_lib.aegis256_mac_init(self._st, _ptr(key), _ptr(nonce))
def __deepcopy__(self) -> "Mac":
"""Return a clone of current MAC state."""
clone = object.__new__(Mac)
clone._maclen = self._maclen
clone._st, clone._owner = new_aligned_struct("aegis256_mac_state", ALIGNMENT)
_lib.aegis256_mac_state_clone(clone._st, self._st)
return clone
clone = __deepcopy__
self._proxy = new_aligned_struct("aegis256_mac_state", ALIGNMENT)
_lib.aegis256_mac_init(self._proxy.ptr, _ptr(key), _ptr(nonce))
def reset(self) -> None:
"""Reset back to the original state, prior to any updates."""
_lib.aegis256_mac_reset(self._st)
_lib.aegis256_mac_reset(self._proxy.ptr)
def clone(self) -> "Mac":
"""Return a clone of current MAC state."""
clone = object.__new__(Mac)
clone._maclen = self._maclen
clone._proxy = new_aligned_struct("aegis256_mac_state", ALIGNMENT)
_lib.aegis256_mac_state_clone(clone._proxy.ptr, self._proxy.ptr)
return clone
__deepcopy__ = clone
def update(self, data: Buffer) -> None:
"""Update the MAC state with more data.
Repeated calls to update() are equivalent to a single call with the concatenated data.
"""
rc = _lib.aegis256_mac_update(self._st, _ptr(data), len(data))
rc = _lib.aegis256_mac_update(self._proxy.ptr, _ptr(data), len(data))
if rc != 0:
err_num = ffi.errno
err_name = errno.errorcode.get(err_num, f"errno_{err_num}")
@@ -511,7 +509,8 @@ class Mac:
raise TypeError("into length must be at least maclen")
out = into
rc = _lib.aegis256_mac_final(self.clone()._st, ffi.from_buffer(out), maclen)
clone = self.clone()
rc = _lib.aegis256_mac_final(clone._proxy.ptr, ffi.from_buffer(out), maclen)
if rc != 0:
err_num = ffi.errno
err_name = errno.errorcode.get(err_num, f"errno_{err_num}")
@@ -543,7 +542,7 @@ class Mac:
raise TypeError("mac length must be 16 or 32")
cloned = self.clone()
rc = _lib.aegis256_mac_verify(cloned._st, _ptr(mac), maclen)
rc = _lib.aegis256_mac_verify(cloned._proxy.ptr, _ptr(mac), maclen)
if rc != 0:
raise ValueError("mac verification failed")
@@ -555,7 +554,7 @@ class Encryptor:
- final([into]) -> returns MAC tag
"""
__slots__ = ("_st", "_owner", "_bytes_in", "_bytes_out", "_maclen")
__slots__ = ("_state", "_bytes_in", "_bytes_out", "_maclen")
def __init__(
self,
@@ -581,16 +580,14 @@ class Encryptor:
raise TypeError(f"key length must be {KEYBYTES}")
if len(nonce) != NONCEBYTES:
raise TypeError(f"nonce length must be {NONCEBYTES}")
st, owner = new_aligned_struct("aegis256_state", ALIGNMENT)
self._state = new_aligned_struct("aegis256_state", ALIGNMENT)
_lib.aegis256_state_init(
st,
self._state.ptr,
_ptr(ad) if ad is not None else ffi.NULL,
0 if ad is None else len(ad),
_ptr(nonce),
_ptr(key),
)
self._st = st
self._owner = owner
self._bytes_in = 0
self._bytes_out = 0
self._maclen = maclen
@@ -624,7 +621,7 @@ class Encryptor:
TypeError: If destination buffer is too small.
RuntimeError: If the C update call fails or if called after final().
"""
if self._st is None:
if self._state is None:
raise RuntimeError("Cannot call update() after final()")
expected_out = len(message)
out = into if into is not None else bytearray(expected_out)
@@ -635,7 +632,7 @@ class Encryptor:
)
written = ffi.new("size_t *")
rc = _lib.aegis256_state_encrypt_update(
self._st,
self._state.ptr,
ffi.from_buffer(out_mv),
len(out_mv),
written,
@@ -666,14 +663,14 @@ class Encryptor:
Raises:
RuntimeError: If the C final call fails or if called after final().
"""
if self._st is None:
if self._state is None:
raise RuntimeError("Cannot call final() after final()")
maclen = self._maclen
# Only the authentication tag is produced here; allocate exactly maclen
out = into if into is not None else bytearray(maclen)
written = ffi.new("size_t *")
rc = _lib.aegis256_state_encrypt_final(
self._st,
self._state.ptr,
ffi.from_buffer(out),
len(out),
written,
@@ -688,8 +685,7 @@ class Encryptor:
# Only the tag bytes are returned when we allocate the buffer
assert w == maclen
self._bytes_out += w
self._st = None
self._owner = None
self._state = None
return out if into is None else memoryview(out)[:w] # type: ignore
@@ -700,7 +696,7 @@ class Decryptor:
- final(mac) -> verifies the MAC tag
"""
__slots__ = ("_st", "_owner", "_bytes_in", "_bytes_out", "_maclen")
__slots__ = ("_state", "_bytes_in", "_bytes_out", "_maclen")
def __init__(
self,
@@ -726,16 +722,14 @@ class Decryptor:
raise TypeError(f"key length must be {KEYBYTES}")
if len(nonce) != NONCEBYTES:
raise TypeError(f"nonce length must be {NONCEBYTES}")
st, owner = new_aligned_struct("aegis256_state", ALIGNMENT)
self._state = new_aligned_struct("aegis256_state", ALIGNMENT)
_lib.aegis256_state_init(
st,
self._state.ptr,
_ptr(ad) if ad is not None else ffi.NULL,
0 if ad is None else len(ad),
_ptr(nonce),
_ptr(key),
)
self._st = st
self._owner = owner
self._bytes_in = 0
self._bytes_out = 0
self._maclen = maclen
@@ -764,7 +758,7 @@ class Decryptor:
TypeError: If destination buffer is too small.
RuntimeError: If the C update call fails or if called after final().
"""
if self._st is None:
if self._state is None:
raise RuntimeError("Cannot call update() after final()")
expected_out = len(ct)
out = into if into is not None else bytearray(expected_out)
@@ -773,7 +767,7 @@ class Decryptor:
raise TypeError("into length must be >= required capacity for this update")
written = ffi.new("size_t *")
rc = _lib.aegis256_state_decrypt_detached_update(
self._st,
self._state.ptr,
ffi.from_buffer(out_mv),
len(out_mv),
written,
@@ -801,18 +795,17 @@ class Decryptor:
ValueError: If authentication fails.
RuntimeError: If called after final().
"""
if self._st is None:
if self._state is None:
raise RuntimeError("Cannot call final() after final()")
maclen = self._maclen
if len(mac) != maclen:
raise TypeError(f"mac length must be {maclen}")
rc = _lib.aegis256_state_decrypt_detached_final(
self._st, ffi.NULL, 0, ffi.NULL, _ptr(mac), maclen
self._state.ptr, ffi.NULL, 0, ffi.NULL, _ptr(mac), maclen
)
if rc != 0:
raise ValueError("authentication failed")
self._st = None
self._owner = None
self._state = None
def new_state():

View File

@@ -441,7 +441,7 @@ class Mac:
mac = a.final()
"""
__slots__ = ("_st", "_owner", "_maclen")
__slots__ = ("_proxy", "_maclen")
def __init__(self, key: Buffer, nonce: Buffer, maclen: int = MACBYTES) -> None:
"""Create a MAC with the given key, nonce, and tag length.
@@ -457,31 +457,29 @@ class Mac:
raise TypeError(f"nonce length must be {NONCEBYTES}")
self._maclen = maclen
st, owner = new_aligned_struct("aegis256x2_mac_state", ALIGNMENT)
self._st = st
self._owner = owner
_lib.aegis256x2_mac_init(self._st, _ptr(key), _ptr(nonce))
def __deepcopy__(self) -> "Mac":
"""Return a clone of current MAC state."""
clone = object.__new__(Mac)
clone._maclen = self._maclen
clone._st, clone._owner = new_aligned_struct("aegis256x2_mac_state", ALIGNMENT)
_lib.aegis256x2_mac_state_clone(clone._st, self._st)
return clone
clone = __deepcopy__
self._proxy = new_aligned_struct("aegis256x2_mac_state", ALIGNMENT)
_lib.aegis256x2_mac_init(self._proxy.ptr, _ptr(key), _ptr(nonce))
def reset(self) -> None:
"""Reset back to the original state, prior to any updates."""
_lib.aegis256x2_mac_reset(self._st)
_lib.aegis256x2_mac_reset(self._proxy.ptr)
def clone(self) -> "Mac":
"""Return a clone of current MAC state."""
clone = object.__new__(Mac)
clone._maclen = self._maclen
clone._proxy = new_aligned_struct("aegis256x2_mac_state", ALIGNMENT)
_lib.aegis256x2_mac_state_clone(clone._proxy.ptr, self._proxy.ptr)
return clone
__deepcopy__ = clone
def update(self, data: Buffer) -> None:
"""Update the MAC state with more data.
Repeated calls to update() are equivalent to a single call with the concatenated data.
"""
rc = _lib.aegis256x2_mac_update(self._st, _ptr(data), len(data))
rc = _lib.aegis256x2_mac_update(self._proxy.ptr, _ptr(data), len(data))
if rc != 0:
err_num = ffi.errno
err_name = errno.errorcode.get(err_num, f"errno_{err_num}")
@@ -511,7 +509,8 @@ class Mac:
raise TypeError("into length must be at least maclen")
out = into
rc = _lib.aegis256x2_mac_final(self.clone()._st, ffi.from_buffer(out), maclen)
clone = self.clone()
rc = _lib.aegis256x2_mac_final(clone._proxy.ptr, ffi.from_buffer(out), maclen)
if rc != 0:
err_num = ffi.errno
err_name = errno.errorcode.get(err_num, f"errno_{err_num}")
@@ -543,7 +542,7 @@ class Mac:
raise TypeError("mac length must be 16 or 32")
cloned = self.clone()
rc = _lib.aegis256x2_mac_verify(cloned._st, _ptr(mac), maclen)
rc = _lib.aegis256x2_mac_verify(cloned._proxy.ptr, _ptr(mac), maclen)
if rc != 0:
raise ValueError("mac verification failed")
@@ -555,7 +554,7 @@ class Encryptor:
- final([into]) -> returns MAC tag
"""
__slots__ = ("_st", "_owner", "_bytes_in", "_bytes_out", "_maclen")
__slots__ = ("_state", "_bytes_in", "_bytes_out", "_maclen")
def __init__(
self,
@@ -581,16 +580,14 @@ class Encryptor:
raise TypeError(f"key length must be {KEYBYTES}")
if len(nonce) != NONCEBYTES:
raise TypeError(f"nonce length must be {NONCEBYTES}")
st, owner = new_aligned_struct("aegis256x2_state", ALIGNMENT)
self._state = new_aligned_struct("aegis256x2_state", ALIGNMENT)
_lib.aegis256x2_state_init(
st,
self._state.ptr,
_ptr(ad) if ad is not None else ffi.NULL,
0 if ad is None else len(ad),
_ptr(nonce),
_ptr(key),
)
self._st = st
self._owner = owner
self._bytes_in = 0
self._bytes_out = 0
self._maclen = maclen
@@ -624,7 +621,7 @@ class Encryptor:
TypeError: If destination buffer is too small.
RuntimeError: If the C update call fails or if called after final().
"""
if self._st is None:
if self._state is None:
raise RuntimeError("Cannot call update() after final()")
expected_out = len(message)
out = into if into is not None else bytearray(expected_out)
@@ -635,7 +632,7 @@ class Encryptor:
)
written = ffi.new("size_t *")
rc = _lib.aegis256x2_state_encrypt_update(
self._st,
self._state.ptr,
ffi.from_buffer(out_mv),
len(out_mv),
written,
@@ -666,14 +663,14 @@ class Encryptor:
Raises:
RuntimeError: If the C final call fails or if called after final().
"""
if self._st is None:
if self._state is None:
raise RuntimeError("Cannot call final() after final()")
maclen = self._maclen
# Only the authentication tag is produced here; allocate exactly maclen
out = into if into is not None else bytearray(maclen)
written = ffi.new("size_t *")
rc = _lib.aegis256x2_state_encrypt_final(
self._st,
self._state.ptr,
ffi.from_buffer(out),
len(out),
written,
@@ -688,8 +685,7 @@ class Encryptor:
# Only the tag bytes are returned when we allocate the buffer
assert w == maclen
self._bytes_out += w
self._st = None
self._owner = None
self._state = None
return out if into is None else memoryview(out)[:w] # type: ignore
@@ -700,7 +696,7 @@ class Decryptor:
- final(mac) -> verifies the MAC tag
"""
__slots__ = ("_st", "_owner", "_bytes_in", "_bytes_out", "_maclen")
__slots__ = ("_state", "_bytes_in", "_bytes_out", "_maclen")
def __init__(
self,
@@ -726,16 +722,14 @@ class Decryptor:
raise TypeError(f"key length must be {KEYBYTES}")
if len(nonce) != NONCEBYTES:
raise TypeError(f"nonce length must be {NONCEBYTES}")
st, owner = new_aligned_struct("aegis256x2_state", ALIGNMENT)
self._state = new_aligned_struct("aegis256x2_state", ALIGNMENT)
_lib.aegis256x2_state_init(
st,
self._state.ptr,
_ptr(ad) if ad is not None else ffi.NULL,
0 if ad is None else len(ad),
_ptr(nonce),
_ptr(key),
)
self._st = st
self._owner = owner
self._bytes_in = 0
self._bytes_out = 0
self._maclen = maclen
@@ -764,7 +758,7 @@ class Decryptor:
TypeError: If destination buffer is too small.
RuntimeError: If the C update call fails or if called after final().
"""
if self._st is None:
if self._state is None:
raise RuntimeError("Cannot call update() after final()")
expected_out = len(ct)
out = into if into is not None else bytearray(expected_out)
@@ -773,7 +767,7 @@ class Decryptor:
raise TypeError("into length must be >= required capacity for this update")
written = ffi.new("size_t *")
rc = _lib.aegis256x2_state_decrypt_detached_update(
self._st,
self._state.ptr,
ffi.from_buffer(out_mv),
len(out_mv),
written,
@@ -801,18 +795,17 @@ class Decryptor:
ValueError: If authentication fails.
RuntimeError: If called after final().
"""
if self._st is None:
if self._state is None:
raise RuntimeError("Cannot call final() after final()")
maclen = self._maclen
if len(mac) != maclen:
raise TypeError(f"mac length must be {maclen}")
rc = _lib.aegis256x2_state_decrypt_detached_final(
self._st, ffi.NULL, 0, ffi.NULL, _ptr(mac), maclen
self._state.ptr, ffi.NULL, 0, ffi.NULL, _ptr(mac), maclen
)
if rc != 0:
raise ValueError("authentication failed")
self._st = None
self._owner = None
self._state = None
def new_state():

View File

@@ -441,7 +441,7 @@ class Mac:
mac = a.final()
"""
__slots__ = ("_st", "_owner", "_maclen")
__slots__ = ("_proxy", "_maclen")
def __init__(self, key: Buffer, nonce: Buffer, maclen: int = MACBYTES) -> None:
"""Create a MAC with the given key, nonce, and tag length.
@@ -457,31 +457,29 @@ class Mac:
raise TypeError(f"nonce length must be {NONCEBYTES}")
self._maclen = maclen
st, owner = new_aligned_struct("aegis256x4_mac_state", ALIGNMENT)
self._st = st
self._owner = owner
_lib.aegis256x4_mac_init(self._st, _ptr(key), _ptr(nonce))
def __deepcopy__(self) -> "Mac":
"""Return a clone of current MAC state."""
clone = object.__new__(Mac)
clone._maclen = self._maclen
clone._st, clone._owner = new_aligned_struct("aegis256x4_mac_state", ALIGNMENT)
_lib.aegis256x4_mac_state_clone(clone._st, self._st)
return clone
clone = __deepcopy__
self._proxy = new_aligned_struct("aegis256x4_mac_state", ALIGNMENT)
_lib.aegis256x4_mac_init(self._proxy.ptr, _ptr(key), _ptr(nonce))
def reset(self) -> None:
"""Reset back to the original state, prior to any updates."""
_lib.aegis256x4_mac_reset(self._st)
_lib.aegis256x4_mac_reset(self._proxy.ptr)
def clone(self) -> "Mac":
"""Return a clone of current MAC state."""
clone = object.__new__(Mac)
clone._maclen = self._maclen
clone._proxy = new_aligned_struct("aegis256x4_mac_state", ALIGNMENT)
_lib.aegis256x4_mac_state_clone(clone._proxy.ptr, self._proxy.ptr)
return clone
__deepcopy__ = clone
def update(self, data: Buffer) -> None:
"""Update the MAC state with more data.
Repeated calls to update() are equivalent to a single call with the concatenated data.
"""
rc = _lib.aegis256x4_mac_update(self._st, _ptr(data), len(data))
rc = _lib.aegis256x4_mac_update(self._proxy.ptr, _ptr(data), len(data))
if rc != 0:
err_num = ffi.errno
err_name = errno.errorcode.get(err_num, f"errno_{err_num}")
@@ -511,7 +509,8 @@ class Mac:
raise TypeError("into length must be at least maclen")
out = into
rc = _lib.aegis256x4_mac_final(self.clone()._st, ffi.from_buffer(out), maclen)
clone = self.clone()
rc = _lib.aegis256x4_mac_final(clone._proxy.ptr, ffi.from_buffer(out), maclen)
if rc != 0:
err_num = ffi.errno
err_name = errno.errorcode.get(err_num, f"errno_{err_num}")
@@ -543,7 +542,7 @@ class Mac:
raise TypeError("mac length must be 16 or 32")
cloned = self.clone()
rc = _lib.aegis256x4_mac_verify(cloned._st, _ptr(mac), maclen)
rc = _lib.aegis256x4_mac_verify(cloned._proxy.ptr, _ptr(mac), maclen)
if rc != 0:
raise ValueError("mac verification failed")
@@ -555,7 +554,7 @@ class Encryptor:
- final([into]) -> returns MAC tag
"""
__slots__ = ("_st", "_owner", "_bytes_in", "_bytes_out", "_maclen")
__slots__ = ("_state", "_bytes_in", "_bytes_out", "_maclen")
def __init__(
self,
@@ -581,16 +580,14 @@ class Encryptor:
raise TypeError(f"key length must be {KEYBYTES}")
if len(nonce) != NONCEBYTES:
raise TypeError(f"nonce length must be {NONCEBYTES}")
st, owner = new_aligned_struct("aegis256x4_state", ALIGNMENT)
self._state = new_aligned_struct("aegis256x4_state", ALIGNMENT)
_lib.aegis256x4_state_init(
st,
self._state.ptr,
_ptr(ad) if ad is not None else ffi.NULL,
0 if ad is None else len(ad),
_ptr(nonce),
_ptr(key),
)
self._st = st
self._owner = owner
self._bytes_in = 0
self._bytes_out = 0
self._maclen = maclen
@@ -624,7 +621,7 @@ class Encryptor:
TypeError: If destination buffer is too small.
RuntimeError: If the C update call fails or if called after final().
"""
if self._st is None:
if self._state is None:
raise RuntimeError("Cannot call update() after final()")
expected_out = len(message)
out = into if into is not None else bytearray(expected_out)
@@ -635,7 +632,7 @@ class Encryptor:
)
written = ffi.new("size_t *")
rc = _lib.aegis256x4_state_encrypt_update(
self._st,
self._state.ptr,
ffi.from_buffer(out_mv),
len(out_mv),
written,
@@ -666,14 +663,14 @@ class Encryptor:
Raises:
RuntimeError: If the C final call fails or if called after final().
"""
if self._st is None:
if self._state is None:
raise RuntimeError("Cannot call final() after final()")
maclen = self._maclen
# Only the authentication tag is produced here; allocate exactly maclen
out = into if into is not None else bytearray(maclen)
written = ffi.new("size_t *")
rc = _lib.aegis256x4_state_encrypt_final(
self._st,
self._state.ptr,
ffi.from_buffer(out),
len(out),
written,
@@ -688,8 +685,7 @@ class Encryptor:
# Only the tag bytes are returned when we allocate the buffer
assert w == maclen
self._bytes_out += w
self._st = None
self._owner = None
self._state = None
return out if into is None else memoryview(out)[:w] # type: ignore
@@ -700,7 +696,7 @@ class Decryptor:
- final(mac) -> verifies the MAC tag
"""
__slots__ = ("_st", "_owner", "_bytes_in", "_bytes_out", "_maclen")
__slots__ = ("_state", "_bytes_in", "_bytes_out", "_maclen")
def __init__(
self,
@@ -726,16 +722,14 @@ class Decryptor:
raise TypeError(f"key length must be {KEYBYTES}")
if len(nonce) != NONCEBYTES:
raise TypeError(f"nonce length must be {NONCEBYTES}")
st, owner = new_aligned_struct("aegis256x4_state", ALIGNMENT)
self._state = new_aligned_struct("aegis256x4_state", ALIGNMENT)
_lib.aegis256x4_state_init(
st,
self._state.ptr,
_ptr(ad) if ad is not None else ffi.NULL,
0 if ad is None else len(ad),
_ptr(nonce),
_ptr(key),
)
self._st = st
self._owner = owner
self._bytes_in = 0
self._bytes_out = 0
self._maclen = maclen
@@ -764,7 +758,7 @@ class Decryptor:
TypeError: If destination buffer is too small.
RuntimeError: If the C update call fails or if called after final().
"""
if self._st is None:
if self._state is None:
raise RuntimeError("Cannot call update() after final()")
expected_out = len(ct)
out = into if into is not None else bytearray(expected_out)
@@ -773,7 +767,7 @@ class Decryptor:
raise TypeError("into length must be >= required capacity for this update")
written = ffi.new("size_t *")
rc = _lib.aegis256x4_state_decrypt_detached_update(
self._st,
self._state.ptr,
ffi.from_buffer(out_mv),
len(out_mv),
written,
@@ -801,18 +795,17 @@ class Decryptor:
ValueError: If authentication fails.
RuntimeError: If called after final().
"""
if self._st is None:
if self._state is None:
raise RuntimeError("Cannot call final() after final()")
maclen = self._maclen
if len(mac) != maclen:
raise TypeError(f"mac length must be {maclen}")
rc = _lib.aegis256x4_state_decrypt_detached_final(
self._st, ffi.NULL, 0, ffi.NULL, _ptr(mac), maclen
self._state.ptr, ffi.NULL, 0, ffi.NULL, _ptr(mac), maclen
)
if rc != 0:
raise ValueError("authentication failed")
self._st = None
self._owner = None
self._state = None
def new_state():

View File

@@ -4,8 +4,6 @@ Currently provides Python-side aligned allocation helpers that avoid relying
on libc/posix_memalign. Memory is owned by Python; C code only borrows it.
"""
from __future__ import annotations
from typing import Protocol
from ._loader import ffi
@@ -13,7 +11,7 @@ from ._loader import ffi
__all__ = ["new_aligned_struct", "aligned_address", "Buffer", "nonce_increment", "wipe"]
try:
from collections.abc import Buffer as _Buffer
from collections.abc import Buffer as _Buffer # type: ignore[misc]
class Buffer(_Buffer, Protocol): # type: ignore[misc]
def __len__(self) -> int: ...
@@ -29,22 +27,36 @@ def aligned_address(obj) -> int:
return int(ffi.cast("uintptr_t", ffi.addressof(obj, 0)))
def new_aligned_struct(ctype: str, alignment: int) -> tuple[object, object]:
"""Allocate memory for one instance of ``ctype`` with requested alignment.
class StructHolder:
"""Proxy object for aligned struct allocation.
This allocates a Python-owned unsigned char[] buffer large enough to find
an aligned start address. Returns (ptr, owner) where ptr is a ``ctype *``
and owner is the buffer object keeping the memory alive.
Exposes the aligned pointer as a property and wipes the buffer on deletion.
"""
if alignment & (alignment - 1): # Not power of two
raise ValueError("alignment must be a power of two")
def __init__(self, ptr: object, view: memoryview):
self._ptr = ptr
self._view = view # Keep memoryview slice and its bytearray alive
@property
def ptr(self) -> object:
"""The aligned pointer to the struct."""
return self._ptr
def __del__(self):
wipe(self._view)
del self._ptr, self._view
def new_aligned_struct(ctype: str, alignment: int) -> StructHolder:
"""Allocate memory for one instance of ``ctype`` with requested alignment."""
# Allocate backing storage with extra space for alignment
size = ffi.sizeof(ctype)
base = ffi.new("unsigned char[]", size + alignment - 1)
addr = aligned_address(base)
offset = (-addr) & (alignment - 1)
aligned_uc = ffi.addressof(base, offset)
ptr = ffi.cast(f"{ctype} *", aligned_uc)
return ptr, base
view = memoryview(bytearray(size + alignment - 1))
# Compute alignment offset from the base address
offset = (-aligned_address(ffi.from_buffer(view))) & (alignment - 1)
# Slice the memoryview to the aligned region (keeps bytearray alive)
view = view[offset : offset + size]
return StructHolder(ffi.from_buffer(f"{ctype} *", view), view)
def nonce_increment(nonce: Buffer) -> None: