162 lines
4.6 KiB
Python
162 lines
4.6 KiB
Python
import hmac
|
|
import re
|
|
from time import time
|
|
from unicodedata import normalize
|
|
|
|
import argon2
|
|
import msgspec
|
|
from html5tagger import Document
|
|
from sanic import Blueprint, html, json, redirect
|
|
from sanic.exceptions import BadRequest, Forbidden, Unauthorized
|
|
|
|
from cista import config, session
|
|
|
|
_argon = argon2.PasswordHasher()
|
|
_droppyhash = re.compile(r"^([a-f0-9]{64})\$([a-f0-9]{8})$")
|
|
|
|
|
|
def _pwnorm(password):
|
|
return normalize("NFC", password).strip().encode()
|
|
|
|
|
|
def login(username: str, password: str):
|
|
un = _pwnorm(username)
|
|
pw = _pwnorm(password)
|
|
try:
|
|
u = config.config.users[un.decode()]
|
|
except KeyError:
|
|
raise ValueError("Invalid username") from None
|
|
# Verify password
|
|
need_rehash = False
|
|
if not u.hash:
|
|
raise ValueError("Account disabled")
|
|
if (m := _droppyhash.match(u.hash)) is not None:
|
|
h, s = m.groups()
|
|
h2 = hmac.digest(pw + s.encode() + un, b"", "sha256").hex()
|
|
if not hmac.compare_digest(h, h2):
|
|
raise ValueError("Invalid password")
|
|
# Droppy hashes are weak, do a hash update
|
|
need_rehash = True
|
|
else:
|
|
try:
|
|
_argon.verify(u.hash, pw)
|
|
except Exception:
|
|
raise ValueError("Invalid password") from None
|
|
if _argon.check_needs_rehash(u.hash):
|
|
need_rehash = True
|
|
# Login successful
|
|
if need_rehash:
|
|
set_password(u, password)
|
|
now = int(time())
|
|
u.lastSeen = now
|
|
return u
|
|
|
|
|
|
def set_password(user: config.User, password: str):
|
|
user.hash = _argon.hash(_pwnorm(password))
|
|
|
|
|
|
class LoginResponse(msgspec.Struct):
|
|
user: str = ""
|
|
privileged: bool = False
|
|
error: str = ""
|
|
|
|
|
|
def verify(request, *, privileged=False):
|
|
"""Raise Unauthorized or Forbidden if the request is not authorized"""
|
|
if privileged:
|
|
if request.ctx.user:
|
|
if request.ctx.user.privileged:
|
|
return
|
|
raise Forbidden("Access Forbidden: Only for privileged users")
|
|
elif config.config.public or request.ctx.user:
|
|
return
|
|
raise Unauthorized("Login required", "cookie")
|
|
|
|
|
|
bp = Blueprint("auth")
|
|
|
|
|
|
@bp.get("/login")
|
|
async def login_page(request):
|
|
doc = Document("Cista Login")
|
|
with doc.div(id="login"):
|
|
with doc.form(method="POST", autocomplete="on"):
|
|
doc.h1("Login")
|
|
doc.input(
|
|
name="username",
|
|
placeholder="Username",
|
|
autocomplete="username",
|
|
required=True,
|
|
).br
|
|
doc.input(
|
|
type="password",
|
|
name="password",
|
|
placeholder="Password",
|
|
autocomplete="current-password",
|
|
required=True,
|
|
).br
|
|
doc.input(type="submit", value="Login")
|
|
s = session.get(request)
|
|
if s:
|
|
name = s["username"]
|
|
with doc.form(method="POST", action="/logout"):
|
|
doc.input(type="submit", value=f"Logout {name}")
|
|
flash = request.cookies.message
|
|
if flash:
|
|
doc.dialog(
|
|
flash,
|
|
id="flash",
|
|
open=True,
|
|
style="position: fixed; top: 0; left: 0; width: 100%; opacity: .8",
|
|
)
|
|
res = html(doc)
|
|
if flash:
|
|
res.cookies.delete_cookie("flash")
|
|
if s is False:
|
|
session.delete(res)
|
|
return res
|
|
|
|
|
|
@bp.post("/login")
|
|
async def login_post(request):
|
|
try:
|
|
if request.headers.content_type == "application/json":
|
|
username = request.json["username"]
|
|
password = request.json["password"]
|
|
else:
|
|
username = request.form["username"][0]
|
|
password = request.form["password"][0]
|
|
if not username or not password:
|
|
raise KeyError
|
|
except KeyError:
|
|
raise BadRequest(
|
|
"Missing username or password",
|
|
context={"redirect": "/login"},
|
|
) from None
|
|
try:
|
|
user = login(username, password)
|
|
except ValueError as e:
|
|
raise Forbidden(str(e), context={"redirect": "/login"}) from e
|
|
|
|
if "text/html" in request.headers.accept:
|
|
res = redirect("/")
|
|
session.flash(res, "Logged in")
|
|
else:
|
|
res = json({"data": {"username": username, "privileged": user.privileged}})
|
|
session.create(res, username)
|
|
return res
|
|
|
|
|
|
@bp.post("/logout")
|
|
async def logout_post(request):
|
|
s = request.ctx.session
|
|
msg = "Logged out" if s else "Not logged in"
|
|
if "text/html" in request.headers.accept:
|
|
res = redirect("/login")
|
|
res.cookies.add_cookie("flash", msg, max_age=5)
|
|
else:
|
|
res = json({"message": msg})
|
|
session.delete(res)
|
|
return res
|