Source code for nesta.core.routines.cordis.collect_cordis_task

"""
H2020 and FP7 Data Collection
=============================

Collection of H2020 and FP7 projects, organisations, publications and topics
from the unofficial API.
"""

from nesta.core.luigihacks.autobatch import AutoBatchTask
from nesta.core.luigihacks.mysqldb import MySqlTarget
from nesta.core.luigihacks.misctools import get_config
from nesta.core.luigihacks.luigi_logging import set_log_level
from nesta.core.orms.orm_utils import db_session
from nesta.core.orms.orm_utils import get_mysql_engine
from nesta.core.orms.cordis_orm import Base
from nesta.core.orms.cordis_orm import Project
from nesta.packages.misc_utils.batches import split_batches
from nesta.packages.misc_utils.batches import put_s3_batch
from nesta.packages.cordis.cordis_api import get_framework_ids
from nesta.core.luigihacks.misctools import find_filepath_from_pathstub as f3p

import luigi
from datetime import datetime as dt

S3BUCKET = "nesta-production-intermediate"


[docs]class CordisCollectTask(AutoBatchTask): process_batch_size = luigi.IntParameter(default=500) intermediate_bucket = luigi.Parameter(default=S3BUCKET) db_config_path = luigi.Parameter(default=f3p('config/mysqldb.config')) db_config_env = luigi.Parameter(default='MYSQLDB') routine_id = luigi.Parameter()
[docs] def output(self): '''Points to the output database engine''' db_conf = get_config(self.db_config_path, "mysqldb") db_conf["database"] = 'dev' if self.test else 'production' db_conf["table"] = "CordisCollect <dummy>" # not a real table update_id = self.job_name return MySqlTarget(update_id=update_id, **db_conf)
[docs] def prepare(self): if self.test: self.process_batch_size = 100 # MySQL setup database = 'dev' if self.test else 'production' engine = get_mysql_engine(self.db_config_env, 'mysqldb', database) # Subtract off all done ids Base.metadata.create_all(engine) with db_session(engine) as session: result = session.query(Project.rcn).all() done_rcn = {r[0] for r in result} # Get all possible ids (or "RCN" in Cordis-speak) nrows = 1000 if self.test else None all_rcn = set(get_framework_ids('fp7', nrows=nrows) + get_framework_ids('h2020', nrows=nrows)) all_rcn = all_rcn - done_rcn # Generate the job params batches = split_batches(all_rcn, self.process_batch_size) params = [{"batch_file": put_s3_batch(batch, self.intermediate_bucket, self.routine_id), "config": 'mysqldb.config', "db_name": database, "bucket": self.intermediate_bucket, "outinfo": 'dummy', "done": False, 'test': self.test} for batch in batches] return params
[docs] def combine(self, job_params): self.output().touch()
[docs]class RootTask(luigi.WrapperTask): production = luigi.BoolParameter(default=False) date = luigi.DateParameter(default=dt.now()) set_log_level(True)
[docs] def requires(self): batchable = f3p("batchables/cordis/cordis_api") env_files = [f3p("nesta"), f3p("config/mysqldb.config")] routine_id = f'Cordis-{self.date}-{self.production}' return CordisCollectTask(routine_id=routine_id, test=not self.production, batchable=batchable, env_files=env_files, job_def="py36_amzn1_image", job_name=f"Collect-{routine_id}", job_queue="HighPriority", region_name="eu-west-2", poll_time=10, memory=2048, max_live_jobs=20)