Source code for core.batchables.health_data.nih_collect_data.run

"""
run.py (nih_collect_data)
-------------------------

Collect NiH table from the official data dump,
based on the name of the table. The data
is piped into the MySQL database.
"""

from sqlalchemy.orm import sessionmaker
from sqlalchemy import and_

from nesta.core.orms.orm_utils import get_mysql_engine
from nesta.core.orms.orm_utils import get_class_by_tablename
from nesta.core.orms.orm_utils import try_until_allowed
from nesta.core.orms.orm_utils import exists

from nesta.core.orms.nih_orm import Base
from nesta.packages.health_data.collect_nih import iterrows
from nesta.core.luigihacks.s3 import parse_s3_path

import os
import boto3
from urllib.parse import urlsplit


[docs]def run(): table_name = os.environ["BATCHPAR_table_name"] url = os.environ["BATCHPAR_url"] db_name = os.environ["BATCHPAR_db_name"] s3_path = os.environ["BATCHPAR_outinfo"] # Setup the database connectors engine = get_mysql_engine("BATCHPAR_config", "mysqldb", db_name) try_until_allowed(Base.metadata.create_all, engine) _class = get_class_by_tablename(Base, table_name) Session = try_until_allowed(sessionmaker, engine) session = try_until_allowed(Session) # Commit the data all_pks = set() objs = [] pkey_cols = _class.__table__.primary_key.columns for row in iterrows(url): if len(row) == 0: continue if session.query(exists(_class, **row)).scalar(): continue pk = tuple([row[pkey.name] for pkey in pkey_cols]) if pk in all_pks: continue all_pks.add(pk) objs.append(_class(**row)) session.bulk_save_objects(objs) session.commit() session.close() # Mark the task as done s3 = boto3.resource('s3') s3_obj = s3.Object(*parse_s3_path(s3_path)) s3_obj.put(Body="")
if __name__ == "__main__": run()