Merge branch 'feature_34_api_endpoint'

This commit is contained in:
Colin Goutte 2023-08-26 23:47:01 +02:00
commit 6c60ffcfd2
10 changed files with 498 additions and 58 deletions

View File

@ -1,3 +1,5 @@
coverage_opt=--cov --cov-report=term-missing:skip-covered --durations=10
clean:
pipenv --rm
@ -12,7 +14,7 @@ run_dev:
git ls-files | entr -r pipenv run python dev.py
tdd:
git ls-files | entr make test opt='--lf --ff'
git ls-files | entr make test opt=$(opt)
git ls-files | entr make functionnal_tests
watch_db:
@ -20,7 +22,7 @@ watch_db:
test:
pipenv run pytest $(opt) utests
pipenv run pytest $(coverage_opt) $(opt) utests
functionnal_tests:
pipenv run python -m pytest functionnal_test.py
@ -43,7 +45,11 @@ venv_test:
make -f MakefileVenv test
db_clean:
rm sql_app.db
rm sql_app.db && touch sql_app.db
db_fill:
pipenv run python database.py
db_reset: db_clean db_fill

View File

@ -14,6 +14,7 @@ pytest = "*"
selenium = "*"
httpx = "*"
pdbpp = "*"
pytest-cov = "*"
[requires]
python_version = "3.11"

49
crud.py
View File

@ -1,12 +1,28 @@
import sqlalchemy
from sqlalchemy.orm import Session
import models
# import schemas
def create_movie(db: Session, name: str = ""):
db_movie = models.Movie(name=name)
def create_movie(
db: Session,
*,
title: str,
genres: list[str],
description: str = "",
vote_average: float | None = None,
vote_count: int | None = None,
release_date: str | None = None,
):
db_movie = models.Movie(
title=title,
genres=str(genres),
description=description,
vote_average=vote_average,
vote_count=vote_count,
release_date=release_date,
)
db.add(db_movie)
db.commit()
db.refresh(db_movie)
@ -28,5 +44,28 @@ def get_movie_by_id(db: Session, id_: str = ""):
id_ = int(id_)
except ValueError:
pass
db_movie = db.query(models.Movie).filter(models.Movie.id == id_)
return db_movie.one()
try:
db_movie = db.query(models.Movie).filter(models.Movie.id == id_).one()
except sqlalchemy.exc.NoResultFound:
raise LookupError
return db_movie
def delete_movie_by_id(db: Session, id_: str = ""):
movie = get_movie_by_id(db, id_)
db.delete(movie)
db.commit()
def update_movie(db: Session, id_: str, **payload):
movie = get_movie_by_id(db, id_)
for name, value in payload.items():
try:
movie.__mapper__.attrs[name]
except KeyError:
raise ValueError(f"Bad attribute {name}")
setattr(movie, name, value)
db.add(movie)
db.commit()
return movie

View File

@ -24,10 +24,13 @@ def fill_db():
import crud
import random
for _ in range(10):
def _genres():
random.choice([["Comedy"], ["Comedy", "Drama"], []])
for _ in range(3):
name = f"fill_db_{random.randint(1, 1000):03}"
out = crud.create_movie(SessionLocal(), name=name)
print(out.name)
out = crud.create_movie(SessionLocal(), title=name, genres=_genres())
print(out.title)
if __name__ == "__main__":

96
dev.py
View File

