🩹 fixed bearer token authentication

This commit is contained in:
Marc Koch 2025-03-24 13:08:35 +01:00
parent 9dc0f99cf1
commit 878af4465a
Signed by: marc.koch
GPG key ID: 12406554CFB028B9
2 changed files with 21 additions and 22 deletions

View file

@ -1,5 +1,4 @@
username = "apiuser" token = 'topsecret_token'
password = "topsecret"
domain = "lists.example.org" domain = "lists.example.org"

40
main.py
View file

@ -1,6 +1,6 @@
from typing import Annotated from typing import Annotated
from fastapi import Depends, FastAPI from fastapi import Depends, FastAPI, HTTPException
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
import hsadmin import hsadmin
import config import config
@ -8,25 +8,20 @@ import config
app = FastAPI() app = FastAPI()
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token") security = HTTPBearer()
@app.get("/")
def root():
return {"Hello": "World"};
@app.post("/token")
async def login(form_data: Annotated[OAuth2PasswordRequestForm, Depends()]):
if not config.username == form_data.username:
raise HTTPException(status_code=400, detail="Incorrect username or password")
if not config.password == form_data.password:
raise HTTPException(status_code=400, detail="Incorrect username or password")
return {"access_token": form_data.username, "token_type": "bearer"}
def check_token(token: str) -> bool:
"""Check if the token is valid"""
return token == config.token
@app.put("/list/{listname}") @app.put("/list/{listname}")
def create_list(token: Annotated[str, Depends(oauth2_scheme)], listname: str): def create_list(token: Annotated[HTTPAuthorizationCredentials, Depends(security)], listname: str):
# Authenticate
if not check_token(token.credentials):
raise HTTPException(status_code=401, detail="Invalid token")
# Create email
api = hsadmin.login() api = hsadmin.login()
if hsadmin.email_exists(api, config.domain, listname): if hsadmin.email_exists(api, config.domain, listname):
return {"Success": "false", "Message": "list already exists"} return {"Success": "false", "Message": "list already exists"}
@ -36,7 +31,13 @@ def create_list(token: Annotated[str, Depends(oauth2_scheme)], listname: str):
return {"Success": "false", "Message": result} return {"Success": "false", "Message": result}
@app.delete("/list/{listname}") @app.delete("/list/{listname}")
def delete_list(token: Annotated[str, Depends(oauth2_scheme)], listname: str): def delete_list(token: Annotated[HTTPAuthorizationCredentials, Depends(security)], listname: str):
# Authenticate
if not check_token(token.credentials):
raise HTTPException(status_code=401, detail="Invalid token")
# Delete email
api = hsadmin.login() api = hsadmin.login()
if not hsadmin.email_exists(api, config.domain, listname): if not hsadmin.email_exists(api, config.domain, listname):
return {"Success": "false", "Message": "list does not exist"} return {"Success": "false", "Message": "list does not exist"}
@ -44,4 +45,3 @@ def delete_list(token: Annotated[str, Depends(oauth2_scheme)], listname: str):
if result == True: if result == True:
return {"Success": "true"} return {"Success": "true"}
return {"Success": "false", "Message": result} return {"Success": "false", "Message": result}