Convert all input buffers to memoryview before use and use .nbytes, because len() doesn't work correctly with some buffers. Update docs with a Numpy example.
This commit is contained in:
12
README.md
12
README.md
@@ -243,6 +243,18 @@ For advanced use cases, the output buffer can be supplied with `into` kwarg. Any
|
||||
|
||||
A `TypeError` is raised if the buffer is too small. For convenience, the functions return a memoryview showing only the bytes actually written.
|
||||
|
||||
Foreign arrays can be used. This example fills a Numpy array with random integers.
|
||||
|
||||
```python
|
||||
import numpy as np
|
||||
from pyaegis import aegis128x4 as ciph
|
||||
key, nonce = ciph.random_key(), ciph.random_nonce()
|
||||
arr = np.empty(10, dtype=np.uint64) # Uninitialised integer array
|
||||
ciph.stream(key, nonce, into=arr) # Fill with random bytes
|
||||
print(arr)
|
||||
```
|
||||
|
||||
|
||||
In-place operations are supported when the input and the output point to the same location in memory. When using attached MAC tag, the input buffer needs to be sliced to correct length:
|
||||
|
||||
```python
|
||||
|
||||
@@ -74,21 +74,30 @@ def encrypt_detached(
|
||||
"""
|
||||
if maclen not in (16, 32):
|
||||
raise TypeError("maclen must be 16 or 32")
|
||||
if len(key) != KEYBYTES:
|
||||
key = memoryview(key)
|
||||
nonce = memoryview(nonce)
|
||||
message = memoryview(message)
|
||||
if ad is not None:
|
||||
ad = memoryview(ad)
|
||||
if ct_into is not None:
|
||||
ct_into = memoryview(ct_into)
|
||||
if mac_into is not None:
|
||||
mac_into = memoryview(mac_into)
|
||||
if key.nbytes != KEYBYTES:
|
||||
raise TypeError(f"key length must be {KEYBYTES}")
|
||||
if len(nonce) != NONCEBYTES:
|
||||
if nonce.nbytes != NONCEBYTES:
|
||||
raise TypeError(f"nonce length must be {NONCEBYTES}")
|
||||
|
||||
if ct_into is None:
|
||||
c = bytearray(len(message))
|
||||
c = bytearray(message.nbytes)
|
||||
else:
|
||||
if len(ct_into) < len(message):
|
||||
raise TypeError("ct_into length must be at least len(message)")
|
||||
if ct_into.nbytes < message.nbytes:
|
||||
raise TypeError("ct_into length must be at least message.nbytes")
|
||||
c = ct_into
|
||||
if mac_into is None:
|
||||
mac = bytearray(maclen)
|
||||
else:
|
||||
if len(mac_into) < maclen:
|
||||
if mac_into.nbytes < maclen:
|
||||
raise TypeError("mac_into length must be at least maclen")
|
||||
mac = mac_into
|
||||
|
||||
@@ -97,9 +106,9 @@ def encrypt_detached(
|
||||
ffi.from_buffer(mac),
|
||||
maclen,
|
||||
_ptr(message),
|
||||
len(message),
|
||||
message.nbytes,
|
||||
_ptr(ad),
|
||||
0 if ad is None else len(ad),
|
||||
0 if ad is None else ad.nbytes,
|
||||
_ptr(nonce),
|
||||
_ptr(key),
|
||||
)
|
||||
@@ -108,7 +117,7 @@ def encrypt_detached(
|
||||
err_name = errno.errorcode.get(err_num, f"errno_{err_num}")
|
||||
raise RuntimeError(f"encrypt detached failed: {err_name}")
|
||||
return (
|
||||
c if ct_into is None else memoryview(c)[: len(message)],
|
||||
c if ct_into is None else memoryview(c)[: message.nbytes],
|
||||
mac if mac_into is None else memoryview(mac)[:maclen],
|
||||
) # type: ignore
|
||||
|
||||
@@ -139,34 +148,42 @@ def decrypt_detached(
|
||||
TypeError: If lengths are invalid.
|
||||
ValueError: If authentication fails.
|
||||
"""
|
||||
if len(key) != KEYBYTES:
|
||||
key = memoryview(key)
|
||||
nonce = memoryview(nonce)
|
||||
ct = memoryview(ct)
|
||||
mac = memoryview(mac)
|
||||
if ad is not None:
|
||||
ad = memoryview(ad)
|
||||
if into is not None:
|
||||
into = memoryview(into)
|
||||
if key.nbytes != KEYBYTES:
|
||||
raise TypeError(f"key length must be {KEYBYTES}")
|
||||
if len(nonce) != NONCEBYTES:
|
||||
if nonce.nbytes != NONCEBYTES:
|
||||
raise TypeError(f"nonce length must be {NONCEBYTES}")
|
||||
maclen = len(mac)
|
||||
maclen = mac.nbytes
|
||||
if maclen not in (16, 32):
|
||||
raise TypeError("mac length must be 16 or 32")
|
||||
if into is None:
|
||||
out = bytearray(len(ct))
|
||||
out = bytearray(ct.nbytes)
|
||||
else:
|
||||
if len(into) < len(ct):
|
||||
raise TypeError("into length must be at least len(ciphertext)")
|
||||
if into.nbytes < ct.nbytes:
|
||||
raise TypeError("into length must be at least ct.nbytes")
|
||||
out = into
|
||||
|
||||
rc = _lib.aegis128l_decrypt_detached(
|
||||
ffi.from_buffer(out),
|
||||
_ptr(ct),
|
||||
len(ct),
|
||||
ct.nbytes,
|
||||
_ptr(mac),
|
||||
maclen,
|
||||
_ptr(ad),
|
||||
0 if ad is None else len(ad),
|
||||
0 if ad is None else ad.nbytes,
|
||||
_ptr(nonce),
|
||||
_ptr(key),
|
||||
)
|
||||
if rc != 0:
|
||||
raise ValueError("authentication failed")
|
||||
return out if into is None else memoryview(out)[: len(ct)] # type: ignore
|
||||
return out if into is None else memoryview(out)[: ct.nbytes] # type: ignore
|
||||
|
||||
|
||||
def encrypt(
|
||||
@@ -197,24 +214,31 @@ def encrypt(
|
||||
"""
|
||||
if maclen not in (16, 32):
|
||||
raise TypeError("maclen must be 16 or 32")
|
||||
if len(key) != KEYBYTES:
|
||||
key = memoryview(key)
|
||||
nonce = memoryview(nonce)
|
||||
message = memoryview(message)
|
||||
if ad is not None:
|
||||
ad = memoryview(ad)
|
||||
if into is not None:
|
||||
into = memoryview(into)
|
||||
if key.nbytes != KEYBYTES:
|
||||
raise TypeError(f"key length must be {KEYBYTES}")
|
||||
if len(nonce) != NONCEBYTES:
|
||||
if nonce.nbytes != NONCEBYTES:
|
||||
raise TypeError(f"nonce length must be {NONCEBYTES}")
|
||||
if into is None:
|
||||
out = bytearray(len(message) + maclen)
|
||||
out = bytearray(message.nbytes + maclen)
|
||||
else:
|
||||
if len(into) < len(message) + maclen:
|
||||
raise TypeError("into length must be at least len(message)+maclen")
|
||||
if into.nbytes < message.nbytes + maclen:
|
||||
raise TypeError("into length must be at least message.nbytes + maclen")
|
||||
out = into
|
||||
|
||||
rc = _lib.aegis128l_encrypt(
|
||||
ffi.from_buffer(out),
|
||||
maclen,
|
||||
_ptr(message),
|
||||
len(message),
|
||||
message.nbytes,
|
||||
_ptr(ad),
|
||||
0 if ad is None else len(ad),
|
||||
0 if ad is None else ad.nbytes,
|
||||
_ptr(nonce),
|
||||
_ptr(key),
|
||||
)
|
||||
@@ -222,7 +246,7 @@ def encrypt(
|
||||
err_num = ffi.errno
|
||||
err_name = errno.errorcode.get(err_num, f"errno_{err_num}")
|
||||
raise RuntimeError(f"encrypt failed: {err_name}")
|
||||
return out if into is None else memoryview(out)[: len(message) + maclen] # type: ignore
|
||||
return out if into is None else memoryview(out)[: message.nbytes + maclen] # type: ignore
|
||||
|
||||
|
||||
def decrypt(
|
||||
@@ -253,29 +277,36 @@ def decrypt(
|
||||
"""
|
||||
if maclen not in (16, 32):
|
||||
raise TypeError("maclen must be 16 or 32")
|
||||
if len(key) != KEYBYTES:
|
||||
key = memoryview(key)
|
||||
nonce = memoryview(nonce)
|
||||
ct = memoryview(ct)
|
||||
if ad is not None:
|
||||
ad = memoryview(ad)
|
||||
if into is not None:
|
||||
into = memoryview(into)
|
||||
if key.nbytes != KEYBYTES:
|
||||
raise TypeError(f"key length must be {KEYBYTES}")
|
||||
if len(nonce) != NONCEBYTES:
|
||||
if nonce.nbytes != NONCEBYTES:
|
||||
raise TypeError(f"nonce length must be {NONCEBYTES}")
|
||||
if len(ct) < maclen:
|
||||
if ct.nbytes < maclen:
|
||||
raise TypeError("ciphertext too short for tag")
|
||||
expected_out = len(ct) - maclen
|
||||
expected_out = ct.nbytes - maclen
|
||||
if into is None:
|
||||
out = bytearray(expected_out)
|
||||
else:
|
||||
if len(into) < expected_out:
|
||||
if into.nbytes < expected_out:
|
||||
raise TypeError(
|
||||
"into length must be at least len(ciphertext_with_tag)-maclen"
|
||||
"into length must be at least ct.nbytes - maclen"
|
||||
)
|
||||
out = into
|
||||
|
||||
rc = _lib.aegis128l_decrypt(
|
||||
ffi.from_buffer(out),
|
||||
_ptr(ct),
|
||||
len(ct),
|
||||
ct.nbytes,
|
||||
maclen,
|
||||
_ptr(ad),
|
||||
0 if ad is None else len(ad),
|
||||
0 if ad is None else ad.nbytes,
|
||||
_ptr(nonce),
|
||||
_ptr(key),
|
||||
)
|
||||
@@ -305,25 +336,30 @@ def stream(
|
||||
Raises:
|
||||
TypeError: If lengths are invalid or neither length nor into provided.
|
||||
"""
|
||||
if len(key) != KEYBYTES:
|
||||
key = memoryview(key)
|
||||
if nonce is not None:
|
||||
nonce = memoryview(nonce)
|
||||
if into is not None:
|
||||
into = memoryview(into)
|
||||
if key.nbytes != KEYBYTES:
|
||||
raise TypeError(f"key length must be {KEYBYTES}")
|
||||
if nonce is not None and len(nonce) != NONCEBYTES:
|
||||
if nonce is not None and nonce.nbytes != NONCEBYTES:
|
||||
raise TypeError(f"nonce length must be {NONCEBYTES}")
|
||||
if into is None:
|
||||
if length is None:
|
||||
raise TypeError("provide either into or length")
|
||||
out = bytearray(length)
|
||||
else:
|
||||
if length is not None and len(into) < length:
|
||||
if length is not None and into.nbytes < length:
|
||||
raise TypeError("into length must be at least length")
|
||||
out = into
|
||||
_lib.aegis128l_stream(
|
||||
ffi.from_buffer(out),
|
||||
len(out),
|
||||
memoryview(out).nbytes,
|
||||
_ptr(nonce),
|
||||
_ptr(key),
|
||||
)
|
||||
return out if into is None else memoryview(out)[: length or len(out)] # type: ignore
|
||||
return out if into is None else memoryview(out)[: length or memoryview(out).nbytes] # type: ignore
|
||||
|
||||
|
||||
def encrypt_unauthenticated(
|
||||
@@ -347,24 +383,29 @@ def encrypt_unauthenticated(
|
||||
Raises:
|
||||
TypeError: If lengths are invalid.
|
||||
"""
|
||||
if len(key) != KEYBYTES:
|
||||
key = memoryview(key)
|
||||
nonce = memoryview(nonce)
|
||||
message = memoryview(message)
|
||||
if into is not None:
|
||||
into = memoryview(into)
|
||||
if key.nbytes != KEYBYTES:
|
||||
raise TypeError(f"key length must be {KEYBYTES}")
|
||||
if len(nonce) != NONCEBYTES:
|
||||
if nonce.nbytes != NONCEBYTES:
|
||||
raise TypeError(f"nonce length must be {NONCEBYTES}")
|
||||
if into is None:
|
||||
out = bytearray(len(message))
|
||||
out = bytearray(message.nbytes)
|
||||
else:
|
||||
if len(into) < len(message):
|
||||
raise TypeError("into length must be at least len(message)")
|
||||
if into.nbytes < message.nbytes:
|
||||
raise TypeError("into length must be at least message.nbytes")
|
||||
out = into
|
||||
_lib.aegis128l_encrypt_unauthenticated(
|
||||
ffi.from_buffer(out),
|
||||
_ptr(message),
|
||||
len(message),
|
||||
message.nbytes,
|
||||
_ptr(nonce),
|
||||
_ptr(key),
|
||||
)
|
||||
return out if into is None else memoryview(out)[: len(message)] # type: ignore
|
||||
return out if into is None else memoryview(out)[: message.nbytes] # type: ignore
|
||||
|
||||
|
||||
def decrypt_unauthenticated(
|
||||
@@ -388,24 +429,29 @@ def decrypt_unauthenticated(
|
||||
Raises:
|
||||
TypeError: If lengths are invalid.
|
||||
"""
|
||||
if len(key) != KEYBYTES:
|
||||
key = memoryview(key)
|
||||
nonce = memoryview(nonce)
|
||||
ct = memoryview(ct)
|
||||
if into is not None:
|
||||
into = memoryview(into)
|
||||
if key.nbytes != KEYBYTES:
|
||||
raise TypeError(f"key length must be {KEYBYTES}")
|
||||
if len(nonce) != NONCEBYTES:
|
||||
if nonce.nbytes != NONCEBYTES:
|
||||
raise TypeError(f"nonce length must be {NONCEBYTES}")
|
||||
if into is None:
|
||||
out = bytearray(len(ct))
|
||||
out = bytearray(ct.nbytes)
|
||||
else:
|
||||
if len(into) < len(ct):
|
||||
raise TypeError("into length must be at least len(ciphertext)")
|
||||
if into.nbytes < ct.nbytes:
|
||||
raise TypeError("into length must be at least ct.nbytes")
|
||||
out = into
|
||||
_lib.aegis128l_decrypt_unauthenticated(
|
||||
ffi.from_buffer(out),
|
||||
_ptr(ct),
|
||||
len(ct),
|
||||
ct.nbytes,
|
||||
_ptr(nonce),
|
||||
_ptr(key),
|
||||
)
|
||||
return out if into is None else memoryview(out)[: len(ct)] # type: ignore
|
||||
return out if into is None else memoryview(out)[: ct.nbytes] # type: ignore
|
||||
|
||||
|
||||
# This is missing from C API but convenient to have here
|
||||
@@ -428,6 +474,11 @@ def mac(
|
||||
Returns:
|
||||
MAC bytes as bytearray if into not provided, memoryview of into otherwise
|
||||
"""
|
||||
key = memoryview(key)
|
||||
nonce = memoryview(nonce)
|
||||
data = memoryview(data)
|
||||
if into is not None:
|
||||
into = memoryview(into)
|
||||
mac_state = Mac(key, nonce, maclen)
|
||||
mac_state.update(data)
|
||||
return mac_state.final(into)
|
||||
@@ -459,9 +510,11 @@ class Mac:
|
||||
"""
|
||||
if maclen not in (16, 32):
|
||||
raise TypeError("maclen must be 16 or 32")
|
||||
if len(key) != KEYBYTES:
|
||||
key = memoryview(key)
|
||||
nonce = memoryview(nonce)
|
||||
if key.nbytes != KEYBYTES:
|
||||
raise TypeError(f"key length must be {KEYBYTES}")
|
||||
if len(nonce) != NONCEBYTES:
|
||||
if nonce.nbytes != NONCEBYTES:
|
||||
raise TypeError(f"nonce length must be {NONCEBYTES}")
|
||||
|
||||
self._maclen = maclen
|
||||
@@ -492,7 +545,8 @@ class Mac:
|
||||
"""
|
||||
if self._cached_digest is not None:
|
||||
raise RuntimeError("Cannot update after final()")
|
||||
rc = _lib.aegis128l_mac_update(self._proxy.ptr, _ptr(data), len(data))
|
||||
data = memoryview(data)
|
||||
rc = _lib.aegis128l_mac_update(self._proxy.ptr, _ptr(data), data.nbytes)
|
||||
if rc != 0:
|
||||
err_num = ffi.errno
|
||||
err_name = errno.errorcode.get(err_num, f"errno_{err_num}")
|
||||
@@ -522,12 +576,13 @@ class Mac:
|
||||
if into is None:
|
||||
out = bytearray(maclen)
|
||||
else:
|
||||
if len(into) < maclen:
|
||||
into = memoryview(into)
|
||||
if into.nbytes < maclen:
|
||||
raise TypeError("into length must be at least maclen")
|
||||
out = into
|
||||
|
||||
clone = self.clone()
|
||||
rc = _lib.aegis128l_mac_final(clone._proxy.ptr, ffi.from_buffer(out), maclen)
|
||||
rc = _lib.aegis128l_mac_final(clone._proxy.ptr, ffi.from_buffer(out), memoryview(out).nbytes)
|
||||
if rc != 0:
|
||||
err_num = ffi.errno
|
||||
err_name = errno.errorcode.get(err_num, f"errno_{err_num}")
|
||||
@@ -567,7 +622,8 @@ class Mac:
|
||||
TypeError: If tag length is invalid.
|
||||
ValueError: If verification fails.
|
||||
"""
|
||||
maclen = len(mac)
|
||||
mac = memoryview(mac)
|
||||
maclen = mac.nbytes
|
||||
if maclen not in (16, 32):
|
||||
raise TypeError("mac length must be 16 or 32")
|
||||
|
||||
@@ -606,15 +662,19 @@ class Encryptor:
|
||||
"""
|
||||
if maclen not in (16, 32):
|
||||
raise TypeError("maclen must be 16 or 32")
|
||||
if len(key) != KEYBYTES:
|
||||
key = memoryview(key)
|
||||
nonce = memoryview(nonce)
|
||||
if ad is not None:
|
||||
ad = memoryview(ad)
|
||||
if key.nbytes != KEYBYTES:
|
||||
raise TypeError(f"key length must be {KEYBYTES}")
|
||||
if len(nonce) != NONCEBYTES:
|
||||
if nonce.nbytes != NONCEBYTES:
|
||||
raise TypeError(f"nonce length must be {NONCEBYTES}")
|
||||
self._state = new_aligned_struct("aegis128l_state", ALIGNMENT)
|
||||
_lib.aegis128l_state_init(
|
||||
self._state.ptr,
|
||||
_ptr(ad) if ad is not None else ffi.NULL,
|
||||
0 if ad is None else len(ad),
|
||||
0 if ad is None else ad.nbytes,
|
||||
_ptr(nonce),
|
||||
_ptr(key),
|
||||
)
|
||||
@@ -638,10 +698,13 @@ class Encryptor:
|
||||
"""
|
||||
if self._state is None:
|
||||
raise RuntimeError("Cannot call update() after final()")
|
||||
expected_out = len(message)
|
||||
message = memoryview(message)
|
||||
if into is not None:
|
||||
into = memoryview(into)
|
||||
expected_out = message.nbytes
|
||||
out = into if into is not None else bytearray(expected_out)
|
||||
out_mv = memoryview(out)
|
||||
if len(out_mv) < expected_out:
|
||||
if out_mv.nbytes < expected_out:
|
||||
raise TypeError(
|
||||
"into length must be >= expected output size for this update"
|
||||
)
|
||||
@@ -649,10 +712,10 @@ class Encryptor:
|
||||
rc = _lib.aegis128l_state_encrypt_update(
|
||||
self._state.ptr,
|
||||
ffi.from_buffer(out_mv),
|
||||
len(out_mv),
|
||||
out_mv.nbytes,
|
||||
written,
|
||||
_ptr(message),
|
||||
len(message),
|
||||
message.nbytes,
|
||||
)
|
||||
if rc != 0:
|
||||
err_num = ffi.errno
|
||||
@@ -680,12 +743,14 @@ class Encryptor:
|
||||
raise RuntimeError("Cannot call final() after final()")
|
||||
maclen = self._maclen
|
||||
# Only the authentication tag is produced here; allocate exactly maclen
|
||||
if into is not None:
|
||||
into = memoryview(into)
|
||||
out = into if into is not None else bytearray(maclen)
|
||||
written = ffi.new("size_t *")
|
||||
rc = _lib.aegis128l_state_encrypt_final(
|
||||
self._state.ptr,
|
||||
ffi.from_buffer(out),
|
||||
len(out),
|
||||
memoryview(out).nbytes,
|
||||
written,
|
||||
maclen,
|
||||
)
|
||||
@@ -730,15 +795,19 @@ class Decryptor:
|
||||
"""
|
||||
if maclen not in (16, 32):
|
||||
raise TypeError("maclen must be 16 or 32")
|
||||
if len(key) != KEYBYTES:
|
||||
key = memoryview(key)
|
||||
nonce = memoryview(nonce)
|
||||
if ad is not None:
|
||||
ad = memoryview(ad)
|
||||
if key.nbytes != KEYBYTES:
|
||||
raise TypeError(f"key length must be {KEYBYTES}")
|
||||
if len(nonce) != NONCEBYTES:
|
||||
if nonce.nbytes != NONCEBYTES:
|
||||
raise TypeError(f"nonce length must be {NONCEBYTES}")
|
||||
self._state = new_aligned_struct("aegis128l_state", ALIGNMENT)
|
||||
_lib.aegis128l_state_init(
|
||||
self._state.ptr,
|
||||
_ptr(ad) if ad is not None else ffi.NULL,
|
||||
0 if ad is None else len(ad),
|
||||
0 if ad is None else ad.nbytes,
|
||||
_ptr(nonce),
|
||||
_ptr(key),
|
||||
)
|
||||
@@ -760,26 +829,29 @@ class Decryptor:
|
||||
"""
|
||||
if self._state is None:
|
||||
raise RuntimeError("Cannot call update() after final()")
|
||||
expected_out = len(ct)
|
||||
ct = memoryview(ct)
|
||||
if into is not None:
|
||||
into = memoryview(into)
|
||||
expected_out = ct.nbytes
|
||||
out = into if into is not None else bytearray(expected_out)
|
||||
out_mv = memoryview(out)
|
||||
if len(out_mv) < expected_out:
|
||||
if out_mv.nbytes < expected_out:
|
||||
raise TypeError("into length must be >= required capacity for this update")
|
||||
written = ffi.new("size_t *")
|
||||
rc = _lib.aegis128l_state_decrypt_detached_update(
|
||||
self._state.ptr,
|
||||
ffi.from_buffer(out_mv),
|
||||
len(out_mv),
|
||||
out_mv.nbytes,
|
||||
written,
|
||||
_ptr(ct),
|
||||
len(ct),
|
||||
ct.nbytes,
|
||||
)
|
||||
if rc != 0:
|
||||
err_num = ffi.errno
|
||||
err_name = errno.errorcode.get(err_num, f"errno_{err_num}")
|
||||
raise RuntimeError(f"state decrypt update failed: {err_name}")
|
||||
w = int(written[0])
|
||||
assert w == expected_out, f"got {w}, expected {expected_out}, len(ct)={len(ct)}"
|
||||
assert w == expected_out, f"got {w}, expected {expected_out}, ct.nbytes={ct.nbytes}"
|
||||
return out if into is None else memoryview(out)[:w] # type: ignore
|
||||
|
||||
def final(self, mac: Buffer) -> None:
|
||||
@@ -796,7 +868,8 @@ class Decryptor:
|
||||
if self._state is None:
|
||||
raise RuntimeError("Cannot call final() after final()")
|
||||
maclen = self._maclen
|
||||
if len(mac) != maclen:
|
||||
mac = memoryview(mac)
|
||||
if mac.nbytes != maclen:
|
||||
raise TypeError(f"mac length must be {maclen}")
|
||||
rc = _lib.aegis128l_state_decrypt_detached_final(
|
||||
self._state.ptr, ffi.NULL, 0, ffi.NULL, _ptr(mac), maclen
|
||||
|
||||
@@ -74,21 +74,30 @@ def encrypt_detached(
|
||||
"""
|
||||
if maclen not in (16, 32):
|
||||
raise TypeError("maclen must be 16 or 32")
|
||||
if len(key) != KEYBYTES:
|
||||
key = memoryview(key)
|
||||
nonce = memoryview(nonce)
|
||||
message = memoryview(message)
|
||||
if ad is not None:
|
||||
ad = memoryview(ad)
|
||||
if ct_into is not None:
|
||||
ct_into = memoryview(ct_into)
|
||||
if mac_into is not None:
|
||||
mac_into = memoryview(mac_into)
|
||||
if key.nbytes != KEYBYTES:
|
||||
raise TypeError(f"key length must be {KEYBYTES}")
|
||||
if len(nonce) != NONCEBYTES:
|
||||
if nonce.nbytes != NONCEBYTES:
|
||||
raise TypeError(f"nonce length must be {NONCEBYTES}")
|
||||
|
||||
if ct_into is None:
|
||||
c = bytearray(len(message))
|
||||
c = bytearray(message.nbytes)
|
||||
else:
|
||||
if len(ct_into) < len(message):
|
||||
raise TypeError("ct_into length must be at least len(message)")
|
||||
if ct_into.nbytes < message.nbytes:
|
||||
raise TypeError("ct_into length must be at least message.nbytes")
|
||||
c = ct_into
|
||||
if mac_into is None:
|
||||
mac = bytearray(maclen)
|
||||
else:
|
||||
if len(mac_into) < maclen:
|
||||
if mac_into.nbytes < maclen:
|
||||
raise TypeError("mac_into length must be at least maclen")
|
||||
mac = mac_into
|
||||
|
||||
@@ -97,9 +106,9 @@ def encrypt_detached(
|
||||
ffi.from_buffer(mac),
|
||||
maclen,
|
||||
_ptr(message),
|
||||
len(message),
|
||||
message.nbytes,
|
||||
_ptr(ad),
|
||||
0 if ad is None else len(ad),
|
||||
0 if ad is None else ad.nbytes,
|
||||
_ptr(nonce),
|
||||
_ptr(key),
|
||||
)
|
||||
@@ -108,7 +117,7 @@ def encrypt_detached(
|
||||
err_name = errno.errorcode.get(err_num, f"errno_{err_num}")
|
||||
raise RuntimeError(f"encrypt detached failed: {err_name}")
|
||||
return (
|
||||
c if ct_into is None else memoryview(c)[: len(message)],
|
||||
c if ct_into is None else memoryview(c)[: message.nbytes],
|
||||
mac if mac_into is None else memoryview(mac)[:maclen],
|
||||
) # type: ignore
|
||||
|
||||
@@ -139,34 +148,42 @@ def decrypt_detached(
|
||||
TypeError: If lengths are invalid.
|
||||
ValueError: If authentication fails.
|
||||
"""
|
||||
if len(key) != KEYBYTES:
|
||||
key = memoryview(key)
|
||||
nonce = memoryview(nonce)
|
||||
ct = memoryview(ct)
|
||||
mac = memoryview(mac)
|
||||
if ad is not None:
|
||||
ad = memoryview(ad)
|
||||
if into is not None:
|
||||
into = memoryview(into)
|
||||
if key.nbytes != KEYBYTES:
|
||||
raise TypeError(f"key length must be {KEYBYTES}")
|
||||
if len(nonce) != NONCEBYTES:
|
||||
if nonce.nbytes != NONCEBYTES:
|
||||
raise TypeError(f"nonce length must be {NONCEBYTES}")
|
||||
maclen = len(mac)
|
||||
maclen = mac.nbytes
|
||||
if maclen not in (16, 32):
|
||||
raise TypeError("mac length must be 16 or 32")
|
||||
if into is None:
|
||||
out = bytearray(len(ct))
|
||||
out = bytearray(ct.nbytes)
|
||||
else:
|
||||
if len(into) < len(ct):
|
||||
raise TypeError("into length must be at least len(ciphertext)")
|
||||
if into.nbytes < ct.nbytes:
|
||||
raise TypeError("into length must be at least ct.nbytes")
|
||||
out = into
|
||||
|
||||
rc = _lib.aegis128x2_decrypt_detached(
|
||||
ffi.from_buffer(out),
|
||||
_ptr(ct),
|
||||
len(ct),
|
||||
ct.nbytes,
|
||||
_ptr(mac),
|
||||
maclen,
|
||||
_ptr(ad),
|
||||
0 if ad is None else len(ad),
|
||||
0 if ad is None else ad.nbytes,
|
||||
_ptr(nonce),
|
||||
_ptr(key),
|
||||
)
|
||||
if rc != 0:
|
||||
raise ValueError("authentication failed")
|
||||
return out if into is None else memoryview(out)[: len(ct)] # type: ignore
|
||||
return out if into is None else memoryview(out)[: ct.nbytes] # type: ignore
|
||||
|
||||
|
||||
def encrypt(
|
||||
@@ -197,24 +214,31 @@ def encrypt(
|
||||
"""
|
||||
if maclen not in (16, 32):
|
||||
raise TypeError("maclen must be 16 or 32")
|
||||
if len(key) != KEYBYTES:
|
||||
key = memoryview(key)
|
||||
nonce = memoryview(nonce)
|
||||
message = memoryview(message)
|
||||
if ad is not None:
|
||||
ad = memoryview(ad)
|
||||
if into is not None:
|
||||
into = memoryview(into)
|
||||
if key.nbytes != KEYBYTES:
|
||||
raise TypeError(f"key length must be {KEYBYTES}")
|
||||
if len(nonce) != NONCEBYTES:
|
||||
if nonce.nbytes != NONCEBYTES:
|
||||
raise TypeError(f"nonce length must be {NONCEBYTES}")
|
||||
if into is None:
|
||||
out = bytearray(len(message) + maclen)
|
||||
out = bytearray(message.nbytes + maclen)
|
||||
else:
|
||||
if len(into) < len(message) + maclen:
|
||||
raise TypeError("into length must be at least len(message)+maclen")
|
||||
if into.nbytes < message.nbytes + maclen:
|
||||
raise TypeError("into length must be at least message.nbytes + maclen")
|
||||
out = into
|
||||
|
||||
rc = _lib.aegis128x2_encrypt(
|
||||
ffi.from_buffer(out),
|
||||
maclen,
|
||||
_ptr(message),
|
||||
len(message),
|
||||
message.nbytes,
|
||||
_ptr(ad),
|
||||
0 if ad is None else len(ad),
|
||||
0 if ad is None else ad.nbytes,
|
||||
_ptr(nonce),
|
||||
_ptr(key),
|
||||
)
|
||||
@@ -222,7 +246,7 @@ def encrypt(
|
||||
err_num = ffi.errno
|
||||
err_name = errno.errorcode.get(err_num, f"errno_{err_num}")
|
||||
raise RuntimeError(f"encrypt failed: {err_name}")
|
||||
return out if into is None else memoryview(out)[: len(message) + maclen] # type: ignore
|
||||
return out if into is None else memoryview(out)[: message.nbytes + maclen] # type: ignore
|
||||
|
||||
|
||||
def decrypt(
|
||||
@@ -253,29 +277,36 @@ def decrypt(
|
||||
"""
|
||||
if maclen not in (16, 32):
|
||||
raise TypeError("maclen must be 16 or 32")
|
||||
if len(key) != KEYBYTES:
|
||||
key = memoryview(key)
|
||||
nonce = memoryview(nonce)
|
||||
ct = memoryview(ct)
|
||||
if ad is not None:
|
||||
ad = memoryview(ad)
|
||||
if into is not None:
|
||||
into = memoryview(into)
|
||||
if key.nbytes != KEYBYTES:
|
||||
raise TypeError(f"key length must be {KEYBYTES}")
|
||||
if len(nonce) != NONCEBYTES:
|
||||
if nonce.nbytes != NONCEBYTES:
|
||||
raise TypeError(f"nonce length must be {NONCEBYTES}")
|
||||
if len(ct) < maclen:
|
||||
if ct.nbytes < maclen:
|
||||
raise TypeError("ciphertext too short for tag")
|
||||
expected_out = len(ct) - maclen
|
||||
expected_out = ct.nbytes - maclen
|
||||
if into is None:
|
||||
out = bytearray(expected_out)
|
||||
else:
|
||||
if len(into) < expected_out:
|
||||
if into.nbytes < expected_out:
|
||||
raise TypeError(
|
||||
"into length must be at least len(ciphertext_with_tag)-maclen"
|
||||
"into length must be at least ct.nbytes - maclen"
|
||||
)
|
||||
out = into
|
||||
|
||||
rc = _lib.aegis128x2_decrypt(
|
||||
ffi.from_buffer(out),
|
||||
_ptr(ct),
|
||||
len(ct),
|
||||
ct.nbytes,
|
||||
maclen,
|
||||
_ptr(ad),
|
||||
0 if ad is None else len(ad),
|
||||
0 if ad is None else ad.nbytes,
|
||||
_ptr(nonce),
|
||||
_ptr(key),
|
||||
)
|
||||
@@ -305,25 +336,30 @@ def stream(
|
||||
Raises:
|
||||
TypeError: If lengths are invalid or neither length nor into provided.
|
||||
"""
|
||||
if len(key) != KEYBYTES:
|
||||
key = memoryview(key)
|
||||
if nonce is not None:
|
||||
nonce = memoryview(nonce)
|
||||
if into is not None:
|
||||
into = memoryview(into)
|
||||
if key.nbytes != KEYBYTES:
|
||||
raise TypeError(f"key length must be {KEYBYTES}")
|
||||
if nonce is not None and len(nonce) != NONCEBYTES:
|
||||
if nonce is not None and nonce.nbytes != NONCEBYTES:
|
||||
raise TypeError(f"nonce length must be {NONCEBYTES}")
|
||||
if into is None:
|
||||
if length is None:
|
||||
raise TypeError("provide either into or length")
|
||||
out = bytearray(length)
|
||||
else:
|
||||
if length is not None and len(into) < length:
|
||||
if length is not None and into.nbytes < length:
|
||||
raise TypeError("into length must be at least length")
|
||||
out = into
|
||||
_lib.aegis128x2_stream(
|
||||
ffi.from_buffer(out),
|
||||
len(out),
|
||||
memoryview(out).nbytes,
|
||||
_ptr(nonce),
|
||||
_ptr(key),
|
||||
)
|
||||
return out if into is None else memoryview(out)[: length or len(out)] # type: ignore
|
||||
return out if into is None else memoryview(out)[: length or memoryview(out).nbytes] # type: ignore
|
||||
|
||||
|
||||
def encrypt_unauthenticated(
|
||||
@@ -347,24 +383,29 @@ def encrypt_unauthenticated(
|
||||
Raises:
|
||||
TypeError: If lengths are invalid.
|
||||
"""
|
||||
if len(key) != KEYBYTES:
|
||||
key = memoryview(key)
|
||||
nonce = memoryview(nonce)
|
||||
message = memoryview(message)
|
||||
if into is not None:
|
||||
into = memoryview(into)
|
||||
if key.nbytes != KEYBYTES:
|
||||
raise TypeError(f"key length must be {KEYBYTES}")
|
||||
if len(nonce) != NONCEBYTES:
|
||||
if nonce.nbytes != NONCEBYTES:
|
||||
raise TypeError(f"nonce length must be {NONCEBYTES}")
|
||||
if into is None:
|
||||
out = bytearray(len(message))
|
||||
out = bytearray(message.nbytes)
|
||||
else:
|
||||
if len(into) < len(message):
|
||||
raise TypeError("into length must be at least len(message)")
|
||||
if into.nbytes < message.nbytes:
|
||||
raise TypeError("into length must be at least message.nbytes")
|
||||
out = into
|
||||
_lib.aegis128x2_encrypt_unauthenticated(
|
||||
ffi.from_buffer(out),
|
||||
_ptr(message),
|
||||
len(message),
|
||||
message.nbytes,
|
||||
_ptr(nonce),
|
||||
_ptr(key),
|
||||
)
|
||||
return out if into is None else memoryview(out)[: len(message)] # type: ignore
|
||||
return out if into is None else memoryview(out)[: message.nbytes] # type: ignore
|
||||
|
||||
|
||||
def decrypt_unauthenticated(
|
||||
@@ -388,24 +429,29 @@ def decrypt_unauthenticated(
|
||||
Raises:
|
||||
TypeError: If lengths are invalid.
|
||||
"""
|
||||
if len(key) != KEYBYTES:
|
||||
key = memoryview(key)
|
||||
nonce = memoryview(nonce)
|
||||
ct = memoryview(ct)
|
||||
if into is not None:
|
||||
into = memoryview(into)
|
||||
if key.nbytes != KEYBYTES:
|
||||
raise TypeError(f"key length must be {KEYBYTES}")
|
||||
if len(nonce) != NONCEBYTES:
|
||||
if nonce.nbytes != NONCEBYTES:
|
||||
raise TypeError(f"nonce length must be {NONCEBYTES}")
|
||||
if into is None:
|
||||
out = bytearray(len(ct))
|
||||
out = bytearray(ct.nbytes)
|
||||
else:
|
||||
if len(into) < len(ct):
|
||||
raise TypeError("into length must be at least len(ciphertext)")
|
||||
if into.nbytes < ct.nbytes:
|
||||
raise TypeError("into length must be at least ct.nbytes")
|
||||
out = into
|
||||
_lib.aegis128x2_decrypt_unauthenticated(
|
||||
ffi.from_buffer(out),
|
||||
_ptr(ct),
|
||||
len(ct),
|
||||
ct.nbytes,
|
||||
_ptr(nonce),
|
||||
_ptr(key),
|
||||
)
|
||||
return out if into is None else memoryview(out)[: len(ct)] # type: ignore
|
||||
return out if into is None else memoryview(out)[: ct.nbytes] # type: ignore
|
||||
|
||||
|
||||
# This is missing from C API but convenient to have here
|
||||
@@ -428,6 +474,11 @@ def mac(
|
||||
Returns:
|
||||
MAC bytes as bytearray if into not provided, memoryview of into otherwise
|
||||
"""
|
||||
key = memoryview(key)
|
||||
nonce = memoryview(nonce)
|
||||
data = memoryview(data)
|
||||
if into is not None:
|
||||
into = memoryview(into)
|
||||
mac_state = Mac(key, nonce, maclen)
|
||||
mac_state.update(data)
|
||||
return mac_state.final(into)
|
||||
@@ -459,9 +510,11 @@ class Mac:
|
||||
"""
|
||||
if maclen not in (16, 32):
|
||||
raise TypeError("maclen must be 16 or 32")
|
||||
if len(key) != KEYBYTES:
|
||||
key = memoryview(key)
|
||||
nonce = memoryview(nonce)
|
||||
if key.nbytes != KEYBYTES:
|
||||
raise TypeError(f"key length must be {KEYBYTES}")
|
||||
if len(nonce) != NONCEBYTES:
|
||||
if nonce.nbytes != NONCEBYTES:
|
||||
raise TypeError(f"nonce length must be {NONCEBYTES}")
|
||||
|
||||
self._maclen = maclen
|
||||
@@ -492,7 +545,8 @@ class Mac:
|
||||
"""
|
||||
if self._cached_digest is not None:
|
||||
raise RuntimeError("Cannot update after final()")
|
||||
rc = _lib.aegis128x2_mac_update(self._proxy.ptr, _ptr(data), len(data))
|
||||
data = memoryview(data)
|
||||
rc = _lib.aegis128x2_mac_update(self._proxy.ptr, _ptr(data), data.nbytes)
|
||||
if rc != 0:
|
||||
err_num = ffi.errno
|
||||
err_name = errno.errorcode.get(err_num, f"errno_{err_num}")
|
||||
@@ -522,12 +576,13 @@ class Mac:
|
||||
if into is None:
|
||||
out = bytearray(maclen)
|
||||
else:
|
||||
if len(into) < maclen:
|
||||
into = memoryview(into)
|
||||
if into.nbytes < maclen:
|
||||
raise TypeError("into length must be at least maclen")
|
||||
out = into
|
||||
|
||||
clone = self.clone()
|
||||
rc = _lib.aegis128x2_mac_final(clone._proxy.ptr, ffi.from_buffer(out), maclen)
|
||||
rc = _lib.aegis128x2_mac_final(clone._proxy.ptr, ffi.from_buffer(out), memoryview(out).nbytes)
|
||||
if rc != 0:
|
||||
err_num = ffi.errno
|
||||
err_name = errno.errorcode.get(err_num, f"errno_{err_num}")
|
||||
@@ -567,7 +622,8 @@ class Mac:
|
||||
TypeError: If tag length is invalid.
|
||||
ValueError: If verification fails.
|
||||
"""
|
||||
maclen = len(mac)
|
||||
mac = memoryview(mac)
|
||||
maclen = mac.nbytes
|
||||
if maclen not in (16, 32):
|
||||
raise TypeError("mac length must be 16 or 32")
|
||||
|
||||
@@ -606,15 +662,19 @@ class Encryptor:
|
||||
"""
|
||||
if maclen not in (16, 32):
|
||||
raise TypeError("maclen must be 16 or 32")
|
||||
if len(key) != KEYBYTES:
|
||||
key = memoryview(key)
|
||||
nonce = memoryview(nonce)
|
||||
if ad is not None:
|
||||
ad = memoryview(ad)
|
||||
if key.nbytes != KEYBYTES:
|
||||
raise TypeError(f"key length must be {KEYBYTES}")
|
||||
if len(nonce) != NONCEBYTES:
|
||||
if nonce.nbytes != NONCEBYTES:
|
||||
raise TypeError(f"nonce length must be {NONCEBYTES}")
|
||||
self._state = new_aligned_struct("aegis128x2_state", ALIGNMENT)
|
||||
_lib.aegis128x2_state_init(
|
||||
self._state.ptr,
|
||||
_ptr(ad) if ad is not None else ffi.NULL,
|
||||
0 if ad is None else len(ad),
|
||||
0 if ad is None else ad.nbytes,
|
||||
_ptr(nonce),
|
||||
_ptr(key),
|
||||
)
|
||||
@@ -638,10 +698,13 @@ class Encryptor:
|
||||
"""
|
||||
if self._state is None:
|
||||
raise RuntimeError("Cannot call update() after final()")
|
||||
expected_out = len(message)
|
||||
message = memoryview(message)
|
||||
if into is not None:
|
||||
into = memoryview(into)
|
||||
expected_out = message.nbytes
|
||||
out = into if into is not None else bytearray(expected_out)
|
||||
out_mv = memoryview(out)
|
||||
if len(out_mv) < expected_out:
|
||||
if out_mv.nbytes < expected_out:
|
||||
raise TypeError(
|
||||
"into length must be >= expected output size for this update"
|
||||
)
|
||||
@@ -649,10 +712,10 @@ class Encryptor:
|
||||
rc = _lib.aegis128x2_state_encrypt_update(
|
||||
self._state.ptr,
|
||||
ffi.from_buffer(out_mv),
|
||||
len(out_mv),
|
||||
out_mv.nbytes,
|
||||
written,
|
||||
_ptr(message),
|
||||
len(message),
|
||||
message.nbytes,
|
||||
)
|
||||
if rc != 0:
|
||||
err_num = ffi.errno
|
||||
@@ -680,12 +743,14 @@ class Encryptor:
|
||||
raise RuntimeError("Cannot call final() after final()")
|
||||
maclen = self._maclen
|
||||
# Only the authentication tag is produced here; allocate exactly maclen
|
||||
if into is not None:
|
||||
into = memoryview(into)
|
||||
out = into if into is not None else bytearray(maclen)
|
||||
written = ffi.new("size_t *")
|
||||
rc = _lib.aegis128x2_state_encrypt_final(
|
||||
self._state.ptr,
|
||||
ffi.from_buffer(out),
|
||||
len(out),
|
||||
memoryview(out).nbytes,
|
||||
written,
|
||||
maclen,
|
||||
)
|
||||
@@ -730,15 +795,19 @@ class Decryptor:
|
||||
"""
|
||||
if maclen not in (16, 32):
|
||||
raise TypeError("maclen must be 16 or 32")
|
||||
if len(key) != KEYBYTES:
|
||||
key = memoryview(key)
|
||||
nonce = memoryview(nonce)
|
||||
if ad is not None:
|
||||
ad = memoryview(ad)
|
||||
if key.nbytes != KEYBYTES:
|
||||
raise TypeError(f"key length must be {KEYBYTES}")
|
||||
if len(nonce) != NONCEBYTES:
|
||||
if nonce.nbytes != NONCEBYTES:
|
||||
raise TypeError(f"nonce length must be {NONCEBYTES}")
|
||||
self._state = new_aligned_struct("aegis128x2_state", ALIGNMENT)
|
||||
_lib.aegis128x2_state_init(
|
||||
self._state.ptr,
|
||||
_ptr(ad) if ad is not None else ffi.NULL,
|
||||
0 if ad is None else len(ad),
|
||||
0 if ad is None else ad.nbytes,
|
||||
_ptr(nonce),
|
||||
_ptr(key),
|
||||
)
|
||||
@@ -760,26 +829,29 @@ class Decryptor:
|
||||
"""
|
||||
if self._state is None:
|
||||
raise RuntimeError("Cannot call update() after final()")
|
||||
expected_out = len(ct)
|
||||
ct = memoryview(ct)
|
||||
if into is not None:
|
||||
into = memoryview(into)
|
||||
expected_out = ct.nbytes
|
||||
out = into if into is not None else bytearray(expected_out)
|
||||
out_mv = memoryview(out)
|
||||
if len(out_mv) < expected_out:
|
||||
if out_mv.nbytes < expected_out:
|
||||
raise TypeError("into length must be >= required capacity for this update")
|
||||
written = ffi.new("size_t *")
|
||||
rc = _lib.aegis128x2_state_decrypt_detached_update(
|
||||
self._state.ptr,
|
||||
ffi.from_buffer(out_mv),
|
||||
len(out_mv),
|
||||
out_mv.nbytes,
|
||||
written,
|
||||
_ptr(ct),
|
||||
len(ct),
|
||||
ct.nbytes,
|
||||
)
|
||||
if rc != 0:
|
||||
err_num = ffi.errno
|
||||
err_name = errno.errorcode.get(err_num, f"errno_{err_num}")
|
||||
raise RuntimeError(f"state decrypt update failed: {err_name}")
|
||||
w = int(written[0])
|
||||
assert w == expected_out, f"got {w}, expected {expected_out}, len(ct)={len(ct)}"
|
||||
assert w == expected_out, f"got {w}, expected {expected_out}, ct.nbytes={ct.nbytes}"
|
||||
return out if into is None else memoryview(out)[:w] # type: ignore
|
||||
|
||||
def final(self, mac: Buffer) -> None:
|
||||
@@ -796,7 +868,8 @@ class Decryptor:
|
||||
if self._state is None:
|
||||
raise RuntimeError("Cannot call final() after final()")
|
||||
maclen = self._maclen
|
||||
if len(mac) != maclen:
|
||||
mac = memoryview(mac)
|
||||
if mac.nbytes != maclen:
|
||||
raise TypeError(f"mac length must be {maclen}")
|
||||
rc = _lib.aegis128x2_state_decrypt_detached_final(
|
||||
self._state.ptr, ffi.NULL, 0, ffi.NULL, _ptr(mac), maclen
|
||||
|
||||
@@ -74,21 +74,30 @@ def encrypt_detached(
|
||||
"""
|
||||
if maclen not in (16, 32):
|
||||
raise TypeError("maclen must be 16 or 32")
|
||||
if len(key) != KEYBYTES:
|
||||
key = memoryview(key)
|
||||
nonce = memoryview(nonce)
|
||||
message = memoryview(message)
|
||||
if ad is not None:
|
||||
ad = memoryview(ad)
|
||||
if ct_into is not None:
|
||||
ct_into = memoryview(ct_into)
|
||||
if mac_into is not None:
|
||||
mac_into = memoryview(mac_into)
|
||||
if key.nbytes != KEYBYTES:
|
||||
raise TypeError(f"key length must be {KEYBYTES}")
|
||||
if len(nonce) != NONCEBYTES:
|
||||
if nonce.nbytes != NONCEBYTES:
|
||||
raise TypeError(f"nonce length must be {NONCEBYTES}")
|
||||
|
||||
if ct_into is None:
|
||||
c = bytearray(len(message))
|
||||
c = bytearray(message.nbytes)
|
||||
else:
|
||||
if len(ct_into) < len(message):
|
||||
raise TypeError("ct_into length must be at least len(message)")
|
||||
if ct_into.nbytes < message.nbytes:
|
||||
raise TypeError("ct_into length must be at least message.nbytes")
|
||||
c = ct_into
|
||||
if mac_into is None:
|
||||
mac = bytearray(maclen)
|
||||
else:
|
||||
if len(mac_into) < maclen:
|
||||
if mac_into.nbytes < maclen:
|
||||
raise TypeError("mac_into length must be at least maclen")
|
||||
mac = mac_into
|
||||
|
||||
@@ -97,9 +106,9 @@ def encrypt_detached(
|
||||
ffi.from_buffer(mac),
|
||||
maclen,
|
||||
_ptr(message),
|
||||
len(message),
|
||||
message.nbytes,
|
||||
_ptr(ad),
|
||||
0 if ad is None else len(ad),
|
||||
0 if ad is None else ad.nbytes,
|
||||
_ptr(nonce),
|
||||
_ptr(key),
|
||||
)
|
||||
@@ -108,7 +117,7 @@ def encrypt_detached(
|
||||
err_name = errno.errorcode.get(err_num, f"errno_{err_num}")
|
||||
raise RuntimeError(f"encrypt detached failed: {err_name}")
|
||||
return (
|
||||
c if ct_into is None else memoryview(c)[: len(message)],
|
||||
c if ct_into is None else memoryview(c)[: message.nbytes],
|
||||
mac if mac_into is None else memoryview(mac)[:maclen],
|
||||
) # type: ignore
|
||||
|
||||
@@ -139,34 +148,42 @@ def decrypt_detached(
|
||||
TypeError: If lengths are invalid.
|
||||
ValueError: If authentication fails.
|
||||
"""
|
||||
if len(key) != KEYBYTES:
|
||||
key = memoryview(key)
|
||||
nonce = memoryview(nonce)
|
||||
ct = memoryview(ct)
|
||||
mac = memoryview(mac)
|
||||
if ad is not None:
|
||||
ad = memoryview(ad)
|
||||
if into is not None:
|
||||
into = memoryview(into)
|
||||
if key.nbytes != KEYBYTES:
|
||||
raise TypeError(f"key length must be {KEYBYTES}")
|
||||
if len(nonce) != NONCEBYTES:
|
||||
if nonce.nbytes != NONCEBYTES:
|
||||
raise TypeError(f"nonce length must be {NONCEBYTES}")
|
||||
maclen = len(mac)
|
||||
maclen = mac.nbytes
|
||||
if maclen not in (16, 32):
|
||||
raise TypeError("mac length must be 16 or 32")
|
||||
if into is None:
|
||||
out = bytearray(len(ct))
|
||||
out = bytearray(ct.nbytes)
|
||||
else:
|
||||
if len(into) < len(ct):
|
||||
raise TypeError("into length must be at least len(ciphertext)")
|
||||
if into.nbytes < ct.nbytes:
|
||||
raise TypeError("into length must be at least ct.nbytes")
|
||||
out = into
|
||||
|
||||
rc = _lib.aegis128x4_decrypt_detached(
|
||||
ffi.from_buffer(out),
|
||||
_ptr(ct),
|
||||
len(ct),
|
||||
ct.nbytes,
|
||||
_ptr(mac),
|
||||
maclen,
|
||||
_ptr(ad),
|
||||
0 if ad is None else len(ad),
|
||||
0 if ad is None else ad.nbytes,
|
||||
_ptr(nonce),
|
||||
_ptr(key),
|
||||
)
|
||||
if rc != 0:
|
||||
raise ValueError("authentication failed")
|
||||
return out if into is None else memoryview(out)[: len(ct)] # type: ignore
|
||||
return out if into is None else memoryview(out)[: ct.nbytes] # type: ignore
|
||||
|
||||
|
||||
def encrypt(
|
||||
@@ -197,24 +214,31 @@ def encrypt(
|
||||
"""
|
||||
if maclen not in (16, 32):
|
||||
raise TypeError("maclen must be 16 or 32")
|
||||
if len(key) != KEYBYTES:
|
||||
key = memoryview(key)
|
||||
nonce = memoryview(nonce)
|
||||
message = memoryview(message)
|
||||
if ad is not None:
|
||||
ad = memoryview(ad)
|
||||
if into is not None:
|
||||
into = memoryview(into)
|
||||
if key.nbytes != KEYBYTES:
|
||||
raise TypeError(f"key length must be {KEYBYTES}")
|
||||
if len(nonce) != NONCEBYTES:
|
||||
if nonce.nbytes != NONCEBYTES:
|
||||
raise TypeError(f"nonce length must be {NONCEBYTES}")
|
||||
if into is None:
|
||||
out = bytearray(len(message) + maclen)
|
||||
out = bytearray(message.nbytes + maclen)
|
||||
else:
|
||||
if len(into) < len(message) + maclen:
|
||||
raise TypeError("into length must be at least len(message)+maclen")
|
||||
if into.nbytes < message.nbytes + maclen:
|
||||
raise TypeError("into length must be at least message.nbytes + maclen")
|
||||
out = into
|
||||
|
||||
rc = _lib.aegis128x4_encrypt(
|
||||
ffi.from_buffer(out),
|
||||
maclen,
|
||||
_ptr(message),
|
||||
len(message),
|
||||
message.nbytes,
|
||||
_ptr(ad),
|
||||
0 if ad is None else len(ad),
|
||||
0 if ad is None else ad.nbytes,
|
||||
_ptr(nonce),
|
||||
_ptr(key),
|
||||
)
|
||||
@@ -222,7 +246,7 @@ def encrypt(
|
||||
err_num = ffi.errno
|
||||
err_name = errno.errorcode.get(err_num, f"errno_{err_num}")
|
||||
raise RuntimeError(f"encrypt failed: {err_name}")
|
||||
return out if into is None else memoryview(out)[: len(message) + maclen] # type: ignore
|
||||
return out if into is None else memoryview(out)[: message.nbytes + maclen] # type: ignore
|
||||
|
||||
|
||||
def decrypt(
|
||||
@@ -253,29 +277,36 @@ def decrypt(
|
||||
"""
|
||||
if maclen not in (16, 32):
|
||||
raise TypeError("maclen must be 16 or 32")
|
||||
if len(key) != KEYBYTES:
|
||||
key = memoryview(key)
|
||||
nonce = memoryview(nonce)
|
||||
ct = memoryview(ct)
|
||||
if ad is not None:
|
||||
ad = memoryview(ad)
|
||||
if into is not None:
|
||||
into = memoryview(into)
|
||||
if key.nbytes != KEYBYTES:
|
||||
raise TypeError(f"key length must be {KEYBYTES}")
|
||||
if len(nonce) != NONCEBYTES:
|
||||
if nonce.nbytes != NONCEBYTES:
|
||||
raise TypeError(f"nonce length must be {NONCEBYTES}")
|
||||
if len(ct) < maclen:
|
||||
if ct.nbytes < maclen:
|
||||
raise TypeError("ciphertext too short for tag")
|
||||
expected_out = len(ct) - maclen
|
||||
expected_out = ct.nbytes - maclen
|
||||
if into is None:
|
||||
out = bytearray(expected_out)
|
||||
else:
|
||||
if len(into) < expected_out:
|
||||
if into.nbytes < expected_out:
|
||||
raise TypeError(
|
||||
"into length must be at least len(ciphertext_with_tag)-maclen"
|
||||
"into length must be at least ct.nbytes - maclen"
|
||||
)
|
||||
out = into
|
||||
|
||||
rc = _lib.aegis128x4_decrypt(
|
||||
ffi.from_buffer(out),
|
||||
_ptr(ct),
|
||||
len(ct),
|
||||
ct.nbytes,
|
||||
maclen,
|
||||
_ptr(ad),
|
||||
0 if ad is None else len(ad),
|
||||
0 if ad is None else ad.nbytes,
|
||||
_ptr(nonce),
|
||||
_ptr(key),
|
||||
)
|
||||
@@ -305,25 +336,30 @@ def stream(
|
||||
Raises:
|
||||
TypeError: If lengths are invalid or neither length nor into provided.
|
||||
"""
|
||||
if len(key) != KEYBYTES:
|
||||
key = memoryview(key)
|
||||
if nonce is not None:
|
||||
nonce = memoryview(nonce)
|
||||
if into is not None:
|
||||
into = memoryview(into)
|
||||
if key.nbytes != KEYBYTES:
|
||||
raise TypeError(f"key length must be {KEYBYTES}")
|
||||
if nonce is not None and len(nonce) != NONCEBYTES:
|
||||
if nonce is not None and nonce.nbytes != NONCEBYTES:
|
||||
raise TypeError(f"nonce length must be {NONCEBYTES}")
|
||||
if into is None:
|
||||
if length is None:
|
||||
raise TypeError("provide either into or length")
|
||||
out = bytearray(length)
|
||||
else:
|
||||
if length is not None and len(into) < length:
|
||||
if length is not None and into.nbytes < length:
|
||||
raise TypeError("into length must be at least length")
|
||||
out = into
|
||||
_lib.aegis128x4_stream(
|
||||
ffi.from_buffer(out),
|
||||
len(out),
|
||||
memoryview(out).nbytes,
|
||||
_ptr(nonce),
|
||||
_ptr(key),
|
||||
)
|
||||
return out if into is None else memoryview(out)[: length or len(out)] # type: ignore
|
||||
return out if into is None else memoryview(out)[: length or memoryview(out).nbytes] # type: ignore
|
||||
|
||||
|
||||
def encrypt_unauthenticated(
|
||||
@@ -347,24 +383,29 @@ def encrypt_unauthenticated(
|
||||
Raises:
|
||||
TypeError: If lengths are invalid.
|
||||
"""
|
||||
if len(key) != KEYBYTES:
|
||||
key = memoryview(key)
|
||||
nonce = memoryview(nonce)
|
||||
message = memoryview(message)
|
||||
if into is not None:
|
||||
into = memoryview(into)
|
||||
if key.nbytes != KEYBYTES:
|
||||
raise TypeError(f"key length must be {KEYBYTES}")
|
||||
if len(nonce) != NONCEBYTES:
|
||||
if nonce.nbytes != NONCEBYTES:
|
||||
raise TypeError(f"nonce length must be {NONCEBYTES}")
|
||||
if into is None:
|
||||
out = bytearray(len(message))
|
||||
out = bytearray(message.nbytes)
|
||||
else:
|
||||
if len(into) < len(message):
|
||||
raise TypeError("into length must be at least len(message)")
|
||||
if into.nbytes < message.nbytes:
|
||||
raise TypeError("into length must be at least message.nbytes")
|
||||
out = into
|
||||
_lib.aegis128x4_encrypt_unauthenticated(
|
||||
ffi.from_buffer(out),
|
||||
_ptr(message),
|
||||
len(message),
|
||||
message.nbytes,
|
||||
_ptr(nonce),
|
||||
_ptr(key),
|
||||
)
|
||||
return out if into is None else memoryview(out)[: len(message)] # type: ignore
|
||||
return out if into is None else memoryview(out)[: message.nbytes] # type: ignore
|
||||
|
||||
|
||||
def decrypt_unauthenticated(
|
||||
@@ -388,24 +429,29 @@ def decrypt_unauthenticated(
|
||||
Raises:
|
||||
TypeError: If lengths are invalid.
|
||||
"""
|
||||
if len(key) != KEYBYTES:
|
||||
key = memoryview(key)
|
||||
nonce = memoryview(nonce)
|
||||
ct = memoryview(ct)
|
||||
if into is not None:
|
||||
into = memoryview(into)
|
||||
if key.nbytes != KEYBYTES:
|
||||
raise TypeError(f"key length must be {KEYBYTES}")
|
||||
if len(nonce) != NONCEBYTES:
|
||||
if nonce.nbytes != NONCEBYTES:
|
||||
raise TypeError(f"nonce length must be {NONCEBYTES}")
|
||||
if into is None:
|
||||
out = bytearray(len(ct))
|
||||
out = bytearray(ct.nbytes)
|
||||
else:
|
||||
if len(into) < len(ct):
|
||||
raise TypeError("into length must be at least len(ciphertext)")
|
||||
if into.nbytes < ct.nbytes:
|
||||
raise TypeError("into length must be at least ct.nbytes")
|
||||
out = into
|
||||
_lib.aegis128x4_decrypt_unauthenticated(
|
||||
ffi.from_buffer(out),
|
||||
_ptr(ct),
|
||||
len(ct),
|
||||
ct.nbytes,
|
||||
_ptr(nonce),
|
||||
_ptr(key),
|
||||
)
|
||||
return out if into is None else memoryview(out)[: len(ct)] # type: ignore
|
||||
return out if into is None else memoryview(out)[: ct.nbytes] # type: ignore
|
||||
|
||||
|
||||
# This is missing from C API but convenient to have here
|
||||
@@ -428,6 +474,11 @@ def mac(
|
||||
Returns:
|
||||
MAC bytes as bytearray if into not provided, memoryview of into otherwise
|
||||
"""
|
||||
key = memoryview(key)
|
||||
nonce = memoryview(nonce)
|
||||
data = memoryview(data)
|
||||
if into is not None:
|
||||
into = memoryview(into)
|
||||
mac_state = Mac(key, nonce, maclen)
|
||||
mac_state.update(data)
|
||||
return mac_state.final(into)
|
||||
@@ -459,9 +510,11 @@ class Mac:
|
||||
"""
|
||||
if maclen not in (16, 32):
|
||||
raise TypeError("maclen must be 16 or 32")
|
||||
if len(key) != KEYBYTES:
|
||||
key = memoryview(key)
|
||||
nonce = memoryview(nonce)
|
||||
if key.nbytes != KEYBYTES:
|
||||
raise TypeError(f"key length must be {KEYBYTES}")
|
||||
if len(nonce) != NONCEBYTES:
|
||||
if nonce.nbytes != NONCEBYTES:
|
||||
raise TypeError(f"nonce length must be {NONCEBYTES}")
|
||||
|
||||
self._maclen = maclen
|
||||
@@ -492,7 +545,8 @@ class Mac:
|
||||
"""
|
||||
if self._cached_digest is not None:
|
||||
raise RuntimeError("Cannot update after final()")
|
||||
rc = _lib.aegis128x4_mac_update(self._proxy.ptr, _ptr(data), len(data))
|
||||
data = memoryview(data)
|
||||
rc = _lib.aegis128x4_mac_update(self._proxy.ptr, _ptr(data), data.nbytes)
|
||||
if rc != 0:
|
||||
err_num = ffi.errno
|
||||
err_name = errno.errorcode.get(err_num, f"errno_{err_num}")
|
||||
@@ -522,12 +576,13 @@ class Mac:
|
||||
if into is None:
|
||||
out = bytearray(maclen)
|
||||
else:
|
||||
if len(into) < maclen:
|
||||
into = memoryview(into)
|
||||
if into.nbytes < maclen:
|
||||
raise TypeError("into length must be at least maclen")
|
||||
out = into
|
||||
|
||||
clone = self.clone()
|
||||
rc = _lib.aegis128x4_mac_final(clone._proxy.ptr, ffi.from_buffer(out), maclen)
|
||||
rc = _lib.aegis128x4_mac_final(clone._proxy.ptr, ffi.from_buffer(out), memoryview(out).nbytes)
|
||||
if rc != 0:
|
||||
err_num = ffi.errno
|
||||
err_name = errno.errorcode.get(err_num, f"errno_{err_num}")
|
||||
@@ -567,7 +622,8 @@ class Mac:
|
||||
TypeError: If tag length is invalid.
|
||||
ValueError: If verification fails.
|
||||
"""
|
||||
maclen = len(mac)
|
||||
mac = memoryview(mac)
|
||||
maclen = mac.nbytes
|
||||
if maclen not in (16, 32):
|
||||
raise TypeError("mac length must be 16 or 32")
|
||||
|
||||
@@ -606,15 +662,19 @@ class Encryptor:
|
||||
"""
|
||||
if maclen not in (16, 32):
|
||||
raise TypeError("maclen must be 16 or 32")
|
||||
if len(key) != KEYBYTES:
|
||||
key = memoryview(key)
|
||||
nonce = memoryview(nonce)
|
||||
if ad is not None:
|
||||
ad = memoryview(ad)
|
||||
if key.nbytes != KEYBYTES:
|
||||
raise TypeError(f"key length must be {KEYBYTES}")
|
||||
if len(nonce) != NONCEBYTES:
|
||||
if nonce.nbytes != NONCEBYTES:
|
||||
raise TypeError(f"nonce length must be {NONCEBYTES}")
|
||||
self._state = new_aligned_struct("aegis128x4_state", ALIGNMENT)
|
||||
_lib.aegis128x4_state_init(
|
||||
self._state.ptr,
|
||||
_ptr(ad) if ad is not None else ffi.NULL,
|
||||
0 if ad is None else len(ad),
|
||||
0 if ad is None else ad.nbytes,
|
||||
_ptr(nonce),
|
||||
_ptr(key),
|
||||
)
|
||||
@@ -638,10 +698,13 @@ class Encryptor:
|
||||
"""
|
||||
if self._state is None:
|
||||
raise RuntimeError("Cannot call update() after final()")
|
||||
expected_out = len(message)
|
||||
message = memoryview(message)
|
||||
if into is not None:
|
||||
into = memoryview(into)
|
||||
expected_out = message.nbytes
|
||||
out = into if into is not None else bytearray(expected_out)
|
||||
out_mv = memoryview(out)
|
||||
if len(out_mv) < expected_out:
|
||||
if out_mv.nbytes < expected_out:
|
||||
raise TypeError(
|
||||
"into length must be >= expected output size for this update"
|
||||
)
|
||||
@@ -649,10 +712,10 @@ class Encryptor:
|
||||
rc = _lib.aegis128x4_state_encrypt_update(
|
||||
self._state.ptr,
|
||||
ffi.from_buffer(out_mv),
|
||||
len(out_mv),
|
||||
out_mv.nbytes,
|
||||
written,
|
||||
_ptr(message),
|
||||
len(message),
|
||||
message.nbytes,
|
||||
)
|
||||
if rc != 0:
|
||||
err_num = ffi.errno
|
||||
@@ -680,12 +743,14 @@ class Encryptor:
|
||||
raise RuntimeError("Cannot call final() after final()")
|
||||
maclen = self._maclen
|
||||
# Only the authentication tag is produced here; allocate exactly maclen
|
||||
if into is not None:
|
||||
into = memoryview(into)
|
||||
out = into if into is not None else bytearray(maclen)
|
||||
written = ffi.new("size_t *")
|
||||
rc = _lib.aegis128x4_state_encrypt_final(
|
||||
self._state.ptr,
|
||||
ffi.from_buffer(out),
|
||||
len(out),
|
||||
memoryview(out).nbytes,
|
||||
written,
|
||||
maclen,
|
||||
)
|
||||
@@ -730,15 +795,19 @@ class Decryptor:
|
||||
"""
|
||||
if maclen not in (16, 32):
|
||||
raise TypeError("maclen must be 16 or 32")
|
||||
if len(key) != KEYBYTES:
|
||||
key = memoryview(key)
|
||||
nonce = memoryview(nonce)
|
||||
if ad is not None:
|
||||
ad = memoryview(ad)
|
||||
if key.nbytes != KEYBYTES:
|
||||
raise TypeError(f"key length must be {KEYBYTES}")
|
||||
if len(nonce) != NONCEBYTES:
|
||||
if nonce.nbytes != NONCEBYTES:
|
||||
raise TypeError(f"nonce length must be {NONCEBYTES}")
|
||||
self._state = new_aligned_struct("aegis128x4_state", ALIGNMENT)
|
||||
_lib.aegis128x4_state_init(
|
||||
self._state.ptr,
|
||||
_ptr(ad) if ad is not None else ffi.NULL,
|
||||
0 if ad is None else len(ad),
|
||||
0 if ad is None else ad.nbytes,
|
||||
_ptr(nonce),
|
||||
_ptr(key),
|
||||
)
|
||||
@@ -760,26 +829,29 @@ class Decryptor:
|
||||
"""
|
||||
if self._state is None:
|
||||
raise RuntimeError("Cannot call update() after final()")
|
||||
expected_out = len(ct)
|
||||
ct = memoryview(ct)
|
||||
if into is not None:
|
||||
into = memoryview(into)
|
||||
expected_out = ct.nbytes
|
||||
out = into if into is not None else bytearray(expected_out)
|
||||
out_mv = memoryview(out)
|
||||
if len(out_mv) < expected_out:
|
||||
if out_mv.nbytes < expected_out:
|
||||
raise TypeError("into length must be >= required capacity for this update")
|
||||
written = ffi.new("size_t *")
|
||||
rc = _lib.aegis128x4_state_decrypt_detached_update(
|
||||
self._state.ptr,
|
||||
ffi.from_buffer(out_mv),
|
||||
len(out_mv),
|
||||
out_mv.nbytes,
|
||||
written,
|
||||
_ptr(ct),
|
||||
len(ct),
|
||||
ct.nbytes,
|
||||
)
|
||||
if rc != 0:
|
||||
err_num = ffi.errno
|
||||
err_name = errno.errorcode.get(err_num, f"errno_{err_num}")
|
||||
raise RuntimeError(f"state decrypt update failed: {err_name}")
|
||||
w = int(written[0])
|
||||
assert w == expected_out, f"got {w}, expected {expected_out}, len(ct)={len(ct)}"
|
||||
assert w == expected_out, f"got {w}, expected {expected_out}, ct.nbytes={ct.nbytes}"
|
||||
return out if into is None else memoryview(out)[:w] # type: ignore
|
||||
|
||||
def final(self, mac: Buffer) -> None:
|
||||
@@ -796,7 +868,8 @@ class Decryptor:
|
||||
if self._state is None:
|
||||
raise RuntimeError("Cannot call final() after final()")
|
||||
maclen = self._maclen
|
||||
if len(mac) != maclen:
|
||||
mac = memoryview(mac)
|
||||
if mac.nbytes != maclen:
|
||||
raise TypeError(f"mac length must be {maclen}")
|
||||
rc = _lib.aegis128x4_state_decrypt_detached_final(
|
||||
self._state.ptr, ffi.NULL, 0, ffi.NULL, _ptr(mac), maclen
|
||||
|
||||
@@ -74,21 +74,30 @@ def encrypt_detached(
|
||||
"""
|
||||
if maclen not in (16, 32):
|
||||
raise TypeError("maclen must be 16 or 32")
|
||||
if len(key) != KEYBYTES:
|
||||
key = memoryview(key)
|
||||
nonce = memoryview(nonce)
|
||||
message = memoryview(message)
|
||||
if ad is not None:
|
||||
ad = memoryview(ad)
|
||||
if ct_into is not None:
|
||||
ct_into = memoryview(ct_into)
|
||||
if mac_into is not None:
|
||||
mac_into = memoryview(mac_into)
|
||||
if key.nbytes != KEYBYTES:
|
||||
raise TypeError(f"key length must be {KEYBYTES}")
|
||||
if len(nonce) != NONCEBYTES:
|
||||
if nonce.nbytes != NONCEBYTES:
|
||||
raise TypeError(f"nonce length must be {NONCEBYTES}")
|
||||
|
||||
if ct_into is None:
|
||||
c = bytearray(len(message))
|
||||
c = bytearray(message.nbytes)
|
||||
else:
|
||||
if len(ct_into) < len(message):
|
||||
raise TypeError("ct_into length must be at least len(message)")
|
||||
if ct_into.nbytes < message.nbytes:
|
||||
raise TypeError("ct_into length must be at least message.nbytes")
|
||||
c = ct_into
|
||||
if mac_into is None:
|
||||
mac = bytearray(maclen)
|
||||
else:
|
||||
if len(mac_into) < maclen:
|
||||
if mac_into.nbytes < maclen:
|
||||
raise TypeError("mac_into length must be at least maclen")
|
||||
mac = mac_into
|
||||
|
||||
@@ -97,9 +106,9 @@ def encrypt_detached(
|
||||
ffi.from_buffer(mac),
|
||||
maclen,
|
||||
_ptr(message),
|
||||
len(message),
|
||||
message.nbytes,
|
||||
_ptr(ad),
|
||||
0 if ad is None else len(ad),
|
||||
0 if ad is None else ad.nbytes,
|
||||
_ptr(nonce),
|
||||
_ptr(key),
|
||||
)
|
||||
@@ -108,7 +117,7 @@ def encrypt_detached(
|
||||
err_name = errno.errorcode.get(err_num, f"errno_{err_num}")
|
||||
raise RuntimeError(f"encrypt detached failed: {err_name}")
|
||||
return (
|
||||
c if ct_into is None else memoryview(c)[: len(message)],
|
||||
c if ct_into is None else memoryview(c)[: message.nbytes],
|
||||
mac if mac_into is None else memoryview(mac)[:maclen],
|
||||
) # type: ignore
|
||||
|
||||
@@ -139,34 +148,42 @@ def decrypt_detached(
|
||||
TypeError: If lengths are invalid.
|
||||
ValueError: If authentication fails.
|
||||
"""
|
||||
if len(key) != KEYBYTES:
|
||||
key = memoryview(key)
|
||||
nonce = memoryview(nonce)
|
||||
ct = memoryview(ct)
|
||||
mac = memoryview(mac)
|
||||
if ad is not None:
|
||||
ad = memoryview(ad)
|
||||
if into is not None:
|
||||
into = memoryview(into)
|
||||
if key.nbytes != KEYBYTES:
|
||||
raise TypeError(f"key length must be {KEYBYTES}")
|
||||
if len(nonce) != NONCEBYTES:
|
||||
if nonce.nbytes != NONCEBYTES:
|
||||
raise TypeError(f"nonce length must be {NONCEBYTES}")
|
||||
maclen = len(mac)
|
||||
maclen = mac.nbytes
|
||||
if maclen not in (16, 32):
|
||||
raise TypeError("mac length must be 16 or 32")
|
||||
if into is None:
|
||||
out = bytearray(len(ct))
|
||||
out = bytearray(ct.nbytes)
|
||||
else:
|
||||
if len(into) < len(ct):
|
||||
raise TypeError("into length must be at least len(ciphertext)")
|
||||
if into.nbytes < ct.nbytes:
|
||||
raise TypeError("into length must be at least ct.nbytes")
|
||||
out = into
|
||||
|
||||
rc = _lib.aegis256_decrypt_detached(
|
||||
ffi.from_buffer(out),
|
||||
_ptr(ct),
|
||||
len(ct),
|
||||
ct.nbytes,
|
||||
_ptr(mac),
|
||||
maclen,
|
||||
_ptr(ad),
|
||||
0 if ad is None else len(ad),
|
||||
0 if ad is None else ad.nbytes,
|
||||
_ptr(nonce),
|
||||
_ptr(key),
|
||||
)
|
||||
if rc != 0:
|
||||
raise ValueError("authentication failed")
|
||||
return out if into is None else memoryview(out)[: len(ct)] # type: ignore
|
||||
return out if into is None else memoryview(out)[: ct.nbytes] # type: ignore
|
||||
|
||||
|
||||
def encrypt(
|
||||
@@ -197,24 +214,31 @@ def encrypt(
|
||||
"""
|
||||
if maclen not in (16, 32):
|
||||
raise TypeError("maclen must be 16 or 32")
|
||||
if len(key) != KEYBYTES:
|
||||
key = memoryview(key)
|
||||
nonce = memoryview(nonce)
|
||||
message = memoryview(message)
|
||||
if ad is not None:
|
||||
ad = memoryview(ad)
|
||||
if into is not None:
|
||||
into = memoryview(into)
|
||||
if key.nbytes != KEYBYTES:
|
||||
raise TypeError(f"key length must be {KEYBYTES}")
|
||||
if len(nonce) != NONCEBYTES:
|
||||
if nonce.nbytes != NONCEBYTES:
|
||||
raise TypeError(f"nonce length must be {NONCEBYTES}")
|
||||
if into is None:
|
||||
out = bytearray(len(message) + maclen)
|
||||
out = bytearray(message.nbytes + maclen)
|
||||
else:
|
||||
if len(into) < len(message) + maclen:
|
||||
raise TypeError("into length must be at least len(message)+maclen")
|
||||
if into.nbytes < message.nbytes + maclen:
|
||||
raise TypeError("into length must be at least message.nbytes + maclen")
|
||||
out = into
|
||||
|
||||
rc = _lib.aegis256_encrypt(
|
||||
ffi.from_buffer(out),
|
||||
maclen,
|
||||
_ptr(message),
|
||||
len(message),
|
||||
message.nbytes,
|
||||
_ptr(ad),
|
||||
0 if ad is None else len(ad),
|
||||
0 if ad is None else ad.nbytes,
|
||||
_ptr(nonce),
|
||||
_ptr(key),
|
||||
)
|
||||
@@ -222,7 +246,7 @@ def encrypt(
|
||||
err_num = ffi.errno
|
||||
err_name = errno.errorcode.get(err_num, f"errno_{err_num}")
|
||||
raise RuntimeError(f"encrypt failed: {err_name}")
|
||||
return out if into is None else memoryview(out)[: len(message) + maclen] # type: ignore
|
||||
return out if into is None else memoryview(out)[: message.nbytes + maclen] # type: ignore
|
||||
|
||||
|
||||
def decrypt(
|
||||
@@ -253,29 +277,36 @@ def decrypt(
|
||||
"""
|
||||
if maclen not in (16, 32):
|
||||
raise TypeError("maclen must be 16 or 32")
|
||||
if len(key) != KEYBYTES:
|
||||
key = memoryview(key)
|
||||
nonce = memoryview(nonce)
|
||||
ct = memoryview(ct)
|
||||
if ad is not None:
|
||||
ad = memoryview(ad)
|
||||
if into is not None:
|
||||
into = memoryview(into)
|
||||
if key.nbytes != KEYBYTES:
|
||||
raise TypeError(f"key length must be {KEYBYTES}")
|
||||
if len(nonce) != NONCEBYTES:
|
||||
if nonce.nbytes != NONCEBYTES:
|
||||
raise TypeError(f"nonce length must be {NONCEBYTES}")
|
||||
if len(ct) < maclen:
|
||||
if ct.nbytes < maclen:
|
||||
raise TypeError("ciphertext too short for tag")
|
||||
expected_out = len(ct) - maclen
|
||||
expected_out = ct.nbytes - maclen
|
||||
if into is None:
|
||||
out = bytearray(expected_out)
|
||||
else:
|
||||
if len(into) < expected_out:
|
||||
if into.nbytes < expected_out:
|
||||
raise TypeError(
|
||||
"into length must be at least len(ciphertext_with_tag)-maclen"
|
||||
"into length must be at least ct.nbytes - maclen"
|
||||
)
|
||||
out = into
|
||||
|
||||
rc = _lib.aegis256_decrypt(
|
||||
ffi.from_buffer(out),
|
||||
_ptr(ct),
|
||||
len(ct),
|
||||
ct.nbytes,
|
||||
maclen,
|
||||
_ptr(ad),
|
||||
0 if ad is None else len(ad),
|
||||
0 if ad is None else ad.nbytes,
|
||||
_ptr(nonce),
|
||||
_ptr(key),
|
||||
)
|
||||
@@ -305,25 +336,30 @@ def stream(
|
||||
Raises:
|
||||
TypeError: If lengths are invalid or neither length nor into provided.
|
||||
"""
|
||||
if len(key) != KEYBYTES:
|
||||
key = memoryview(key)
|
||||
if nonce is not None:
|
||||
nonce = memoryview(nonce)
|
||||
if into is not None:
|
||||
into = memoryview(into)
|
||||
if key.nbytes != KEYBYTES:
|
||||
raise TypeError(f"key length must be {KEYBYTES}")
|
||||
if nonce is not None and len(nonce) != NONCEBYTES:
|
||||
if nonce is not None and nonce.nbytes != NONCEBYTES:
|
||||
raise TypeError(f"nonce length must be {NONCEBYTES}")
|
||||
if into is None:
|
||||
if length is None:
|
||||
raise TypeError("provide either into or length")
|
||||
out = bytearray(length)
|
||||
else:
|
||||
if length is not None and len(into) < length:
|
||||
if length is not None and into.nbytes < length:
|
||||
raise TypeError("into length must be at least length")
|
||||
out = into
|
||||
_lib.aegis256_stream(
|
||||
ffi.from_buffer(out),
|
||||
len(out),
|
||||
memoryview(out).nbytes,
|
||||
_ptr(nonce),
|
||||
_ptr(key),
|
||||
)
|
||||
return out if into is None else memoryview(out)[: length or len(out)] # type: ignore
|
||||
return out if into is None else memoryview(out)[: length or memoryview(out).nbytes] # type: ignore
|
||||
|
||||
|
||||
def encrypt_unauthenticated(
|
||||
@@ -347,24 +383,29 @@ def encrypt_unauthenticated(
|
||||
Raises:
|
||||
TypeError: If lengths are invalid.
|
||||
"""
|
||||
if len(key) != KEYBYTES:
|
||||
key = memoryview(key)
|
||||
nonce = memoryview(nonce)
|
||||
message = memoryview(message)
|
||||
if into is not None:
|
||||
into = memoryview(into)
|
||||
if key.nbytes != KEYBYTES:
|
||||
raise TypeError(f"key length must be {KEYBYTES}")
|
||||
if len(nonce) != NONCEBYTES:
|
||||
if nonce.nbytes != NONCEBYTES:
|
||||
raise TypeError(f"nonce length must be {NONCEBYTES}")
|
||||
if into is None:
|
||||
out = bytearray(len(message))
|
||||
out = bytearray(message.nbytes)
|
||||
else:
|
||||
if len(into) < len(message):
|
||||
raise TypeError("into length must be at least len(message)")
|
||||
if into.nbytes < message.nbytes:
|
||||
raise TypeError("into length must be at least message.nbytes")
|
||||
out = into
|
||||
_lib.aegis256_encrypt_unauthenticated(
|
||||
ffi.from_buffer(out),
|
||||
_ptr(message),
|
||||
len(message),
|
||||
message.nbytes,
|
||||
_ptr(nonce),
|
||||
_ptr(key),
|
||||
)
|
||||
return out if into is None else memoryview(out)[: len(message)] # type: ignore
|
||||
return out if into is None else memoryview(out)[: message.nbytes] # type: ignore
|
||||
|
||||
|
||||
def decrypt_unauthenticated(
|
||||
@@ -388,24 +429,29 @@ def decrypt_unauthenticated(
|
||||
Raises:
|
||||
TypeError: If lengths are invalid.
|
||||
"""
|
||||
if len(key) != KEYBYTES:
|
||||
key = memoryview(key)
|
||||
nonce = memoryview(nonce)
|
||||
ct = memoryview(ct)
|
||||
if into is not None:
|
||||
into = memoryview(into)
|
||||
if key.nbytes != KEYBYTES:
|
||||
raise TypeError(f"key length must be {KEYBYTES}")
|
||||
if len(nonce) != NONCEBYTES:
|
||||
if nonce.nbytes != NONCEBYTES:
|
||||
raise TypeError(f"nonce length must be {NONCEBYTES}")
|
||||
if into is None:
|
||||
out = bytearray(len(ct))
|
||||
out = bytearray(ct.nbytes)
|
||||
else:
|
||||
if len(into) < len(ct):
|
||||
raise TypeError("into length must be at least len(ciphertext)")
|
||||
if into.nbytes < ct.nbytes:
|
||||
raise TypeError("into length must be at least ct.nbytes")
|
||||
out = into
|
||||
_lib.aegis256_decrypt_unauthenticated(
|
||||
ffi.from_buffer(out),
|
||||
_ptr(ct),
|
||||
len(ct),
|
||||
ct.nbytes,
|
||||
_ptr(nonce),
|
||||
_ptr(key),
|
||||
)
|
||||
return out if into is None else memoryview(out)[: len(ct)] # type: ignore
|
||||
return out if into is None else memoryview(out)[: ct.nbytes] # type: ignore
|
||||
|
||||
|
||||
# This is missing from C API but convenient to have here
|
||||
@@ -428,6 +474,11 @@ def mac(
|
||||
Returns:
|
||||
MAC bytes as bytearray if into not provided, memoryview of into otherwise
|
||||
"""
|
||||
key = memoryview(key)
|
||||
nonce = memoryview(nonce)
|
||||
data = memoryview(data)
|
||||
if into is not None:
|
||||
into = memoryview(into)
|
||||
mac_state = Mac(key, nonce, maclen)
|
||||
mac_state.update(data)
|
||||
return mac_state.final(into)
|
||||
@@ -459,9 +510,11 @@ class Mac:
|
||||
"""
|
||||
if maclen not in (16, 32):
|
||||
raise TypeError("maclen must be 16 or 32")
|
||||
if len(key) != KEYBYTES:
|
||||
key = memoryview(key)
|
||||
nonce = memoryview(nonce)
|
||||
if key.nbytes != KEYBYTES:
|
||||
raise TypeError(f"key length must be {KEYBYTES}")
|
||||
if len(nonce) != NONCEBYTES:
|
||||
if nonce.nbytes != NONCEBYTES:
|
||||
raise TypeError(f"nonce length must be {NONCEBYTES}")
|
||||
|
||||
self._maclen = maclen
|
||||
@@ -492,7 +545,8 @@ class Mac:
|
||||
"""
|
||||
if self._cached_digest is not None:
|
||||
raise RuntimeError("Cannot update after final()")
|
||||
rc = _lib.aegis256_mac_update(self._proxy.ptr, _ptr(data), len(data))
|
||||
data = memoryview(data)
|
||||
rc = _lib.aegis256_mac_update(self._proxy.ptr, _ptr(data), data.nbytes)
|
||||
if rc != 0:
|
||||
err_num = ffi.errno
|
||||
err_name = errno.errorcode.get(err_num, f"errno_{err_num}")
|
||||
@@ -522,12 +576,13 @@ class Mac:
|
||||
if into is None:
|
||||
out = bytearray(maclen)
|
||||
else:
|
||||
if len(into) < maclen:
|
||||
into = memoryview(into)
|
||||
if into.nbytes < maclen:
|
||||
raise TypeError("into length must be at least maclen")
|
||||
out = into
|
||||
|
||||
clone = self.clone()
|
||||
rc = _lib.aegis256_mac_final(clone._proxy.ptr, ffi.from_buffer(out), maclen)
|
||||
rc = _lib.aegis256_mac_final(clone._proxy.ptr, ffi.from_buffer(out), memoryview(out).nbytes)
|
||||
if rc != 0:
|
||||
err_num = ffi.errno
|
||||
err_name = errno.errorcode.get(err_num, f"errno_{err_num}")
|
||||
@@ -567,7 +622,8 @@ class Mac:
|
||||
TypeError: If tag length is invalid.
|
||||
ValueError: If verification fails.
|
||||
"""
|
||||
maclen = len(mac)
|
||||
mac = memoryview(mac)
|
||||
maclen = mac.nbytes
|
||||
if maclen not in (16, 32):
|
||||
raise TypeError("mac length must be 16 or 32")
|
||||
|
||||
@@ -606,15 +662,19 @@ class Encryptor:
|
||||
"""
|
||||
if maclen not in (16, 32):
|
||||
raise TypeError("maclen must be 16 or 32")
|
||||
if len(key) != KEYBYTES:
|
||||
key = memoryview(key)
|
||||
nonce = memoryview(nonce)
|
||||
if ad is not None:
|
||||
ad = memoryview(ad)
|
||||
if key.nbytes != KEYBYTES:
|
||||
raise TypeError(f"key length must be {KEYBYTES}")
|
||||
if len(nonce) != NONCEBYTES:
|
||||
if nonce.nbytes != NONCEBYTES:
|
||||
raise TypeError(f"nonce length must be {NONCEBYTES}")
|
||||
self._state = new_aligned_struct("aegis256_state", ALIGNMENT)
|
||||
_lib.aegis256_state_init(
|
||||
self._state.ptr,
|
||||
_ptr(ad) if ad is not None else ffi.NULL,
|
||||
0 if ad is None else len(ad),
|
||||
0 if ad is None else ad.nbytes,
|
||||
_ptr(nonce),
|
||||
_ptr(key),
|
||||
)
|
||||
@@ -638,10 +698,13 @@ class Encryptor:
|
||||
"""
|
||||
if self._state is None:
|
||||
raise RuntimeError("Cannot call update() after final()")
|
||||
expected_out = len(message)
|
||||
message = memoryview(message)
|
||||
if into is not None:
|
||||
into = memoryview(into)
|
||||
expected_out = message.nbytes
|
||||
out = into if into is not None else bytearray(expected_out)
|
||||
out_mv = memoryview(out)
|
||||
if len(out_mv) < expected_out:
|
||||
if out_mv.nbytes < expected_out:
|
||||
raise TypeError(
|
||||
"into length must be >= expected output size for this update"
|
||||
)
|
||||
@@ -649,10 +712,10 @@ class Encryptor:
|
||||
rc = _lib.aegis256_state_encrypt_update(
|
||||
self._state.ptr,
|
||||
ffi.from_buffer(out_mv),
|
||||
len(out_mv),
|
||||
out_mv.nbytes,
|
||||
written,
|
||||
_ptr(message),
|
||||
len(message),
|
||||
message.nbytes,
|
||||
)
|
||||
if rc != 0:
|
||||
err_num = ffi.errno
|
||||
@@ -680,12 +743,14 @@ class Encryptor:
|
||||
raise RuntimeError("Cannot call final() after final()")
|
||||
maclen = self._maclen
|
||||
# Only the authentication tag is produced here; allocate exactly maclen
|
||||
if into is not None:
|
||||
into = memoryview(into)
|
||||
out = into if into is not None else bytearray(maclen)
|
||||
written = ffi.new("size_t *")
|
||||
rc = _lib.aegis256_state_encrypt_final(
|
||||
self._state.ptr,
|
||||
ffi.from_buffer(out),
|
||||
len(out),
|
||||
memoryview(out).nbytes,
|
||||
written,
|
||||
maclen,
|
||||
)
|
||||
@@ -730,15 +795,19 @@ class Decryptor:
|
||||
"""
|
||||
if maclen not in (16, 32):
|
||||
raise TypeError("maclen must be 16 or 32")
|
||||
if len(key) != KEYBYTES:
|
||||
key = memoryview(key)
|
||||
nonce = memoryview(nonce)
|
||||
if ad is not None:
|
||||
ad = memoryview(ad)
|
||||
if key.nbytes != KEYBYTES:
|
||||
raise TypeError(f"key length must be {KEYBYTES}")
|
||||
if len(nonce) != NONCEBYTES:
|
||||
if nonce.nbytes != NONCEBYTES:
|
||||
raise TypeError(f"nonce length must be {NONCEBYTES}")
|
||||
self._state = new_aligned_struct("aegis256_state", ALIGNMENT)
|
||||
_lib.aegis256_state_init(
|
||||
self._state.ptr,
|
||||
_ptr(ad) if ad is not None else ffi.NULL,
|
||||
0 if ad is None else len(ad),
|
||||
0 if ad is None else ad.nbytes,
|
||||
_ptr(nonce),
|
||||
_ptr(key),
|
||||
)
|
||||
@@ -760,26 +829,29 @@ class Decryptor:
|
||||
"""
|
||||
if self._state is None:
|
||||
raise RuntimeError("Cannot call update() after final()")
|
||||
expected_out = len(ct)
|
||||
ct = memoryview(ct)
|
||||
if into is not None:
|
||||
into = memoryview(into)
|
||||
expected_out = ct.nbytes
|
||||
out = into if into is not None else bytearray(expected_out)
|
||||
out_mv = memoryview(out)
|
||||
if len(out_mv) < expected_out:
|
||||
if out_mv.nbytes < expected_out:
|
||||
raise TypeError("into length must be >= required capacity for this update")
|
||||
written = ffi.new("size_t *")
|
||||
rc = _lib.aegis256_state_decrypt_detached_update(
|
||||
self._state.ptr,
|
||||
ffi.from_buffer(out_mv),
|
||||
len(out_mv),
|
||||
out_mv.nbytes,
|
||||
written,
|
||||
_ptr(ct),
|
||||
len(ct),
|
||||
ct.nbytes,
|
||||
)
|
||||
if rc != 0:
|
||||
err_num = ffi.errno
|
||||
err_name = errno.errorcode.get(err_num, f"errno_{err_num}")
|
||||
raise RuntimeError(f"state decrypt update failed: {err_name}")
|
||||
w = int(written[0])
|
||||
assert w == expected_out, f"got {w}, expected {expected_out}, len(ct)={len(ct)}"
|
||||
assert w == expected_out, f"got {w}, expected {expected_out}, ct.nbytes={ct.nbytes}"
|
||||
return out if into is None else memoryview(out)[:w] # type: ignore
|
||||
|
||||
def final(self, mac: Buffer) -> None:
|
||||
@@ -796,7 +868,8 @@ class Decryptor:
|
||||
if self._state is None:
|
||||
raise RuntimeError("Cannot call final() after final()")
|
||||
maclen = self._maclen
|
||||
if len(mac) != maclen:
|
||||
mac = memoryview(mac)
|
||||
if mac.nbytes != maclen:
|
||||
raise TypeError(f"mac length must be {maclen}")
|
||||
rc = _lib.aegis256_state_decrypt_detached_final(
|
||||
self._state.ptr, ffi.NULL, 0, ffi.NULL, _ptr(mac), maclen
|
||||
|
||||
@@ -74,21 +74,30 @@ def encrypt_detached(
|
||||
"""
|
||||
if maclen not in (16, 32):
|
||||
raise TypeError("maclen must be 16 or 32")
|
||||
if len(key) != KEYBYTES:
|
||||
key = memoryview(key)
|
||||
nonce = memoryview(nonce)
|
||||
message = memoryview(message)
|
||||
if ad is not None:
|
||||
ad = memoryview(ad)
|
||||
if ct_into is not None:
|
||||
ct_into = memoryview(ct_into)
|
||||
if mac_into is not None:
|
||||
mac_into = memoryview(mac_into)
|
||||
if key.nbytes != KEYBYTES:
|
||||
raise TypeError(f"key length must be {KEYBYTES}")
|
||||
if len(nonce) != NONCEBYTES:
|
||||
if nonce.nbytes != NONCEBYTES:
|
||||
raise TypeError(f"nonce length must be {NONCEBYTES}")
|
||||
|
||||
if ct_into is None:
|
||||
c = bytearray(len(message))
|
||||
c = bytearray(message.nbytes)
|
||||
else:
|
||||
if len(ct_into) < len(message):
|
||||
raise TypeError("ct_into length must be at least len(message)")
|
||||
if ct_into.nbytes < message.nbytes:
|
||||
raise TypeError("ct_into length must be at least message.nbytes")
|
||||
c = ct_into
|
||||
if mac_into is None:
|
||||
mac = bytearray(maclen)
|
||||
else:
|
||||
if len(mac_into) < maclen:
|
||||
if mac_into.nbytes < maclen:
|
||||
raise TypeError("mac_into length must be at least maclen")
|
||||
mac = mac_into
|
||||
|
||||
@@ -97,9 +106,9 @@ def encrypt_detached(
|
||||
ffi.from_buffer(mac),
|
||||
maclen,
|
||||
_ptr(message),
|
||||
len(message),
|
||||
message.nbytes,
|
||||
_ptr(ad),
|
||||
0 if ad is None else len(ad),
|
||||
0 if ad is None else ad.nbytes,
|
||||
_ptr(nonce),
|
||||
_ptr(key),
|
||||
)
|
||||
@@ -108,7 +117,7 @@ def encrypt_detached(
|
||||
err_name = errno.errorcode.get(err_num, f"errno_{err_num}")
|
||||
raise RuntimeError(f"encrypt detached failed: {err_name}")
|
||||
return (
|
||||
c if ct_into is None else memoryview(c)[: len(message)],
|
||||
c if ct_into is None else memoryview(c)[: message.nbytes],
|
||||
mac if mac_into is None else memoryview(mac)[:maclen],
|
||||
) # type: ignore
|
||||
|
||||
@@ -139,34 +148,42 @@ def decrypt_detached(
|
||||
TypeError: If lengths are invalid.
|
||||
ValueError: If authentication fails.
|
||||
"""
|
||||
if len(key) != KEYBYTES:
|
||||
key = memoryview(key)
|
||||
nonce = memoryview(nonce)
|
||||
ct = memoryview(ct)
|
||||
mac = memoryview(mac)
|
||||
if ad is not None:
|
||||
ad = memoryview(ad)
|
||||
if into is not None:
|
||||
into = memoryview(into)
|
||||
if key.nbytes != KEYBYTES:
|
||||
raise TypeError(f"key length must be {KEYBYTES}")
|
||||
if len(nonce) != NONCEBYTES:
|
||||
if nonce.nbytes != NONCEBYTES:
|
||||
raise TypeError(f"nonce length must be {NONCEBYTES}")
|
||||
maclen = len(mac)
|
||||
maclen = mac.nbytes
|
||||
if maclen not in (16, 32):
|
||||
raise TypeError("mac length must be 16 or 32")
|
||||
if into is None:
|
||||
out = bytearray(len(ct))
|
||||
out = bytearray(ct.nbytes)
|
||||
else:
|
||||
if len(into) < len(ct):
|
||||
raise TypeError("into length must be at least len(ciphertext)")
|
||||
if into.nbytes < ct.nbytes:
|
||||
raise TypeError("into length must be at least ct.nbytes")
|
||||
out = into
|
||||
|
||||
rc = _lib.aegis256x2_decrypt_detached(
|
||||
ffi.from_buffer(out),
|
||||
_ptr(ct),
|
||||
len(ct),
|
||||
ct.nbytes,
|
||||
_ptr(mac),
|
||||
maclen,
|
||||
_ptr(ad),
|
||||
0 if ad is None else len(ad),
|
||||
0 if ad is None else ad.nbytes,
|
||||
_ptr(nonce),
|
||||
_ptr(key),
|
||||
)
|
||||
if rc != 0:
|
||||
raise ValueError("authentication failed")
|
||||
return out if into is None else memoryview(out)[: len(ct)] # type: ignore
|
||||
return out if into is None else memoryview(out)[: ct.nbytes] # type: ignore
|
||||
|
||||
|
||||
def encrypt(
|
||||
@@ -197,24 +214,31 @@ def encrypt(
|
||||
"""
|
||||
if maclen not in (16, 32):
|
||||
raise TypeError("maclen must be 16 or 32")
|
||||
if len(key) != KEYBYTES:
|
||||
key = memoryview(key)
|
||||
nonce = memoryview(nonce)
|
||||
message = memoryview(message)
|
||||
if ad is not None:
|
||||
ad = memoryview(ad)
|
||||
if into is not None:
|
||||
into = memoryview(into)
|
||||
if key.nbytes != KEYBYTES:
|
||||
raise TypeError(f"key length must be {KEYBYTES}")
|
||||
if len(nonce) != NONCEBYTES:
|
||||
if nonce.nbytes != NONCEBYTES:
|
||||
raise TypeError(f"nonce length must be {NONCEBYTES}")
|
||||
if into is None:
|
||||
out = bytearray(len(message) + maclen)
|
||||
out = bytearray(message.nbytes + maclen)
|
||||
else:
|
||||
if len(into) < len(message) + maclen:
|
||||
raise TypeError("into length must be at least len(message)+maclen")
|
||||
if into.nbytes < message.nbytes + maclen:
|
||||
raise TypeError("into length must be at least message.nbytes + maclen")
|
||||
out = into
|
||||
|
||||
rc = _lib.aegis256x2_encrypt(
|
||||
ffi.from_buffer(out),
|
||||
maclen,
|
||||
_ptr(message),
|
||||
len(message),
|
||||
message.nbytes,
|
||||
_ptr(ad),
|
||||
0 if ad is None else len(ad),
|
||||
0 if ad is None else ad.nbytes,
|
||||
_ptr(nonce),
|
||||
_ptr(key),
|
||||
)
|
||||
@@ -222,7 +246,7 @@ def encrypt(
|
||||
err_num = ffi.errno
|
||||
err_name = errno.errorcode.get(err_num, f"errno_{err_num}")
|
||||
raise RuntimeError(f"encrypt failed: {err_name}")
|
||||
return out if into is None else memoryview(out)[: len(message) + maclen] # type: ignore
|
||||
return out if into is None else memoryview(out)[: message.nbytes + maclen] # type: ignore
|
||||
|
||||
|
||||
def decrypt(
|
||||
@@ -253,29 +277,36 @@ def decrypt(
|
||||
"""
|
||||
if maclen not in (16, 32):
|
||||
raise TypeError("maclen must be 16 or 32")
|
||||
if len(key) != KEYBYTES:
|
||||
key = memoryview(key)
|
||||
nonce = memoryview(nonce)
|
||||
ct = memoryview(ct)
|
||||
if ad is not None:
|
||||
ad = memoryview(ad)
|
||||
if into is not None:
|
||||
into = memoryview(into)
|
||||
if key.nbytes != KEYBYTES:
|
||||
raise TypeError(f"key length must be {KEYBYTES}")
|
||||
if len(nonce) != NONCEBYTES:
|
||||
if nonce.nbytes != NONCEBYTES:
|
||||
raise TypeError(f"nonce length must be {NONCEBYTES}")
|
||||
if len(ct) < maclen:
|
||||
if ct.nbytes < maclen:
|
||||
raise TypeError("ciphertext too short for tag")
|
||||
expected_out = len(ct) - maclen
|
||||
expected_out = ct.nbytes - maclen
|
||||
if into is None:
|
||||
out = bytearray(expected_out)
|
||||
else:
|
||||
if len(into) < expected_out:
|
||||
if into.nbytes < expected_out:
|
||||
raise TypeError(
|
||||
"into length must be at least len(ciphertext_with_tag)-maclen"
|
||||
"into length must be at least ct.nbytes - maclen"
|
||||
)
|
||||
out = into
|
||||
|
||||
rc = _lib.aegis256x2_decrypt(
|
||||
ffi.from_buffer(out),
|
||||
_ptr(ct),
|
||||
len(ct),
|
||||
ct.nbytes,
|
||||
maclen,
|
||||
_ptr(ad),
|
||||
0 if ad is None else len(ad),
|
||||
0 if ad is None else ad.nbytes,
|
||||
_ptr(nonce),
|
||||
_ptr(key),
|
||||
)
|
||||
@@ -305,25 +336,30 @@ def stream(
|
||||
Raises:
|
||||
TypeError: If lengths are invalid or neither length nor into provided.
|
||||
"""
|
||||
if len(key) != KEYBYTES:
|
||||
key = memoryview(key)
|
||||
if nonce is not None:
|
||||
nonce = memoryview(nonce)
|
||||
if into is not None:
|
||||
into = memoryview(into)
|
||||
if key.nbytes != KEYBYTES:
|
||||
raise TypeError(f"key length must be {KEYBYTES}")
|
||||
if nonce is not None and len(nonce) != NONCEBYTES:
|
||||
if nonce is not None and nonce.nbytes != NONCEBYTES:
|
||||
raise TypeError(f"nonce length must be {NONCEBYTES}")
|
||||
if into is None:
|
||||
if length is None:
|
||||
raise TypeError("provide either into or length")
|
||||
out = bytearray(length)
|
||||
else:
|
||||
if length is not None and len(into) < length:
|
||||
if length is not None and into.nbytes < length:
|
||||
raise TypeError("into length must be at least length")
|
||||
out = into
|
||||
_lib.aegis256x2_stream(
|
||||
ffi.from_buffer(out),
|
||||
len(out),
|
||||
memoryview(out).nbytes,
|
||||
_ptr(nonce),
|
||||
_ptr(key),
|
||||
)
|
||||
return out if into is None else memoryview(out)[: length or len(out)] # type: ignore
|
||||
return out if into is None else memoryview(out)[: length or memoryview(out).nbytes] # type: ignore
|
||||
|
||||
|
||||
def encrypt_unauthenticated(
|
||||
@@ -347,24 +383,29 @@ def encrypt_unauthenticated(
|
||||
Raises:
|
||||
TypeError: If lengths are invalid.
|
||||
"""
|
||||
if len(key) != KEYBYTES:
|
||||
key = memoryview(key)
|
||||
nonce = memoryview(nonce)
|
||||
message = memoryview(message)
|
||||
if into is not None:
|
||||
into = memoryview(into)
|
||||
if key.nbytes != KEYBYTES:
|
||||
raise TypeError(f"key length must be {KEYBYTES}")
|
||||
if len(nonce) != NONCEBYTES:
|
||||
if nonce.nbytes != NONCEBYTES:
|
||||
raise TypeError(f"nonce length must be {NONCEBYTES}")
|
||||
if into is None:
|
||||
out = bytearray(len(message))
|
||||
out = bytearray(message.nbytes)
|
||||
else:
|
||||
if len(into) < len(message):
|
||||
raise TypeError("into length must be at least len(message)")
|
||||
if into.nbytes < message.nbytes:
|
||||
raise TypeError("into length must be at least message.nbytes")
|
||||
out = into
|
||||
_lib.aegis256x2_encrypt_unauthenticated(
|
||||
ffi.from_buffer(out),
|
||||
_ptr(message),
|
||||
len(message),
|
||||
message.nbytes,
|
||||
_ptr(nonce),
|
||||
_ptr(key),
|
||||
)
|
||||
return out if into is None else memoryview(out)[: len(message)] # type: ignore
|
||||
return out if into is None else memoryview(out)[: message.nbytes] # type: ignore
|
||||
|
||||
|
||||
def decrypt_unauthenticated(
|
||||
@@ -388,24 +429,29 @@ def decrypt_unauthenticated(
|
||||
Raises:
|
||||
TypeError: If lengths are invalid.
|
||||
"""
|
||||
if len(key) != KEYBYTES:
|
||||
key = memoryview(key)
|
||||
nonce = memoryview(nonce)
|
||||
ct = memoryview(ct)
|
||||
if into is not None:
|
||||
into = memoryview(into)
|
||||
if key.nbytes != KEYBYTES:
|
||||
raise TypeError(f"key length must be {KEYBYTES}")
|
||||
if len(nonce) != NONCEBYTES:
|
||||
if nonce.nbytes != NONCEBYTES:
|
||||
raise TypeError(f"nonce length must be {NONCEBYTES}")
|
||||
if into is None:
|
||||
out = bytearray(len(ct))
|
||||
out = bytearray(ct.nbytes)
|
||||
else:
|
||||
if len(into) < len(ct):
|
||||
raise TypeError("into length must be at least len(ciphertext)")
|
||||
if into.nbytes < ct.nbytes:
|
||||
raise TypeError("into length must be at least ct.nbytes")
|
||||
out = into
|
||||
_lib.aegis256x2_decrypt_unauthenticated(
|
||||
ffi.from_buffer(out),
|
||||
_ptr(ct),
|
||||
len(ct),
|
||||
ct.nbytes,
|
||||
_ptr(nonce),
|
||||
_ptr(key),
|
||||
)
|
||||
return out if into is None else memoryview(out)[: len(ct)] # type: ignore
|
||||
return out if into is None else memoryview(out)[: ct.nbytes] # type: ignore
|
||||
|
||||
|
||||
# This is missing from C API but convenient to have here
|
||||
@@ -428,6 +474,11 @@ def mac(
|
||||
Returns:
|
||||
MAC bytes as bytearray if into not provided, memoryview of into otherwise
|
||||
"""
|
||||
key = memoryview(key)
|
||||
nonce = memoryview(nonce)
|
||||
data = memoryview(data)
|
||||
if into is not None:
|
||||
into = memoryview(into)
|
||||
mac_state = Mac(key, nonce, maclen)
|
||||
mac_state.update(data)
|
||||
return mac_state.final(into)
|
||||
@@ -459,9 +510,11 @@ class Mac:
|
||||
"""
|
||||
if maclen not in (16, 32):
|
||||
raise TypeError("maclen must be 16 or 32")
|
||||
if len(key) != KEYBYTES:
|
||||
key = memoryview(key)
|
||||
nonce = memoryview(nonce)
|
||||
if key.nbytes != KEYBYTES:
|
||||
raise TypeError(f"key length must be {KEYBYTES}")
|
||||
if len(nonce) != NONCEBYTES:
|
||||
if nonce.nbytes != NONCEBYTES:
|
||||
raise TypeError(f"nonce length must be {NONCEBYTES}")
|
||||
|
||||
self._maclen = maclen
|
||||
@@ -492,7 +545,8 @@ class Mac:
|
||||
"""
|
||||
if self._cached_digest is not None:
|
||||
raise RuntimeError("Cannot update after final()")
|
||||
rc = _lib.aegis256x2_mac_update(self._proxy.ptr, _ptr(data), len(data))
|
||||
data = memoryview(data)
|
||||
rc = _lib.aegis256x2_mac_update(self._proxy.ptr, _ptr(data), data.nbytes)
|
||||
if rc != 0:
|
||||
err_num = ffi.errno
|
||||
err_name = errno.errorcode.get(err_num, f"errno_{err_num}")
|
||||
@@ -522,12 +576,13 @@ class Mac:
|
||||
if into is None:
|
||||
out = bytearray(maclen)
|
||||
else:
|
||||
if len(into) < maclen:
|
||||
into = memoryview(into)
|
||||
if into.nbytes < maclen:
|
||||
raise TypeError("into length must be at least maclen")
|
||||
out = into
|
||||
|
||||
clone = self.clone()
|
||||
rc = _lib.aegis256x2_mac_final(clone._proxy.ptr, ffi.from_buffer(out), maclen)
|
||||
rc = _lib.aegis256x2_mac_final(clone._proxy.ptr, ffi.from_buffer(out), memoryview(out).nbytes)
|
||||
if rc != 0:
|
||||
err_num = ffi.errno
|
||||
err_name = errno.errorcode.get(err_num, f"errno_{err_num}")
|
||||
@@ -567,7 +622,8 @@ class Mac:
|
||||
TypeError: If tag length is invalid.
|
||||
ValueError: If verification fails.
|
||||
"""
|
||||
maclen = len(mac)
|
||||
mac = memoryview(mac)
|
||||
maclen = mac.nbytes
|
||||
if maclen not in (16, 32):
|
||||
raise TypeError("mac length must be 16 or 32")
|
||||
|
||||
@@ -606,15 +662,19 @@ class Encryptor:
|
||||
"""
|
||||
if maclen not in (16, 32):
|
||||
raise TypeError("maclen must be 16 or 32")
|
||||
if len(key) != KEYBYTES:
|
||||
key = memoryview(key)
|
||||
nonce = memoryview(nonce)
|
||||
if ad is not None:
|
||||
ad = memoryview(ad)
|
||||
if key.nbytes != KEYBYTES:
|
||||
raise TypeError(f"key length must be {KEYBYTES}")
|
||||
if len(nonce) != NONCEBYTES:
|
||||
if nonce.nbytes != NONCEBYTES:
|
||||
raise TypeError(f"nonce length must be {NONCEBYTES}")
|
||||
self._state = new_aligned_struct("aegis256x2_state", ALIGNMENT)
|
||||
_lib.aegis256x2_state_init(
|
||||
self._state.ptr,
|
||||
_ptr(ad) if ad is not None else ffi.NULL,
|
||||
0 if ad is None else len(ad),
|
||||
0 if ad is None else ad.nbytes,
|
||||
_ptr(nonce),
|
||||
_ptr(key),
|
||||
)
|
||||
@@ -638,10 +698,13 @@ class Encryptor:
|
||||
"""
|
||||
if self._state is None:
|
||||
raise RuntimeError("Cannot call update() after final()")
|
||||
expected_out = len(message)
|
||||
message = memoryview(message)
|
||||
if into is not None:
|
||||
into = memoryview(into)
|
||||
expected_out = message.nbytes
|
||||
out = into if into is not None else bytearray(expected_out)
|
||||
out_mv = memoryview(out)
|
||||
if len(out_mv) < expected_out:
|
||||
if out_mv.nbytes < expected_out:
|
||||
raise TypeError(
|
||||
"into length must be >= expected output size for this update"
|
||||
)
|
||||
@@ -649,10 +712,10 @@ class Encryptor:
|
||||
rc = _lib.aegis256x2_state_encrypt_update(
|
||||
self._state.ptr,
|
||||
ffi.from_buffer(out_mv),
|
||||
len(out_mv),
|
||||
out_mv.nbytes,
|
||||
written,
|
||||
_ptr(message),
|
||||
len(message),
|
||||
message.nbytes,
|
||||
)
|
||||
if rc != 0:
|
||||
err_num = ffi.errno
|
||||
@@ -680,12 +743,14 @@ class Encryptor:
|
||||
raise RuntimeError("Cannot call final() after final()")
|
||||
maclen = self._maclen
|
||||
# Only the authentication tag is produced here; allocate exactly maclen
|
||||
if into is not None:
|
||||
into = memoryview(into)
|
||||
out = into if into is not None else bytearray(maclen)
|
||||
written = ffi.new("size_t *")
|
||||
rc = _lib.aegis256x2_state_encrypt_final(
|
||||
self._state.ptr,
|
||||
ffi.from_buffer(out),
|
||||
len(out),
|
||||
memoryview(out).nbytes,
|
||||
written,
|
||||
maclen,
|
||||
)
|
||||
@@ -730,15 +795,19 @@ class Decryptor:
|
||||
"""
|
||||
if maclen not in (16, 32):
|
||||
raise TypeError("maclen must be 16 or 32")
|
||||
if len(key) != KEYBYTES:
|
||||
key = memoryview(key)
|
||||
nonce = memoryview(nonce)
|
||||
if ad is not None:
|
||||
ad = memoryview(ad)
|
||||
if key.nbytes != KEYBYTES:
|
||||
raise TypeError(f"key length must be {KEYBYTES}")
|
||||
if len(nonce) != NONCEBYTES:
|
||||
if nonce.nbytes != NONCEBYTES:
|
||||
raise TypeError(f"nonce length must be {NONCEBYTES}")
|
||||
self._state = new_aligned_struct("aegis256x2_state", ALIGNMENT)
|
||||
_lib.aegis256x2_state_init(
|
||||
self._state.ptr,
|
||||
_ptr(ad) if ad is not None else ffi.NULL,
|
||||
0 if ad is None else len(ad),
|
||||
0 if ad is None else ad.nbytes,
|
||||
_ptr(nonce),
|
||||
_ptr(key),
|
||||
)
|
||||
@@ -760,26 +829,29 @@ class Decryptor:
|
||||
"""
|
||||
if self._state is None:
|
||||
raise RuntimeError("Cannot call update() after final()")
|
||||
expected_out = len(ct)
|
||||
ct = memoryview(ct)
|
||||
if into is not None:
|
||||
into = memoryview(into)
|
||||
expected_out = ct.nbytes
|
||||
out = into if into is not None else bytearray(expected_out)
|
||||
out_mv = memoryview(out)
|
||||
if len(out_mv) < expected_out:
|
||||
if out_mv.nbytes < expected_out:
|
||||
raise TypeError("into length must be >= required capacity for this update")
|
||||
written = ffi.new("size_t *")
|
||||
rc = _lib.aegis256x2_state_decrypt_detached_update(
|
||||
self._state.ptr,
|
||||
ffi.from_buffer(out_mv),
|
||||
len(out_mv),
|
||||
out_mv.nbytes,
|
||||
written,
|
||||
_ptr(ct),
|
||||
len(ct),
|
||||
ct.nbytes,
|
||||
)
|
||||
if rc != 0:
|
||||
err_num = ffi.errno
|
||||
err_name = errno.errorcode.get(err_num, f"errno_{err_num}")
|
||||
raise RuntimeError(f"state decrypt update failed: {err_name}")
|
||||
w = int(written[0])
|
||||
assert w == expected_out, f"got {w}, expected {expected_out}, len(ct)={len(ct)}"
|
||||
assert w == expected_out, f"got {w}, expected {expected_out}, ct.nbytes={ct.nbytes}"
|
||||
return out if into is None else memoryview(out)[:w] # type: ignore
|
||||
|
||||
def final(self, mac: Buffer) -> None:
|
||||
@@ -796,7 +868,8 @@ class Decryptor:
|
||||
if self._state is None:
|
||||
raise RuntimeError("Cannot call final() after final()")
|
||||
maclen = self._maclen
|
||||
if len(mac) != maclen:
|
||||
mac = memoryview(mac)
|
||||
if mac.nbytes != maclen:
|
||||
raise TypeError(f"mac length must be {maclen}")
|
||||
rc = _lib.aegis256x2_state_decrypt_detached_final(
|
||||
self._state.ptr, ffi.NULL, 0, ffi.NULL, _ptr(mac), maclen
|
||||
|
||||
@@ -74,21 +74,30 @@ def encrypt_detached(
|
||||
"""
|
||||
if maclen not in (16, 32):
|
||||
raise TypeError("maclen must be 16 or 32")
|
||||
if len(key) != KEYBYTES:
|
||||
key = memoryview(key)
|
||||
nonce = memoryview(nonce)
|
||||
message = memoryview(message)
|
||||
if ad is not None:
|
||||
ad = memoryview(ad)
|
||||
if ct_into is not None:
|
||||
ct_into = memoryview(ct_into)
|
||||
if mac_into is not None:
|
||||
mac_into = memoryview(mac_into)
|
||||
if key.nbytes != KEYBYTES:
|
||||
raise TypeError(f"key length must be {KEYBYTES}")
|
||||
if len(nonce) != NONCEBYTES:
|
||||
if nonce.nbytes != NONCEBYTES:
|
||||
raise TypeError(f"nonce length must be {NONCEBYTES}")
|
||||
|
||||
if ct_into is None:
|
||||
c = bytearray(len(message))
|
||||
c = bytearray(message.nbytes)
|
||||
else:
|
||||
if len(ct_into) < len(message):
|
||||
raise TypeError("ct_into length must be at least len(message)")
|
||||
if ct_into.nbytes < message.nbytes:
|
||||
raise TypeError("ct_into length must be at least message.nbytes")
|
||||
c = ct_into
|
||||
if mac_into is None:
|
||||
mac = bytearray(maclen)
|
||||
else:
|
||||
if len(mac_into) < maclen:
|
||||
if mac_into.nbytes < maclen:
|
||||
raise TypeError("mac_into length must be at least maclen")
|
||||
mac = mac_into
|
||||
|
||||
@@ -97,9 +106,9 @@ def encrypt_detached(
|
||||
ffi.from_buffer(mac),
|
||||
maclen,
|
||||
_ptr(message),
|
||||
len(message),
|
||||
message.nbytes,
|
||||
_ptr(ad),
|
||||
0 if ad is None else len(ad),
|
||||
0 if ad is None else ad.nbytes,
|
||||
_ptr(nonce),
|
||||
_ptr(key),
|
||||
)
|
||||
@@ -108,7 +117,7 @@ def encrypt_detached(
|
||||
err_name = errno.errorcode.get(err_num, f"errno_{err_num}")
|
||||
raise RuntimeError(f"encrypt detached failed: {err_name}")
|
||||
return (
|
||||
c if ct_into is None else memoryview(c)[: len(message)],
|
||||
c if ct_into is None else memoryview(c)[: message.nbytes],
|
||||
mac if mac_into is None else memoryview(mac)[:maclen],
|
||||
) # type: ignore
|
||||
|
||||
@@ -139,34 +148,42 @@ def decrypt_detached(
|
||||
TypeError: If lengths are invalid.
|
||||
ValueError: If authentication fails.
|
||||
"""
|
||||
if len(key) != KEYBYTES:
|
||||
key = memoryview(key)
|
||||
nonce = memoryview(nonce)
|
||||
ct = memoryview(ct)
|
||||
mac = memoryview(mac)
|
||||
if ad is not None:
|
||||
ad = memoryview(ad)
|
||||
if into is not None:
|
||||
into = memoryview(into)
|
||||
if key.nbytes != KEYBYTES:
|
||||
raise TypeError(f"key length must be {KEYBYTES}")
|
||||
if len(nonce) != NONCEBYTES:
|
||||
if nonce.nbytes != NONCEBYTES:
|
||||
raise TypeError(f"nonce length must be {NONCEBYTES}")
|
||||
maclen = len(mac)
|
||||
maclen = mac.nbytes
|
||||
if maclen not in (16, 32):
|
||||
raise TypeError("mac length must be 16 or 32")
|
||||
if into is None:
|
||||
out = bytearray(len(ct))
|
||||
out = bytearray(ct.nbytes)
|
||||
else:
|
||||
if len(into) < len(ct):
|
||||
raise TypeError("into length must be at least len(ciphertext)")
|
||||
if into.nbytes < ct.nbytes:
|
||||
raise TypeError("into length must be at least ct.nbytes")
|
||||
out = into
|
||||
|
||||
rc = _lib.aegis256x4_decrypt_detached(
|
||||
ffi.from_buffer(out),
|
||||
_ptr(ct),
|
||||
len(ct),
|
||||
ct.nbytes,
|
||||
_ptr(mac),
|
||||
maclen,
|
||||
_ptr(ad),
|
||||
0 if ad is None else len(ad),
|
||||
0 if ad is None else ad.nbytes,
|
||||
_ptr(nonce),
|
||||
_ptr(key),
|
||||
)
|
||||
if rc != 0:
|
||||
raise ValueError("authentication failed")
|
||||
return out if into is None else memoryview(out)[: len(ct)] # type: ignore
|
||||
return out if into is None else memoryview(out)[: ct.nbytes] # type: ignore
|
||||
|
||||
|
||||
def encrypt(
|
||||
@@ -197,24 +214,31 @@ def encrypt(
|
||||
"""
|
||||
if maclen not in (16, 32):
|
||||
raise TypeError("maclen must be 16 or 32")
|
||||
if len(key) != KEYBYTES:
|
||||
key = memoryview(key)
|
||||
nonce = memoryview(nonce)
|
||||
message = memoryview(message)
|
||||
if ad is not None:
|
||||
ad = memoryview(ad)
|
||||
if into is not None:
|
||||
into = memoryview(into)
|
||||
if key.nbytes != KEYBYTES:
|
||||
raise TypeError(f"key length must be {KEYBYTES}")
|
||||
if len(nonce) != NONCEBYTES:
|
||||
if nonce.nbytes != NONCEBYTES:
|
||||
raise TypeError(f"nonce length must be {NONCEBYTES}")
|
||||
if into is None:
|
||||
out = bytearray(len(message) + maclen)
|
||||
out = bytearray(message.nbytes + maclen)
|
||||
else:
|
||||
if len(into) < len(message) + maclen:
|
||||
raise TypeError("into length must be at least len(message)+maclen")
|
||||
if into.nbytes < message.nbytes + maclen:
|
||||
raise TypeError("into length must be at least message.nbytes + maclen")
|
||||
out = into
|
||||
|
||||
rc = _lib.aegis256x4_encrypt(
|
||||
ffi.from_buffer(out),
|
||||
maclen,
|
||||
_ptr(message),
|
||||
len(message),
|
||||
message.nbytes,
|
||||
_ptr(ad),
|
||||
0 if ad is None else len(ad),
|
||||
0 if ad is None else ad.nbytes,
|
||||
_ptr(nonce),
|
||||
_ptr(key),
|
||||
)
|
||||
@@ -222,7 +246,7 @@ def encrypt(
|
||||
err_num = ffi.errno
|
||||
err_name = errno.errorcode.get(err_num, f"errno_{err_num}")
|
||||
raise RuntimeError(f"encrypt failed: {err_name}")
|
||||
return out if into is None else memoryview(out)[: len(message) + maclen] # type: ignore
|
||||
return out if into is None else memoryview(out)[: message.nbytes + maclen] # type: ignore
|
||||
|
||||
|
||||
def decrypt(
|
||||
@@ -253,29 +277,36 @@ def decrypt(
|
||||
"""
|
||||
if maclen not in (16, 32):
|
||||
raise TypeError("maclen must be 16 or 32")
|
||||
if len(key) != KEYBYTES:
|
||||
key = memoryview(key)
|
||||
nonce = memoryview(nonce)
|
||||
ct = memoryview(ct)
|
||||
if ad is not None:
|
||||
ad = memoryview(ad)
|
||||
if into is not None:
|
||||
into = memoryview(into)
|
||||
if key.nbytes != KEYBYTES:
|
||||
raise TypeError(f"key length must be {KEYBYTES}")
|
||||
if len(nonce) != NONCEBYTES:
|
||||
if nonce.nbytes != NONCEBYTES:
|
||||
raise TypeError(f"nonce length must be {NONCEBYTES}")
|
||||
if len(ct) < maclen:
|
||||
if ct.nbytes < maclen:
|
||||
raise TypeError("ciphertext too short for tag")
|
||||
expected_out = len(ct) - maclen
|
||||
expected_out = ct.nbytes - maclen
|
||||
if into is None:
|
||||
out = bytearray(expected_out)
|
||||
else:
|
||||
if len(into) < expected_out:
|
||||
if into.nbytes < expected_out:
|
||||
raise TypeError(
|
||||
"into length must be at least len(ciphertext_with_tag)-maclen"
|
||||
"into length must be at least ct.nbytes - maclen"
|
||||
)
|
||||
out = into
|
||||
|
||||
rc = _lib.aegis256x4_decrypt(
|
||||
ffi.from_buffer(out),
|
||||
_ptr(ct),
|
||||
len(ct),
|
||||
ct.nbytes,
|
||||
maclen,
|
||||
_ptr(ad),
|
||||
0 if ad is None else len(ad),
|
||||
0 if ad is None else ad.nbytes,
|
||||
_ptr(nonce),
|
||||
_ptr(key),
|
||||
)
|
||||
@@ -305,25 +336,30 @@ def stream(
|
||||
Raises:
|
||||
TypeError: If lengths are invalid or neither length nor into provided.
|
||||
"""
|
||||
if len(key) != KEYBYTES:
|
||||
key = memoryview(key)
|
||||
if nonce is not None:
|
||||
nonce = memoryview(nonce)
|
||||
if into is not None:
|
||||
into = memoryview(into)
|
||||
if key.nbytes != KEYBYTES:
|
||||
raise TypeError(f"key length must be {KEYBYTES}")
|
||||
if nonce is not None and len(nonce) != NONCEBYTES:
|
||||
if nonce is not None and nonce.nbytes != NONCEBYTES:
|
||||
raise TypeError(f"nonce length must be {NONCEBYTES}")
|
||||
if into is None:
|
||||
if length is None:
|
||||
raise TypeError("provide either into or length")
|
||||
out = bytearray(length)
|
||||
else:
|
||||
if length is not None and len(into) < length:
|
||||
if length is not None and into.nbytes < length:
|
||||
raise TypeError("into length must be at least length")
|
||||
out = into
|
||||
_lib.aegis256x4_stream(
|
||||
ffi.from_buffer(out),
|
||||
len(out),
|
||||
memoryview(out).nbytes,
|
||||
_ptr(nonce),
|
||||
_ptr(key),
|
||||
)
|
||||
return out if into is None else memoryview(out)[: length or len(out)] # type: ignore
|
||||
return out if into is None else memoryview(out)[: length or memoryview(out).nbytes] # type: ignore
|
||||
|
||||
|
||||
def encrypt_unauthenticated(
|
||||
@@ -347,24 +383,29 @@ def encrypt_unauthenticated(
|
||||
Raises:
|
||||
TypeError: If lengths are invalid.
|
||||
"""
|
||||
if len(key) != KEYBYTES:
|
||||
key = memoryview(key)
|
||||
nonce = memoryview(nonce)
|
||||
message = memoryview(message)
|
||||
if into is not None:
|
||||
into = memoryview(into)
|
||||
if key.nbytes != KEYBYTES:
|
||||
raise TypeError(f"key length must be {KEYBYTES}")
|
||||
if len(nonce) != NONCEBYTES:
|
||||
if nonce.nbytes != NONCEBYTES:
|
||||
raise TypeError(f"nonce length must be {NONCEBYTES}")
|
||||
if into is None:
|
||||
out = bytearray(len(message))
|
||||
out = bytearray(message.nbytes)
|
||||
else:
|
||||
if len(into) < len(message):
|
||||
raise TypeError("into length must be at least len(message)")
|
||||
if into.nbytes < message.nbytes:
|
||||
raise TypeError("into length must be at least message.nbytes")
|
||||
out = into
|
||||
_lib.aegis256x4_encrypt_unauthenticated(
|
||||
ffi.from_buffer(out),
|
||||
_ptr(message),
|
||||
len(message),
|
||||
message.nbytes,
|
||||
_ptr(nonce),
|
||||
_ptr(key),
|
||||
)
|
||||
return out if into is None else memoryview(out)[: len(message)] # type: ignore
|
||||
return out if into is None else memoryview(out)[: message.nbytes] # type: ignore
|
||||
|
||||
|
||||
def decrypt_unauthenticated(
|
||||
@@ -388,24 +429,29 @@ def decrypt_unauthenticated(
|
||||
Raises:
|
||||
TypeError: If lengths are invalid.
|
||||
"""
|
||||
if len(key) != KEYBYTES:
|
||||
key = memoryview(key)
|
||||
nonce = memoryview(nonce)
|
||||
ct = memoryview(ct)
|
||||
if into is not None:
|
||||
into = memoryview(into)
|
||||
if key.nbytes != KEYBYTES:
|
||||
raise TypeError(f"key length must be {KEYBYTES}")
|
||||
if len(nonce) != NONCEBYTES:
|
||||
if nonce.nbytes != NONCEBYTES:
|
||||
raise TypeError(f"nonce length must be {NONCEBYTES}")
|
||||
if into is None:
|
||||
out = bytearray(len(ct))
|
||||
out = bytearray(ct.nbytes)
|
||||
else:
|
||||
if len(into) < len(ct):
|
||||
raise TypeError("into length must be at least len(ciphertext)")
|
||||
if into.nbytes < ct.nbytes:
|
||||
raise TypeError("into length must be at least ct.nbytes")
|
||||
out = into
|
||||
_lib.aegis256x4_decrypt_unauthenticated(
|
||||
ffi.from_buffer(out),
|
||||
_ptr(ct),
|
||||
len(ct),
|
||||
ct.nbytes,
|
||||
_ptr(nonce),
|
||||
_ptr(key),
|
||||
)
|
||||
return out if into is None else memoryview(out)[: len(ct)] # type: ignore
|
||||
return out if into is None else memoryview(out)[: ct.nbytes] # type: ignore
|
||||
|
||||
|
||||
# This is missing from C API but convenient to have here
|
||||
@@ -428,6 +474,11 @@ def mac(
|
||||
Returns:
|
||||
MAC bytes as bytearray if into not provided, memoryview of into otherwise
|
||||
"""
|
||||
key = memoryview(key)
|
||||
nonce = memoryview(nonce)
|
||||
data = memoryview(data)
|
||||
if into is not None:
|
||||
into = memoryview(into)
|
||||
mac_state = Mac(key, nonce, maclen)
|
||||
mac_state.update(data)
|
||||
return mac_state.final(into)
|
||||
@@ -459,9 +510,11 @@ class Mac:
|
||||
"""
|
||||
if maclen not in (16, 32):
|
||||
raise TypeError("maclen must be 16 or 32")
|
||||
if len(key) != KEYBYTES:
|
||||
key = memoryview(key)
|
||||
nonce = memoryview(nonce)
|
||||
if key.nbytes != KEYBYTES:
|
||||
raise TypeError(f"key length must be {KEYBYTES}")
|
||||
if len(nonce) != NONCEBYTES:
|
||||
if nonce.nbytes != NONCEBYTES:
|
||||
raise TypeError(f"nonce length must be {NONCEBYTES}")
|
||||
|
||||
self._maclen = maclen
|
||||
@@ -492,7 +545,8 @@ class Mac:
|
||||
"""
|
||||
if self._cached_digest is not None:
|
||||
raise RuntimeError("Cannot update after final()")
|
||||
rc = _lib.aegis256x4_mac_update(self._proxy.ptr, _ptr(data), len(data))
|
||||
data = memoryview(data)
|
||||
rc = _lib.aegis256x4_mac_update(self._proxy.ptr, _ptr(data), data.nbytes)
|
||||
if rc != 0:
|
||||
err_num = ffi.errno
|
||||
err_name = errno.errorcode.get(err_num, f"errno_{err_num}")
|
||||
@@ -522,12 +576,13 @@ class Mac:
|
||||
if into is None:
|
||||
out = bytearray(maclen)
|
||||
else:
|
||||
if len(into) < maclen:
|
||||
into = memoryview(into)
|
||||
if into.nbytes < maclen:
|
||||
raise TypeError("into length must be at least maclen")
|
||||
out = into
|
||||
|
||||
clone = self.clone()
|
||||
rc = _lib.aegis256x4_mac_final(clone._proxy.ptr, ffi.from_buffer(out), maclen)
|
||||
rc = _lib.aegis256x4_mac_final(clone._proxy.ptr, ffi.from_buffer(out), memoryview(out).nbytes)
|
||||
if rc != 0:
|
||||
err_num = ffi.errno
|
||||
err_name = errno.errorcode.get(err_num, f"errno_{err_num}")
|
||||
@@ -567,7 +622,8 @@ class Mac:
|
||||
TypeError: If tag length is invalid.
|
||||
ValueError: If verification fails.
|
||||
"""
|
||||
maclen = len(mac)
|
||||
mac = memoryview(mac)
|
||||
maclen = mac.nbytes
|
||||
if maclen not in (16, 32):
|
||||
raise TypeError("mac length must be 16 or 32")
|
||||
|
||||
@@ -606,15 +662,19 @@ class Encryptor:
|
||||
"""
|
||||
if maclen not in (16, 32):
|
||||
raise TypeError("maclen must be 16 or 32")
|
||||
if len(key) != KEYBYTES:
|
||||
key = memoryview(key)
|
||||
nonce = memoryview(nonce)
|
||||
if ad is not None:
|
||||
ad = memoryview(ad)
|
||||
if key.nbytes != KEYBYTES:
|
||||
raise TypeError(f"key length must be {KEYBYTES}")
|
||||
if len(nonce) != NONCEBYTES:
|
||||
if nonce.nbytes != NONCEBYTES:
|
||||
raise TypeError(f"nonce length must be {NONCEBYTES}")
|
||||
self._state = new_aligned_struct("aegis256x4_state", ALIGNMENT)
|
||||
_lib.aegis256x4_state_init(
|
||||
self._state.ptr,
|
||||
_ptr(ad) if ad is not None else ffi.NULL,
|
||||
0 if ad is None else len(ad),
|
||||
0 if ad is None else ad.nbytes,
|
||||
_ptr(nonce),
|
||||
_ptr(key),
|
||||
)
|
||||
@@ -638,10 +698,13 @@ class Encryptor:
|
||||
"""
|
||||
if self._state is None:
|
||||
raise RuntimeError("Cannot call update() after final()")
|
||||
expected_out = len(message)
|
||||
message = memoryview(message)
|
||||
if into is not None:
|
||||
into = memoryview(into)
|
||||
expected_out = message.nbytes
|
||||
out = into if into is not None else bytearray(expected_out)
|
||||
out_mv = memoryview(out)
|
||||
if len(out_mv) < expected_out:
|
||||
if out_mv.nbytes < expected_out:
|
||||
raise TypeError(
|
||||
"into length must be >= expected output size for this update"
|
||||
)
|
||||
@@ -649,10 +712,10 @@ class Encryptor:
|
||||
rc = _lib.aegis256x4_state_encrypt_update(
|
||||
self._state.ptr,
|
||||
ffi.from_buffer(out_mv),
|
||||
len(out_mv),
|
||||
out_mv.nbytes,
|
||||
written,
|
||||
_ptr(message),
|
||||
len(message),
|
||||
message.nbytes,
|
||||
)
|
||||
if rc != 0:
|
||||
err_num = ffi.errno
|
||||
@@ -680,12 +743,14 @@ class Encryptor:
|
||||
raise RuntimeError("Cannot call final() after final()")
|
||||
maclen = self._maclen
|
||||
# Only the authentication tag is produced here; allocate exactly maclen
|
||||
if into is not None:
|
||||
into = memoryview(into)
|
||||
out = into if into is not None else bytearray(maclen)
|
||||
written = ffi.new("size_t *")
|
||||
rc = _lib.aegis256x4_state_encrypt_final(
|
||||
self._state.ptr,
|
||||
ffi.from_buffer(out),
|
||||
len(out),
|
||||
memoryview(out).nbytes,
|
||||
written,
|
||||
maclen,
|
||||
)
|
||||
@@ -730,15 +795,19 @@ class Decryptor:
|
||||
"""
|
||||
if maclen not in (16, 32):
|
||||
raise TypeError("maclen must be 16 or 32")
|
||||
if len(key) != KEYBYTES:
|
||||
key = memoryview(key)
|
||||
nonce = memoryview(nonce)
|
||||
if ad is not None:
|
||||
ad = memoryview(ad)
|
||||
if key.nbytes != KEYBYTES:
|
||||
raise TypeError(f"key length must be {KEYBYTES}")
|
||||
if len(nonce) != NONCEBYTES:
|
||||
if nonce.nbytes != NONCEBYTES:
|
||||
raise TypeError(f"nonce length must be {NONCEBYTES}")
|
||||
self._state = new_aligned_struct("aegis256x4_state", ALIGNMENT)
|
||||
_lib.aegis256x4_state_init(
|
||||
self._state.ptr,
|
||||
_ptr(ad) if ad is not None else ffi.NULL,
|
||||
0 if ad is None else len(ad),
|
||||
0 if ad is None else ad.nbytes,
|
||||
_ptr(nonce),
|
||||
_ptr(key),
|
||||
)
|
||||
@@ -760,26 +829,29 @@ class Decryptor:
|
||||
"""
|
||||
if self._state is None:
|
||||
raise RuntimeError("Cannot call update() after final()")
|
||||
expected_out = len(ct)
|
||||
ct = memoryview(ct)
|
||||
if into is not None:
|
||||
into = memoryview(into)
|
||||
expected_out = ct.nbytes
|
||||
out = into if into is not None else bytearray(expected_out)
|
||||
out_mv = memoryview(out)
|
||||
if len(out_mv) < expected_out:
|
||||
if out_mv.nbytes < expected_out:
|
||||
raise TypeError("into length must be >= required capacity for this update")
|
||||
written = ffi.new("size_t *")
|
||||
rc = _lib.aegis256x4_state_decrypt_detached_update(
|
||||
self._state.ptr,
|
||||
ffi.from_buffer(out_mv),
|
||||
len(out_mv),
|
||||
out_mv.nbytes,
|
||||
written,
|
||||
_ptr(ct),
|
||||
len(ct),
|
||||
ct.nbytes,
|
||||
)
|
||||
if rc != 0:
|
||||
err_num = ffi.errno
|
||||
err_name = errno.errorcode.get(err_num, f"errno_{err_num}")
|
||||
raise RuntimeError(f"state decrypt update failed: {err_name}")
|
||||
w = int(written[0])
|
||||
assert w == expected_out, f"got {w}, expected {expected_out}, len(ct)={len(ct)}"
|
||||
assert w == expected_out, f"got {w}, expected {expected_out}, ct.nbytes={ct.nbytes}"
|
||||
return out if into is None else memoryview(out)[:w] # type: ignore
|
||||
|
||||
def final(self, mac: Buffer) -> None:
|
||||
@@ -796,7 +868,8 @@ class Decryptor:
|
||||
if self._state is None:
|
||||
raise RuntimeError("Cannot call final() after final()")
|
||||
maclen = self._maclen
|
||||
if len(mac) != maclen:
|
||||
mac = memoryview(mac)
|
||||
if mac.nbytes != maclen:
|
||||
raise TypeError(f"mac length must be {maclen}")
|
||||
rc = _lib.aegis256x4_state_decrypt_detached_final(
|
||||
self._state.ptr, ffi.NULL, 0, ffi.NULL, _ptr(mac), maclen
|
||||
|
||||
@@ -14,11 +14,10 @@ try:
|
||||
from collections.abc import Buffer as _Buffer # type: ignore[misc]
|
||||
|
||||
class Buffer(_Buffer, Protocol): # type: ignore[misc]
|
||||
def __len__(self) -> int: ...
|
||||
pass
|
||||
except ImportError:
|
||||
|
||||
class Buffer(Protocol):
|
||||
def __len__(self) -> int: ...
|
||||
def __buffer__(self, flags: int) -> memoryview: ...
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user