Compare commits
6 Commits
pagination
...
backup
Author | SHA1 | Date |
---|---|---|
|
9911d0f973 | |
|
6a931228c3 | |
|
b985e7590a | |
|
0b09021d5c | |
|
97af8ea80f | |
|
0ae18d01d2 |
|
@ -3,4 +3,4 @@ Nogo:
|
|||
|
||||
|
||||
Caveats:
|
||||
Issue in database / session management for tests, out of scope to unserstand no
|
||||
Issue in database / session management for tests, out of scope to unserstand now
|
||||
|
|
4
Makefile
4
Makefile
|
@ -16,7 +16,8 @@ run_dev:
|
|||
git ls-files | entr -r pipenv run python dev.py
|
||||
|
||||
tdd:
|
||||
git ls-files | entr make test functionnal_tests opt='$(opt)'
|
||||
git ls-files | entr make test opt='$(opt)'
|
||||
git ls-files | entr make functionnal_tests
|
||||
|
||||
refactor_tdd:
|
||||
make tdd opt="--pdb --ff --lf --ff -x"
|
||||
|
@ -28,6 +29,7 @@ watch_db:
|
|||
|
||||
test:
|
||||
pipenv run pytest $(coverage_opt) $(opt) utests
|
||||
make functional_tests
|
||||
|
||||
functionnal_tests:
|
||||
pipenv run python -m pytest functionnal_test.py
|
||||
|
|
26
crud.py
26
crud.py
|
@ -37,14 +37,28 @@ def get_movie_by_name(db: Session, name: str = ""):
|
|||
return db_movie.all()
|
||||
|
||||
|
||||
def get_all_movies(db: Session, offset: int | None = None, limit: int | None = None):
|
||||
def get_all_movies(db: Session):
|
||||
db_movie = db.query(models.Movie)
|
||||
if offset is not None:
|
||||
db_movie = db_movie.offset(offset)
|
||||
if limit is not None:
|
||||
db_movie = db_movie.limit(limit)
|
||||
return db_movie.all()
|
||||
|
||||
return db_movie
|
||||
|
||||
def search_movie(db: Session, term: str = "", *criterions):
|
||||
db_movies = db.query(models.Movie)
|
||||
exp = f"%{term}%"
|
||||
# Use regex instead ? still \W is had to escape
|
||||
if term:
|
||||
db_movies = db_movies.where(
|
||||
sqlalchemy.or_(
|
||||
models.Movie.title.like(exp),
|
||||
models.Movie.description.like(exp),
|
||||
)
|
||||
)
|
||||
# would try the following approache for dynamic search
|
||||
|
||||
for colnamename, opname, value in criterions:
|
||||
db_movies = db_movies.where(getattr(getattr(models.Movie, name), opname)(value))
|
||||
|
||||
return db_movies
|
||||
|
||||
|
||||
def get_movie_by_id(db: Session, id_: str = ""):
|
||||
|
|
37
dev.py
37
dev.py
|
@ -114,40 +114,11 @@ async def delete_movie(id_: str, db: Session = Depends(get_db)) -> None:
|
|||
|
||||
|
||||
@app.get("/movies/")
|
||||
async def list_movie(
|
||||
db: Session = Depends(get_db),
|
||||
pagenum: int | None = None,
|
||||
pagesize: int | None = None,
|
||||
) -> schemas.PaginatedMovies | schemas.MovieObjectsOut:
|
||||
paginate_params = {}
|
||||
paginate_data = {}
|
||||
async def list_movie(db: Session = Depends(get_db)) -> schemas.MovieObjectsOut:
|
||||
movies = crud.get_all_movies(db)
|
||||
count = len(movies)
|
||||
|
||||
pagination_params = {"pagenum": pagenum, "pagesize": pagesize}
|
||||
if any(v for v in pagination_params.values() if v is not None):
|
||||
missing = [name for (name, value) in pagination_params.items() if not value]
|
||||
if missing:
|
||||
raise HTTPException(status_code=404, detail=f"Missing {missing}")
|
||||
|
||||
# Here we do a "x + 1 - 1 = x" trick to check if there will be more pages
|
||||
# eg we want from 10 to 15, we ask for 10 to 16, if we have *stricly* more
|
||||
# than 5 element we can now that there will be one more page
|
||||
paginate_params = dict(offset=(pagenum - 1) * pagesize, limit=pagesize + 1)
|
||||
|
||||
movies = crud.get_all_movies(db, **paginate_params)
|
||||
|
||||
if paginate_params:
|
||||
has_more_content = movies.count() > pagesize
|
||||
paginate_data = {
|
||||
"next_page": pagenum + 1 if has_more_content else None,
|
||||
"previous_page": pagenum - 1 if pagenum > 1 else None,
|
||||
}
|
||||
|
||||
movies = movies.limit(pagesize)
|
||||
|
||||
count = movies.count()
|
||||
|
||||
payload = {"movies": movies, "count": count}
|
||||
return {**payload, **paginate_data}
|
||||
return {"movies": movies, "count": count}
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
|
|
@ -16,15 +16,6 @@ class MovieObject(MoviePayload):
|
|||
id: int | str
|
||||
|
||||
|
||||
class Paginated(BaseModel):
|
||||
next_page: str | int | None
|
||||
previous_page: str | int | None
|
||||
|
||||
|
||||
class MovieObjectsOut(BaseModel):
|
||||
movies: list[MovieObject]
|
||||
count: int
|
||||
|
||||
|
||||
class PaginatedMovies(MovieObjectsOut, Paginated):
|
||||
pass
|
||||
|
|
|
@ -157,130 +157,6 @@ class BaseCrud(unittest.TestCase):
|
|||
assert isinstance(movies["movies"], list)
|
||||
assert movies["count"] == primary_count + N
|
||||
|
||||
def test_list_pagination_limits(self):
|
||||
response = client.get("/movies/")
|
||||
nb_movies = response.json()["count"]
|
||||
|
||||
for _ in range(3):
|
||||
self.create_payload["title"] = rand_name()
|
||||
response = client.post("/movies/", json=self.create_payload)
|
||||
|
||||
response = client.get("/movies/")
|
||||
nb_movies = response.json()["count"]
|
||||
|
||||
pagenum = 1
|
||||
pagesize = nb_movies - 1
|
||||
|
||||
# Test page 1 has no previous ?
|
||||
current_movies = client.get(
|
||||
f"/movies/?pagenum={pagenum}&pagesize={pagesize}"
|
||||
).json()
|
||||
assert current_movies.get("previous_page") is None
|
||||
assert current_movies["next_page"]
|
||||
|
||||
current_movies = client.get(
|
||||
f"/movies/?pagenum={pagenum + 1 }&pagesize={pagesize}"
|
||||
).json()
|
||||
assert current_movies.get("next_page") is None
|
||||
assert current_movies["previous_page"]
|
||||
|
||||
# test last page has no next
|
||||
|
||||
def test_list_movies_pagination_back_forth(self):
|
||||
response = client.get("/movies/")
|
||||
nb_movies = response.json()["count"]
|
||||
|
||||
for _ in range(3):
|
||||
self.create_payload["title"] = rand_name()
|
||||
response = client.post("/movies/", json=self.create_payload)
|
||||
|
||||
response = client.get("/movies/")
|
||||
nb_movies = response.json()["count"]
|
||||
|
||||
pagenum = 1
|
||||
pagesize = 2
|
||||
|
||||
first, *_, last = client.get("/movies/").json()["movies"]
|
||||
|
||||
while current_movies := client.get(
|
||||
f"/movies/?pagenum={pagenum}&pagesize={pagesize}"
|
||||
).json():
|
||||
next_page_num = current_movies.get("next_page")
|
||||
assert next_page_num != pagenum
|
||||
|
||||
if next_page_num is None:
|
||||
assert current_movies["movies"][-1] == last
|
||||
break
|
||||
else:
|
||||
assert next_page_num == pagenum + 1
|
||||
pagenum = next_page_num
|
||||
|
||||
def test_list_movies_pagination(self):
|
||||
response = client.get("/movies/")
|
||||
assert response.status_code == 200
|
||||
# assert response.json() == []
|
||||
primary_count = response.json()["count"]
|
||||
|
||||
N = 10
|
||||
names = []
|
||||
for _ in range(N):
|
||||
name = rand_name()
|
||||
|
||||
names.append(name)
|
||||
self.create_payload["title"] = name
|
||||
response = client.post("/movies/", json=self.create_payload)
|
||||
assert response.status_code == 200
|
||||
|
||||
pagenum = 3
|
||||
pagesize = 5
|
||||
|
||||
sliced_movies = client.get("/movies/").json()["movies"][
|
||||
(pagenum - 1) * pagesize : pagenum * pagesize
|
||||
]
|
||||
|
||||
sliced_titles = [m["title"] for m in sliced_movies]
|
||||
|
||||
movies_paginate = client.get(
|
||||
f"/movies/?pagenum={pagenum}&pagesize={pagesize}"
|
||||
).json()["movies"]
|
||||
|
||||
paginate_titles = [m["title"] for m in movies_paginate]
|
||||
|
||||
assert sliced_titles == paginate_titles
|
||||
|
||||
def test_list_movies_pagination(self):
|
||||
response = client.get("/movies/")
|
||||
assert response.status_code == 200
|
||||
# assert response.json() == []
|
||||
primary_count = response.json()["count"]
|
||||
|
||||
N = 10
|
||||
names = []
|
||||
for _ in range(N):
|
||||
name = rand_name()
|
||||
|
||||
names.append(name)
|
||||
self.create_payload["title"] = name
|
||||
response = client.post("/movies/", json=self.create_payload)
|
||||
assert response.status_code == 200
|
||||
|
||||
pagenum = 3
|
||||
pagesize = 5
|
||||
|
||||
sliced_movies = client.get("/movies/").json()["movies"][
|
||||
(pagenum - 1) * pagesize : pagenum * pagesize
|
||||
]
|
||||
|
||||
sliced_titles = [m["title"] for m in sliced_movies]
|
||||
|
||||
movies_paginate = client.get(
|
||||
f"/movies/?pagenum={pagenum}&pagesize={pagesize}"
|
||||
).json()["movies"]
|
||||
|
||||
paginate_titles = [m["title"] for m in movies_paginate]
|
||||
|
||||
assert sliced_titles == paginate_titles
|
||||
|
||||
|
||||
class ApiTestCase(unittest.TestCase):
|
||||
def test_payload_content_in_and_out_loopback(self):
|
||||
|
|
|
@ -9,6 +9,9 @@ from dev import app, get_db
|
|||
from models import Movie
|
||||
|
||||
import pytest
|
||||
import unittest
|
||||
|
||||
|
||||
import crud
|
||||
import contextlib
|
||||
|
||||
|
@ -66,132 +69,182 @@ def rand_name():
|
|||
return name
|
||||
|
||||
|
||||
def test_create_moviem_models():
|
||||
name = rand_name()
|
||||
movie = Movie(title=name)
|
||||
assert movie.title == name
|
||||
class CrudModelsTestCase(unittest.TestCase):
|
||||
def test_create_moviem_models(self):
|
||||
name = rand_name()
|
||||
movie = Movie(title=name)
|
||||
assert movie.title == name
|
||||
|
||||
def test_sample_crud(self):
|
||||
name = rand_name()
|
||||
|
||||
def test_sample_crud():
|
||||
name = rand_name()
|
||||
|
||||
with db_context() as db:
|
||||
movie = crud.create_movie(db, title=name, genres=["Yes", "No"])
|
||||
assert movie.title == name
|
||||
|
||||
|
||||
def test_genre_custom_type_serialize_value():
|
||||
name = rand_name()
|
||||
|
||||
genres = ["Yes", "No"]
|
||||
with db_context() as db:
|
||||
movie = crud.create_movie(db, title=name, genres=genres)
|
||||
assert movie.genres == genres
|
||||
|
||||
|
||||
def test_genre_custom_type_serialize_type():
|
||||
name = rand_name()
|
||||
genres = ["Yes", "No"]
|
||||
csv_genres = ",".join(genres)
|
||||
|
||||
try:
|
||||
with db_context() as db:
|
||||
movie = crud.create_movie(db, title=name, genres=csv_genres)
|
||||
except (ValueError, exc.StatementError) as error:
|
||||
assert "tuple" in str(error)
|
||||
else:
|
||||
raise RuntimeError("Exception should have been raised")
|
||||
movie = crud.create_movie(db, title=name, genres=["Yes", "No"])
|
||||
assert movie.title == name
|
||||
|
||||
def test_genre_custom_type_serialize_value(self):
|
||||
name = rand_name()
|
||||
|
||||
def test_list_movies():
|
||||
clear_db()
|
||||
response = client.get("/movies/")
|
||||
# assert response.json() == []
|
||||
|
||||
N = 10
|
||||
names = []
|
||||
with db_context() as db:
|
||||
for _ in range(N):
|
||||
name = rand_name()
|
||||
|
||||
names.append(name)
|
||||
crud.create_movie(db, title=name, genres=["Animated", "Paropaganda"])
|
||||
|
||||
movies = client.get("movies").json()["movies"]
|
||||
movies_by_title = {m["title"]: m for m in movies}
|
||||
assert all(movies_by_title[name] for name in names)
|
||||
|
||||
|
||||
def test_search_movies():
|
||||
clear_db()
|
||||
response = client.get("/movies/")
|
||||
# assert response.json() == []
|
||||
|
||||
radix = rand_name()
|
||||
|
||||
name = radix + "test_search"
|
||||
|
||||
desc = radix + "test_desription"
|
||||
|
||||
with db_context() as db:
|
||||
movie_name = crud.create_movie(
|
||||
db, title=name, genres=["Animated", "Paropaganda"]
|
||||
)
|
||||
movie_desc = crud.create_movie(
|
||||
db, title=radix, description=desc, genres=["Animated", "Paropaganda"]
|
||||
)
|
||||
|
||||
for term, target in zip((name, desc), (movie_name, movie_desc)):
|
||||
genres = ["Yes", "No"]
|
||||
with db_context() as db:
|
||||
movie = crud.create_movie(db, title=name, genres=genres)
|
||||
assert movie.genres == genres
|
||||
|
||||
def test_genre_custom_type_serialize_type(self):
|
||||
name = rand_name()
|
||||
genres = ["Yes", "No"]
|
||||
csv_genres = ",".join(genres)
|
||||
|
||||
try:
|
||||
with db_context() as db:
|
||||
movie = crud.create_movie(db, title=name, genres=csv_genres)
|
||||
except (ValueError, exc.StatementError) as error:
|
||||
assert "tuple" in str(error)
|
||||
else:
|
||||
raise RuntimeError("Exception should have been raised")
|
||||
|
||||
def test_list_movies(self):
|
||||
clear_db()
|
||||
response = client.get("/movies/")
|
||||
# assert response.json() == []
|
||||
|
||||
N = 10
|
||||
names = []
|
||||
with db_context() as db:
|
||||
for _ in range(N):
|
||||
name = rand_name()
|
||||
|
||||
names.append(name)
|
||||
crud.create_movie(db, title=name, genres=["Animated", "Paropaganda"])
|
||||
|
||||
movies = client.get("movies").json()["movies"]
|
||||
movies_by_title = {m["title"]: m for m in movies}
|
||||
assert all(movies_by_title[name] for name in names)
|
||||
|
||||
def test_search_movies_exact(self):
|
||||
clear_db()
|
||||
response = client.get("/movies/")
|
||||
# assert response.json() == []
|
||||
|
||||
radix = rand_name()
|
||||
|
||||
title = radix + "test_search_title"
|
||||
|
||||
desc = radix + "test_search_desription"
|
||||
|
||||
with db_context() as db:
|
||||
movie_title = crud.create_movie(
|
||||
db, title=title, genres=["Animated", "Paropaganda"]
|
||||
)
|
||||
|
||||
movie_desc = crud.create_movie(
|
||||
db, title=radix, description=desc, genres=["Animated", "Paropaganda"]
|
||||
)
|
||||
movie_desc_id = movie_desc.id
|
||||
|
||||
found = crud.search_movie(db, title).all()
|
||||
assert len(found) == 1
|
||||
|
||||
assert found[0] == movie_title
|
||||
|
||||
found = crud.search_movie(db, desc).all()
|
||||
assert len(found) == 1
|
||||
assert target == found[0]
|
||||
|
||||
movies = client.get("movies").json()["movies"]
|
||||
movies_by_title = {m["title"]: m for m in movies}
|
||||
assert all(movies_by_title[name] for name in names)
|
||||
assert found[0] == movie_desc
|
||||
|
||||
def test_search_movies_token(self):
|
||||
clear_db()
|
||||
response = client.get("/movies/")
|
||||
# assert response.json() == []
|
||||
|
||||
def test_sample_import_toy_story():
|
||||
clear_db()
|
||||
radix = rand_name()
|
||||
|
||||
movie_title = "Toy Story"
|
||||
file_path = "input_data/movies_metadata_short.csv"
|
||||
title = radix + "test_search_title titletoken"
|
||||
|
||||
movies = client.get("movies").json()["movies"]
|
||||
movies_by_title = {m["title"]: m for m in movies}
|
||||
desc = radix + "test_search_desription desctoken"
|
||||
|
||||
assert movie_title not in movies_by_title, "The movie should not be pre existing"
|
||||
with db_context() as db:
|
||||
movie_title = crud.create_movie(
|
||||
db, title=title, genres=["Animated", "Paropaganda"]
|
||||
)
|
||||
|
||||
with db_context() as db:
|
||||
fill_db(db, file_path)
|
||||
movie_desc = crud.create_movie(
|
||||
db, title=radix, description=desc, genres=["Animated", "Paropaganda"]
|
||||
)
|
||||
movie_desc_id = movie_desc.id
|
||||
|
||||
movies = client.get("movies").json()["movies"]
|
||||
movies_by_title = {m["title"]: m for m in movies}
|
||||
found = crud.search_movie(db, "titletoken").all()
|
||||
assert len(found) == 1
|
||||
|
||||
toy_story = movies_by_title["Toy Story"]
|
||||
assert found[0] == movie_title
|
||||
|
||||
assert "Andy" in toy_story["description"]
|
||||
found = crud.search_movie(db, "desctoken").all()
|
||||
assert len(found) == 1
|
||||
|
||||
assert found[0] == movie_desc
|
||||
|
||||
def test_title_is_taken_form_original_title_is_missing():
|
||||
"""
|
||||
t0113002,en,Midnight Man
|
||||
19763 Midnight Man (among others) has an unescaped \n that makes import fail
|
||||
def test_search_movies_token_period(self):
|
||||
return
|
||||
clear_db()
|
||||
response = client.get("/movies/")
|
||||
# assert response.json() == []
|
||||
|
||||
in the csv the movie 'Avalanche Sharks' @ line 35587
|
||||
has no tiltle, we fix this here to get quicker but we need a better solution
|
||||
"""
|
||||
movie_title = "Midnight Man"
|
||||
radix = rand_name()
|
||||
|
||||
file_path = "utests/movie_error_missing_title.csv"
|
||||
file_path = "input_data/movies_metadata.csv"
|
||||
movies = client.get("movies").json()["movies"]
|
||||
movies_by_title = {m["title"]: m for m in movies}
|
||||
title = radix + "test_search_title title3tokenperiod."
|
||||
|
||||
assert movie_title not in movies_by_title, "The movie should not be pre existing"
|
||||
with db_context() as db:
|
||||
fill_db(db, file_path, sample_rate=1)
|
||||
with db_context() as db:
|
||||
movie_title = crud.create_movie(
|
||||
db, title=title, genres=["Animated", "Paropaganda"]
|
||||
)
|
||||
|
||||
movies = client.get("movies").json()["movies"]
|
||||
movies_by_title = {m["title"]: m for m in movies}
|
||||
found = crud.search_movie(db, "title3tokenpreriod").all()
|
||||
assert len(found) == 1
|
||||
assert found[0] == movie_title
|
||||
|
||||
def test_sample_import_toy_story(self):
|
||||
clear_db()
|
||||
|
||||
movie_title = "Toy Story"
|
||||
file_path = "input_data/movies_metadata_short.csv"
|
||||
|
||||
movies = client.get("movies").json()["movies"]
|
||||
movies_by_title = {m["title"]: m for m in movies}
|
||||
|
||||
assert (
|
||||
movie_title not in movies_by_title
|
||||
), "The movie should not be pre existing"
|
||||
|
||||
with db_context() as db:
|
||||
fill_db(db, file_path)
|
||||
|
||||
movies = client.get("movies").json()["movies"]
|
||||
movies_by_title = {m["title"]: m for m in movies}
|
||||
|
||||
toy_story = movies_by_title["Toy Story"]
|
||||
|
||||
assert "Andy" in toy_story["description"]
|
||||
|
||||
def test_title_is_taken_form_original_title_is_missing(self):
|
||||
"""
|
||||
t0113002,en,Midnight Man
|
||||
19763 Midnight Man (among others) has an unescaped \n that makes import fail
|
||||
|
||||
in the csv the movie 'Avalanche Sharks' @ line 35587
|
||||
has no tiltle, we fix this here to get quicker but we need a better solution
|
||||
"""
|
||||
movie_title = "Midnight Man"
|
||||
|
||||
file_path = "utests/movie_error_missing_title.csv"
|
||||
file_path = "input_data/movies_metadata.csv"
|
||||
movies = client.get("movies").json()["movies"]
|
||||
movies_by_title = {m["title"]: m for m in movies}
|
||||
|
||||
assert (
|
||||
movie_title not in movies_by_title
|
||||
), "The movie should not be pre existing"
|
||||
with db_context() as db:
|
||||
fill_db(db, file_path, sample_rate=1)
|
||||
|
||||
movies = client.get("movies").json()["movies"]
|
||||
movies_by_title = {m["title"]: m for m in movies}
|
||||
|
|
Loading…
Reference in New Issue