Source code for nesta.core.routines.health_data.nih_data.nih_root_task

'''
Root Task (HealthMosaic)
========================

Luigi routine to collect NIH World RePORTER data
via the World ExPORTER data dump. The routine
transfers the data into the MySQL database before
processing and indexing the data to ElasticSearch.
'''

import luigi
import datetime
import logging
from nesta.core.luigihacks.misctools import find_filepath_from_pathstub as f3p
import os

from nesta.core.routines.health_data.nih_data.nih_dedupe_task import DedupeTask


[docs]class RootTask(luigi.WrapperTask): '''A dummy root task, which collects the database configurations and executes the central task. Args: date (datetime): Date used to label the outputs db_config_path (str): Path to the MySQL database configuration production (bool): Flag indicating whether running in testing mode (False, default), or production mode (True). ''' date = luigi.DateParameter(default=datetime.date.today()) db_config_path = luigi.Parameter(default="mysqldb.config") production = luigi.BoolParameter(default=False) drop_and_recreate = luigi.BoolParameter(default=False)
[docs] def requires(self): '''Collects the database configurations and executes the central task.''' _routine_id = "{}-{}".format(self.date, self.production) logging.getLogger().setLevel(logging.INFO) yield DedupeTask(date=self.date, drop_and_recreate=self.drop_and_recreate, routine_id=_routine_id, db_config_path=self.db_config_path, process_batch_size=5000, intermediate_bucket='nesta-production-intermediate', test=(not self.production), batchable=f3p("batchables/health_data/" "nih_dedupe"), env_files=[f3p("nesta/"), f3p("config/mysqldb.config"), f3p("config/elasticsearch.config"), f3p("nih.json")], job_def="py36_amzn1_image", job_name="NiHDedupeTask-%s" % _routine_id, job_queue="HighPriority", region_name="eu-west-2", poll_time=10, memory=1024, max_live_jobs=20)