Source code for nesta.core.routines.crunchbase.crunchbase_elasticsearch_task

'''
Pipe data to Elasticsearch
================================

Luigi routine to load the Crunchbase data from MYSQL into Elasticsearch.

Not all data is copied: organizations, categories and locations only. The data is
flattened and it is all stored in the same index.
'''

import boto3
from elasticsearch.helpers import scan
import logging
import luigi
import os

from nesta.core.routines.crunchbase.crunchbase_mesh_task import DescriptionMeshTask
from nesta.packages.crunchbase.crunchbase_collect import all_org_ids
from nesta.packages.misc_utils.batches import split_batches, put_s3_batch
from nesta.core.luigihacks import autobatch
from nesta.core.luigihacks.misctools import get_config
from nesta.core.luigihacks.mysqldb import MySqlTarget
from nesta.core.orms.orm_utils import get_mysql_engine
from nesta.core.orms.orm_utils import setup_es


S3 = boto3.resource('s3')
_BUCKET = S3.Bucket("nesta-production-intermediate")
DONE_KEYS = set(obj.key for obj in _BUCKET.objects.all())

[docs]class CrunchbaseSql2EsTask(autobatch.AutoBatchTask): '''Download tar file of csvs and load them into the MySQL server. Args: date (datetime): Datetime used to label the outputs _routine_id (str): String used to label the AWS task db_config_env (str): The output database envariable process_batch_size (int): Number of rows to process in a batch insert_batch_size (int): Number of rows to insert into the db in a batch intermediate_bucket (str): S3 bucket where the list of ids for each batch are written ''' date = luigi.DateParameter() _routine_id = luigi.Parameter() db_config_env = luigi.Parameter() process_batch_size = luigi.IntParameter(default=10000) insert_batch_size = luigi.IntParameter() intermediate_bucket = luigi.Parameter() drop_and_recreate = luigi.BoolParameter(default=False)
[docs] def requires(self): yield DescriptionMeshTask(date=self.date, _routine_id=self._routine_id, test=self.test, insert_batch_size=self.insert_batch_size, db_config_path=self.db_config_path, db_config_env=self.db_config_env)
[docs] def output(self): '''Points to the output database engine''' self.db_config_path = os.environ[self.db_config_env] db_config = get_config(self.db_config_path, "mysqldb") db_config["database"] = 'dev' if self.test else 'production' db_config["table"] = "Crunchbase to Elasticsearch <dummy>" # Note, not a real table update_id = "CrunchbaseToElasticsearch_{}".format(self.date) return MySqlTarget(update_id=update_id, **db_config)
[docs] def prepare(self): if self.test: self.process_batch_size = 1000 logging.warning("Batch size restricted to " f"{self.process_batch_size}" " while in test mode") # MySQL setup self.database = 'dev' if self.test else 'production' engine = get_mysql_engine(self.db_config_env, 'mysqldb', self.database) # Elasticsearch setup es_mode = 'dev' if self.test else 'prod' es, es_config = setup_es(es_mode, self.test, self.drop_and_recreate, dataset='crunchbase', aliases='health_scanner') # Get set of existing ids from elasticsearch via scroll scanner = scan(es, query={"_source": False}, index=es_config['index'], doc_type=es_config['type']) existing_ids = {s['_id'] for s in scanner} logging.info(f"Collected {len(existing_ids)} existing in " "Elasticsearch") # Get set of all organisations from mysql all_orgs = list(all_org_ids(engine)) logging.info(f"{len(all_orgs)} organisations in MySQL") # Remove previously processed orgs_to_process = list(org for org in all_orgs if org not in existing_ids) logging.info(f"{len(orgs_to_process)} to be processed") job_params = [] for count, batch in enumerate(split_batches(orgs_to_process, self.process_batch_size), 1): logging.info(f"Processing batch {count} with size {len(batch)}") # write batch of ids to s3 batch_file = put_s3_batch(batch, self.intermediate_bucket, 'crunchbase_to_es') params = { "batch_file": batch_file, "config": 'mysqldb.config', "db_name": self.database, "bucket": self.intermediate_bucket, "done": False, 'outinfo': es_config['host'], 'out_port': es_config['port'], 'out_index': es_config['index'], 'out_type': es_config['type'], 'aws_auth_region': es_config['region'], 'entity_type': 'company', "test": self.test } logging.info(params) job_params.append(params) if self.test and count > 1: logging.warning("Breaking after 2 batches while in " "test mode.") break logging.warning("Batch preparation completed, " f"with {len(job_params)} batches") return job_params
[docs] def combine(self, job_params): '''Touch the checkpoint''' self.output().touch()