Merge branch 'refactor_pydantic'
This commit is contained in:
commit
a08c18d51d
6
Makefile
6
Makefile
|
@ -14,9 +14,13 @@ run_dev:
|
|||
git ls-files | entr -r pipenv run python dev.py
|
||||
|
||||
tdd:
|
||||
git ls-files | entr make test opt=$(opt)
|
||||
git ls-files | entr make test opt='$(opt)'
|
||||
git ls-files | entr make functionnal_tests
|
||||
|
||||
refactor_tdd:
|
||||
make tdd opt="--pdb --ff --lf --ff -x"
|
||||
|
||||
|
||||
watch_db:
|
||||
watch "sqlite3 sql_app.db 'select * from movies'"
|
||||
|
||||
|
|
|
@ -25,7 +25,7 @@ def fill_db():
|
|||
import random
|
||||
|
||||
def _genres():
|
||||
random.choice([["Comedy"], ["Comedy", "Drama"], []])
|
||||
return random.choice(["Comedy"], ["Comedy", "Drama"], [])
|
||||
|
||||
for _ in range(3):
|
||||
name = f"fill_db_{random.randint(1, 1000):03}"
|
||||
|
|
65
dev.py
65
dev.py
|
@ -44,39 +44,35 @@ async def root():
|
|||
return {"message": "Hello World"}
|
||||
|
||||
|
||||
@app.post("/pydantic_movies/")
|
||||
@app.post("/movies/")
|
||||
async def create_movie(payload: schemas.MoviePayload, db: Session = Depends(get_db)):
|
||||
movie = crud.create_movie(db, **payload.dict())
|
||||
out = {"message": f"Created {movie.title} XX", "id": movie.id}
|
||||
return out
|
||||
|
||||
|
||||
@app.post("/movies/")
|
||||
async def create_movie(
|
||||
name: str = "", db: Session = Depends(get_db), request: Request = None
|
||||
):
|
||||
out = {}
|
||||
try: # Bypass for dev
|
||||
data = await request.json()
|
||||
except:
|
||||
data = {}
|
||||
crud_params = dict(
|
||||
genres=data.get("genres", ["Unknown"]),
|
||||
description=data.get("description", ""),
|
||||
title=data.get("title", ""),
|
||||
vote_average=data.get("vote_average"),
|
||||
vote_count=data.get("vote_count"),
|
||||
)
|
||||
|
||||
movie = crud.create_movie(db, **crud_params)
|
||||
out = {"message": f"Created {movie.title} XX", "id": movie.id}
|
||||
return out
|
||||
|
||||
|
||||
@app.put("/movies/{id_}")
|
||||
async def update_movie(
|
||||
id_: str,
|
||||
payload: schemas.MoviePayload,
|
||||
db: Session = Depends(get_db),
|
||||
request: Request = None,
|
||||
) -> schemas.MovieObject:
|
||||
try:
|
||||
movie = crud.get_movie_by_id(db, id_)
|
||||
except LookupError:
|
||||
raise HTTPException(status_code=404, detail=f"No movie found with id {id_}")
|
||||
|
||||
crud_params = payload.dict()
|
||||
movie = crud.update_movie(db, id_, **crud_params)
|
||||
|
||||
return movie
|
||||
|
||||
|
||||
@app.patch("/movies/{id_}")
|
||||
async def patch_movie(
|
||||
id_: str, db: Session = Depends(get_db), request: Request = None
|
||||
):
|
||||
) -> schemas.MovieObject:
|
||||
try:
|
||||
movie = crud.get_movie_by_id(db, id_)
|
||||
except LookupError:
|
||||
|
@ -96,24 +92,21 @@ async def update_movie(
|
|||
|
||||
movie = crud.update_movie(db, id_, **crud_params)
|
||||
|
||||
out = {k: v for (k, v) in movie.__dict__.items() if not k.startswith("_")}
|
||||
return out
|
||||
return movie
|
||||
|
||||
|
||||
@app.get("/movies/{id_}")
|
||||
async def get_movie(id_: str, db: Session = Depends(get_db)):
|
||||
async def get_movie(id_: str, db: Session = Depends(get_db)) -> schemas.MovieObject:
|
||||
try:
|
||||
movie = crud.get_movie_by_id(db, id_)
|
||||
|
||||
out = {k: v for (k, v) in movie.__dict__.items() if not k.startswith("_")}
|
||||
except LookupError:
|
||||
raise HTTPException(status_code=404, detail=f"No movie found with id {id_}")
|
||||
else:
|
||||
return out
|
||||
return movie
|
||||
|
||||
|
||||
@app.delete("/movies/{id_}", status_code=status.HTTP_204_NO_CONTENT)
|
||||
async def delete_movie(id_: str, db: Session = Depends(get_db)):
|
||||
async def delete_movie(id_: str, db: Session = Depends(get_db)) -> None:
|
||||
try:
|
||||
movie = crud.delete_movie_by_id(db, id_)
|
||||
except LookupError:
|
||||
|
@ -121,14 +114,8 @@ async def delete_movie(id_: str, db: Session = Depends(get_db)):
|
|||
|
||||
|
||||
@app.get("/movies/")
|
||||
async def list_movie(db: Session = Depends(get_db)):
|
||||
movies = crud.get_all_movies(db)
|
||||
|
||||
out = [
|
||||
{k: v for (k, v) in movie.__dict__.items() if not k.startswith("_")}
|
||||
for movie in movies
|
||||
]
|
||||
return out
|
||||
async def list_movie(db: Session = Depends(get_db)) -> list[schemas.MovieObject]:
|
||||
return crud.get_all_movies(db)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
|
14
schemas.py
14
schemas.py
|
@ -4,9 +4,13 @@ from pydantic import BaseModel
|
|||
class MoviePayload(BaseModel):
|
||||
title: str
|
||||
|
||||
vote_count: int
|
||||
vote_average: float
|
||||
vote_count: int | None = 0
|
||||
vote_average: float | None = None
|
||||
|
||||
genres: list[str]
|
||||
description: str
|
||||
release_date: str
|
||||
genres: list[str] = []
|
||||
description: str = ""
|
||||
release_date: str | None = None # Use custom formatted string validation
|
||||
|
||||
|
||||
class MovieObject(MoviePayload):
|
||||
id: int | str
|
||||
|
|
|
@ -51,6 +51,12 @@ def rand_name():
|
|||
|
||||
|
||||
class BaseCrud(unittest.TestCase):
|
||||
def setUp(self):
|
||||
name = f"rand_{random.randint(1, 1000)}"
|
||||
self.create_payload = {
|
||||
"title": name,
|
||||
}
|
||||
|
||||
def test_get_delete_movie_404_if_not_found(self):
|
||||
response = client.get("/movies/-1")
|
||||
assert response.status_code == 404
|
||||
|
@ -58,18 +64,16 @@ class BaseCrud(unittest.TestCase):
|
|||
assert response_delete.status_code == 404
|
||||
|
||||
def test_create_movie_api(self):
|
||||
name = f"rand_{random.randint(1, 1000)}"
|
||||
response = client.post("/movies/", json={"title": name})
|
||||
response = client.post("/movies/", json=self.create_payload)
|
||||
|
||||
assert response.status_code == 200
|
||||
movie_id = response.json()["id"]
|
||||
assert f"Created {name}" in response.json()["message"]
|
||||
assert f"Created {self.create_payload['title']}" in response.json()["message"]
|
||||
response = client.get(f"/movies/{movie_id}")
|
||||
assert response.json()["title"] == name
|
||||
assert response.json()["title"] == self.create_payload["title"]
|
||||
|
||||
def test_delete_movie(self):
|
||||
name = f"rand_{random.randint(1, 1000)}"
|
||||
response = client.post("/movies/", json={"title": name})
|
||||
response = client.post("/movies/", json=self.create_payload)
|
||||
movie_id = response.json()["id"]
|
||||
created_movie_path = f"/movies/{movie_id}"
|
||||
|
||||
|
@ -81,12 +85,14 @@ class BaseCrud(unittest.TestCase):
|
|||
assert response_missing.status_code == 404
|
||||
|
||||
def test_update_movie_api(self):
|
||||
name = f"rand_{random.randint(1, 1000)}"
|
||||
response = client.post("/movies/", json={"title": name, "genres": ["anime"]})
|
||||
self.create_payload["genres"] = ["anime"]
|
||||
|
||||
self.create_payload["description"] = legacy_description = "will be deleted"
|
||||
response = client.post("/movies/", json=self.create_payload)
|
||||
movie_id = response.json()["id"]
|
||||
created_movie_path = f"/movies/{movie_id}"
|
||||
|
||||
new_name = name.replace("rand", "update")
|
||||
new_name = self.create_payload["title"].replace("rand", "update")
|
||||
new_genres = ["Drama", "war"]
|
||||
response_get = client.get(f"/movies/{movie_id}")
|
||||
|
||||
|
@ -101,9 +107,11 @@ class BaseCrud(unittest.TestCase):
|
|||
response_get = client.get(f"/movies/{movie_id}")
|
||||
assert response_get.json()["title"] == new_name
|
||||
assert response_get.json()["genres"] == new_genres
|
||||
assert response_get.json()["description"] == ""
|
||||
|
||||
def test_list_movies(self):
|
||||
response = client.get("/movies/")
|
||||
assert response.status_code == 200
|
||||
# assert response.json() == []
|
||||
|
||||
N = 10
|
||||
|
@ -112,7 +120,8 @@ class BaseCrud(unittest.TestCase):
|
|||
name = rand_name()
|
||||
|
||||
names.append(name)
|
||||
response = client.post("/movies/", json={"title": name})
|
||||
self.create_payload["title"] = name
|
||||
response = client.post("/movies/", json=self.create_payload)
|
||||
assert response.status_code == 200
|
||||
|
||||
movies = client.get("/movies/")
|
||||
|
@ -185,7 +194,7 @@ class ApiTestCase(unittest.TestCase):
|
|||
|
||||
payload.pop(missing_key)
|
||||
|
||||
response = client.post("/pydantic_movies/", json=payload)
|
||||
response = client.post("/movies/", json=payload)
|
||||
|
||||
assert 400 <= response.status_code < 500
|
||||
|
||||
|
@ -215,13 +224,13 @@ class ApiTestCase(unittest.TestCase):
|
|||
|
||||
payload.pop(missing_key)
|
||||
|
||||
response = client.post("/pydantic_movies/", json=payload)
|
||||
response = client.post("/movies/", json=payload)
|
||||
|
||||
assert 400 <= response.status_code < 500
|
||||
|
||||
assert response.status_code == 400
|
||||
|
||||
def test_payload_content_in_and_out_loopback_pydantic(self):
|
||||
def test_payload_content_in_and_out_loopback(self):
|
||||
be_the_fun_in_de_funes = {
|
||||
"id": 1,
|
||||
"title": "La Grande Vadrouille",
|
||||
|
@ -242,7 +251,7 @@ class ApiTestCase(unittest.TestCase):
|
|||
|
||||
payload = {k: be_the_fun_in_de_funes[k] for k in domain_keys}
|
||||
# FIXME
|
||||
response = client.post("/pydantic_movies/", json=payload)
|
||||
response = client.post("/movies/", json=payload)
|
||||
|
||||
assert response.status_code == 200
|
||||
movie_id = response.json()["id"]
|
||||
|
|
Loading…
Reference in New Issue