Make Mac class prevent further updates or final after finalisation. Keep cached values for hashlib API.
This commit is contained in:
@@ -77,6 +77,8 @@ Stateful classes that can be used for processing the data in separate chunks:
|
||||
- update(ct_chunk[, into]) -> plaintext_chunk
|
||||
- final(mac) -> raises ValueError on failure
|
||||
|
||||
The object releases its state and becomes unusable after final has been called.
|
||||
|
||||
### Message Authentication Code
|
||||
|
||||
No encryption, but prevents changes to the data without the correct key.
|
||||
@@ -88,8 +90,10 @@ No encryption, but prevents changes to the data without the correct key.
|
||||
- verify(mac) -> raises ValueError on failure
|
||||
- digest() -> bytes
|
||||
- hexdigest() -> str
|
||||
- reset()
|
||||
- clone() -> Mac
|
||||
|
||||
The `Mac` class follows the Python hashlib API for compatibility with code expecting hash objects. Finalizing does not alter the state, so further updates appending to the already input data can be issued even after calling the other methods that calculate the MAC.
|
||||
The `Mac` class follows the Python hashlib API for compatibility with code expecting hash objects. After calling `final()`, `digest()`, or `hexdigest()`, the Mac object becomes unusable for further `update()` operations. However, `digest()` and `hexdigest()` cache their results and can be called multiple times. Use `reset()` to clear the state and start over, or `clone()` to create a copy before finalizing.
|
||||
|
||||
### Keystream generation
|
||||
|
||||
|
||||
@@ -4,6 +4,7 @@
|
||||
|
||||
import errno
|
||||
import secrets
|
||||
from typing import Literal
|
||||
|
||||
from ._loader import ffi
|
||||
from ._loader import lib as _lib
|
||||
@@ -433,15 +434,22 @@ def mac(
|
||||
|
||||
|
||||
class Mac:
|
||||
"""AEGIS-128L MAC state wrapper.
|
||||
"""MAC calculation and verification with incremental updates.
|
||||
|
||||
Example:
|
||||
a = Mac(key, nonce)
|
||||
a.update(data)
|
||||
...
|
||||
mac = a.final()
|
||||
|
||||
Hashlib compatible interface:
|
||||
a = Mac(key, nonce)
|
||||
a.update(data)
|
||||
bytes_mac = a.digest()
|
||||
hex_mac = a.hexdigest()
|
||||
"""
|
||||
|
||||
__slots__ = ("_proxy", "_maclen")
|
||||
__slots__ = ("_proxy", "_maclen", "_cached_digest")
|
||||
|
||||
def __init__(self, key: Buffer, nonce: Buffer, maclen: int = MACBYTES) -> None:
|
||||
"""Create a MAC with the given key, nonce, and tag length.
|
||||
@@ -459,10 +467,12 @@ class Mac:
|
||||
self._maclen = maclen
|
||||
self._proxy = new_aligned_struct("aegis128l_mac_state", ALIGNMENT)
|
||||
_lib.aegis128l_mac_init(self._proxy.ptr, _ptr(key), _ptr(nonce))
|
||||
self._cached_digest: None | Literal[False] | bytes = None
|
||||
|
||||
def reset(self) -> None:
|
||||
"""Reset back to the original state, prior to any updates."""
|
||||
_lib.aegis128l_mac_reset(self._proxy.ptr)
|
||||
self._cached_digest = None
|
||||
|
||||
def clone(self) -> "Mac":
|
||||
"""Return a clone of current MAC state."""
|
||||
@@ -470,6 +480,7 @@ class Mac:
|
||||
clone._maclen = self._maclen
|
||||
clone._proxy = new_aligned_struct("aegis128l_mac_state", ALIGNMENT)
|
||||
_lib.aegis128l_mac_state_clone(clone._proxy.ptr, self._proxy.ptr)
|
||||
clone._cached_digest = self._cached_digest
|
||||
return clone
|
||||
|
||||
__deepcopy__ = clone
|
||||
@@ -479,6 +490,8 @@ class Mac:
|
||||
|
||||
Repeated calls to update() are equivalent to a single call with the concatenated data.
|
||||
"""
|
||||
if self._cached_digest is not None:
|
||||
raise RuntimeError("Cannot update after final()")
|
||||
rc = _lib.aegis128l_mac_update(self._proxy.ptr, _ptr(data), len(data))
|
||||
if rc != 0:
|
||||
err_num = ffi.errno
|
||||
@@ -488,8 +501,8 @@ class Mac:
|
||||
def final(self, into: Buffer | None = None) -> bytearray | memoryview:
|
||||
"""Calculate and return the MAC tag for the currently input data.
|
||||
|
||||
Unlike the C library, this method does not alter the current state,
|
||||
allowing for multiple calls and further updates on the same object.
|
||||
This method can only be called once. After calling it, the MAC becomes unusable
|
||||
for further updates or calls to final().
|
||||
|
||||
Args:
|
||||
into: Optional buffer to write the tag into (default: bytearray created).
|
||||
@@ -499,8 +512,12 @@ class Mac:
|
||||
|
||||
Raises:
|
||||
TypeError: If lengths are invalid.
|
||||
RuntimeError: If finalization fails in the C library.
|
||||
RuntimeError: If finalization fails in the C library or if already finalized.
|
||||
"""
|
||||
if self._cached_digest is not None:
|
||||
raise RuntimeError(
|
||||
"The MAC can only be calculated once. Use reset() to start over, or clone() before finalizing to continue."
|
||||
)
|
||||
maclen = self._maclen
|
||||
if into is None:
|
||||
out = bytearray(maclen)
|
||||
@@ -515,14 +532,27 @@ class Mac:
|
||||
err_num = ffi.errno
|
||||
err_name = errno.errorcode.get(err_num, f"errno_{err_num}")
|
||||
raise RuntimeError(f"mac final failed: {err_name}")
|
||||
self._cached_digest = False
|
||||
return out if into is None else memoryview(out)[:maclen] # type: ignore
|
||||
|
||||
def digest(self) -> bytes:
|
||||
"""Calculate and return the MAC tag as bytes."""
|
||||
return bytes(self.final())
|
||||
"""Calculate and return the MAC tag as bytes.
|
||||
|
||||
After calling this method, the MAC becomes unusable for further updates.
|
||||
The result is cached and subsequent calls return the same value.
|
||||
Can be called after final() to get the cached digest.
|
||||
"""
|
||||
if self._cached_digest:
|
||||
return self._cached_digest
|
||||
self._cached_digest = bytes(self.final()) # Overrides the False set by final()
|
||||
return self._cached_digest
|
||||
|
||||
def hexdigest(self) -> str:
|
||||
"""Calculate and return the MAC tag as a hex string."""
|
||||
"""Calculate and return the MAC tag as a hex string.
|
||||
|
||||
After calling this method, the MAC becomes unusable for further updates.
|
||||
The result is cached and subsequent calls return the same value.
|
||||
"""
|
||||
return self.digest().hex()
|
||||
|
||||
def verify(self, mac: Buffer):
|
||||
|
||||
@@ -4,6 +4,7 @@
|
||||
|
||||
import errno
|
||||
import secrets
|
||||
from typing import Literal
|
||||
|
||||
from ._loader import ffi
|
||||
from ._loader import lib as _lib
|
||||
@@ -433,15 +434,22 @@ def mac(
|
||||
|
||||
|
||||
class Mac:
|
||||
"""AEGIS-128X2 MAC state wrapper.
|
||||
"""MAC calculation and verification with incremental updates.
|
||||
|
||||
Example:
|
||||
a = Mac(key, nonce)
|
||||
a.update(data)
|
||||
...
|
||||
mac = a.final()
|
||||
|
||||
Hashlib compatible interface:
|
||||
a = Mac(key, nonce)
|
||||
a.update(data)
|
||||
bytes_mac = a.digest()
|
||||
hex_mac = a.hexdigest()
|
||||
"""
|
||||
|
||||
__slots__ = ("_proxy", "_maclen")
|
||||
__slots__ = ("_proxy", "_maclen", "_cached_digest")
|
||||
|
||||
def __init__(self, key: Buffer, nonce: Buffer, maclen: int = MACBYTES) -> None:
|
||||
"""Create a MAC with the given key, nonce, and tag length.
|
||||
@@ -459,10 +467,12 @@ class Mac:
|
||||
self._maclen = maclen
|
||||
self._proxy = new_aligned_struct("aegis128x2_mac_state", ALIGNMENT)
|
||||
_lib.aegis128x2_mac_init(self._proxy.ptr, _ptr(key), _ptr(nonce))
|
||||
self._cached_digest: None | Literal[False] | bytes = None
|
||||
|
||||
def reset(self) -> None:
|
||||
"""Reset back to the original state, prior to any updates."""
|
||||
_lib.aegis128x2_mac_reset(self._proxy.ptr)
|
||||
self._cached_digest = None
|
||||
|
||||
def clone(self) -> "Mac":
|
||||
"""Return a clone of current MAC state."""
|
||||
@@ -470,6 +480,7 @@ class Mac:
|
||||
clone._maclen = self._maclen
|
||||
clone._proxy = new_aligned_struct("aegis128x2_mac_state", ALIGNMENT)
|
||||
_lib.aegis128x2_mac_state_clone(clone._proxy.ptr, self._proxy.ptr)
|
||||
clone._cached_digest = self._cached_digest
|
||||
return clone
|
||||
|
||||
__deepcopy__ = clone
|
||||
@@ -479,6 +490,8 @@ class Mac:
|
||||
|
||||
Repeated calls to update() are equivalent to a single call with the concatenated data.
|
||||
"""
|
||||
if self._cached_digest is not None:
|
||||
raise RuntimeError("Cannot update after final()")
|
||||
rc = _lib.aegis128x2_mac_update(self._proxy.ptr, _ptr(data), len(data))
|
||||
if rc != 0:
|
||||
err_num = ffi.errno
|
||||
@@ -488,8 +501,8 @@ class Mac:
|
||||
def final(self, into: Buffer | None = None) -> bytearray | memoryview:
|
||||
"""Calculate and return the MAC tag for the currently input data.
|
||||
|
||||
Unlike the C library, this method does not alter the current state,
|
||||
allowing for multiple calls and further updates on the same object.
|
||||
This method can only be called once. After calling it, the MAC becomes unusable
|
||||
for further updates or calls to final().
|
||||
|
||||
Args:
|
||||
into: Optional buffer to write the tag into (default: bytearray created).
|
||||
@@ -499,8 +512,12 @@ class Mac:
|
||||
|
||||
Raises:
|
||||
TypeError: If lengths are invalid.
|
||||
RuntimeError: If finalization fails in the C library.
|
||||
RuntimeError: If finalization fails in the C library or if already finalized.
|
||||
"""
|
||||
if self._cached_digest is not None:
|
||||
raise RuntimeError(
|
||||
"The MAC can only be calculated once. Use reset() to start over, or clone() before finalizing to continue."
|
||||
)
|
||||
maclen = self._maclen
|
||||
if into is None:
|
||||
out = bytearray(maclen)
|
||||
@@ -515,14 +532,27 @@ class Mac:
|
||||
err_num = ffi.errno
|
||||
err_name = errno.errorcode.get(err_num, f"errno_{err_num}")
|
||||
raise RuntimeError(f"mac final failed: {err_name}")
|
||||
self._cached_digest = False
|
||||
return out if into is None else memoryview(out)[:maclen] # type: ignore
|
||||
|
||||
def digest(self) -> bytes:
|
||||
"""Calculate and return the MAC tag as bytes."""
|
||||
return bytes(self.final())
|
||||
"""Calculate and return the MAC tag as bytes.
|
||||
|
||||
After calling this method, the MAC becomes unusable for further updates.
|
||||
The result is cached and subsequent calls return the same value.
|
||||
Can be called after final() to get the cached digest.
|
||||
"""
|
||||
if self._cached_digest:
|
||||
return self._cached_digest
|
||||
self._cached_digest = bytes(self.final()) # Overrides the False set by final()
|
||||
return self._cached_digest
|
||||
|
||||
def hexdigest(self) -> str:
|
||||
"""Calculate and return the MAC tag as a hex string."""
|
||||
"""Calculate and return the MAC tag as a hex string.
|
||||
|
||||
After calling this method, the MAC becomes unusable for further updates.
|
||||
The result is cached and subsequent calls return the same value.
|
||||
"""
|
||||
return self.digest().hex()
|
||||
|
||||
def verify(self, mac: Buffer):
|
||||
|
||||
@@ -4,6 +4,7 @@
|
||||
|
||||
import errno
|
||||
import secrets
|
||||
from typing import Literal
|
||||
|
||||
from ._loader import ffi
|
||||
from ._loader import lib as _lib
|
||||
@@ -433,15 +434,22 @@ def mac(
|
||||
|
||||
|
||||
class Mac:
|
||||
"""AEGIS-128X4 MAC state wrapper.
|
||||
"""MAC calculation and verification with incremental updates.
|
||||
|
||||
Example:
|
||||
a = Mac(key, nonce)
|
||||
a.update(data)
|
||||
...
|
||||
mac = a.final()
|
||||
|
||||
Hashlib compatible interface:
|
||||
a = Mac(key, nonce)
|
||||
a.update(data)
|
||||
bytes_mac = a.digest()
|
||||
hex_mac = a.hexdigest()
|
||||
"""
|
||||
|
||||
__slots__ = ("_proxy", "_maclen")
|
||||
__slots__ = ("_proxy", "_maclen", "_cached_digest")
|
||||
|
||||
def __init__(self, key: Buffer, nonce: Buffer, maclen: int = MACBYTES) -> None:
|
||||
"""Create a MAC with the given key, nonce, and tag length.
|
||||
@@ -459,10 +467,12 @@ class Mac:
|
||||
self._maclen = maclen
|
||||
self._proxy = new_aligned_struct("aegis128x4_mac_state", ALIGNMENT)
|
||||
_lib.aegis128x4_mac_init(self._proxy.ptr, _ptr(key), _ptr(nonce))
|
||||
self._cached_digest: None | Literal[False] | bytes = None
|
||||
|
||||
def reset(self) -> None:
|
||||
"""Reset back to the original state, prior to any updates."""
|
||||
_lib.aegis128x4_mac_reset(self._proxy.ptr)
|
||||
self._cached_digest = None
|
||||
|
||||
def clone(self) -> "Mac":
|
||||
"""Return a clone of current MAC state."""
|
||||
@@ -470,6 +480,7 @@ class Mac:
|
||||
clone._maclen = self._maclen
|
||||
clone._proxy = new_aligned_struct("aegis128x4_mac_state", ALIGNMENT)
|
||||
_lib.aegis128x4_mac_state_clone(clone._proxy.ptr, self._proxy.ptr)
|
||||
clone._cached_digest = self._cached_digest
|
||||
return clone
|
||||
|
||||
__deepcopy__ = clone
|
||||
@@ -479,6 +490,8 @@ class Mac:
|
||||
|
||||
Repeated calls to update() are equivalent to a single call with the concatenated data.
|
||||
"""
|
||||
if self._cached_digest is not None:
|
||||
raise RuntimeError("Cannot update after final()")
|
||||
rc = _lib.aegis128x4_mac_update(self._proxy.ptr, _ptr(data), len(data))
|
||||
if rc != 0:
|
||||
err_num = ffi.errno
|
||||
@@ -488,8 +501,8 @@ class Mac:
|
||||
def final(self, into: Buffer | None = None) -> bytearray | memoryview:
|
||||
"""Calculate and return the MAC tag for the currently input data.
|
||||
|
||||
Unlike the C library, this method does not alter the current state,
|
||||
allowing for multiple calls and further updates on the same object.
|
||||
This method can only be called once. After calling it, the MAC becomes unusable
|
||||
for further updates or calls to final().
|
||||
|
||||
Args:
|
||||
into: Optional buffer to write the tag into (default: bytearray created).
|
||||
@@ -499,8 +512,12 @@ class Mac:
|
||||
|
||||
Raises:
|
||||
TypeError: If lengths are invalid.
|
||||
RuntimeError: If finalization fails in the C library.
|
||||
RuntimeError: If finalization fails in the C library or if already finalized.
|
||||
"""
|
||||
if self._cached_digest is not None:
|
||||
raise RuntimeError(
|
||||
"The MAC can only be calculated once. Use reset() to start over, or clone() before finalizing to continue."
|
||||
)
|
||||
maclen = self._maclen
|
||||
if into is None:
|
||||
out = bytearray(maclen)
|
||||
@@ -515,14 +532,27 @@ class Mac:
|
||||
err_num = ffi.errno
|
||||
err_name = errno.errorcode.get(err_num, f"errno_{err_num}")
|
||||
raise RuntimeError(f"mac final failed: {err_name}")
|
||||
self._cached_digest = False
|
||||
return out if into is None else memoryview(out)[:maclen] # type: ignore
|
||||
|
||||
def digest(self) -> bytes:
|
||||
"""Calculate and return the MAC tag as bytes."""
|
||||
return bytes(self.final())
|
||||
"""Calculate and return the MAC tag as bytes.
|
||||
|
||||
After calling this method, the MAC becomes unusable for further updates.
|
||||
The result is cached and subsequent calls return the same value.
|
||||
Can be called after final() to get the cached digest.
|
||||
"""
|
||||
if self._cached_digest:
|
||||
return self._cached_digest
|
||||
self._cached_digest = bytes(self.final()) # Overrides the False set by final()
|
||||
return self._cached_digest
|
||||
|
||||
def hexdigest(self) -> str:
|
||||
"""Calculate and return the MAC tag as a hex string."""
|
||||
"""Calculate and return the MAC tag as a hex string.
|
||||
|
||||
After calling this method, the MAC becomes unusable for further updates.
|
||||
The result is cached and subsequent calls return the same value.
|
||||
"""
|
||||
return self.digest().hex()
|
||||
|
||||
def verify(self, mac: Buffer):
|
||||
|
||||
@@ -4,6 +4,7 @@
|
||||
|
||||
import errno
|
||||
import secrets
|
||||
from typing import Literal
|
||||
|
||||
from ._loader import ffi
|
||||
from ._loader import lib as _lib
|
||||
@@ -433,15 +434,22 @@ def mac(
|
||||
|
||||
|
||||
class Mac:
|
||||
"""AEGIS-256 MAC state wrapper.
|
||||
"""MAC calculation and verification with incremental updates.
|
||||
|
||||
Example:
|
||||
a = Mac(key, nonce)
|
||||
a.update(data)
|
||||
...
|
||||
mac = a.final()
|
||||
|
||||
Hashlib compatible interface:
|
||||
a = Mac(key, nonce)
|
||||
a.update(data)
|
||||
bytes_mac = a.digest()
|
||||
hex_mac = a.hexdigest()
|
||||
"""
|
||||
|
||||
__slots__ = ("_proxy", "_maclen")
|
||||
__slots__ = ("_proxy", "_maclen", "_cached_digest")
|
||||
|
||||
def __init__(self, key: Buffer, nonce: Buffer, maclen: int = MACBYTES) -> None:
|
||||
"""Create a MAC with the given key, nonce, and tag length.
|
||||
@@ -459,10 +467,12 @@ class Mac:
|
||||
self._maclen = maclen
|
||||
self._proxy = new_aligned_struct("aegis256_mac_state", ALIGNMENT)
|
||||
_lib.aegis256_mac_init(self._proxy.ptr, _ptr(key), _ptr(nonce))
|
||||
self._cached_digest: None | Literal[False] | bytes = None
|
||||
|
||||
def reset(self) -> None:
|
||||
"""Reset back to the original state, prior to any updates."""
|
||||
_lib.aegis256_mac_reset(self._proxy.ptr)
|
||||
self._cached_digest = None
|
||||
|
||||
def clone(self) -> "Mac":
|
||||
"""Return a clone of current MAC state."""
|
||||
@@ -470,6 +480,7 @@ class Mac:
|
||||
clone._maclen = self._maclen
|
||||
clone._proxy = new_aligned_struct("aegis256_mac_state", ALIGNMENT)
|
||||
_lib.aegis256_mac_state_clone(clone._proxy.ptr, self._proxy.ptr)
|
||||
clone._cached_digest = self._cached_digest
|
||||
return clone
|
||||
|
||||
__deepcopy__ = clone
|
||||
@@ -479,6 +490,8 @@ class Mac:
|
||||
|
||||
Repeated calls to update() are equivalent to a single call with the concatenated data.
|
||||
"""
|
||||
if self._cached_digest is not None:
|
||||
raise RuntimeError("Cannot update after final()")
|
||||
rc = _lib.aegis256_mac_update(self._proxy.ptr, _ptr(data), len(data))
|
||||
if rc != 0:
|
||||
err_num = ffi.errno
|
||||
@@ -488,8 +501,8 @@ class Mac:
|
||||
def final(self, into: Buffer | None = None) -> bytearray | memoryview:
|
||||
"""Calculate and return the MAC tag for the currently input data.
|
||||
|
||||
Unlike the C library, this method does not alter the current state,
|
||||
allowing for multiple calls and further updates on the same object.
|
||||
This method can only be called once. After calling it, the MAC becomes unusable
|
||||
for further updates or calls to final().
|
||||
|
||||
Args:
|
||||
into: Optional buffer to write the tag into (default: bytearray created).
|
||||
@@ -499,8 +512,12 @@ class Mac:
|
||||
|
||||
Raises:
|
||||
TypeError: If lengths are invalid.
|
||||
RuntimeError: If finalization fails in the C library.
|
||||
RuntimeError: If finalization fails in the C library or if already finalized.
|
||||
"""
|
||||
if self._cached_digest is not None:
|
||||
raise RuntimeError(
|
||||
"The MAC can only be calculated once. Use reset() to start over, or clone() before finalizing to continue."
|
||||
)
|
||||
maclen = self._maclen
|
||||
if into is None:
|
||||
out = bytearray(maclen)
|
||||
@@ -515,14 +532,27 @@ class Mac:
|
||||
err_num = ffi.errno
|
||||
err_name = errno.errorcode.get(err_num, f"errno_{err_num}")
|
||||
raise RuntimeError(f"mac final failed: {err_name}")
|
||||
self._cached_digest = False
|
||||
return out if into is None else memoryview(out)[:maclen] # type: ignore
|
||||
|
||||
def digest(self) -> bytes:
|
||||
"""Calculate and return the MAC tag as bytes."""
|
||||
return bytes(self.final())
|
||||
"""Calculate and return the MAC tag as bytes.
|
||||
|
||||
After calling this method, the MAC becomes unusable for further updates.
|
||||
The result is cached and subsequent calls return the same value.
|
||||
Can be called after final() to get the cached digest.
|
||||
"""
|
||||
if self._cached_digest:
|
||||
return self._cached_digest
|
||||
self._cached_digest = bytes(self.final()) # Overrides the False set by final()
|
||||
return self._cached_digest
|
||||
|
||||
def hexdigest(self) -> str:
|
||||
"""Calculate and return the MAC tag as a hex string."""
|
||||
"""Calculate and return the MAC tag as a hex string.
|
||||
|
||||
After calling this method, the MAC becomes unusable for further updates.
|
||||
The result is cached and subsequent calls return the same value.
|
||||
"""
|
||||
return self.digest().hex()
|
||||
|
||||
def verify(self, mac: Buffer):
|
||||
|
||||
@@ -4,6 +4,7 @@
|
||||
|
||||
import errno
|
||||
import secrets
|
||||
from typing import Literal
|
||||
|
||||
from ._loader import ffi
|
||||
from ._loader import lib as _lib
|
||||
@@ -433,15 +434,22 @@ def mac(
|
||||
|
||||
|
||||
class Mac:
|
||||
"""AEGIS-256X2 MAC state wrapper.
|
||||
"""MAC calculation and verification with incremental updates.
|
||||
|
||||
Example:
|
||||
a = Mac(key, nonce)
|
||||
a.update(data)
|
||||
...
|
||||
mac = a.final()
|
||||
|
||||
Hashlib compatible interface:
|
||||
a = Mac(key, nonce)
|
||||
a.update(data)
|
||||
bytes_mac = a.digest()
|
||||
hex_mac = a.hexdigest()
|
||||
"""
|
||||
|
||||
__slots__ = ("_proxy", "_maclen")
|
||||
__slots__ = ("_proxy", "_maclen", "_cached_digest")
|
||||
|
||||
def __init__(self, key: Buffer, nonce: Buffer, maclen: int = MACBYTES) -> None:
|
||||
"""Create a MAC with the given key, nonce, and tag length.
|
||||
@@ -459,10 +467,12 @@ class Mac:
|
||||
self._maclen = maclen
|
||||
self._proxy = new_aligned_struct("aegis256x2_mac_state", ALIGNMENT)
|
||||
_lib.aegis256x2_mac_init(self._proxy.ptr, _ptr(key), _ptr(nonce))
|
||||
self._cached_digest: None | Literal[False] | bytes = None
|
||||
|
||||
def reset(self) -> None:
|
||||
"""Reset back to the original state, prior to any updates."""
|
||||
_lib.aegis256x2_mac_reset(self._proxy.ptr)
|
||||
self._cached_digest = None
|
||||
|
||||
def clone(self) -> "Mac":
|
||||
"""Return a clone of current MAC state."""
|
||||
@@ -470,6 +480,7 @@ class Mac:
|
||||
clone._maclen = self._maclen
|
||||
clone._proxy = new_aligned_struct("aegis256x2_mac_state", ALIGNMENT)
|
||||
_lib.aegis256x2_mac_state_clone(clone._proxy.ptr, self._proxy.ptr)
|
||||
clone._cached_digest = self._cached_digest
|
||||
return clone
|
||||
|
||||
__deepcopy__ = clone
|
||||
@@ -479,6 +490,8 @@ class Mac:
|
||||
|
||||
Repeated calls to update() are equivalent to a single call with the concatenated data.
|
||||
"""
|
||||
if self._cached_digest is not None:
|
||||
raise RuntimeError("Cannot update after final()")
|
||||
rc = _lib.aegis256x2_mac_update(self._proxy.ptr, _ptr(data), len(data))
|
||||
if rc != 0:
|
||||
err_num = ffi.errno
|
||||
@@ -488,8 +501,8 @@ class Mac:
|
||||
def final(self, into: Buffer | None = None) -> bytearray | memoryview:
|
||||
"""Calculate and return the MAC tag for the currently input data.
|
||||
|
||||
Unlike the C library, this method does not alter the current state,
|
||||
allowing for multiple calls and further updates on the same object.
|
||||
This method can only be called once. After calling it, the MAC becomes unusable
|
||||
for further updates or calls to final().
|
||||
|
||||
Args:
|
||||
into: Optional buffer to write the tag into (default: bytearray created).
|
||||
@@ -499,8 +512,12 @@ class Mac:
|
||||
|
||||
Raises:
|
||||
TypeError: If lengths are invalid.
|
||||
RuntimeError: If finalization fails in the C library.
|
||||
RuntimeError: If finalization fails in the C library or if already finalized.
|
||||
"""
|
||||
if self._cached_digest is not None:
|
||||
raise RuntimeError(
|
||||
"The MAC can only be calculated once. Use reset() to start over, or clone() before finalizing to continue."
|
||||
)
|
||||
maclen = self._maclen
|
||||
if into is None:
|
||||
out = bytearray(maclen)
|
||||
@@ -515,14 +532,27 @@ class Mac:
|
||||
err_num = ffi.errno
|
||||
err_name = errno.errorcode.get(err_num, f"errno_{err_num}")
|
||||
raise RuntimeError(f"mac final failed: {err_name}")
|
||||
self._cached_digest = False
|
||||
return out if into is None else memoryview(out)[:maclen] # type: ignore
|
||||
|
||||
def digest(self) -> bytes:
|
||||
"""Calculate and return the MAC tag as bytes."""
|
||||
return bytes(self.final())
|
||||
"""Calculate and return the MAC tag as bytes.
|
||||
|
||||
After calling this method, the MAC becomes unusable for further updates.
|
||||
The result is cached and subsequent calls return the same value.
|
||||
Can be called after final() to get the cached digest.
|
||||
"""
|
||||
if self._cached_digest:
|
||||
return self._cached_digest
|
||||
self._cached_digest = bytes(self.final()) # Overrides the False set by final()
|
||||
return self._cached_digest
|
||||
|
||||
def hexdigest(self) -> str:
|
||||
"""Calculate and return the MAC tag as a hex string."""
|
||||
"""Calculate and return the MAC tag as a hex string.
|
||||
|
||||
After calling this method, the MAC becomes unusable for further updates.
|
||||
The result is cached and subsequent calls return the same value.
|
||||
"""
|
||||
return self.digest().hex()
|
||||
|
||||
def verify(self, mac: Buffer):
|
||||
|
||||
@@ -4,6 +4,7 @@
|
||||
|
||||
import errno
|
||||
import secrets
|
||||
from typing import Literal
|
||||
|
||||
from ._loader import ffi
|
||||
from ._loader import lib as _lib
|
||||
@@ -433,15 +434,22 @@ def mac(
|
||||
|
||||
|
||||
class Mac:
|
||||
"""AEGIS-256X4 MAC state wrapper.
|
||||
"""MAC calculation and verification with incremental updates.
|
||||
|
||||
Example:
|
||||
a = Mac(key, nonce)
|
||||
a.update(data)
|
||||
...
|
||||
mac = a.final()
|
||||
|
||||
Hashlib compatible interface:
|
||||
a = Mac(key, nonce)
|
||||
a.update(data)
|
||||
bytes_mac = a.digest()
|
||||
hex_mac = a.hexdigest()
|
||||
"""
|
||||
|
||||
__slots__ = ("_proxy", "_maclen")
|
||||
__slots__ = ("_proxy", "_maclen", "_cached_digest")
|
||||
|
||||
def __init__(self, key: Buffer, nonce: Buffer, maclen: int = MACBYTES) -> None:
|
||||
"""Create a MAC with the given key, nonce, and tag length.
|
||||
@@ -459,10 +467,12 @@ class Mac:
|
||||
self._maclen = maclen
|
||||
self._proxy = new_aligned_struct("aegis256x4_mac_state", ALIGNMENT)
|
||||
_lib.aegis256x4_mac_init(self._proxy.ptr, _ptr(key), _ptr(nonce))
|
||||
self._cached_digest: None | Literal[False] | bytes = None
|
||||
|
||||
def reset(self) -> None:
|
||||
"""Reset back to the original state, prior to any updates."""
|
||||
_lib.aegis256x4_mac_reset(self._proxy.ptr)
|
||||
self._cached_digest = None
|
||||
|
||||
def clone(self) -> "Mac":
|
||||
"""Return a clone of current MAC state."""
|
||||
@@ -470,6 +480,7 @@ class Mac:
|
||||
clone._maclen = self._maclen
|
||||
clone._proxy = new_aligned_struct("aegis256x4_mac_state", ALIGNMENT)
|
||||
_lib.aegis256x4_mac_state_clone(clone._proxy.ptr, self._proxy.ptr)
|
||||
clone._cached_digest = self._cached_digest
|
||||
return clone
|
||||
|
||||
__deepcopy__ = clone
|
||||
@@ -479,6 +490,8 @@ class Mac:
|
||||
|
||||
Repeated calls to update() are equivalent to a single call with the concatenated data.
|
||||
"""
|
||||
if self._cached_digest is not None:
|
||||
raise RuntimeError("Cannot update after final()")
|
||||
rc = _lib.aegis256x4_mac_update(self._proxy.ptr, _ptr(data), len(data))
|
||||
if rc != 0:
|
||||
err_num = ffi.errno
|
||||
@@ -488,8 +501,8 @@ class Mac:
|
||||
def final(self, into: Buffer | None = None) -> bytearray | memoryview:
|
||||
"""Calculate and return the MAC tag for the currently input data.
|
||||
|
||||
Unlike the C library, this method does not alter the current state,
|
||||
allowing for multiple calls and further updates on the same object.
|
||||
This method can only be called once. After calling it, the MAC becomes unusable
|
||||
for further updates or calls to final().
|
||||
|
||||
Args:
|
||||
into: Optional buffer to write the tag into (default: bytearray created).
|
||||
@@ -499,8 +512,12 @@ class Mac:
|
||||
|
||||
Raises:
|
||||
TypeError: If lengths are invalid.
|
||||
RuntimeError: If finalization fails in the C library.
|
||||
RuntimeError: If finalization fails in the C library or if already finalized.
|
||||
"""
|
||||
if self._cached_digest is not None:
|
||||
raise RuntimeError(
|
||||
"The MAC can only be calculated once. Use reset() to start over, or clone() before finalizing to continue."
|
||||
)
|
||||
maclen = self._maclen
|
||||
if into is None:
|
||||
out = bytearray(maclen)
|
||||
@@ -515,14 +532,27 @@ class Mac:
|
||||
err_num = ffi.errno
|
||||
err_name = errno.errorcode.get(err_num, f"errno_{err_num}")
|
||||
raise RuntimeError(f"mac final failed: {err_name}")
|
||||
self._cached_digest = False
|
||||
return out if into is None else memoryview(out)[:maclen] # type: ignore
|
||||
|
||||
def digest(self) -> bytes:
|
||||
"""Calculate and return the MAC tag as bytes."""
|
||||
return bytes(self.final())
|
||||
"""Calculate and return the MAC tag as bytes.
|
||||
|
||||
After calling this method, the MAC becomes unusable for further updates.
|
||||
The result is cached and subsequent calls return the same value.
|
||||
Can be called after final() to get the cached digest.
|
||||
"""
|
||||
if self._cached_digest:
|
||||
return self._cached_digest
|
||||
self._cached_digest = bytes(self.final()) # Overrides the False set by final()
|
||||
return self._cached_digest
|
||||
|
||||
def hexdigest(self) -> str:
|
||||
"""Calculate and return the MAC tag as a hex string."""
|
||||
"""Calculate and return the MAC tag as a hex string.
|
||||
|
||||
After calling this method, the MAC becomes unusable for further updates.
|
||||
The result is cached and subsequent calls return the same value.
|
||||
"""
|
||||
return self.digest().hex()
|
||||
|
||||
def verify(self, mac: Buffer):
|
||||
|
||||
@@ -57,14 +57,11 @@ def bench_mac(ciph) -> None:
|
||||
buf = bytearray(MSG_LEN)
|
||||
buf[:] = _random_bytes(len(buf))
|
||||
|
||||
mac0 = ciph.Mac(key, nonce, maclen=ciph.MACBYTES_LONG)
|
||||
mac_out = bytearray(ciph.MACBYTES_LONG)
|
||||
|
||||
t0 = time.perf_counter()
|
||||
for _ in range(ITERATIONS):
|
||||
mac = mac0.clone()
|
||||
mac.update(buf)
|
||||
mac.final(into=mac_out)
|
||||
m = ciph.mac(key, nonce, buf, maclen=ciph.MACBYTES_LONG, into=mac_out)
|
||||
t1 = time.perf_counter()
|
||||
|
||||
_ = mac_out[0]
|
||||
|
||||
@@ -7,6 +7,9 @@ from pyaegis import aegis128l, aegis128x2, aegis128x4, aegis256, aegis256x2, aeg
|
||||
|
||||
from .util import random_split_bytes
|
||||
|
||||
# All AEGIS algorithm modules
|
||||
ALL_ALGORITHMS = [aegis128l, aegis128x2, aegis128x4, aegis256, aegis256x2, aegis256x4]
|
||||
|
||||
|
||||
def load_mac_test_vectors():
|
||||
"""Load MAC test vectors from JSON file."""
|
||||
@@ -100,3 +103,167 @@ def test_mac_class(vector):
|
||||
assert computed_tag256 == expected_tag256, (
|
||||
f"256-bit MAC mismatch for {vector['name']}"
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("vector", load_mac_test_vectors(), ids=get_test_id)
|
||||
def test_mac_class_with_digest(vector):
|
||||
"""Test MAC computation using digest() and hexdigest() instead of final()."""
|
||||
alg = get_algorithm_module(vector["name"])
|
||||
|
||||
key = bytes.fromhex(vector["key"])
|
||||
nonce = bytes.fromhex(vector["nonce"])
|
||||
data = bytes.fromhex(vector["data"])
|
||||
|
||||
# Test 128-bit MAC if present
|
||||
if "tag128" in vector:
|
||||
expected_tag128 = bytes.fromhex(vector["tag128"])
|
||||
|
||||
# Test with digest()
|
||||
mac_state = alg.Mac(key, nonce, maclen=16)
|
||||
for chunk in random_split_bytes(data):
|
||||
mac_state.update(chunk)
|
||||
computed_tag128 = mac_state.digest()
|
||||
assert computed_tag128 == expected_tag128, (
|
||||
f"128-bit MAC mismatch for {vector['name']} using digest()"
|
||||
)
|
||||
|
||||
# Test that digest() can be called multiple times
|
||||
computed_tag128_again = mac_state.digest()
|
||||
assert computed_tag128 == computed_tag128_again, (
|
||||
"digest() should return the same value on repeated calls"
|
||||
)
|
||||
|
||||
# Test hexdigest()
|
||||
mac_state2 = alg.Mac(key, nonce, maclen=16)
|
||||
for chunk in random_split_bytes(data):
|
||||
mac_state2.update(chunk)
|
||||
hex_tag = mac_state2.hexdigest()
|
||||
assert hex_tag == expected_tag128.hex(), (
|
||||
f"128-bit MAC hexdigest mismatch for {vector['name']}"
|
||||
)
|
||||
|
||||
# Test that hexdigest() can be called multiple times
|
||||
hex_tag_again = mac_state2.hexdigest()
|
||||
assert hex_tag == hex_tag_again, (
|
||||
"hexdigest() should return the same value on repeated calls"
|
||||
)
|
||||
|
||||
# Test 256-bit MAC if present
|
||||
if "tag256" in vector:
|
||||
expected_tag256 = bytes.fromhex(vector["tag256"])
|
||||
|
||||
# Test with digest()
|
||||
mac_state = alg.Mac(key, nonce, maclen=32)
|
||||
for chunk in random_split_bytes(data):
|
||||
mac_state.update(chunk)
|
||||
computed_tag256 = mac_state.digest()
|
||||
assert computed_tag256 == expected_tag256, (
|
||||
f"256-bit MAC mismatch for {vector['name']} using digest()"
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("vector", load_mac_test_vectors(), ids=get_test_id)
|
||||
def test_mac_clone(vector):
|
||||
"""Test that cloning a Mac state works correctly."""
|
||||
alg = get_algorithm_module(vector["name"])
|
||||
|
||||
key = bytes.fromhex(vector["key"])
|
||||
nonce = bytes.fromhex(vector["nonce"])
|
||||
data = bytes.fromhex(vector["data"])
|
||||
|
||||
# Test 128-bit MAC if present
|
||||
if "tag128" in vector:
|
||||
expected_tag128 = bytes.fromhex(vector["tag128"])
|
||||
|
||||
mac_state = alg.Mac(key, nonce, maclen=16)
|
||||
for chunk in random_split_bytes(data):
|
||||
mac_state.update(chunk)
|
||||
|
||||
# Clone the state
|
||||
cloned_state = mac_state.clone()
|
||||
|
||||
# Both should produce the same tag
|
||||
tag1 = mac_state.final()
|
||||
tag2 = cloned_state.final()
|
||||
|
||||
assert tag1 == expected_tag128
|
||||
assert tag2 == expected_tag128
|
||||
assert tag1 == tag2
|
||||
|
||||
|
||||
@pytest.mark.parametrize("vector", load_mac_test_vectors(), ids=get_test_id)
|
||||
def test_mac_reset(vector):
|
||||
"""Test that resetting a Mac state works correctly."""
|
||||
alg = get_algorithm_module(vector["name"])
|
||||
|
||||
key = bytes.fromhex(vector["key"])
|
||||
nonce = bytes.fromhex(vector["nonce"])
|
||||
data = bytes.fromhex(vector["data"])
|
||||
|
||||
# Test 128-bit MAC if present
|
||||
if "tag128" in vector:
|
||||
expected_tag128 = bytes.fromhex(vector["tag128"])
|
||||
|
||||
mac_state = alg.Mac(key, nonce, maclen=16)
|
||||
for chunk in random_split_bytes(data):
|
||||
mac_state.update(chunk)
|
||||
tag1 = mac_state.final()
|
||||
assert tag1 == expected_tag128
|
||||
|
||||
# Reset and compute again
|
||||
mac_state.reset()
|
||||
for chunk in random_split_bytes(data):
|
||||
mac_state.update(chunk)
|
||||
tag2 = mac_state.final()
|
||||
|
||||
assert tag2 == expected_tag128
|
||||
assert tag1 == tag2
|
||||
|
||||
|
||||
@pytest.mark.parametrize("alg", ALL_ALGORITHMS, ids=lambda x: x.__name__.split(".")[-1])
|
||||
def test_mac_reset_after_digest(alg):
|
||||
"""Test that reset() clears the cached digest and allows reuse."""
|
||||
key = alg.random_key()
|
||||
nonce = alg.random_nonce()
|
||||
|
||||
mac_state = alg.Mac(key, nonce)
|
||||
mac_state.update(b"Hello, world!")
|
||||
tag1 = mac_state.digest()
|
||||
|
||||
# After digest(), update should fail
|
||||
with pytest.raises(RuntimeError):
|
||||
mac_state.update(b"More data")
|
||||
|
||||
# Reset should clear the cached digest
|
||||
mac_state.reset()
|
||||
|
||||
# Now we should be able to update again
|
||||
mac_state.update(b"Different data")
|
||||
tag2 = mac_state.digest()
|
||||
|
||||
# Tags should be different since we used different data
|
||||
assert tag1 != tag2
|
||||
|
||||
|
||||
@pytest.mark.parametrize("alg", ALL_ALGORITHMS, ids=lambda x: x.__name__.split(".")[-1])
|
||||
def test_mac_clone_preserves_cached_digest(alg):
|
||||
"""Test that cloning preserves the cached digest state."""
|
||||
key = alg.random_key()
|
||||
nonce = alg.random_nonce()
|
||||
|
||||
mac_state = alg.Mac(key, nonce)
|
||||
mac_state.update(b"Hello, world!")
|
||||
tag1 = mac_state.digest()
|
||||
|
||||
# Clone after digest
|
||||
cloned_state = mac_state.clone()
|
||||
|
||||
# Both should return the same cached tag
|
||||
tag2 = cloned_state.digest()
|
||||
assert tag1 == tag2
|
||||
|
||||
# Both should be unable to update
|
||||
with pytest.raises(RuntimeError):
|
||||
mac_state.update(b"More data")
|
||||
with pytest.raises(RuntimeError):
|
||||
cloned_state.update(b"More data")
|
||||
|
||||
@@ -6,7 +6,98 @@ after calling final(), preventing accidental misuse.
|
||||
|
||||
import pytest
|
||||
|
||||
from pyaegis import aegis256x4
|
||||
from pyaegis import aegis128l, aegis128x2, aegis128x4, aegis256, aegis256x2, aegis256x4
|
||||
|
||||
# All AEGIS algorithm modules
|
||||
ALL_ALGORITHMS = [aegis128l, aegis128x2, aegis128x4, aegis256, aegis256x2, aegis256x4]
|
||||
|
||||
|
||||
class TestMacFinalization:
|
||||
"""Test that Mac becomes unusable after final()."""
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"alg", ALL_ALGORITHMS, ids=lambda x: x.__name__.split(".")[-1]
|
||||
)
|
||||
def test_update_after_final_raises(self, alg):
|
||||
"""Test that calling update() after final() raises RuntimeError."""
|
||||
key = alg.random_key()
|
||||
nonce = alg.random_nonce()
|
||||
|
||||
mac = alg.Mac(key, nonce)
|
||||
mac.update(b"Hello, world!")
|
||||
mac.final()
|
||||
|
||||
# Attempting to update after final should raise RuntimeError
|
||||
with pytest.raises(RuntimeError, match="Cannot update after final\\(\\)"):
|
||||
mac.update(b"More data")
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"alg", ALL_ALGORITHMS, ids=lambda x: x.__name__.split(".")[-1]
|
||||
)
|
||||
def test_final_after_final_raises(self, alg):
|
||||
"""Test that calling final() after final() raises RuntimeError."""
|
||||
key = alg.random_key()
|
||||
nonce = alg.random_nonce()
|
||||
|
||||
mac = alg.Mac(key, nonce)
|
||||
mac.update(b"Hello, world!")
|
||||
mac.final()
|
||||
|
||||
# Attempting to call final again should raise RuntimeError
|
||||
with pytest.raises(RuntimeError, match="The MAC can only be calculated once"):
|
||||
mac.final()
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"alg", ALL_ALGORITHMS, ids=lambda x: x.__name__.split(".")[-1]
|
||||
)
|
||||
def test_digest_after_final_raises(self, alg):
|
||||
"""Test that digest() and hexdigest() raise after final()."""
|
||||
key = alg.random_key()
|
||||
nonce = alg.random_nonce()
|
||||
|
||||
mac = alg.Mac(key, nonce)
|
||||
mac.update(b"Hello, world!")
|
||||
mac.final()
|
||||
|
||||
# digest() should raise after final()
|
||||
with pytest.raises(RuntimeError, match="The MAC can only be calculated once"):
|
||||
mac.digest()
|
||||
|
||||
# hexdigest() should also raise after final()
|
||||
with pytest.raises(RuntimeError, match="The MAC can only be calculated once"):
|
||||
mac.hexdigest()
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"alg", ALL_ALGORITHMS, ids=lambda x: x.__name__.split(".")[-1]
|
||||
)
|
||||
def test_update_after_digest_raises(self, alg):
|
||||
"""Test that calling update() after digest() raises RuntimeError."""
|
||||
key = alg.random_key()
|
||||
nonce = alg.random_nonce()
|
||||
|
||||
mac = alg.Mac(key, nonce)
|
||||
mac.update(b"Hello, world!")
|
||||
mac.digest()
|
||||
|
||||
# Attempting to update after digest should raise RuntimeError
|
||||
with pytest.raises(RuntimeError, match="Cannot update after final\\(\\)"):
|
||||
mac.update(b"More data")
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"alg", ALL_ALGORITHMS, ids=lambda x: x.__name__.split(".")[-1]
|
||||
)
|
||||
def test_final_after_digest_raises(self, alg):
|
||||
"""Test that calling final() after digest() raises RuntimeError."""
|
||||
key = alg.random_key()
|
||||
nonce = alg.random_nonce()
|
||||
|
||||
mac = alg.Mac(key, nonce)
|
||||
mac.update(b"Hello, world!")
|
||||
mac.digest()
|
||||
|
||||
# Attempting to call final after digest should raise RuntimeError
|
||||
with pytest.raises(RuntimeError, match="The MAC can only be calculated once"):
|
||||
mac.final()
|
||||
|
||||
|
||||
class TestEncryptorFinalization:
|
||||
|
||||
Reference in New Issue
Block a user