Build cleanup, pathname reorganisation, cdef to repository (although generated with gen_cdef.py).

This commit is contained in:
Leo Vasanko
2025-11-06 15:23:30 -06:00
parent c8fe16d21f
commit d43a22bc6c
14 changed files with 805 additions and 292 deletions

2
.gitignore vendored
View File

@@ -4,3 +4,5 @@
/dist
/pyaegis/build
__pycache__
!.gitignore
!.gitmodules

View File

@@ -1,33 +1,13 @@
"""Dynamic loader for libaegis using CFFI (ABI mode).
This module loads the shared library bundled with the package.
Exports:
- ffi: a cffi.FFI instance with the libaegis API declared
- lib: the loaded libaegis shared library (ffi.dlopen)
- libc: the C runtime (for posix_memalign/free on POSIX)
- alloc_aligned(size, alignment): return (void*) pointer with requested alignment
"""
"""Dynamic loader for libaegis using CFFI (ABI mode)."""
import os
import sys
from pathlib import Path
from typing import Any
from cffi import FFI
ffi = FFI()
# Load CFFI cdef declarations from text file
cdef_path = os.path.join(os.path.dirname(__file__), "build", "aegis_cdef.h")
with open(cdef_path, "r", encoding="utf-8") as f:
cdef_content = f.read()
ffi.cdef(cdef_content)
try:
# ctypes is only used to resolve libc reliably across platforms
import ctypes.util as _ctypes_util # type: ignore
except Exception: # pragma: no cover - very unlikely to happen
_ctypes_util = None # type: ignore[assignment]
__ALL__ = ["ffi", "lib"]
def _platform_lib_name() -> str:
@@ -39,52 +19,18 @@ def _platform_lib_name() -> str:
def _load_libaegis():
pkg_dir = os.path.dirname(__file__)
candidate = os.path.join(pkg_dir, "build", _platform_lib_name())
if os.path.exists(candidate):
pkg_dir = Path(__file__).parent
candidate = pkg_dir / "build" / _platform_lib_name()
if candidate.exists():
try:
return ffi.dlopen(candidate)
return ffi.dlopen(str(candidate))
except Exception as e:
raise OSError(f"Failed to load libaegis from {candidate}: {e}")
else:
raise OSError(f"Could not find libaegis at {candidate}")
def _load_libc():
# Use ctypes.util to find a usable libc name; fallback to None-dlopen on POSIX
if _ctypes_util is not None:
libc_name = _ctypes_util.find_library("c") # type: ignore[attr-defined]
if libc_name:
try:
return ffi.dlopen(libc_name)
except Exception:
pass
# Fallback: try process globals (works on many Unix platforms)
try:
return ffi.dlopen(None)
except Exception as e:
raise OSError(f"Unable to load libc for aligned allocation: {e}")
ffi = FFI()
ffi.cdef(Path(__file__).with_name("aegis_cdef.h").read_text(encoding="utf-8"))
lib: Any = _load_libaegis()
libc: Any = _load_libc()
# Initialize CPU feature selection (recommended by the library)
try:
lib.aegis_init()
except Exception:
# Non-fatal; functions will still work, maybe slower
pass
def alloc_aligned(size: int, alignment: int = 64):
"""Allocate aligned memory via posix_memalign(); returns a void* cdata.
The returned pointer must be freed with libc.free(). Attach a GC finalizer
at call sites using ffi.gc(ptr, libc.free) after casting to the target type.
"""
memptr = ffi.new("void **")
rc = libc.posix_memalign(memptr, alignment, size)
if rc != 0 or memptr[0] == ffi.NULL:
raise MemoryError(f"posix_memalign({alignment}, {size}) failed with rc={rc}")
return memptr[0]
lib.aegis_init()

View File

