from fastapi import FastAPI, Depends, Request, HTTPException, status from fastapi.exceptions import RequestValidationError from fastapi.responses import JSONResponse from sqlalchemy.orm import Session import uvicorn import database import models import crud import schemas app = FastAPI() convert_422_to_400 = True if convert_422_to_400: # Taken from there. # https://stackoverflow.com/questions/75958222/can-i-return-400-error-instead-of-422-error # https://stackoverflow.com/questions/71681068/how-to-customise-error-response-for-a-specific-route-in-fastapi @app.exception_handler(RequestValidationError) async def validation_exception_handler( request: Request, exc: RequestValidationError ): return JSONResponse( status_code=status.HTTP_400_BAD_REQUEST, content={"detail": exc.errors()}, ) # Dependency def get_db(): db = database.SessionLocal() try: yield db finally: db.close() @app.get("/") async def root(): return {"message": "Hello World"} @app.post("/movies/") async def create_movie(payload: schemas.MoviePayload, db: Session = Depends(get_db)): movie = crud.create_movie(db, **payload.dict()) out = {"message": f"Created {movie.title} XX", "id": movie.id} return out @app.put("/movies/{id_}") async def update_movie( id_: str, payload: schemas.MoviePayload, db: Session = Depends(get_db), request: Request = None, ) -> schemas.MovieObject: try: movie = crud.get_movie_by_id(db, id_) except LookupError: raise HTTPException(status_code=404, detail=f"No movie found with id {id_}") crud_params = payload.dict() movie = crud.update_movie(db, id_, **crud_params) return movie @app.patch("/movies/{id_}") async def patch_movie( id_: str, db: Session = Depends(get_db), request: Request = None ) -> schemas.MovieObject: try: movie = crud.get_movie_by_id(db, id_) except LookupError: raise HTTPException(status_code=404, detail=f"No movie found with id {id_}") try: # Bypass for dev data = await request.json() except: data = {} crud_params = dict( genres=data.get("genres", ["Unknown"]), description=data.get("description", ""), title=data.get("title", ""), vote_average=data.get("vote_average"), vote_count=data.get("vote_count"), ) movie = crud.update_movie(db, id_, **crud_params) return movie @app.get("/movies/{id_}") async def get_movie(id_: str, db: Session = Depends(get_db)) -> schemas.MovieObject: try: movie = crud.get_movie_by_id(db, id_) except LookupError: raise HTTPException(status_code=404, detail=f"No movie found with id {id_}") else: return movie @app.delete("/movies/{id_}", status_code=status.HTTP_204_NO_CONTENT) async def delete_movie(id_: str, db: Session = Depends(get_db)) -> None: try: movie = crud.delete_movie_by_id(db, id_) except LookupError: raise HTTPException(status_code=404, detail=f"No movie found with id {id_}") @app.get("/movies/") async def list_movie( db: Session = Depends(get_db), pagenum: int | None = None, pagesize: int | None = None, ) -> schemas.PaginatedMovies | schemas.MovieObjectsOut: paginate_params = {} paginate_data = {} pagination_params = {"pagenum": pagenum, "pagesize": pagesize} if any(v for v in pagination_params.values() if v is not None): missing = [name for (name, value) in pagination_params.items() if not value] if missing: raise HTTPException(status_code=404, detail=f"Missing {missing}") # Here we do a "x + 1 - 1 = x" trick to check if there will be more pages # eg we want from 10 to 15, we ask for 10 to 16, if we have *stricly* more # than 5 element we can now that there will be one more page paginate_params = dict(offset=(pagenum - 1) * pagesize, limit=pagesize + 1) movies = crud.get_all_movies(db, **paginate_params) if paginate_params: has_more_content = movies.count() > pagesize paginate_data = { "next_page": pagenum + 1 if has_more_content else None, "previous_page": pagenum - 1 if pagenum > 1 else None, } movies = movies.limit(pagesize) count = movies.count() payload = {"movies": movies, "count": count} return {**payload, **paginate_data} if __name__ == "__main__": database.create_db() uvicorn.run(app, host="127.0.0.1", port=5000)