457 lines
15 KiB
Python
457 lines
15 KiB
Python
from fastapi.testclient import TestClient
|
|
from sqlalchemy import create_engine
|
|
from sqlalchemy.orm import sessionmaker
|
|
from sqlalchemy.pool import StaticPool
|
|
from sqlalchemy import MetaData
|
|
|
|
from database import Base
|
|
from dev import app, get_db
|
|
from models import Movie
|
|
|
|
import pytest
|
|
import crud
|
|
import contextlib
|
|
|
|
import random
|
|
import inspect
|
|
import unittest
|
|
|
|
SQLALCHEMY_DATABASE_URL = "sqlite:///:memory:"
|
|
|
|
engine = create_engine(
|
|
SQLALCHEMY_DATABASE_URL,
|
|
connect_args={"check_same_thread": False},
|
|
poolclass=StaticPool,
|
|
)
|
|
TestingSessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
|
|
|
|
|
|
Base.metadata.create_all(bind=engine)
|
|
|
|
|
|
def override_get_db():
|
|
try:
|
|
db = TestingSessionLocal()
|
|
yield db
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
app.dependency_overrides[get_db] = override_get_db
|
|
|
|
client = TestClient(app)
|
|
|
|
|
|
def rand_name():
|
|
import sys
|
|
|
|
caller = sys._getframe(1).f_code.co_name
|
|
name = f"{caller}_{random.randint(1, 1000)}"
|
|
return name
|
|
|
|
|
|
class BaseCrud(unittest.TestCase):
|
|
def setUp(self):
|
|
name = f"rand_{random.randint(1, 1000)}"
|
|
self.create_payload = {
|
|
"title": name,
|
|
}
|
|
|
|
def test_get_delete_movie_404_if_not_found(self):
|
|
response = client.get("/movies/-1")
|
|
assert response.status_code == 404
|
|
response_delete = client.delete("/movies/-1")
|
|
assert response_delete.status_code == 404
|
|
|
|
def test_create_movie_api(self):
|
|
response = client.post("/movies/", json=self.create_payload)
|
|
|
|
assert response.status_code == 200
|
|
movie_id = response.json()["id"]
|
|
assert f"Created {self.create_payload['title']}" in response.json()["message"]
|
|
response = client.get(f"/movies/{movie_id}")
|
|
assert response.json()["title"] == self.create_payload["title"]
|
|
|
|
def test_delete_movie(self):
|
|
response = client.post("/movies/", json=self.create_payload)
|
|
movie_id = response.json()["id"]
|
|
created_movie_path = f"/movies/{movie_id}"
|
|
|
|
response_delete = client.delete(created_movie_path)
|
|
|
|
response_missing = client.get(created_movie_path)
|
|
|
|
assert response_delete.status_code == 204
|
|
assert response_missing.status_code == 404
|
|
|
|
def test_update_movie_api(self):
|
|
self.create_payload["genres"] = ["anime"]
|
|
|
|
self.create_payload["description"] = legacy_description = "will be deleted"
|
|
response = client.post("/movies/", json=self.create_payload)
|
|
movie_id = response.json()["id"]
|
|
created_movie_path = f"/movies/{movie_id}"
|
|
|
|
new_name = self.create_payload["title"].replace("rand", "update")
|
|
new_genres = ["Drama", "war"]
|
|
response_get = client.get(f"/movies/{movie_id}")
|
|
|
|
assert response_get.json()["title"] != new_name
|
|
assert response_get.json()["genres"] != new_genres
|
|
|
|
response_update = client.put(
|
|
created_movie_path, json={"title": new_name, "genres": new_genres}
|
|
)
|
|
assert response_update.status_code == 200
|
|
|
|
response_get = client.get(f"/movies/{movie_id}")
|
|
assert response_get.json()["title"] == new_name
|
|
assert response_get.json()["genres"] == new_genres
|
|
assert response_get.json()["description"] == ""
|
|
|
|
def test_list_movies(self):
|
|
response = client.get("/movies/")
|
|
assert response.status_code == 200
|
|
primary_count = response.json()["count"]
|
|
# assert response.json() == []
|
|
|
|
N = 10
|
|
names = []
|
|
for _ in range(N):
|
|
name = rand_name()
|
|
|
|
names.append(name)
|
|
self.create_payload["title"] = name
|
|
response = client.post("/movies/", json=self.create_payload)
|
|
assert response.status_code == 200
|
|
|
|
response = client.get("/movies/").json()
|
|
|
|
movies = response["movies"]
|
|
count = response["count"]
|
|
movies_by_title = {m["title"]: m for m in movies}
|
|
found = list(movies_by_title[title] for title in names)
|
|
assert all(movies_by_title[title] for title in names)
|
|
|
|
assert count == primary_count + N
|
|
|
|
def test_list_movies_payload_format(self):
|
|
response = client.get("/movies/")
|
|
assert response.status_code == 200
|
|
# assert response.json() == []
|
|
primary_count = response.json()["count"]
|
|
|
|
N = 10
|
|
names = []
|
|
for _ in range(N):
|
|
name = rand_name()
|
|
|
|
names.append(name)
|
|
self.create_payload["title"] = name
|
|
response = client.post("/movies/", json=self.create_payload)
|
|
assert response.status_code == 200
|
|
|
|
movies = client.get("/movies/").json()
|
|
|
|
assert isinstance(movies["count"], int)
|
|
assert isinstance(movies["movies"], list)
|
|
assert movies["count"] == primary_count + N
|
|
|
|
def test_list_pagination_limits(self):
|
|
response = client.get("/movies/")
|
|
nb_movies = response.json()["count"]
|
|
|
|
for _ in range(3):
|
|
self.create_payload["title"] = rand_name()
|
|
response = client.post("/movies/", json=self.create_payload)
|
|
|
|
response = client.get("/movies/")
|
|
nb_movies = response.json()["count"]
|
|
|
|
pagenum = 1
|
|
pagesize = nb_movies - 1
|
|
|
|
# Test page 1 has no previous ?
|
|
current_movies = client.get(
|
|
f"/movies/?pagenum={pagenum}&pagesize={pagesize}"
|
|
).json()
|
|
assert current_movies.get("previous_page") is None
|
|
assert current_movies["next_page"]
|
|
|
|
current_movies = client.get(
|
|
f"/movies/?pagenum={pagenum + 1 }&pagesize={pagesize}"
|
|
).json()
|
|
assert current_movies.get("next_page") is None
|
|
assert current_movies["previous_page"]
|
|
|
|
# test last page has no next
|
|
|
|
def test_list_movies_pagination_back_forth(self):
|
|
response = client.get("/movies/")
|
|
nb_movies = response.json()["count"]
|
|
|
|
for _ in range(3):
|
|
self.create_payload["title"] = rand_name()
|
|
response = client.post("/movies/", json=self.create_payload)
|
|
|
|
response = client.get("/movies/")
|
|
nb_movies = response.json()["count"]
|
|
|
|
pagenum = 1
|
|
pagesize = 2
|
|
|
|
first, *_, last = client.get("/movies/").json()["movies"]
|
|
|
|
while current_movies := client.get(
|
|
f"/movies/?pagenum={pagenum}&pagesize={pagesize}"
|
|
).json():
|
|
next_page_num = current_movies.get("next_page")
|
|
assert next_page_num != pagenum
|
|
|
|
if next_page_num is None:
|
|
assert current_movies["movies"][-1] == last
|
|
break
|
|
else:
|
|
assert next_page_num == pagenum + 1
|
|
pagenum = next_page_num
|
|
|
|
def test_list_movies_pagination(self):
|
|
response = client.get("/movies/")
|
|
assert response.status_code == 200
|
|
# assert response.json() == []
|
|
primary_count = response.json()["count"]
|
|
|
|
N = 10
|
|
names = []
|
|
for _ in range(N):
|
|
name = rand_name()
|
|
|
|
names.append(name)
|
|
self.create_payload["title"] = name
|
|
response = client.post("/movies/", json=self.create_payload)
|
|
assert response.status_code == 200
|
|
|
|
pagenum = 3
|
|
pagesize = 5
|
|
|
|
sliced_movies = client.get("/movies/").json()["movies"][
|
|
(pagenum - 1) * pagesize : pagenum * pagesize
|
|
]
|
|
|
|
sliced_titles = [m["title"] for m in sliced_movies]
|
|
|
|
movies_paginate = client.get(
|
|
f"/movies/?pagenum={pagenum}&pagesize={pagesize}"
|
|
).json()["movies"]
|
|
|
|
paginate_titles = [m["title"] for m in movies_paginate]
|
|
|
|
assert sliced_titles == paginate_titles
|
|
|
|
def test_list_movies_pagination(self):
|
|
response = client.get("/movies/")
|
|
assert response.status_code == 200
|
|
# assert response.json() == []
|
|
primary_count = response.json()["count"]
|
|
|
|
N = 10
|
|
names = []
|
|
for _ in range(N):
|
|
name = rand_name()
|
|
|
|
names.append(name)
|
|
self.create_payload["title"] = name
|
|
response = client.post("/movies/", json=self.create_payload)
|
|
assert response.status_code == 200
|
|
|
|
pagenum = 3
|
|
pagesize = 5
|
|
|
|
sliced_movies = client.get("/movies/").json()["movies"][
|
|
(pagenum - 1) * pagesize : pagenum * pagesize
|
|
]
|
|
|
|
sliced_titles = [m["title"] for m in sliced_movies]
|
|
|
|
movies_paginate = client.get(
|
|
f"/movies/?pagenum={pagenum}&pagesize={pagesize}"
|
|
).json()["movies"]
|
|
|
|
paginate_titles = [m["title"] for m in movies_paginate]
|
|
|
|
assert sliced_titles == paginate_titles
|
|
|
|
|
|
class ApiTestCase(unittest.TestCase):
|
|
def test_payload_content_in_and_out_loopback(self):
|
|
be_the_fun_in_de_funes = {
|
|
"id": 1,
|
|
"title": "La Grande Vadrouille",
|
|
"description": "During World War II, two French civilians and a downed English Bomber Crew set "
|
|
"out from Paris to cross the demarcation line between Nazi-occupied Northern France and the "
|
|
"South. From there they will be able to escape to England. First, they must avoid German troops -"
|
|
"and the consequences of their own blunders.",
|
|
"genres": ["Comedy", "War"],
|
|
"release_date": "1966-12-07",
|
|
"vote_average": 7.7,
|
|
"vote_count": 1123,
|
|
}
|
|
|
|
domain_keys = sorted(
|
|
{k for k in be_the_fun_in_de_funes if k not in ["id"]}
|
|
) # Make it deterministic
|
|
non_primtive = ["genres", "release_date"]
|
|
|
|
payload = {k: be_the_fun_in_de_funes[k] for k in domain_keys}
|
|
# FIXME
|
|
response = client.post("/movies/", json=payload)
|
|
|
|
assert response.status_code == 200
|
|
movie_id = response.json()["id"]
|
|
|
|
loopback_fetch = client.get(f"/movies/{movie_id}")
|
|
assert loopback_fetch.status_code == 200
|
|
loopback_payload = loopback_fetch.json()
|
|
# check for keys
|
|
for attribute_name in domain_keys:
|
|
with self.subTest(attribute_name=attribute_name):
|
|
assert attribute_name in loopback_payload
|
|
if attribute_name not in non_primtive:
|
|
assert (
|
|
loopback_payload[attribute_name]
|
|
== be_the_fun_in_de_funes[attribute_name]
|
|
)
|
|
|
|
def test_payload_content_bad_format_status_code(self):
|
|
be_the_fun_in_de_funes = {
|
|
"id": 1,
|
|
"title": "La Grande Vadrouille",
|
|
"description": "During World War II, two French civilians and a downed English Bomber Crew set "
|
|
"out from Paris to cross the demarcation line between Nazi-occupied Northern France and the "
|
|
"South. From there they will be able to escape to England. First, they must avoid German troops -"
|
|
"and the consequences of their own blunders.",
|
|
"genres": ["Comedy", "War"],
|
|
"release_date": "1966-12-07",
|
|
"vote_average": 7.7,
|
|
"vote_count": 1123,
|
|
}
|
|
|
|
domain_keys = sorted(
|
|
{k for k in be_the_fun_in_de_funes if k not in ["id"]}
|
|
) # Make it deterministic
|
|
|
|
payload = {k: be_the_fun_in_de_funes[k] for k in domain_keys}
|
|
|
|
missing_key = "title"
|
|
|
|
payload.pop(missing_key)
|
|
|
|
response = client.post("/movies/", json=payload)
|
|
|
|
assert 400 <= response.status_code < 500
|
|
|
|
assert missing_key in response.text
|
|
|
|
def test_payload_content_bad_format_detail(self):
|
|
be_the_fun_in_de_funes = {
|
|
"id": 1,
|
|
"title": "La Grande Vadrouille",
|
|
"description": "During World War II, two French civilians and a downed English Bomber Crew set "
|
|
"out from Paris to cross the demarcation line between Nazi-occupied Northern France and the "
|
|
"South. From there they will be able to escape to England. First, they must avoid German troops -"
|
|
"and the consequences of their own blunders.",
|
|
"genres": ["Comedy", "War"],
|
|
"release_date": "1966-12-07",
|
|
"vote_average": 7.7,
|
|
"vote_count": 1123,
|
|
}
|
|
|
|
domain_keys = sorted(
|
|
{k for k in be_the_fun_in_de_funes if k not in ["id"]}
|
|
) # Make it deterministic
|
|
|
|
payload = {k: be_the_fun_in_de_funes[k] for k in domain_keys}
|
|
|
|
missing_key = "title"
|
|
|
|
payload.pop(missing_key)
|
|
|
|
response = client.post("/movies/", json=payload)
|
|
|
|
assert 400 <= response.status_code < 500
|
|
|
|
assert response.status_code == 400
|
|
|
|
def test_payload_content_in_and_out_loopback(self):
|
|
be_the_fun_in_de_funes = {
|
|
"id": 1,
|
|
"title": "La Grande Vadrouille",
|
|
"description": "During World War II, two French civilians and a downed English Bomber Crew set "
|
|
"out from Paris to cross the demarcation line between Nazi-occupied Northern France and the "
|
|
"South. From there they will be able to escape to England. First, they must avoid German troops -"
|
|
"and the consequences of their own blunders.",
|
|
"genres": ["Comedy", "War"],
|
|
"release_date": "1966-12-07",
|
|
"vote_average": 7.7,
|
|
"vote_count": 1123,
|
|
}
|
|
|
|
domain_keys = sorted(
|
|
{k for k in be_the_fun_in_de_funes if k not in ["id"]}
|
|
) # Make it deterministic
|
|
non_primtive = ["genres", "release_date"]
|
|
|
|
payload = {k: be_the_fun_in_de_funes[k] for k in domain_keys}
|
|
# FIXME
|
|
response = client.post("/movies/", json=payload)
|
|
|
|
assert response.status_code == 200
|
|
movie_id = response.json()["id"]
|
|
|
|
loopback_fetch = client.get(f"/movies/{movie_id}")
|
|
assert loopback_fetch.status_code == 200
|
|
loopback_payload = loopback_fetch.json()
|
|
# check for keys
|
|
for attribute_name in domain_keys:
|
|
with self.subTest(attribute_name=attribute_name):
|
|
assert attribute_name in loopback_payload
|
|
if attribute_name not in non_primtive:
|
|
assert (
|
|
loopback_payload[attribute_name]
|
|
== be_the_fun_in_de_funes[attribute_name]
|
|
)
|
|
|
|
def test_payload_content_in_and_out_loopback_non_primitive(self):
|
|
be_the_fun_in_de_funes = {
|
|
"id": 1,
|
|
"title": "La Grande Vadrouille",
|
|
"description": "During World War II, two French civilians and a downed English Bomber Crew set "
|
|
"out from Paris to cross the demarcation line between Nazi-occupied Northern France and the "
|
|
"South. From there they will be able to escape to England. First, they must avoid German troops -"
|
|
"and the consequences of their own blunders.",
|
|
"genres": ["Comedy", "War"],
|
|
"release_date": "1966-12-07",
|
|
"vote_average": 7.7,
|
|
"vote_count": 1123,
|
|
}
|
|
|
|
domain_keys = sorted(
|
|
{k for k in be_the_fun_in_de_funes if k not in ["id"]}
|
|
) # Make it deterministic
|
|
|
|
payload = {k: be_the_fun_in_de_funes[k] for k in domain_keys}
|
|
response = client.post("/movies/", json=payload)
|
|
|
|
assert response.status_code == 200
|
|
movie_id = response.json()["id"]
|
|
|
|
loopback_fetch = client.get(f"/movies/{movie_id}")
|
|
assert loopback_fetch.status_code == 200
|
|
loopback_payload = loopback_fetch.json()
|
|
# check for keys
|
|
for attribute_name in domain_keys:
|
|
assert (
|
|
loopback_payload[attribute_name]
|
|
== be_the_fun_in_de_funes[attribute_name]
|
|
)
|