@ -1,4 +1,6 @@
from fastapi import FastAPI, Depends, Request
from fastapi import FastAPI, Depends, Request, HTTPException, status
from fastapi.exceptions import RequestValidationError
from fastapi.responses import JSONResponse
from sqlalchemy.orm import Session
@ -6,10 +8,27 @@ import uvicorn
import database
import models
import crud
import schemas
app = FastAPI()
convert_422_to_400 = True
if convert_422_to_400:
# Taken from there.
# https://stackoverflow.com/questions/75958222/can-i-return-400-error-instead-of-422-error
# https://stackoverflow.com/questions/71681068/how-to-customise-error-response-for-a-specific-route-in-fastapi
@app.exception_handler(RequestValidationError)
async def validation_exception_handler(
request: Request, exc: RequestValidationError
):
return JSONResponse(
status_code=status.HTTP_400_BAD_REQUEST,
content={"detail": exc.errors()},
)
# Dependency
def get_db():
@ -25,6 +44,13 @@ async def root():
return {"message": "Hello World"}
@app.post("/pydantic_movies/")
async def create_movie(payload: schemas.MoviePayload, db: Session = Depends(get_db)):
movie = crud.create_movie(db, **payload.dict())
out = {"message": f"Created {movie.title} XX", "id": movie.id}
return out
@app.post("/movies/")
async def create_movie(
name: str = "", db: Session = Depends(get_db), request: Request = None
@ -34,26 +60,68 @@ async def create_movie(
data = await request.json()
except:
data = {}
name = name or data["name"]
movie = models.Movie()
movie.name = name
db.add(movie)
db.flush()
db.commit()
db.refresh(movie)
out = {"message": f"Created {movie.name} XX", "id": movie.id}
crud_params = dict(
genres=data.get("genres", ["Unknown"]),
description=data.get("description", ""),
title=data.get("title", ""),
vote_average=data.get("vote_average"),
vote_count=data.get("vote_count"),
)
movie = crud.create_movie(db, **crud_params)
out = {"message": f"Created {movie.title} XX", "id": movie.id}
return out
@app.put("/movies/{id_}")
async def update_movie(
id_: str, db: Session = Depends(get_db), request: Request = None
):
try:
movie = crud.get_movie_by_id(db, id_)
except LookupError:
raise HTTPException(status_code=404, detail=f"No movie found with id {id_}")
try: # Bypass for dev
data = await request.json()
except:
data = {}
crud_params = dict(
genres=data.get("genres", ["Unknown"]),
description=data.get("description", ""),
title=data.get("title", ""),
vote_average=data.get("vote_average"),
vote_count=data.get("vote_count"),
)
movie = crud.update_movie(db, id_, **crud_params)
out = {k: v for (k, v) in movie.__dict__.items() if not k.startswith("_")}
return out
@app.get("/movies/{id_}")
async def get_movie(id_: str, db: Session = Depends(get_db)):
movie = crud.get_movie_by_id(db, id_)
out = {k: v for (k, v) in movie.__dict__.items() if not k.startswith("_")}
return out
try:
movie = crud.get_movie_by_id(db, id_)
out = {k: v for (k, v) in movie.__dict__.items() if not k.startswith("_")}
except LookupError:
raise HTTPException(status_code=404, detail=f"No movie found with id {id_}")
else:
return out
@app.delete("/movies/{id_}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_movie(id_: str, db: Session = Depends(get_db)):
try:
movie = crud.delete_movie_by_id(db, id_)
except LookupError:
raise HTTPException(status_code=404, detail=f"No movie found with id {id_}")
@app.get("/movies/")
async def create_movie(db: Session = Depends(get_db)):
async def list_movie(db: Session = Depends(get_db)):
movies = crud.get_all_movies(db)
out = [

View File

@ -1,6 +1,20 @@
from sqlalchemy import Column, ForeignKey, Integer, String
from sqlalchemy import Column, ForeignKey, Integer, String, Float, types
from database import Base
import sqlalchemy.types as types
class NaiveStringList(types.TypeDecorator):
impl = types.Unicode
sep = "\n"
def process_bind_param(self, value, dialect):
return self.sep.join(value)
def process_result_value(self, value, dialect):
return value.split(self.sep)
def copy(self, **kw):
return NaiveStringList(self.impl.length)
class Movie(Base):
@ -8,4 +22,13 @@ class Movie(Base):
id = Column(Integer, primary_key=True, index=True)
name = Column(String, index=True)
title = Column(String, index=True)
vote_count = Column(Integer)
vote_average = Column(Float)
genres = Column(NaiveStringList) # LLw
# genres = Column(ARRAY(String, dimensions=1)) # String array dimention 1
description = Column(String)
release_date = Column(String)

12
schemas.py Normal file
View File

@ -0,0 +1,12 @@
from pydantic import BaseModel
class MoviePayload(BaseModel):
title: str
vote_count: int
vote_average: float
genres: list[str]
description: str
release_date: str

297
utests/test_api.py Normal file
View File

@ -0,0 +1,297 @@
from fastapi.testclient import TestClient
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.pool import StaticPool
from sqlalchemy import MetaData
from database import Base
from dev import app, get_db
from models import Movie
import pytest
import crud
import contextlib
import random
import inspect
import unittest
SQLALCHEMY_DATABASE_URL = "sqlite:///:memory:"
engine = create_engine(
SQLALCHEMY_DATABASE_URL,
connect_args={"check_same_thread": False},
poolclass=StaticPool,
)
TestingSessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base.metadata.create_all(bind=engine)
def override_get_db():
try:
db = TestingSessionLocal()
yield db
finally:
db.close()
app.dependency_overrides[get_db] = override_get_db
client = TestClient(app)
def rand_name():
import sys
caller = sys._getframe(1).f_code.co_name
name = f"{caller}_{random.randint(1, 1000)}"
return name
class BaseCrud(unittest.TestCase):
def test_get_delete_movie_404_if_not_found(self):
response = client.get("/movies/-1")
assert response.status_code == 404
response_delete = client.delete("/movies/-1")
assert response_delete.status_code == 404
def test_create_movie_api(self):
name = f"rand_{random.randint(1, 1000)}"
response = client.post("/movies/", json={"title": name})
assert response.status_code == 200
movie_id = response.json()["id"]
assert f"Created {name}" in response.json()["message"]
response = client.get(f"/movies/{movie_id}")
assert response.json()["title"] == name
def test_delete_movie(self):
name = f"rand_{random.randint(1, 1000)}"
response = client.post("/movies/", json={"title": name})
movie_id = response.json()["id"]
created_movie_path = f"/movies/{movie_id}"
response_delete = client.delete(created_movie_path)
response_missing = client.get(created_movie_path)
assert response_delete.status_code == 204
assert response_missing.status_code == 404
def test_update_movie_api(self):
name = f"rand_{random.randint(1, 1000)}"
response = client.post("/movies/", json={"title": name, "genres": ["anime"]})
movie_id = response.json()["id"]
created_movie_path = f"/movies/{movie_id}"
new_name = name.replace("rand", "update")
new_genres = ["Drama", "war"]
response_get = client.get(f"/movies/{movie_id}")
assert response_get.json()["title"] != new_name
assert response_get.json()["genres"] != new_genres
response_update = client.put(
created_movie_path, json={"title": new_name, "genres": new_genres}
)
assert response_update.status_code == 200
response_get = client.get(f"/movies/{movie_id}")
assert response_get.json()["title"] == new_name
assert response_get.json()["genres"] == new_genres
def test_list_movies(self):
response = client.get("/movies/")
# assert response.json() == []
N = 10
names = []
for _ in range(N):
name = rand_name()
names.append(name)
response = client.post("/movies/", json={"title": name})
assert response.status_code == 200
movies = client.get("/movies/")
movies_by_title = {m["title"]: m for m in movies.json()}
found = list(movies_by_title[title] for title in names)
assert all(movies_by_title[title] for title in names)
class ApiTestCase(unittest.TestCase):
def test_payload_content_in_and_out_loopback(self):
be_the_fun_in_de_funes = {
"id": 1,
"title": "La Grande Vadrouille",
"description": "During World War II, two French civilians and a downed English Bomber Crew set "
"out from Paris to cross the demarcation line between Nazi-occupied Northern France and the "
"South. From there they will be able to escape to England. First, they must avoid German troops -"
"and the consequences of their own blunders.",
"genres": ["Comedy", "War"],
"release_date": "1966-12-07",
"vote_average": 7.7,
"vote_count": 1123,
}
domain_keys = sorted(
{k for k in be_the_fun_in_de_funes if k not in ["id"]}
) # Make it deterministic
non_primtive = ["genres", "release_date"]
payload = {k: be_the_fun_in_de_funes[k] for k in domain_keys}
# FIXME
response = client.post("/movies/", json=payload)
assert response.status_code == 200
movie_id = response.json()["id"]
loopback_fetch = client.get(f"/movies/{movie_id}")
assert loopback_fetch.status_code == 200
loopback_payload = loopback_fetch.json()
# check for keys
for attribute_name in domain_keys:
with self.subTest(attribute_name=attribute_name):
assert attribute_name in loopback_payload
if attribute_name not in non_primtive:
assert (
loopback_payload[attribute_name]
== be_the_fun_in_de_funes[attribute_name]
)
def test_payload_content_bad_format_status_code(self):
be_the_fun_in_de_funes = {
"id": 1,
"title": "La Grande Vadrouille",
"description": "During World War II, two French civilians and a downed English Bomber Crew set "
"out from Paris to cross the demarcation line between Nazi-occupied Northern France and the "
"South. From there they will be able to escape to England. First, they must avoid German troops -"
"and the consequences of their own blunders.",
"genres": ["Comedy", "War"],
"release_date": "1966-12-07",
"vote_average": 7.7,
"vote_count": 1123,
}
domain_keys = sorted(
{k for k in be_the_fun_in_de_funes if k not in ["id"]}
) # Make it deterministic
payload = {k: be_the_fun_in_de_funes[k] for k in domain_keys}
missing_key = "title"
payload.pop(missing_key)
response = client.post("/pydantic_movies/", json=payload)
assert 400 <= response.status_code < 500
assert missing_key in response.text
def test_payload_content_bad_format_detail(self):
be_the_fun_in_de_funes = {
"id": 1,
"title": "La Grande Vadrouille",
"description": "During World War II, two French civilians and a downed English Bomber Crew set "
"out from Paris to cross the demarcation line between Nazi-occupied Northern France and the "
"South. From there they will be able to escape to England. First, they must avoid German troops -"
"and the consequences of their own blunders.",
"genres": ["Comedy", "War"],
"release_date": "1966-12-07",
"vote_average": 7.7,
"vote_count": 1123,
}
domain_keys = sorted(
{k for k in be_the_fun_in_de_funes if k not in ["id"]}
) # Make it deterministic
payload = {k: be_the_fun_in_de_funes[k] for k in domain_keys}
missing_key = "title"
payload.pop(missing_key)
response = client.post("/pydantic_movies/", json=payload)
assert 400 <= response.status_code < 500
assert response.status_code == 400
def test_payload_content_in_and_out_loopback_pydantic(self):
be_the_fun_in_de_funes = {
"id": 1,
"title": "La Grande Vadrouille",
"description": "During World War II, two French civilians and a downed English Bomber Crew set "
"out from Paris to cross the demarcation line between Nazi-occupied Northern France and the "
"South. From there they will be able to escape to England. First, they must avoid German troops -"
"and the consequences of their own blunders.",
"genres": ["Comedy", "War"],
"release_date": "1966-12-07",
"vote_average": 7.7,
"vote_count": 1123,
}
domain_keys = sorted(
{k for k in be_the_fun_in_de_funes if k not in ["id"]}
) # Make it deterministic
non_primtive = ["genres", "release_date"]
payload = {k: be_the_fun_in_de_funes[k] for k in domain_keys}
# FIXME
response = client.post("/pydantic_movies/", json=payload)
assert response.status_code == 200
movie_id = response.json()["id"]
loopback_fetch = client.get(f"/movies/{movie_id}")
assert loopback_fetch.status_code == 200
loopback_payload = loopback_fetch.json()
# check for keys
for attribute_name in domain_keys:
with self.subTest(attribute_name=attribute_name):
assert attribute_name in loopback_payload
if attribute_name not in non_primtive:
assert (
loopback_payload[attribute_name]
== be_the_fun_in_de_funes[attribute_name]
)
@unittest.expectedFailure
def test_payload_content_in_and_out_loopback_non_primitive(self):
be_the_fun_in_de_funes = {
"id": 1,
"title": "La Grande Vadrouille",
"description": "During World War II, two French civilians and a downed English Bomber Crew set "
"out from Paris to cross the demarcation line between Nazi-occupied Northern France and the "
"South. From there they will be able to escape to England. First, they must avoid German troops -"
"and the consequences of their own blunders.",
"genres": ["Comedy", "War"],
"release_date": "1966-12-07",
"vote_average": 7.7,
"vote_count": 1123,
}
domain_keys = sorted(
{k for k in be_the_fun_in_de_funes if k not in ["id"]}
) # Make it deterministic
non_primtive = ["genres", "release_date"]
payload = {k: be_the_fun_in_de_funes[k] for k in domain_keys}
response = client.post("/movies/", json=payload)
assert response.status_code == 200
movie_id = response.json()["id"]
loopback_fetch = client.get(f"/movies/{movie_id}")
assert loopback_fetch.status_code == 200
loopback_payload = loopback_fetch.json()
# check for keys
for attribute_name in domain_keys:
assert (
loopback_payload[attribute_name]
== be_the_fun_in_de_funes[attribute_name]
)

View File

@ -28,14 +28,7 @@ TestingSessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engin
Base.metadata.create_all(bind=engine)
def clear_db():
# Make this a more generic functional tool for test
meta = MetaData()
with contextlib.closing(engine.connect()) as con:
trans = con.begin()
for table in reversed(meta.sorted_tables):
con.execute(table.delete())
trans.commit()
client = TestClient(app)
def override_get_db():
@ -47,16 +40,24 @@ def override_get_db():
db.close()
app.dependency_overrides[get_db] = override_get_db
def clear_db():
# Make this a more generic functional tool for test
meta = MetaData()
with contextlib.closing(engine.connect()) as con:
trans = con.begin()
for table in reversed(meta.sorted_tables):
con.execute(table.delete())
trans.commit()
@contextlib.contextmanager
def db_context():
yield from override_get_db()
app.dependency_overrides[get_db] = override_get_db
client = TestClient(app)
def rand_name():
import sys
@ -67,16 +68,16 @@ def rand_name():
def test_create_moviem_models():
name = rand_name()
movie = Movie(name=name)
assert movie.name == name
movie = Movie(title=name)
assert movie.title == name
def test_sample_crud():
name = rand_name()
with db_context() as db:
movie = crud.create_movie(db, name=name)
assert movie.name == name
movie = crud.create_movie(db, title=name, genres=["Yes", "No"])
assert movie.title == name
def test_list_movies():
@ -91,18 +92,8 @@ def test_list_movies():
name = rand_name()
names.append(name)
crud.create_movie(db, name=name)
crud.create_movie(db, title=name, genres=["Animated", "Paropaganda"])
movies = client.get("movies")
movies_by_name = {m["name"]: m for m in movies.json()}
assert all(movies_by_name[name] for name in names)
def test_create_movie_api():
name = f"rand_{random.randint(1, 1000)}"
response = client.post("/movies/", json={"name": name})
assert response.status_code == 200
movie_id = response.json()["id"]
assert f"Created {name}" in response.json()["message"]
response = client.get(f"/movies/{movie_id}")
assert response.json()["name"] == name
movies_by_title = {m["title"]: m for m in movies.json()}
assert all(movies_by_title[name] for name in names)

0
utests/vadrouille.json Normal file
View File