cista-storage/cista/auth.py

122 lines
3.8 KiB
Python
Raw Normal View History

import hmac
import re
from time import time
from unicodedata import normalize
import argon2
import msgspec
from html5tagger import Document
from sanic import BadRequest, Blueprint, Forbidden, html, json, redirect
2023-10-21 17:17:09 +01:00
from cista import config, session
_argon = argon2.PasswordHasher()
_droppyhash = re.compile(r'^([a-f0-9]{64})\$([a-f0-9]{8})$')
def _pwnorm(password):
return normalize('NFC', password).strip().encode()
def login(username: str, password: str):
un = _pwnorm(username)
pw = _pwnorm(password)
try:
u = config.config.users[un.decode()]
except KeyError:
raise ValueError("Invalid username")
# Verify password
need_rehash = False
if not u.hash:
raise ValueError("Account disabled")
if (m := _droppyhash.match(u.hash)) is not None:
h, s = m.groups()
h2 = hmac.digest(pw + s.encode() + un, b"", "sha256").hex()
if not hmac.compare_digest(h, h2):
raise ValueError("Invalid password")
# Droppy hashes are weak, do a hash update
need_rehash = True
else:
try:
_argon.verify(u.hash, pw)
except Exception:
raise ValueError("Invalid password")
if _argon.check_needs_rehash(u.hash):
need_rehash = True
# Login successful
if need_rehash:
set_password(u, password)
now = int(time())
u.lastSeen = now
return u
def set_password(user: config.User, password: str):
user.hash = _argon.hash(_pwnorm(password))
class LoginResponse(msgspec.Struct):
user: str = ""
privileged: bool = False
error: str = ""
authbp = Blueprint("auth")
@authbp.get("/login")
async def login_page(request):
doc = Document("Cista Login")
with doc.div(id="login"):
with doc.form(method="POST", autocomplete="on"):
doc.h1("Login")
doc.input(name="username", placeholder="Username", autocomplete="username", required=True).br
doc.input(type="password", name="password", placeholder="Password", autocomplete="current-password", required=True).br
doc.input(type="submit", value="Login")
s = session.get(request)
if s:
name = s["username"]
with doc.form(method="POST", action="/logout"):
doc.input(type="submit", value=f"Logout {name}")
flash = request.cookies.message
if flash:
doc.dialog(flash, id="flash", open=True, style="position: fixed; top: 0; left: 0; width: 100%; opacity: .8")
res = html(doc)
if flash:
res.cookies.delete_cookie("flash")
if s is False:
session.delete(res)
return res
@authbp.post("/login")
async def login_post(request):
try:
if request.headers.content_type == "application/json":
username = request.json["username"]
password = request.json["password"]
else:
username = request.form["username"][0]
password = request.form["password"][0]
if not username or not password:
raise KeyError
except KeyError:
raise BadRequest("Missing username or password", context={"redirect": "/login"})
try:
user = login(username, password)
except ValueError as e:
raise Forbidden(str(e), context={"redirect": "/login"})
if "text/html" in request.headers.accept:
res = redirect("/")
session.flash(res, "Logged in")
else:
res = json({"data": {"username": username, "privileged": user.privileged}})
session.create(res, username)
return res
@authbp.post("/logout")
async def logout_post(request):
s = request.ctx.session
msg = "Logged out" if s else "Not logged in"
if "text/html" in request.headers.accept:
res = redirect("/login")
res.cookies.add_cookie("flash", msg, max_age=5)
else:
res = json({"message": msg})
session.delete(res)
return res