icm/crud.py

80 lines
1.8 KiB
Python

import sqlalchemy
from sqlalchemy.orm import Session
import models
# import schemas
def create_movie(
db: Session,
batch_mode=False,
*,
title: str,
genres: list[str],
description: str = "",
vote_average: float | None = None,
vote_count: int | None = None,
release_date: str | None = None,
**dummy,
):
db_movie = models.Movie(
title=title,
genres=genres,
description=description,
vote_average=vote_average,
vote_count=vote_count,
release_date=release_date,
)
db.add(db_movie)
if not batch_mode:
db.commit()
db.refresh(db_movie)
return db_movie
def get_movie_by_name(db: Session, name: str = ""):
db_movie = db.query(models.Movie).filter(models.Movie.name == name)
return db_movie.all()
def get_all_movies(db: Session, offset: int | None = None, limit: int | None = None):
db_movie = db.query(models.Movie)
if offset is not None:
db_movie = db_movie.offset(offset)
if limit is not None:
db_movie = db_movie.limit(limit)
return db_movie
def get_movie_by_id(db: Session, id_: str = ""):
try:
id_ = int(id_)
except ValueError:
pass
try:
db_movie = db.query(models.Movie).filter(models.Movie.id == id_).one()
except sqlalchemy.exc.NoResultFound:
raise LookupError
return db_movie
def delete_movie_by_id(db: Session, id_: str = ""):
movie = get_movie_by_id(db, id_)
db.delete(movie)
db.commit()
def update_movie(db: Session, id_: str, **payload):
movie = get_movie_by_id(db, id_)
for name, value in payload.items():
try:
movie.__mapper__.attrs[name]
except KeyError:
raise ValueError(f"Bad attribute {name}")
setattr(movie, name, value)
db.add(movie)
db.commit()
return movie