Base implem of create and restore db

This commit is contained in:
Colin Goutte 2023-08-25 21:20:09 +02:00
parent e9f5b70a33
commit 0a5857ead2
6 changed files with 94 additions and 16 deletions

View File

@ -15,6 +15,9 @@ tdd:
git ls-files | entr make test opt='--lf --ff'
git ls-files | entr make functionnal_tests
watch_db:
watch "sqlite3 sql_app.db 'select * from movies'"
test:
pipenv run pytest $(opt) utests
@ -38,3 +41,9 @@ venv_install:
venv_test:
make -f MakefileVenv test
db_clean:
rm sql_app.db
db_fill:
pipenv run python database.py

View File

@ -58,6 +58,14 @@ Functional testing requires *geckodriver* and *selenium* which is a driver to pr
Database
--------
For now we only use sqlite
One may use `make db_clean` and `make db_fill` to feed database.
Toolings
--------

View File

@ -15,7 +15,12 @@ def create_movie(db: Session, name: str = ""):
def get_movie_by_name(db: Session, name: str = ""):
db_movie = db.query(models.Movie).filter(models.Movie.name == name)
return db.movie.all()
return db_movie.all()
def get_all_movies(db: Session):
db_movie = db.query(models.Movie)
return db_movie.all()
def get_movie_by_id(db: Session, id_: str = ""):

View File

@ -11,3 +11,24 @@ engine = create_engine(
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()
def create_db():
import models
models.Base.metadata.create_all(bind=engine)
def fill_db():
create_db()
import crud
import random
for _ in range(10):
name = f"fill_db_{random.randint(1, 1000):03}"
out = crud.create_movie(SessionLocal(), name=name)
print(out.name)
if __name__ == "__main__":
fill_db()

23
dev.py
View File

@ -1,19 +1,19 @@
from fastapi import FastAPI, Depends, Request
from sqlalchemy.orm import Session
import uvicorn
from database import SessionLocal, engine
import database
import models
import crud
models.Base.metadata.create_all(bind=engine)
app = FastAPI()
# Dependency
def get_db():
db = SessionLocal()
db = database.SessionLocal()
try:
yield db
finally:
@ -30,7 +30,10 @@ async def create_movie(
name: str = "", db: Session = Depends(get_db), request: Request = None
):
out = {}
data = await request.json()
try: # Bypass for dev
data = await request.json()
except:
data = {}
name = name or data["name"]
movie = models.Movie()
movie.name = name
@ -49,5 +52,17 @@ async def get_movie(id_: str, db: Session = Depends(get_db)):
return out
@app.get("/movies/")
async def create_movie(db: Session = Depends(get_db)):
movies = crud.get_all_movies(db)
out = [
{k: v for (k, v) in movie.__dict__.items() if not k.startswith("_")}
for movie in movies
]
return out
if __name__ == "__main__":
database.create_db()
uvicorn.run(app, host="127.0.0.1", port=5000)

View File

@ -2,6 +2,7 @@ from fastapi.testclient import TestClient
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.pool import StaticPool
from sqlalchemy import MetaData
from database import Base
from dev import app, get_db
@ -11,6 +12,9 @@ import pytest
import crud
import contextlib
import random
import inspect
SQLALCHEMY_DATABASE_URL = "sqlite:///:memory:"
engine = create_engine(
@ -24,10 +28,21 @@ TestingSessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engin
Base.metadata.create_all(bind=engine)
def clear_db():
# Make this a more generic functional tool for test
meta = MetaData()
with contextlib.closing(engine.connect()) as con:
trans = con.begin()
for table in reversed(meta.sorted_tables):
con.execute(table.delete())
trans.commit()
def override_get_db():
try:
db = TestingSessionLocal()
yield db
clear_db()
finally:
db.close()
@ -42,18 +57,22 @@ app.dependency_overrides[get_db] = override_get_db
client = TestClient(app)
def test_create_moviem_models():
import random
def rand_name():
import sys
name = f"rand_{random.randint(1, 1000)}"
caller = sys._getframe(1).f_code.co_name
name = f"{caller}_{random.randint(1, 1000)}"
return name
def test_create_moviem_models():
name = rand_name()
movie = Movie(name=name)
assert movie.name == name
def test_sample_crud():
import random
name = f"rand_{random.randint(1, 1000)}"
name = rand_name()
with db_context() as db:
movie = crud.create_movie(db, name=name)
@ -61,24 +80,25 @@ def test_sample_crud():
def test_list_movies():
clear_db()
response = client.get("/movies/")
assert response.json() == []
# assert response.json() == []
N = 10
names = []
with db_context() as db:
for _ in range(N):
name = f"rand_{random.randint(1, 1000)}"
name = rand_name()
names.append(name)
crud.create_movie(db, name=name)
movies = client.get("movies")
by_name = {}
movies_by_name = {m["name"]: m for m in movies.json()}
assert all(movies_by_name[name] for name in names)
def test_create_movie_api():
import random
name = f"rand_{random.randint(1, 1000)}"
response = client.post("/movies/", json={"name": name})
assert response.status_code == 200