mirror of
https://github.com/markbeep/AudioBookRequest.git
synced 2026-02-19 13:08:52 -06:00
55 lines
1.4 KiB
Python
55 lines
1.4 KiB
Python
from typing import Annotated, Literal
|
|
|
|
from argon2 import PasswordHasher
|
|
from argon2.exceptions import VerifyMismatchError
|
|
from fastapi import Depends, HTTPException
|
|
from fastapi.security import HTTPBasic, HTTPBasicCredentials
|
|
from sqlmodel import Session, select
|
|
|
|
from app.db import get_session
|
|
from app.models import User
|
|
|
|
security = HTTPBasic()
|
|
ph = PasswordHasher()
|
|
|
|
|
|
def create_user(
|
|
username: str,
|
|
password: str,
|
|
group: Literal["admin", "trusted", "untrusted"] = "untrusted",
|
|
) -> User:
|
|
password_hash = ph.hash(password)
|
|
return User(username=username, password=password_hash, group=group)
|
|
|
|
|
|
def get_user(
|
|
session: Annotated[Session, Depends(get_session)],
|
|
credentials: Annotated[HTTPBasicCredentials, Depends(security)],
|
|
) -> User:
|
|
user = session.exec(
|
|
select(User).where(User.username == credentials.username)
|
|
).one_or_none()
|
|
|
|
if not user:
|
|
raise HTTPException(
|
|
status_code=401,
|
|
detail="Invalid credentials",
|
|
headers={"WWW-Authenticate": "Basic"},
|
|
)
|
|
|
|
try:
|
|
ph.verify(user.password, credentials.password)
|
|
except VerifyMismatchError:
|
|
raise HTTPException(
|
|
status_code=401,
|
|
detail="Invalid credentials",
|
|
headers={"WWW-Authenticate": "Basic"},
|
|
)
|
|
|
|
if ph.check_needs_rehash(user.password):
|
|
user.password = ph.hash(credentials.password)
|
|
session.add(user)
|
|
session.commit()
|
|
|
|
return user
|