Source code for nesta.core.routines.crunchbase.crunchbase_root_task

'''
Root task (HealthMosaic)
========================


Luigi routine to collect all data from the Crunchbase data dump and load it to MySQL, pipe to Elasticsearch, label projects as being health-related, assign mesh terms and deduplicate.
'''

import luigi
import datetime
import logging

from nesta.core.routines.crunchbase.crunchbase_elasticsearch_task import CrunchbaseSql2EsTask
from nesta.core.luigihacks.misctools import find_filepath_from_pathstub as f3p


[docs]class RootTask(luigi.WrapperTask): '''A dummy root task, which collects the database configurations and executes the central task. Args: date (datetime): Date used to label the outputs db_config_path (str): Path to the MySQL database configuration production (bool): Flag indicating whether running in testing mode (False, default), or production mode (True). ''' date = luigi.DateParameter(default=datetime.date.today()) drop_and_recreate = luigi.BoolParameter(default=False) production = luigi.BoolParameter(default=False) insert_batch_size = luigi.IntParameter(default=500)
[docs] def requires(self): '''Collects the database configurations and executes the central task.''' _routine_id = "{}-{}".format(self.date, self.production) logging.getLogger().setLevel(logging.INFO) yield CrunchbaseSql2EsTask(date=self.date, _routine_id=_routine_id, test=not self.production, drop_and_recreate=self.drop_and_recreate, db_config_env="MYSQLDB", insert_batch_size=self.insert_batch_size, process_batch_size=50000, intermediate_bucket='nesta-production-intermediate', batchable=f3p("core/batchables/crunchbase/crunchbase_elasticsearch"), env_files=[f3p("nesta/"), f3p("config/mysqldb.config"), f3p("schema_transformations/crunchbase_organisation_members.json"), f3p("config/elasticsearch.config")], job_def="py36_amzn1_image", job_name=f"CrunchBaseElasticsearchTask-{_routine_id}", job_queue="HighPriority", region_name="eu-west-2", poll_time=10, memory=2048, max_live_jobs=100)