@@ -9,8 +9,9 @@ Error return codes from the C library raise ValueError.
import errno
from collections.abc import Buffer
from ._loader import alloc_aligned, ffi, libc
from ._loader import ffi
from ._loader import lib as _lib
from .util import new_aligned_struct
# Constants exposed as functions in C; mirror them as integers at module import time
KEYBYTES = _lib.aegis128l_keybytes()
@@ -445,7 +446,7 @@ class Mac:
mac2 = Mac(key, nonce); mac2.update(data); mac2.verify(tag)
"""
__slots__ = ("_st", "_nonce", "_key")
__slots__ = ("_st", "_owner")
def __init__(
self,
@@ -462,19 +463,19 @@ class Mac:
Raises:
TypeError: If key or nonce lengths are invalid.
"""
raw = alloc_aligned(ffi.sizeof("aegis128l_mac_state"), ALIGNMENT)
st = ffi.cast("aegis128l_mac_state *", raw)
self._st = ffi.gc(st, libc.free)
if _other is not None:
st, owner = new_aligned_struct("aegis128l_mac_state", ALIGNMENT)
self._st = st
self._owner = owner
if _other is not None: # clone path
_lib.aegis128l_mac_state_clone(self._st, _other._st)
return
# Normal init
# Normal init path
nonce = memoryview(nonce)
key = memoryview(key)
if key.nbytes != KEYBYTES:
raise TypeError(f"key length must be {KEYBYTES=}")
raise TypeError(f"key length must be {KEYBYTES}")
if nonce.nbytes != NPUBBYTES:
raise TypeError(f"nonce length must be {NPUBBYTES=}")
raise TypeError(f"nonce length must be {NPUBBYTES}")
_lib.aegis128l_mac_init(self._st, _ptr(key), _ptr(nonce))
def __deepcopy__(self) -> "Mac":
@@ -565,7 +566,7 @@ class Encryptor:
- final_detached([ct_into], [mac_into], maclen=16) -> returns (tail_bytes, mac)
"""
__slots__ = ("_st", "_bytes_in", "_bytes_out")
__slots__ = ("_st", "_owner", "_bytes_in", "_bytes_out")
def __init__(self, key: Buffer, nonce: Buffer, ad: Buffer | None = None):
"""Create an incremental encryptor.
@@ -584,9 +585,7 @@ class Encryptor:
raise TypeError(f"key length must be {KEYBYTES}")
if nonce.nbytes != NPUBBYTES:
raise TypeError(f"nonce length must be {NPUBBYTES}")
raw = alloc_aligned(ffi.sizeof("aegis128l_state"), ALIGNMENT)
st = ffi.cast("aegis128l_state *", raw)
st = ffi.gc(st, libc.free)
st, owner = new_aligned_struct("aegis128l_state", ALIGNMENT)
_lib.aegis128l_state_init(
st,
_ptr(ad) if ad is not None else ffi.NULL,
@@ -595,6 +594,7 @@ class Encryptor:
_ptr(key),
)
self._st = st
self._owner = owner
# Track total plaintext bytes passed through update() so far
self._bytes_in = 0
self._bytes_out = 0
@@ -746,7 +746,7 @@ class Decryptor:
- final(mac[, into]) -> returns any remaining plaintext bytes
"""
__slots__ = ("_st", "_bytes_in", "_bytes_out")
__slots__ = ("_st", "_owner", "_bytes_in", "_bytes_out")
def __init__(self, key: Buffer, nonce: Buffer, ad: Buffer | None = None):
"""Create an incremental decryptor for detached tags.
@@ -765,9 +765,7 @@ class Decryptor:
raise TypeError(f"key length must be {KEYBYTES}")
if nonce.nbytes != NPUBBYTES:
raise TypeError(f"nonce length must be {NPUBBYTES}")
raw = alloc_aligned(ffi.sizeof("aegis128l_state"), ALIGNMENT)
st = ffi.cast("aegis128l_state *", raw)
st = ffi.gc(st, libc.free)
st, owner = new_aligned_struct("aegis128l_state", ALIGNMENT)
_lib.aegis128l_state_init(
st,
_ptr(ad) if ad is not None else ffi.NULL,
@@ -776,6 +774,7 @@ class Decryptor:
_ptr(key),
)
self._st = st
self._owner = owner
# Track total ciphertext bytes passed through update() so far
self._bytes_in = 0
self._bytes_out = 0
@@ -873,21 +872,13 @@ class Decryptor:
def new_state():
"""Allocate and return a new aegis128l_state* with proper alignment.
The returned object is an ffi cdata pointer with automatic finalizer.
"""
# Allocate with required alignment using libc.posix_memalign
raw = alloc_aligned(ffi.sizeof("aegis128l_state"), ALIGNMENT)
ptr = ffi.cast("aegis128l_state *", raw)
return ffi.gc(ptr, libc.free)
"""Allocate and return a new aegis128l_state* with proper alignment."""
return new_aligned_struct("aegis128l_state", ALIGNMENT)
def new_mac_state():
"""Allocate and return a new aegis128l_mac_state* with proper alignment."""
raw = alloc_aligned(ffi.sizeof("aegis128l_mac_state"), ALIGNMENT)
ptr = ffi.cast("aegis128l_mac_state *", raw)
return ffi.gc(ptr, libc.free)
return new_aligned_struct("aegis128l_mac_state", ALIGNMENT)
__all__ = [

View File

@@ -9,8 +9,9 @@ Error return codes from the C library raise ValueError.
import errno
from collections.abc import Buffer
from ._loader import alloc_aligned, ffi, libc
from ._loader import ffi
from ._loader import lib as _lib
from .util import new_aligned_struct
# Constants exposed as functions in C; mirror them as integers at module import time
KEYBYTES = _lib.aegis128x2_keybytes()
@@ -445,7 +446,7 @@ class Mac:
mac2 = Mac(key, nonce); mac2.update(data); mac2.verify(tag)
"""
__slots__ = ("_st", "_nonce", "_key")
__slots__ = ("_st", "_owner")
def __init__(
self,
@@ -462,19 +463,19 @@ class Mac:
Raises:
TypeError: If key or nonce lengths are invalid.
"""
raw = alloc_aligned(ffi.sizeof("aegis128x2_mac_state"), ALIGNMENT)
st = ffi.cast("aegis128x2_mac_state *", raw)
self._st = ffi.gc(st, libc.free)
if _other is not None:
st, owner = new_aligned_struct("aegis128x2_mac_state", ALIGNMENT)
self._st = st
self._owner = owner
if _other is not None: # clone path
_lib.aegis128x2_mac_state_clone(self._st, _other._st)
return
# Normal init
# Normal init path
nonce = memoryview(nonce)
key = memoryview(key)
if key.nbytes != KEYBYTES:
raise TypeError(f"key length must be {KEYBYTES=}")
raise TypeError(f"key length must be {KEYBYTES}")
if nonce.nbytes != NPUBBYTES:
raise TypeError(f"nonce length must be {NPUBBYTES=}")
raise TypeError(f"nonce length must be {NPUBBYTES}")
_lib.aegis128x2_mac_init(self._st, _ptr(key), _ptr(nonce))
def __deepcopy__(self) -> "Mac":
@@ -565,7 +566,7 @@ class Encryptor:
- final_detached([ct_into], [mac_into], maclen=16) -> returns (tail_bytes, mac)
"""
__slots__ = ("_st", "_bytes_in", "_bytes_out")
__slots__ = ("_st", "_owner", "_bytes_in", "_bytes_out")
def __init__(self, key: Buffer, nonce: Buffer, ad: Buffer | None = None):
"""Create an incremental encryptor.
@@ -584,9 +585,7 @@ class Encryptor:
raise TypeError(f"key length must be {KEYBYTES}")
if nonce.nbytes != NPUBBYTES:
raise TypeError(f"nonce length must be {NPUBBYTES}")
raw = alloc_aligned(ffi.sizeof("aegis128x2_state"), ALIGNMENT)
st = ffi.cast("aegis128x2_state *", raw)
st = ffi.gc(st, libc.free)
st, owner = new_aligned_struct("aegis128x2_state", ALIGNMENT)
_lib.aegis128x2_state_init(
st,
_ptr(ad) if ad is not None else ffi.NULL,
@@ -595,6 +594,7 @@ class Encryptor:
_ptr(key),
)
self._st = st
self._owner = owner
# Track total plaintext bytes passed through update() so far
self._bytes_in = 0
self._bytes_out = 0
@@ -746,7 +746,7 @@ class Decryptor:
- final(mac[, into]) -> returns any remaining plaintext bytes
"""
__slots__ = ("_st", "_bytes_in", "_bytes_out")
__slots__ = ("_st", "_owner", "_bytes_in", "_bytes_out")
def __init__(self, key: Buffer, nonce: Buffer, ad: Buffer | None = None):
"""Create an incremental decryptor for detached tags.
@@ -765,9 +765,7 @@ class Decryptor:
raise TypeError(f"key length must be {KEYBYTES}")
if nonce.nbytes != NPUBBYTES:
raise TypeError(f"nonce length must be {NPUBBYTES}")
raw = alloc_aligned(ffi.sizeof("aegis128x2_state"), ALIGNMENT)
st = ffi.cast("aegis128x2_state *", raw)
st = ffi.gc(st, libc.free)
st, owner = new_aligned_struct("aegis128x2_state", ALIGNMENT)
_lib.aegis128x2_state_init(
st,
_ptr(ad) if ad is not None else ffi.NULL,
@@ -776,6 +774,7 @@ class Decryptor:
_ptr(key),
)
self._st = st
self._owner = owner
# Track total ciphertext bytes passed through update() so far
self._bytes_in = 0
self._bytes_out = 0
@@ -873,21 +872,13 @@ class Decryptor:
def new_state():
"""Allocate and return a new aegis128x2_state* with proper alignment.
The returned object is an ffi cdata pointer with automatic finalizer.
"""
# Allocate with required alignment using libc.posix_memalign
raw = alloc_aligned(ffi.sizeof("aegis128x2_state"), ALIGNMENT)
ptr = ffi.cast("aegis128x2_state *", raw)
return ffi.gc(ptr, libc.free)
"""Allocate and return a new aegis128x2_state* with proper alignment."""
return new_aligned_struct("aegis128x2_state", ALIGNMENT)
def new_mac_state():
"""Allocate and return a new aegis128x2_mac_state* with proper alignment."""
raw = alloc_aligned(ffi.sizeof("aegis128x2_mac_state"), ALIGNMENT)
ptr = ffi.cast("aegis128x2_mac_state *", raw)
return ffi.gc(ptr, libc.free)
return new_aligned_struct("aegis128x2_mac_state", ALIGNMENT)
__all__ = [

View File

@@ -9,8 +9,9 @@ Error return codes from the C library raise ValueError.
import errno
from collections.abc import Buffer
from ._loader import alloc_aligned, ffi, libc
from ._loader import ffi
from ._loader import lib as _lib
from .util import new_aligned_struct
# Constants exposed as functions in C; mirror them as integers at module import time
KEYBYTES = _lib.aegis128x4_keybytes()
@@ -445,7 +446,7 @@ class Mac:
mac2 = Mac(key, nonce); mac2.update(data); mac2.verify(tag)
"""
__slots__ = ("_st", "_nonce", "_key")
__slots__ = ("_st", "_owner")
def __init__(
self,
@@ -462,19 +463,19 @@ class Mac:
Raises:
TypeError: If key or nonce lengths are invalid.
"""
raw = alloc_aligned(ffi.sizeof("aegis128x4_mac_state"), ALIGNMENT)
st = ffi.cast("aegis128x4_mac_state *", raw)
self._st = ffi.gc(st, libc.free)
if _other is not None:
st, owner = new_aligned_struct("aegis128x4_mac_state", ALIGNMENT)
self._st = st
self._owner = owner
if _other is not None: # clone path
_lib.aegis128x4_mac_state_clone(self._st, _other._st)
return
# Normal init
# Normal init path
nonce = memoryview(nonce)
key = memoryview(key)
if key.nbytes != KEYBYTES:
raise TypeError(f"key length must be {KEYBYTES=}")
raise TypeError(f"key length must be {KEYBYTES}")
if nonce.nbytes != NPUBBYTES:
raise TypeError(f"nonce length must be {NPUBBYTES=}")
raise TypeError(f"nonce length must be {NPUBBYTES}")
_lib.aegis128x4_mac_init(self._st, _ptr(key), _ptr(nonce))
def __deepcopy__(self) -> "Mac":
@@ -565,7 +566,7 @@ class Encryptor:
- final_detached([ct_into], [mac_into], maclen=16) -> returns (tail_bytes, mac)
"""
__slots__ = ("_st", "_bytes_in", "_bytes_out")
__slots__ = ("_st", "_owner", "_bytes_in", "_bytes_out")
def __init__(self, key: Buffer, nonce: Buffer, ad: Buffer | None = None):
"""Create an incremental encryptor.
@@ -584,9 +585,7 @@ class Encryptor:
raise TypeError(f"key length must be {KEYBYTES}")
if nonce.nbytes != NPUBBYTES:
raise TypeError(f"nonce length must be {NPUBBYTES}")
raw = alloc_aligned(ffi.sizeof("aegis128x4_state"), ALIGNMENT)
st = ffi.cast("aegis128x4_state *", raw)
st = ffi.gc(st, libc.free)
st, owner = new_aligned_struct("aegis128x4_state", ALIGNMENT)
_lib.aegis128x4_state_init(
st,
_ptr(ad) if ad is not None else ffi.NULL,
@@ -595,6 +594,7 @@ class Encryptor:
_ptr(key),
)
self._st = st
self._owner = owner
# Track total plaintext bytes passed through update() so far
self._bytes_in = 0
self._bytes_out = 0
@@ -746,7 +746,7 @@ class Decryptor:
- final(mac[, into]) -> returns any remaining plaintext bytes
"""
__slots__ = ("_st", "_bytes_in", "_bytes_out")
__slots__ = ("_st", "_owner", "_bytes_in", "_bytes_out")
def __init__(self, key: Buffer, nonce: Buffer, ad: Buffer | None = None):
"""Create an incremental decryptor for detached tags.
@@ -765,9 +765,7 @@ class Decryptor:
raise TypeError(f"key length must be {KEYBYTES}")
if nonce.nbytes != NPUBBYTES:
raise TypeError(f"nonce length must be {NPUBBYTES}")
raw = alloc_aligned(ffi.sizeof("aegis128x4_state"), ALIGNMENT)
st = ffi.cast("aegis128x4_state *", raw)
st = ffi.gc(st, libc.free)
st, owner = new_aligned_struct("aegis128x4_state", ALIGNMENT)
_lib.aegis128x4_state_init(
st,
_ptr(ad) if ad is not None else ffi.NULL,
@@ -776,6 +774,7 @@ class Decryptor:
_ptr(key),
)
self._st = st
self._owner = owner
# Track total ciphertext bytes passed through update() so far
self._bytes_in = 0
self._bytes_out = 0
@@ -873,21 +872,13 @@ class Decryptor:
def new_state():
"""Allocate and return a new aegis128x4_state* with proper alignment.
The returned object is an ffi cdata pointer with automatic finalizer.
"""
# Allocate with required alignment using libc.posix_memalign
raw = alloc_aligned(ffi.sizeof("aegis128x4_state"), ALIGNMENT)
ptr = ffi.cast("aegis128x4_state *", raw)
return ffi.gc(ptr, libc.free)
"""Allocate and return a new aegis128x4_state* with proper alignment."""
return new_aligned_struct("aegis128x4_state", ALIGNMENT)
def new_mac_state():
"""Allocate and return a new aegis128x4_mac_state* with proper alignment."""
raw = alloc_aligned(ffi.sizeof("aegis128x4_mac_state"), ALIGNMENT)
ptr = ffi.cast("aegis128x4_mac_state *", raw)
return ffi.gc(ptr, libc.free)
return new_aligned_struct("aegis128x4_mac_state", ALIGNMENT)
__all__ = [

View File

@@ -9,8 +9,9 @@ Error return codes from the C library raise ValueError.
import errno
from collections.abc import Buffer
from ._loader import alloc_aligned, ffi, libc
from ._loader import ffi
from ._loader import lib as _lib
from .util import new_aligned_struct
# Constants exposed as functions in C; mirror them as integers at module import time
KEYBYTES = _lib.aegis256_keybytes()
@@ -445,7 +446,7 @@ class Mac:
mac2 = Mac(key, nonce); mac2.update(data); mac2.verify(tag)
"""
__slots__ = ("_st", "_nonce", "_key")
__slots__ = ("_st", "_owner")
def __init__(
self,
@@ -462,19 +463,19 @@ class Mac:
Raises:
TypeError: If key or nonce lengths are invalid.
"""
raw = alloc_aligned(ffi.sizeof("aegis256_mac_state"), ALIGNMENT)
st = ffi.cast("aegis256_mac_state *", raw)
self._st = ffi.gc(st, libc.free)
if _other is not None:
st, owner = new_aligned_struct("aegis256_mac_state", ALIGNMENT)
self._st = st
self._owner = owner
if _other is not None: # clone path
_lib.aegis256_mac_state_clone(self._st, _other._st)
return
# Normal init
# Normal init path
nonce = memoryview(nonce)
key = memoryview(key)
if key.nbytes != KEYBYTES:
raise TypeError(f"key length must be {KEYBYTES=}")
raise TypeError(f"key length must be {KEYBYTES}")
if nonce.nbytes != NPUBBYTES:
raise TypeError(f"nonce length must be {NPUBBYTES=}")
raise TypeError(f"nonce length must be {NPUBBYTES}")
_lib.aegis256_mac_init(self._st, _ptr(key), _ptr(nonce))
def __deepcopy__(self) -> "Mac":
@@ -565,7 +566,7 @@ class Encryptor:
- final_detached([ct_into], [mac_into], maclen=16) -> returns (tail_bytes, mac)
"""
__slots__ = ("_st", "_bytes_in", "_bytes_out")
__slots__ = ("_st", "_owner", "_bytes_in", "_bytes_out")
def __init__(self, key: Buffer, nonce: Buffer, ad: Buffer | None = None):
"""Create an incremental encryptor.
@@ -584,9 +585,7 @@ class Encryptor:
raise TypeError(f"key length must be {KEYBYTES}")
if nonce.nbytes != NPUBBYTES:
raise TypeError(f"nonce length must be {NPUBBYTES}")
raw = alloc_aligned(ffi.sizeof("aegis256_state"), ALIGNMENT)
st = ffi.cast("aegis256_state *", raw)
st = ffi.gc(st, libc.free)
st, owner = new_aligned_struct("aegis256_state", ALIGNMENT)
_lib.aegis256_state_init(
st,
_ptr(ad) if ad is not None else ffi.NULL,
@@ -595,6 +594,7 @@ class Encryptor:
_ptr(key),
)
self._st = st
self._owner = owner
# Track total plaintext bytes passed through update() so far
self._bytes_in = 0
self._bytes_out = 0
@@ -746,7 +746,7 @@ class Decryptor:
- final(mac[, into]) -> returns any remaining plaintext bytes
"""
__slots__ = ("_st", "_bytes_in", "_bytes_out")
__slots__ = ("_st", "_owner", "_bytes_in", "_bytes_out")
def __init__(self, key: Buffer, nonce: Buffer, ad: Buffer | None = None):
"""Create an incremental decryptor for detached tags.
@@ -765,9 +765,7 @@ class Decryptor:
raise TypeError(f"key length must be {KEYBYTES}")
if nonce.nbytes != NPUBBYTES:
raise TypeError(f"nonce length must be {NPUBBYTES}")
raw = alloc_aligned(ffi.sizeof("aegis256_state"), ALIGNMENT)
st = ffi.cast("aegis256_state *", raw)
st = ffi.gc(st, libc.free)
st, owner = new_aligned_struct("aegis256_state", ALIGNMENT)
_lib.aegis256_state_init(
st,
_ptr(ad) if ad is not None else ffi.NULL,
@@ -776,6 +774,7 @@ class Decryptor:
_ptr(key),
)
self._st = st
self._owner = owner
# Track total ciphertext bytes passed through update() so far
self._bytes_in = 0
self._bytes_out = 0
@@ -873,21 +872,13 @@ class Decryptor:
def new_state():
"""Allocate and return a new aegis256_state* with proper alignment.
The returned object is an ffi cdata pointer with automatic finalizer.
"""
# Allocate with required alignment using libc.posix_memalign
raw = alloc_aligned(ffi.sizeof("aegis256_state"), ALIGNMENT)
ptr = ffi.cast("aegis256_state *", raw)
return ffi.gc(ptr, libc.free)
"""Allocate and return a new aegis256_state* with proper alignment."""
return new_aligned_struct("aegis256_state", ALIGNMENT)
def new_mac_state():
"""Allocate and return a new aegis256_mac_state* with proper alignment."""
raw = alloc_aligned(ffi.sizeof("aegis256_mac_state"), ALIGNMENT)
ptr = ffi.cast("aegis256_mac_state *", raw)
return ffi.gc(ptr, libc.free)
return new_aligned_struct("aegis256_mac_state", ALIGNMENT)
__all__ = [

View File

@@ -9,8 +9,9 @@ Error return codes from the C library raise ValueError.
import errno
from collections.abc import Buffer
from ._loader import alloc_aligned, ffi, libc
from ._loader import ffi
from ._loader import lib as _lib
from .util import new_aligned_struct
# Constants exposed as functions in C; mirror them as integers at module import time
KEYBYTES = _lib.aegis256x2_keybytes()
@@ -445,7 +446,7 @@ class Mac:
mac2 = Mac(key, nonce); mac2.update(data); mac2.verify(tag)
"""
__slots__ = ("_st", "_nonce", "_key")
__slots__ = ("_st", "_owner")
def __init__(
self,
@@ -462,19 +463,19 @@ class Mac:
Raises:
TypeError: If key or nonce lengths are invalid.
"""
raw = alloc_aligned(ffi.sizeof("aegis256x2_mac_state"), ALIGNMENT)
st = ffi.cast("aegis256x2_mac_state *", raw)
self._st = ffi.gc(st, libc.free)
if _other is not None:
st, owner = new_aligned_struct("aegis256x2_mac_state", ALIGNMENT)
self._st = st
self._owner = owner
if _other is not None: # clone path
_lib.aegis256x2_mac_state_clone(self._st, _other._st)
return
# Normal init
# Normal init path
nonce = memoryview(nonce)
key = memoryview(key)
if key.nbytes != KEYBYTES:
raise TypeError(f"key length must be {KEYBYTES=}")
raise TypeError(f"key length must be {KEYBYTES}")
if nonce.nbytes != NPUBBYTES:
raise TypeError(f"nonce length must be {NPUBBYTES=}")
raise TypeError(f"nonce length must be {NPUBBYTES}")
_lib.aegis256x2_mac_init(self._st, _ptr(key), _ptr(nonce))
def __deepcopy__(self) -> "Mac":
@@ -565,7 +566,7 @@ class Encryptor:
- final_detached([ct_into], [mac_into], maclen=16) -> returns (tail_bytes, mac)
"""
__slots__ = ("_st", "_bytes_in", "_bytes_out")
__slots__ = ("_st", "_owner", "_bytes_in", "_bytes_out")
def __init__(self, key: Buffer, nonce: Buffer, ad: Buffer | None = None):
"""Create an incremental encryptor.
@@ -584,9 +585,7 @@ class Encryptor:
raise TypeError(f"key length must be {KEYBYTES}")
if nonce.nbytes != NPUBBYTES:
raise TypeError(f"nonce length must be {NPUBBYTES}")
raw = alloc_aligned(ffi.sizeof("aegis256x2_state"), ALIGNMENT)
st = ffi.cast("aegis256x2_state *", raw)
st = ffi.gc(st, libc.free)
st, owner = new_aligned_struct("aegis256x2_state", ALIGNMENT)
_lib.aegis256x2_state_init(
st,
_ptr(ad) if ad is not None else ffi.NULL,
@@ -595,6 +594,7 @@ class Encryptor:
_ptr(key),
)
self._st = st
self._owner = owner
# Track total plaintext bytes passed through update() so far
self._bytes_in = 0
self._bytes_out = 0
@@ -746,7 +746,7 @@ class Decryptor:
- final(mac[, into]) -> returns any remaining plaintext bytes
"""
__slots__ = ("_st", "_bytes_in", "_bytes_out")
__slots__ = ("_st", "_owner", "_bytes_in", "_bytes_out")
def __init__(self, key: Buffer, nonce: Buffer, ad: Buffer | None = None):
"""Create an incremental decryptor for detached tags.
@@ -765,9 +765,7 @@ class Decryptor:
raise TypeError(f"key length must be {KEYBYTES}")
if nonce.nbytes != NPUBBYTES:
raise TypeError(f"nonce length must be {NPUBBYTES}")
raw = alloc_aligned(ffi.sizeof("aegis256x2_state"), ALIGNMENT)
st = ffi.cast("aegis256x2_state *", raw)
st = ffi.gc(st, libc.free)
st, owner = new_aligned_struct("aegis256x2_state", ALIGNMENT)
_lib.aegis256x2_state_init(
st,
_ptr(ad) if ad is not None else ffi.NULL,
@@ -776,6 +774,7 @@ class Decryptor:
_ptr(key),
)
self._st = st
self._owner = owner
# Track total ciphertext bytes passed through update() so far
self._bytes_in = 0
self._bytes_out = 0
@@ -873,21 +872,13 @@ class Decryptor:
def new_state():
"""Allocate and return a new aegis256x2_state* with proper alignment.
The returned object is an ffi cdata pointer with automatic finalizer.
"""
# Allocate with required alignment using libc.posix_memalign
raw = alloc_aligned(ffi.sizeof("aegis256x2_state"), ALIGNMENT)
ptr = ffi.cast("aegis256x2_state *", raw)
return ffi.gc(ptr, libc.free)
"""Allocate and return a new aegis256x2_state* with proper alignment."""
return new_aligned_struct("aegis256x2_state", ALIGNMENT)
def new_mac_state():
"""Allocate and return a new aegis256x2_mac_state* with proper alignment."""
raw = alloc_aligned(ffi.sizeof("aegis256x2_mac_state"), ALIGNMENT)
ptr = ffi.cast("aegis256x2_mac_state *", raw)
return ffi.gc(ptr, libc.free)
return new_aligned_struct("aegis256x2_mac_state", ALIGNMENT)
__all__ = [

View File

@@ -9,8 +9,9 @@ Error return codes from the C library raise ValueError.
import errno
from collections.abc import Buffer
from ._loader import alloc_aligned, ffi, libc
from ._loader import ffi
from ._loader import lib as _lib
from .util import new_aligned_struct
# Constants exposed as functions in C; mirror them as integers at module import time
KEYBYTES = _lib.aegis256x4_keybytes()
@@ -445,7 +446,7 @@ class Mac:
mac2 = Mac(key, nonce); mac2.update(data); mac2.verify(tag)
"""
__slots__ = ("_st", "_nonce", "_key")
__slots__ = ("_st", "_owner")
def __init__(
self,
@@ -462,19 +463,19 @@ class Mac:
Raises:
TypeError: If key or nonce lengths are invalid.
"""
raw = alloc_aligned(ffi.sizeof("aegis256x4_mac_state"), ALIGNMENT)
st = ffi.cast("aegis256x4_mac_state *", raw)
self._st = ffi.gc(st, libc.free)
if _other is not None:
st, owner = new_aligned_struct("aegis256x4_mac_state", ALIGNMENT)
self._st = st
self._owner = owner
if _other is not None: # clone path
_lib.aegis256x4_mac_state_clone(self._st, _other._st)
return
# Normal init
# Normal init path
nonce = memoryview(nonce)
key = memoryview(key)
if key.nbytes != KEYBYTES:
raise TypeError(f"key length must be {KEYBYTES=}")
raise TypeError(f"key length must be {KEYBYTES}")
if nonce.nbytes != NPUBBYTES:
raise TypeError(f"nonce length must be {NPUBBYTES=}")
raise TypeError(f"nonce length must be {NPUBBYTES}")
_lib.aegis256x4_mac_init(self._st, _ptr(key), _ptr(nonce))
def __deepcopy__(self) -> "Mac":
@@ -565,7 +566,7 @@ class Encryptor:
- final_detached([ct_into], [mac_into], maclen=16) -> returns (tail_bytes, mac)
"""
__slots__ = ("_st", "_bytes_in", "_bytes_out")
__slots__ = ("_st", "_owner", "_bytes_in", "_bytes_out")
def __init__(self, key: Buffer, nonce: Buffer, ad: Buffer | None = None):
"""Create an incremental encryptor.
@@ -584,9 +585,7 @@ class Encryptor:
raise TypeError(f"key length must be {KEYBYTES}")
if nonce.nbytes != NPUBBYTES:
raise TypeError(f"nonce length must be {NPUBBYTES}")
raw = alloc_aligned(ffi.sizeof("aegis256x4_state"), ALIGNMENT)
st = ffi.cast("aegis256x4_state *", raw)
st = ffi.gc(st, libc.free)
st, owner = new_aligned_struct("aegis256x4_state", ALIGNMENT)
_lib.aegis256x4_state_init(
st,
_ptr(ad) if ad is not None else ffi.NULL,
@@ -595,6 +594,7 @@ class Encryptor:
_ptr(key),
)
self._st = st
self._owner = owner
# Track total plaintext bytes passed through update() so far
self._bytes_in = 0
self._bytes_out = 0
@@ -746,7 +746,7 @@ class Decryptor:
- final(mac[, into]) -> returns any remaining plaintext bytes
"""
__slots__ = ("_st", "_bytes_in", "_bytes_out")
__slots__ = ("_st", "_owner", "_bytes_in", "_bytes_out")
def __init__(self, key: Buffer, nonce: Buffer, ad: Buffer | None = None):
"""Create an incremental decryptor for detached tags.
@@ -765,9 +765,7 @@ class Decryptor:
raise TypeError(f"key length must be {KEYBYTES}")
if nonce.nbytes != NPUBBYTES:
raise TypeError(f"nonce length must be {NPUBBYTES}")
raw = alloc_aligned(ffi.sizeof("aegis256x4_state"), ALIGNMENT)
st = ffi.cast("aegis256x4_state *", raw)
st = ffi.gc(st, libc.free)
st, owner = new_aligned_struct("aegis256x4_state", ALIGNMENT)
_lib.aegis256x4_state_init(
st,
_ptr(ad) if ad is not None else ffi.NULL,
@@ -776,6 +774,7 @@ class Decryptor:
_ptr(key),
)
self._st = st
self._owner = owner
# Track total ciphertext bytes passed through update() so far
self._bytes_in = 0
self._bytes_out = 0
@@ -873,21 +872,13 @@ class Decryptor:
def new_state():
"""Allocate and return a new aegis256x4_state* with proper alignment.
The returned object is an ffi cdata pointer with automatic finalizer.
"""
# Allocate with required alignment using libc.posix_memalign
raw = alloc_aligned(ffi.sizeof("aegis256x4_state"), ALIGNMENT)
ptr = ffi.cast("aegis256x4_state *", raw)
return ffi.gc(ptr, libc.free)
"""Allocate and return a new aegis256x4_state* with proper alignment."""
return new_aligned_struct("aegis256x4_state", ALIGNMENT)
def new_mac_state():
"""Allocate and return a new aegis256x4_mac_state* with proper alignment."""
raw = alloc_aligned(ffi.sizeof("aegis256x4_mac_state"), ALIGNMENT)
ptr = ffi.cast("aegis256x4_mac_state *", raw)
return ffi.gc(ptr, libc.free)
return new_aligned_struct("aegis256x4_mac_state", ALIGNMENT)
__all__ = [

571
pyaegis/aegis_cdef.h Normal file
View File

@@ -0,0 +1,571 @@
typedef unsigned char uint8_t;
typedef unsigned long size_t;
/* aegis.h */
int aegis_init(void);
int aegis_verify_16(const uint8_t *x, const uint8_t *y) ;
int aegis_verify_32(const uint8_t *x, const uint8_t *y) ;
/* aegis128l.h */
typedef struct aegis128l_state { uint8_t opaque[256]; } aegis128l_state;
typedef struct aegis128l_mac_state { uint8_t opaque[384]; } aegis128l_mac_state;
size_t aegis128l_keybytes(void);
size_t aegis128l_npubbytes(void);
size_t aegis128l_abytes_min(void);
size_t aegis128l_abytes_max(void);
size_t aegis128l_tailbytes_max(void);
int aegis128l_encrypt_detached(uint8_t *c,
uint8_t *mac,
size_t maclen,
const uint8_t *m,
size_t mlen,
const uint8_t *ad,
size_t adlen,
const uint8_t *npub,
const uint8_t *k);
int aegis128l_decrypt_detached(uint8_t *m,
const uint8_t *c,
size_t clen,
const uint8_t *mac,
size_t maclen,
const uint8_t *ad,
size_t adlen,
const uint8_t *npub,
const uint8_t *k) ;
int aegis128l_encrypt(uint8_t *c,
size_t maclen,
const uint8_t *m,
size_t mlen,
const uint8_t *ad,
size_t adlen,
const uint8_t *npub,
const uint8_t *k);
int aegis128l_decrypt(uint8_t *m,
const uint8_t *c,
size_t clen,
size_t maclen,
const uint8_t *ad,
size_t adlen,
const uint8_t *npub,
const uint8_t *k) ;
void aegis128l_state_init(aegis128l_state *st_,
const uint8_t *ad,
size_t adlen,
const uint8_t *npub,
const uint8_t *k);
int aegis128l_state_encrypt_update(aegis128l_state *st_,
uint8_t *c,
size_t clen_max,
size_t *written,
const uint8_t *m,
size_t mlen);
int aegis128l_state_encrypt_detached_final(aegis128l_state *st_,
uint8_t *c,
size_t clen_max,
size_t *written,
uint8_t *mac,
size_t maclen);
int aegis128l_state_encrypt_final(aegis128l_state *st_,
uint8_t *c,
size_t clen_max,
size_t *written,
size_t maclen);
int aegis128l_state_decrypt_detached_update(aegis128l_state *st_,
uint8_t *m,
size_t mlen_max,
size_t *written,
const uint8_t *c,
size_t clen) ;
int aegis128l_state_decrypt_detached_final(aegis128l_state *st_,
uint8_t *m,
size_t mlen_max,
size_t *written,
const uint8_t *mac,
size_t maclen) ;
void aegis128l_stream(uint8_t *out, size_t len, const uint8_t *npub, const uint8_t *k);
void aegis128l_encrypt_unauthenticated(uint8_t *c,
const uint8_t *m,
size_t mlen,
const uint8_t *npub,
const uint8_t *k);
void aegis128l_decrypt_unauthenticated(uint8_t *m,
const uint8_t *c,
size_t clen,
const uint8_t *npub,
const uint8_t *k);
void aegis128l_mac_init(aegis128l_mac_state *st_, const uint8_t *k, const uint8_t *npub);
int aegis128l_mac_update(aegis128l_mac_state *st_, const uint8_t *m, size_t mlen);
int aegis128l_mac_final(aegis128l_mac_state *st_, uint8_t *mac, size_t maclen);
int aegis128l_mac_verify(aegis128l_mac_state *st_, const uint8_t *mac, size_t maclen);
void aegis128l_mac_reset(aegis128l_mac_state *st_);
void aegis128l_mac_state_clone(aegis128l_mac_state *dst, const aegis128l_mac_state *src);
/* aegis128x2.h */
typedef struct aegis128x2_state { uint8_t opaque[448]; } aegis128x2_state;
typedef struct aegis128x2_mac_state { uint8_t opaque[704]; } aegis128x2_mac_state;
size_t aegis128x2_keybytes(void);
size_t aegis128x2_npubbytes(void);
size_t aegis128x2_abytes_min(void);
size_t aegis128x2_abytes_max(void);
size_t aegis128x2_tailbytes_max(void);
int aegis128x2_encrypt_detached(uint8_t *c,
uint8_t *mac,
size_t maclen,
const uint8_t *m,
size_t mlen,
const uint8_t *ad,
size_t adlen,
const uint8_t *npub,
const uint8_t *k);
int aegis128x2_decrypt_detached(uint8_t *m,
const uint8_t *c,
size_t clen,
const uint8_t *mac,
size_t maclen,
const uint8_t *ad,
size_t adlen,
const uint8_t *npub,
const uint8_t *k) ;
int aegis128x2_encrypt(uint8_t *c,
size_t maclen,
const uint8_t *m,
size_t mlen,
const uint8_t *ad,
size_t adlen,
const uint8_t *npub,
const uint8_t *k);
int aegis128x2_decrypt(uint8_t *m,
const uint8_t *c,
size_t clen,
size_t maclen,
const uint8_t *ad,
size_t adlen,
const uint8_t *npub,
const uint8_t *k) ;
void aegis128x2_state_init(aegis128x2_state *st_,
const uint8_t *ad,
size_t adlen,
const uint8_t *npub,
const uint8_t *k);
int aegis128x2_state_encrypt_update(aegis128x2_state *st_,
uint8_t *c,
size_t clen_max,
size_t *written,
const uint8_t *m,
size_t mlen);
int aegis128x2_state_encrypt_detached_final(aegis128x2_state *st_,
uint8_t *c,
size_t clen_max,
size_t *written,
uint8_t *mac,
size_t maclen);
int aegis128x2_state_encrypt_final(aegis128x2_state *st_,
uint8_t *c,
size_t clen_max,
size_t *written,
size_t maclen);
int aegis128x2_state_decrypt_detached_update(aegis128x2_state *st_,
uint8_t *m,
size_t mlen_max,
size_t *written,
const uint8_t *c,
size_t clen) ;
int aegis128x2_state_decrypt_detached_final(aegis128x2_state *st_,
uint8_t *m,
size_t mlen_max,
size_t *written,
const uint8_t *mac,
size_t maclen) ;
void aegis128x2_stream(uint8_t *out, size_t len, const uint8_t *npub, const uint8_t *k);
void aegis128x2_encrypt_unauthenticated(uint8_t *c,
const uint8_t *m,
size_t mlen,
const uint8_t *npub,
const uint8_t *k);
void aegis128x2_decrypt_unauthenticated(uint8_t *m,
const uint8_t *c,
size_t clen,
const uint8_t *npub,
const uint8_t *k);
void aegis128x2_mac_init(aegis128x2_mac_state *st_, const uint8_t *k, const uint8_t *npub);
int aegis128x2_mac_update(aegis128x2_mac_state *st_, const uint8_t *m, size_t mlen);
int aegis128x2_mac_final(aegis128x2_mac_state *st_, uint8_t *mac, size_t maclen);
int aegis128x2_mac_verify(aegis128x2_mac_state *st_, const uint8_t *mac, size_t maclen);
void aegis128x2_mac_reset(aegis128x2_mac_state *st_);
void aegis128x2_mac_state_clone(aegis128x2_mac_state *dst, const aegis128x2_mac_state *src);
/* aegis128x4.h */
typedef struct aegis128x4_state { uint8_t opaque[832]; } aegis128x4_state;
typedef struct aegis128x4_mac_state { uint8_t opaque[1344]; } aegis128x4_mac_state;
size_t aegis128x4_keybytes(void);
size_t aegis128x4_npubbytes(void);
size_t aegis128x4_abytes_min(void);
size_t aegis128x4_abytes_max(void);
size_t aegis128x4_tailbytes_max(void);
int aegis128x4_encrypt_detached(uint8_t *c,
uint8_t *mac,
size_t maclen,
const uint8_t *m,
size_t mlen,
const uint8_t *ad,
size_t adlen,
const uint8_t *npub,
const uint8_t *k);
int aegis128x4_decrypt_detached(uint8_t *m,
const uint8_t *c,
size_t clen,
const uint8_t *mac,
size_t maclen,
const uint8_t *ad,
size_t adlen,
const uint8_t *npub,
const uint8_t *k) ;
int aegis128x4_encrypt(uint8_t *c,
size_t maclen,
const uint8_t *m,
size_t mlen,
const uint8_t *ad,
size_t adlen,
const uint8_t *npub,
const uint8_t *k);
int aegis128x4_decrypt(uint8_t *m,
const uint8_t *c,
size_t clen,
size_t maclen,
const uint8_t *ad,
size_t adlen,
const uint8_t *npub,
const uint8_t *k) ;
void aegis128x4_state_init(aegis128x4_state *st_,
const uint8_t *ad,
size_t adlen,
const uint8_t *npub,
const uint8_t *k);
int aegis128x4_state_encrypt_update(aegis128x4_state *st_,
uint8_t *c,
size_t clen_max,
size_t *written,
const uint8_t *m,
size_t mlen);
int aegis128x4_state_encrypt_detached_final(aegis128x4_state *st_,
uint8_t *c,
size_t clen_max,
size_t *written,
uint8_t *mac,
size_t maclen);
int aegis128x4_state_encrypt_final(aegis128x4_state *st_,
uint8_t *c,
size_t clen_max,
size_t *written,
size_t maclen);
int aegis128x4_state_decrypt_detached_update(aegis128x4_state *st_,
uint8_t *m,
size_t mlen_max,
size_t *written,
const uint8_t *c,
size_t clen) ;
int aegis128x4_state_decrypt_detached_final(aegis128x4_state *st_,
uint8_t *m,
size_t mlen_max,
size_t *written,
const uint8_t *mac,
size_t maclen) ;
void aegis128x4_stream(uint8_t *out, size_t len, const uint8_t *npub, const uint8_t *k);
void aegis128x4_encrypt_unauthenticated(uint8_t *c,
const uint8_t *m,
size_t mlen,
const uint8_t *npub,
const uint8_t *k);
void aegis128x4_decrypt_unauthenticated(uint8_t *m,
const uint8_t *c,
size_t clen,
const uint8_t *npub,
const uint8_t *k);
void aegis128x4_mac_init(aegis128x4_mac_state *st_, const uint8_t *k, const uint8_t *npub);
int aegis128x4_mac_update(aegis128x4_mac_state *st_, const uint8_t *m, size_t mlen);
int aegis128x4_mac_final(aegis128x4_mac_state *st_, uint8_t *mac, size_t maclen);
int aegis128x4_mac_verify(aegis128x4_mac_state *st_, const uint8_t *mac, size_t maclen);
void aegis128x4_mac_reset(aegis128x4_mac_state *st_);
void aegis128x4_mac_state_clone(aegis128x4_mac_state *dst, const aegis128x4_mac_state *src);
/* aegis256.h */
typedef struct aegis256_state { uint8_t opaque[192]; } aegis256_state;
typedef struct aegis256_mac_state { uint8_t opaque[288]; } aegis256_mac_state;
size_t aegis256_keybytes(void);
size_t aegis256_npubbytes(void);
size_t aegis256_abytes_min(void);
size_t aegis256_abytes_max(void);
size_t aegis256_tailbytes_max(void);
int aegis256_encrypt_detached(uint8_t *c,
uint8_t *mac,
size_t maclen,
const uint8_t *m,
size_t mlen,
const uint8_t *ad,
size_t adlen,
const uint8_t *npub,
const uint8_t *k);
int aegis256_decrypt_detached(uint8_t *m,
const uint8_t *c,
size_t clen,
const uint8_t *mac,
size_t maclen,
const uint8_t *ad,
size_t adlen,
const uint8_t *npub,
const uint8_t *k) ;
int aegis256_encrypt(uint8_t *c,
size_t maclen,
const uint8_t *m,
size_t mlen,
const uint8_t *ad,
size_t adlen,
const uint8_t *npub,
const uint8_t *k);
int aegis256_decrypt(uint8_t *m,
const uint8_t *c,
size_t clen,
size_t maclen,
const uint8_t *ad,
size_t adlen,
const uint8_t *npub,
const uint8_t *k) ;
void aegis256_state_init(aegis256_state *st_,
const uint8_t *ad,
size_t adlen,
const uint8_t *npub,
const uint8_t *k);
int aegis256_state_encrypt_update(aegis256_state *st_,
uint8_t *c,
size_t clen_max,
size_t *written,
const uint8_t *m,
size_t mlen);
int aegis256_state_encrypt_detached_final(aegis256_state *st_,
uint8_t *c,
size_t clen_max,
size_t *written,
uint8_t *mac,
size_t maclen);
int aegis256_state_encrypt_final(aegis256_state *st_,
uint8_t *c,
size_t clen_max,
size_t *written,
size_t maclen);
int aegis256_state_decrypt_detached_update(aegis256_state *st_,
uint8_t *m,
size_t mlen_max,
size_t *written,
const uint8_t *c,
size_t clen) ;
int aegis256_state_decrypt_detached_final(aegis256_state *st_,
uint8_t *m,
size_t mlen_max,
size_t *written,
const uint8_t *mac,
size_t maclen) ;
void aegis256_stream(uint8_t *out, size_t len, const uint8_t *npub, const uint8_t *k);
void aegis256_encrypt_unauthenticated(uint8_t *c,
const uint8_t *m,
size_t mlen,
const uint8_t *npub,
const uint8_t *k);
void aegis256_decrypt_unauthenticated(uint8_t *m,
const uint8_t *c,
size_t clen,
const uint8_t *npub,
const uint8_t *k);
void aegis256_mac_init(aegis256_mac_state *st_, const uint8_t *k, const uint8_t *npub);
int aegis256_mac_update(aegis256_mac_state *st_, const uint8_t *m, size_t mlen);
int aegis256_mac_final(aegis256_mac_state *st_, uint8_t *mac, size_t maclen);
int aegis256_mac_verify(aegis256_mac_state *st_, const uint8_t *mac, size_t maclen);
void aegis256_mac_reset(aegis256_mac_state *st_);
void aegis256_mac_state_clone(aegis256_mac_state *dst, const aegis256_mac_state *src);
/* aegis256x2.h */
typedef struct aegis256x2_state { uint8_t opaque[320]; } aegis256x2_state;
typedef struct aegis256x2_mac_state { uint8_t opaque[512]; } aegis256x2_mac_state;
size_t aegis256x2_keybytes(void);
size_t aegis256x2_npubbytes(void);
size_t aegis256x2_abytes_min(void);
size_t aegis256x2_abytes_max(void);
size_t aegis256x2_tailbytes_max(void);
int aegis256x2_encrypt_detached(uint8_t *c,
uint8_t *mac,
size_t maclen,
const uint8_t *m,
size_t mlen,
const uint8_t *ad,
size_t adlen,
const uint8_t *npub,
const uint8_t *k);
int aegis256x2_decrypt_detached(uint8_t *m,
const uint8_t *c,
size_t clen,
const uint8_t *mac,
size_t maclen,
const uint8_t *ad,
size_t adlen,
const uint8_t *npub,
const uint8_t *k) ;
int aegis256x2_encrypt(uint8_t *c,
size_t maclen,
const uint8_t *m,
size_t mlen,
const uint8_t *ad,
size_t adlen,
const uint8_t *npub,
const uint8_t *k);
int aegis256x2_decrypt(uint8_t *m,
const uint8_t *c,
size_t clen,
size_t maclen,
const uint8_t *ad,
size_t adlen,
const uint8_t *npub,
const uint8_t *k) ;
void aegis256x2_state_init(aegis256x2_state *st_,
const uint8_t *ad,
size_t adlen,
const uint8_t *npub,
const uint8_t *k);
int aegis256x2_state_encrypt_update(aegis256x2_state *st_,
uint8_t *c,
size_t clen_max,
size_t *written,
const uint8_t *m,
size_t mlen);
int aegis256x2_state_encrypt_detached_final(aegis256x2_state *st_,
uint8_t *c,
size_t clen_max,
size_t *written,
uint8_t *mac,
size_t maclen);
int aegis256x2_state_encrypt_final(aegis256x2_state *st_,
uint8_t *c,
size_t clen_max,
size_t *written,
size_t maclen);
int aegis256x2_state_decrypt_detached_update(aegis256x2_state *st_,
uint8_t *m,
size_t mlen_max,
size_t *written,
const uint8_t *c,
size_t clen) ;
int aegis256x2_state_decrypt_detached_final(aegis256x2_state *st_,
uint8_t *m,
size_t mlen_max,
size_t *written,
const uint8_t *mac,
size_t maclen) ;
void aegis256x2_stream(uint8_t *out, size_t len, const uint8_t *npub, const uint8_t *k);
void aegis256x2_encrypt_unauthenticated(uint8_t *c,
const uint8_t *m,
size_t mlen,
const uint8_t *npub,
const uint8_t *k);
void aegis256x2_decrypt_unauthenticated(uint8_t *m,
const uint8_t *c,
size_t clen,
const uint8_t *npub,
const uint8_t *k);
void aegis256x2_mac_init(aegis256x2_mac_state *st_, const uint8_t *k, const uint8_t *npub);
int aegis256x2_mac_update(aegis256x2_mac_state *st_, const uint8_t *m, size_t mlen);
int aegis256x2_mac_final(aegis256x2_mac_state *st_, uint8_t *mac, size_t maclen);
int aegis256x2_mac_verify(aegis256x2_mac_state *st_, const uint8_t *mac, size_t maclen);
void aegis256x2_mac_reset(aegis256x2_mac_state *st_);
void aegis256x2_mac_state_clone(aegis256x2_mac_state *dst, const aegis256x2_mac_state *src);
/* aegis256x4.h */
typedef struct aegis256x4_state { uint8_t opaque[576]; } aegis256x4_state;
typedef struct aegis256x4_mac_state { uint8_t opaque[960]; } aegis256x4_mac_state;
size_t aegis256x4_keybytes(void);
size_t aegis256x4_npubbytes(void);
size_t aegis256x4_abytes_min(void);
size_t aegis256x4_abytes_max(void);
size_t aegis256x4_tailbytes_max(void);
int aegis256x4_encrypt_detached(uint8_t *c,
uint8_t *mac,
size_t maclen,
const uint8_t *m,
size_t mlen,
const uint8_t *ad,
size_t adlen,
const uint8_t *npub,
const uint8_t *k);
int aegis256x4_decrypt_detached(uint8_t *m,
const uint8_t *c,
size_t clen,
const uint8_t *mac,
size_t maclen,
const uint8_t *ad,
size_t adlen,
const uint8_t *npub,
const uint8_t *k) ;
int aegis256x4_encrypt(uint8_t *c,
size_t maclen,
const uint8_t *m,
size_t mlen,
const uint8_t *ad,
size_t adlen,
const uint8_t *npub,
const uint8_t *k);
int aegis256x4_decrypt(uint8_t *m,
const uint8_t *c,
size_t clen,
size_t maclen,
const uint8_t *ad,
size_t adlen,
const uint8_t *npub,
const uint8_t *k) ;
void aegis256x4_state_init(aegis256x4_state *st_,
const uint8_t *ad,
size_t adlen,
const uint8_t *npub,
const uint8_t *k);
int aegis256x4_state_encrypt_update(aegis256x4_state *st_,
uint8_t *c,
size_t clen_max,
size_t *written,
const uint8_t *m,
size_t mlen);
int aegis256x4_state_encrypt_detached_final(aegis256x4_state *st_,
uint8_t *c,
size_t clen_max,
size_t *written,
uint8_t *mac,
size_t maclen);
int aegis256x4_state_encrypt_final(aegis256x4_state *st_,
uint8_t *c,
size_t clen_max,
size_t *written,
size_t maclen);
int aegis256x4_state_decrypt_detached_update(aegis256x4_state *st_,
uint8_t *m,
size_t mlen_max,
size_t *written,
const uint8_t *c,
size_t clen) ;
int aegis256x4_state_decrypt_detached_final(aegis256x4_state *st_,
uint8_t *m,
size_t mlen_max,
size_t *written,
const uint8_t *mac,
size_t maclen) ;
void aegis256x4_stream(uint8_t *out, size_t len, const uint8_t *npub, const uint8_t *k);
void aegis256x4_encrypt_unauthenticated(uint8_t *c,
const uint8_t *m,
size_t mlen,
const uint8_t *npub,
const uint8_t *k);
void aegis256x4_decrypt_unauthenticated(uint8_t *m,
const uint8_t *c,
size_t clen,
const uint8_t *npub,
const uint8_t *k);
void aegis256x4_mac_init(aegis256x4_mac_state *st_, const uint8_t *k, const uint8_t *npub);
int aegis256x4_mac_update(aegis256x4_mac_state *st_, const uint8_t *m, size_t mlen);
int aegis256x4_mac_final(aegis256x4_mac_state *st_, uint8_t *mac, size_t maclen);
int aegis256x4_mac_verify(aegis256x4_mac_state *st_, const uint8_t *mac, size_t maclen);
void aegis256x4_mac_reset(aegis256x4_mac_state *st_);
void aegis256x4_mac_state_clone(aegis256x4_mac_state *dst, const aegis256x4_mac_state *src);

34
pyaegis/util.py Normal file
View File

@@ -0,0 +1,34 @@
"""Utility helpers for pyaegis.
Currently provides Python-side aligned allocation helpers that avoid relying
on libc/posix_memalign. Memory is owned by Python; C code only borrows it.
"""
from __future__ import annotations
from ._loader import ffi
__all__ = ["new_aligned_struct", "aligned_address"]
def aligned_address(obj) -> int:
"""Return the integer address of the start of a cffi array object."""
return int(ffi.cast("uintptr_t", ffi.addressof(obj, 0)))
def new_aligned_struct(ctype: str, alignment: int) -> tuple[object, object]:
"""Allocate memory for one instance of ``ctype`` with requested alignment.
This allocates a Python-owned unsigned char[] buffer large enough to find
an aligned start address. Returns (ptr, owner) where ptr is a ``ctype *``
and owner is the buffer object keeping the memory alive.
"""
if alignment & (alignment - 1): # Not power of two
raise ValueError("alignment must be a power of two")
size = ffi.sizeof(ctype)
base = ffi.new("unsigned char[]", size + alignment - 1)
addr = aligned_address(base)
offset = (-addr) & (alignment - 1)
aligned_uc = ffi.addressof(base, offset)
ptr = ffi.cast(f"{ctype} *", aligned_uc)
return ptr, base

View File

@@ -3,7 +3,7 @@ requires = ["hatchling", "cffi>=2.0.0"]
build-backend = "hatchling.build"
[project]
name = "aegis"
name = "pyaegis"
version = "0.1.0"
description = "Python bindings for libaegis (links to system library)"
requires-python = ">=3.12"
@@ -33,6 +33,4 @@ dev = [
path = "tools/build_hook.py"
[tool.hatch.build.targets.wheel]
# Ensure only the Python package is included by default; native artifacts
# will be added by the build hook once implemented.
packages = ["pyaegis"]

View File

@@ -10,8 +10,11 @@ from hatchling.builders.hooks.plugin.interface import BuildHookInterface
class BuildHook(BuildHookInterface):
"""Build dynamic library with Zig and include in wheel."""
PLUGIN_NAME = "pyaegis_build_hook"
def initialize(self, version: str, build_data: dict) -> None:
"""Build library with Zig and add it to the wheel."""
super().initialize(version, build_data)
if self.target_name != "wheel":
return
@@ -19,52 +22,79 @@ class BuildHook(BuildHookInterface):
raise RuntimeError("Zig compiler not found in PATH")
libaegis_dir = Path(self.root) / "libaegis"
self.app.display_info(f"[aegis] Using libaegis source at: {libaegis_dir}")
original_build_zig = libaegis_dir / "build.zig"
if not original_build_zig.exists():
raise RuntimeError(f"libaegis source not found at {libaegis_dir}")
# Prepare a temporary build directory (avoid touching original files)
build_dir = Path.cwd() / "libaegis-build"
build_dir.mkdir(exist_ok=True)
build_zig = build_dir / "build.zig"
build_zig.write_text(
original_build_zig.read_text(encoding="utf-8").replace(
".linkage = .static,", ".linkage = .dynamic,"
),
encoding="utf-8",
)
for res in "build.zig.zon", "src":
(build_dir / res).symlink_to(libaegis_dir / res)
self.app.display_info("[aegis] Building libaegis dynamic library with Zig...")
try:
subprocess.run(
["zig", "build", "-Drelease"],
check=True,
cwd=str(build_dir),
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
build_dir = Path("libaegis-build")
build_dir.mkdir(exist_ok=True)
build_zig = build_dir / "build.zig"
build_zig.write_text(
original_build_zig.read_text(encoding="utf-8").replace(
".linkage = .static,", ".linkage = .dynamic,"
),
encoding="utf-8",
)
except subprocess.CalledProcessError as e:
output = e.stdout.decode(errors="replace") if e.stdout else ""
raise RuntimeError(f"Zig build failed:\n{output}") from e
for res in ("build.zig.zon", "src"):
(build_dir / res).symlink_to(libaegis_dir / res)
self.app.display_info(
"[aegis] Building libaegis dynamic library with Zig..."
)
try:
subprocess.run(
["zig", "build", "-Drelease"],
check=True,
cwd=str(build_dir),
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
)
except subprocess.CalledProcessError as e:
output = e.stdout.decode(errors="replace") if e.stdout else ""
raise RuntimeError(f"Zig build failed:\n{output}") from e
lib_dir = build_dir / "zig-out" / "lib"
lib_dir = build_dir / "zig-out" / "lib"
dynamic_lib = None
for lib_file in lib_dir.iterdir():
if lib_file.name.startswith("libaegis") and lib_file.suffix in (
".so",
".dylib",
".dll",
):
dynamic_lib = lib_file
break
dynamic_lib = None
for lib_file in lib_dir.iterdir():
if lib_file.name.startswith("libaegis") and lib_file.suffix in (
".so",
".dylib",
".dll",
):
dynamic_lib = lib_file
break
if not dynamic_lib or not dynamic_lib.exists():
raise RuntimeError(f"Built dynamic library not found in {lib_dir}")
if not dynamic_lib or not dynamic_lib.exists():
raise RuntimeError(f"Built dynamic library not found in {lib_dir}")
# Copy the built dynamic library into the Python package tree so that it
# is naturally included as package data. Hatch will pick up anything
# under the listed packages ("pyaegis"), so a direct copy is simpler
# than relying on force_include. We still leave the original artifact
# in place in case other hooks/tools want to inspect it.
package_build_dir = Path(self.root) / "pyaegis" / "build"
package_build_dir.mkdir(parents=True, exist_ok=True)
self.app.display_info(
f"[aegis] Staging dynamic library to package... {package_build_dir} {Path.cwd()}"
)
dest_path = package_build_dir / dynamic_lib.name
try:
shutil.copy2(dynamic_lib, dest_path)
except Exception as e: # pragma: no cover - defensive
raise RuntimeError(
f"Failed to copy dynamic library to package: {e}"
) from e
finally:
shutil.rmtree(build_dir, ignore_errors=True)
# Retain force_include as a fallback for environments where an older
# Hatch might not automatically include non-.py files, or if wheels are
# built with custom exclusion rules.
if "force_include" not in build_data:
build_data["force_include"] = {}
dest_rel = str(Path("build") / dynamic_lib.name)
build_data["force_include"][str(dynamic_lib)] = dest_rel
self.app.display_info(f"[aegis] Added dynamic library to wheel: {dest_rel}")
build_data["force_include"][str(dest_path)] = str(
Path("pyaegis") / "build" / dynamic_lib.name
)
self.app.display_info(f"[aegis] Dynamic library staged at: {dest_path}")

View File

@@ -130,22 +130,13 @@ def generate_cdef(include_dir: pathlib.Path) -> str:
lines.append("")
# Add libc bits for aligned allocation
lines.extend(
[
"/* libc bits for aligned allocation on POSIX */",
"int posix_memalign(void **memptr, size_t alignment, size_t size);",
"void free(void *ptr);",
]
)
return "\n".join(lines)
def main() -> int:
# Find the include directory
root = pathlib.Path(__file__).resolve().parents[2]
include_dir = root / "src" / "include"
root = pathlib.Path(__file__).parent.parent
include_dir = root / "libaegis" / "src" / "include"
if not include_dir.exists():
print(f"Include directory not found: {include_dir}", file=sys.stderr)
@@ -153,8 +144,8 @@ def main() -> int:
cdef_string = generate_cdef(include_dir)
# Write to a file in the pyaegis/build subdirectory
output_dir = root / "python" / "pyaegis" / "build"
# Write to a file in the pyaegis directory
output_dir = root / "pyaegis"
output_dir.mkdir(exist_ok=True)
output_path = output_dir / "aegis_cdef.h"
output_path.write_text(cdef_string, encoding="utf-8")

View File

@@ -17,13 +17,8 @@ import re
import sys
# Template and target locations
ROOT = (
pathlib.Path(__file__).resolve().parents[2]
if (pathlib.Path(__file__).resolve().parents[0].name == "tools")
else pathlib.Path.cwd()
)
PY_DIR = ROOT / "python"
AEGIS_DIR = PY_DIR / "aegis"
ROOT = pathlib.Path(__file__).parent.parent
AEGIS_DIR = ROOT / "pyaegis"
TEMPLATE = AEGIS_DIR / "aegis256x4.py"
# Variants to generate (template excluded) and their ALIGNMENT values