diff --git a/config-sample.py b/config-sample.py index e978954..29bb45e 100644 --- a/config-sample.py +++ b/config-sample.py @@ -1,5 +1,4 @@ -username = "apiuser" -password = "topsecret" +token = 'topsecret_token' domain = "lists.example.org" diff --git a/main.py b/main.py index 4441411..7f6bb3c 100644 --- a/main.py +++ b/main.py @@ -1,6 +1,6 @@ from typing import Annotated -from fastapi import Depends, FastAPI -from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm +from fastapi import Depends, FastAPI, HTTPException +from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer import hsadmin import config @@ -8,25 +8,20 @@ import config app = FastAPI() -oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token") - -@app.get("/") -def root(): - return {"Hello": "World"}; - -@app.post("/token") -async def login(form_data: Annotated[OAuth2PasswordRequestForm, Depends()]): - - if not config.username == form_data.username: - raise HTTPException(status_code=400, detail="Incorrect username or password") - if not config.password == form_data.password: - raise HTTPException(status_code=400, detail="Incorrect username or password") - - return {"access_token": form_data.username, "token_type": "bearer"} +security = HTTPBearer() +def check_token(token: str) -> bool: + """Check if the token is valid""" + return token == config.token @app.put("/list/{listname}") -def create_list(token: Annotated[str, Depends(oauth2_scheme)], listname: str): +def create_list(token: Annotated[HTTPAuthorizationCredentials, Depends(security)], listname: str): + + # Authenticate + if not check_token(token.credentials): + raise HTTPException(status_code=401, detail="Invalid token") + + # Create email api = hsadmin.login() if hsadmin.email_exists(api, config.domain, listname): return {"Success": "false", "Message": "list already exists"} @@ -36,7 +31,13 @@ def create_list(token: Annotated[str, Depends(oauth2_scheme)], listname: str): return {"Success": "false", "Message": result} @app.delete("/list/{listname}") -def delete_list(token: Annotated[str, Depends(oauth2_scheme)], listname: str): +def delete_list(token: Annotated[HTTPAuthorizationCredentials, Depends(security)], listname: str): + + # Authenticate + if not check_token(token.credentials): + raise HTTPException(status_code=401, detail="Invalid token") + + # Delete email api = hsadmin.login() if not hsadmin.email_exists(api, config.domain, listname): return {"Success": "false", "Message": "list does not exist"} @@ -44,4 +45,3 @@ def delete_list(token: Annotated[str, Depends(oauth2_scheme)], listname: str): if result == True: return {"Success": "true"} return {"Success": "false", "Message": result} -