from sqlalchemy import create_engine from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker SQLALCHEMY_DATABASE_URL = "sqlite:///./db_dev.sqlite3" # to be configure # SQLALCHEMY_DATABASE_URL = "postgresql://user:password@postgresserver/db" engine = create_engine( SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False} ) SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) Base = declarative_base() def create_db(): import models models.Base.metadata.create_all(bind=engine) def _json_quotes(s): return s.replace("'", '"') def adapt_movie_data(data_in: dict): import copy, json data_out = copy.deepcopy(data_in) data_out["genres"] = [ genre["name"] for genre in json.loads(_json_quotes(data_in["genres"])) ] data_out["description"] = data_in["overview"] return data_out def fill_db( db=SessionLocal(), movie_input_file: str = "input_data/movies_metadata.csv", sample_rate=100, ): import crud import csv import random page_size = 1_00 def compute_rate(*dummy): if 0 < sample_rate < 100: return random.random() < (sample_rate / 100) return True with open(movie_input_file) as csvfile: for count, movie_data in enumerate( filter(compute_rate, csv.DictReader(csvfile)), start=1 ): if count % page_size == 0: db.commit() adapted_data = adapt_movie_data(movie_data) if not adapted_data["title"]: print(count, "should be fixed") print(adapted_data) continue crud.create_movie(db, batch_mode=True, **adapted_data) db.commit() if __name__ == "__main__": create_db() fill_db(sample_rate=1)