Source code for nesta.core.routines.crunchbase.crunchbase_non_org_collect_task

'''
Non-organisation collection
===========================

Luigi routine to collect non-organisation Crunchbase data exports and load the data into MySQL.

Organizations, category_groups, org_parents and organization_descriptions should have
already been processed; this task picks up all other files to be imported.
'''

import boto3
import logging
import luigi

from nesta.packages.crunchbase.crunchbase_collect import get_csv_list
from nesta.core.luigihacks import autobatch, misctools
from nesta.core.luigihacks.mysqldb import MySqlTarget
from nesta.core.routines.crunchbase.crunchbase_org_collect_task import OrgCollectTask


S3 = boto3.resource('s3')
_BUCKET = S3.Bucket("nesta-production-intermediate")
DONE_KEYS = set(obj.key for obj in _BUCKET.objects.all())


[docs]class NonOrgCollectTask(autobatch.AutoBatchTask): '''Download tar file of csvs and load them into the MySQL server. Args: date (datetime): Datetime used to label the outputs _routine_id (str): String used to label the AWS task db_config_path: (str) The output database configuration ''' date = luigi.DateParameter() _routine_id = luigi.Parameter() db_config_path = luigi.Parameter() insert_batch_size = luigi.IntParameter(default=500)
[docs] def requires(self): yield OrgCollectTask(date=self.date, _routine_id=self._routine_id, test=self.test, insert_batch_size=self.insert_batch_size, db_config_env='MYSQLDB')
[docs] def output(self): '''Points to the output database engine''' db_config = misctools.get_config(self.db_config_path, "mysqldb") db_config["database"] = 'dev' if self.test else 'production' db_config["table"] = "Crunchbase <dummy>" # Note, not a real table update_id = "CrunchbaseCollectNonOrgData_{}".format(self.date) return MySqlTarget(update_id=update_id, **db_config)
[docs] def prepare(self): '''Prepare the batch job parameters''' tables = ['acquisitions', 'degrees', 'funding_rounds', 'funds', 'investment_partners', 'investments', 'investors', 'ipos', 'jobs', 'people' ] logging.info('Retrieving list of csvs in Crunchbase export') all_csvs = get_csv_list() logging.info(all_csvs) if not all(table in all_csvs for table in tables): raise ValueError("Crunchbase export is missing one or more required tables") db_name = 'dev' if self.test else 'production' job_params = [] for table in tables: key = f"{table}_{db_name}" done = key in DONE_KEYS params = {"table": table, "config": "mysqldb.config", "db_name": db_name, "batch_size": self.insert_batch_size, "outinfo": f"s3://nesta-production-intermediate/{key}", "test": self.test, "done": done} job_params.append(params) logging.info(params) return job_params
[docs] def combine(self, job_params): '''Touch the checkpoint''' self.output().touch()