Crunchbase (private sector companies)

NB: The Crunchbase pipeline may not work until this issue has been resolved.

Data collection and processing pipeline of Crunchbase data, principally for the healthMosaic platform.

Root task (HealthMosaic)

Luigi routine to collect all data from the Crunchbase data dump and load it to MySQL, pipe to Elasticsearch, label projects as being health-related, assign mesh terms and deduplicate.

class RootTask(*args, **kwargs)[source]

Bases: luigi.task.WrapperTask

A dummy root task, which collects the database configurations and executes the central task.

Parameters:
  • date (datetime) – Date used to label the outputs
  • db_config_path (str) – Path to the MySQL database configuration
  • production (bool) – Flag indicating whether running in testing mode (False, default), or production mode (True).
date = <luigi.parameter.DateParameter object>
drop_and_recreate = <luigi.parameter.BoolParameter object>
production = <luigi.parameter.BoolParameter object>
insert_batch_size = <luigi.parameter.IntParameter object>
requires()[source]

Collects the database configurations and executes the central task.

Get organisations

Luigi routine to collect organisations from Crunchbase data exports and load the data into MySQL.

class OrgCollectTask(*args, **kwargs)[source]

Bases: luigi.task.Task

Download tar file of Organization csvs and load them into the MySQL server.

Parameters:
  • _routine_id (str) – String used to label the AWS task
  • db_config_path – (str) The output database configuration
date = <luigi.parameter.DateParameter object>
test = <luigi.parameter.BoolParameter object>
insert_batch_size = <luigi.parameter.IntParameter object>
db_config_env = <luigi.parameter.Parameter object>
output()[source]

Points to the output database engine

run()[source]

Collect and process organizations, categories and long descriptions.

Non-organisation collection

Luigi routine to collect non-organisation Crunchbase data exports and load the data into MySQL.

Organizations, category_groups, org_parents and organization_descriptions should have already been processed; this task picks up all other files to be imported.

class NonOrgCollectTask(*args, **kwargs)[source]

Bases: nesta.core.luigihacks.autobatch.AutoBatchTask

Download tar file of csvs and load them into the MySQL server.

Parameters:
  • date (datetime) – Datetime used to label the outputs
  • _routine_id (str) – String used to label the AWS task
  • db_config_path – (str) The output database configuration
date = <luigi.parameter.DateParameter object>
db_config_path = <luigi.parameter.Parameter object>
insert_batch_size = <luigi.parameter.IntParameter object>
requires()[source]

The Tasks that this Task depends on.

A Task will only run if all of the Tasks that it requires are completed. If your Task does not require any other Tasks, then you don’t need to override this method. Otherwise, a subclass can override this method to return a single Task, a list of Task instances, or a dict whose values are Task instances.

See Task.requires

output()[source]

Points to the output database engine

prepare()[source]

Prepare the batch job parameters

combine(job_params)[source]

Touch the checkpoint

Geocoding

Luigi routines to geocode the Organization, FundingRound, Investor, Ipo and People tables.

class OrgGeocodeTask(*args, **kwargs)[source]

Bases: nesta.core.luigihacks.batchgeocode.GeocodeBatchTask

date = <luigi.parameter.DateParameter object>
insert_batch_size = <luigi.parameter.IntParameter object>
output()[source]

Points to the output database engine

requires()[source]

Collects the database configurations and executes the central task.

combine(job_params)[source]

Touch the checkpoint

class FundingRoundGeocodeTask(*args, **kwargs)[source]

Bases: nesta.core.luigihacks.batchgeocode.GeocodeBatchTask

date = <luigi.parameter.DateParameter object>
insert_batch_size = <luigi.parameter.IntParameter object>
output()[source]

Points to the output database engine

requires()[source]

Collects the database configurations and executes the central task.

combine(job_params)[source]

Touch the checkpoint

class InvestorGeocodeTask(*args, **kwargs)[source]

Bases: nesta.core.luigihacks.batchgeocode.GeocodeBatchTask

date = <luigi.parameter.DateParameter object>
insert_batch_size = <luigi.parameter.IntParameter object>
output()[source]

Points to the output database engine

requires()[source]

Collects the database configurations and executes the central task.

combine(job_params)[source]

Touch the checkpoint

class IpoGeocodeTask(*args, **kwargs)[source]

Bases: nesta.core.luigihacks.batchgeocode.GeocodeBatchTask

date = <luigi.parameter.DateParameter object>
insert_batch_size = <luigi.parameter.IntParameter object>
output()[source]

Points to the output database engine

requires()[source]

Collects the database configurations and executes the central task.

combine(job_params)[source]

Touch the checkpoint

class PeopleGeocodeTask(*args, **kwargs)[source]

Bases: nesta.core.luigihacks.batchgeocode.GeocodeBatchTask

date = <luigi.parameter.DateParameter object>
insert_batch_size = <luigi.parameter.IntParameter object>
output()[source]

Points to the output database engine

requires()[source]

Collects the database configurations and executes the central task.

combine(job_params)[source]

Touch the checkpoint

Organisation health labeling

Luigi routine to determine if crunchbase orgs are involved in health and apply a label to the data in MYSQL.

class HealthLabelTask(*args, **kwargs)[source]

Bases: luigi.task.Task

Apply health labels to the organisation data in MYSQL.

Parameters:
  • date (datetime) – Datetime used to label the outputs
  • _routine_id (str) – String used to label the AWS task
  • test (bool) – True if in test mode
  • insert_batch_size (int) – Number of rows to insert into the db in a batch
  • db_config_env (str) – The output database envariable
  • bucket (str) – S3 bucket where the models are stored
  • vectoriser_key (str) – S3 key for the vectoriser model
  • classifier_key (str) – S3 key for the classifier model
date = <luigi.parameter.DateParameter object>
test = <luigi.parameter.BoolParameter object>
insert_batch_size = <luigi.parameter.IntParameter object>
db_config_env = <luigi.parameter.Parameter object>
bucket = <luigi.parameter.Parameter object>
vectoriser_key = <luigi.parameter.Parameter object>
classifier_key = <luigi.parameter.Parameter object>
requires()[source]

The Tasks that this Task depends on.

A Task will only run if all of the Tasks that it requires are completed. If your Task does not require any other Tasks, then you don’t need to override this method. Otherwise, a subclass can override this method to return a single Task, a list of Task instances, or a dict whose values are Task instances.

See Task.requires

output()[source]

Points to the output database engine

run()[source]

Apply health labels using model.

Merge in parent organisations

This task picks up the missed org_parents table from the Crumchbase data dump and combines this with organizations.

class ParentIdCollectTask(*args, **kwargs)[source]

Bases: luigi.task.Task

Download tar file of csvs and append parent_ids to the organizations table.

Parameters:
  • date (datetime) – Datetime used to label the outputs
  • _routine_id (str) – String used to label the AWS task
  • db_config_env (str) – The output database envariable
  • db_config_path (str) – The output database configuration
  • insert_batch_size (int) – number of rows to insert into the db in a batch
date = <luigi.parameter.DateParameter object>
test = <luigi.parameter.BoolParameter object>
db_config_env = <luigi.parameter.Parameter object>
db_config_path = <luigi.parameter.Parameter object>
insert_batch_size = <luigi.parameter.IntParameter object>
requires()[source]

The Tasks that this Task depends on.

A Task will only run if all of the Tasks that it requires are completed. If your Task does not require any other Tasks, then you don’t need to override this method. Otherwise, a subclass can override this method to return a single Task, a list of Task instances, or a dict whose values are Task instances.

See Task.requires

output()[source]

Points to the output database engine

run()[source]

The task run method, to be overridden in a subclass.

See Task.run

Apply mesh terms

Collects and combines Mesh terms from S3 and descriptions from MySQL.

class DescriptionMeshTask(*args, **kwargs)[source]

Bases: luigi.task.Task

Collects and combines Mesh terms from S3, and descriptions from MYSQL.

Parameters:
  • date (str) – Date used to label the outputs
  • _routine_id (str) – String used to label the AWS task
  • db_config_path (str) – Path to the MySQL database configuration
date = <luigi.parameter.DateParameter object>
test = <luigi.parameter.BoolParameter object>
db_config_env = <luigi.parameter.Parameter object>
db_config_path = <luigi.parameter.Parameter object>
insert_batch_size = <luigi.parameter.IntParameter object>
requires()[source]

Collects the configurations and executes the previous task.

output()[source]

Points to the output database engine

run()[source]

The task run method, to be overridden in a subclass.

See Task.run

Pipe data to Elasticsearch

Luigi routine to load the Crunchbase data from MYSQL into Elasticsearch.

Not all data is copied: organizations, categories and locations only. The data is flattened and it is all stored in the same index.

class CrunchbaseSql2EsTask(*args, **kwargs)[source]

Bases: nesta.core.luigihacks.autobatch.AutoBatchTask

Download tar file of csvs and load them into the MySQL server.

Parameters:
  • date (datetime) – Datetime used to label the outputs
  • _routine_id (str) – String used to label the AWS task
  • db_config_env (str) – The output database envariable
  • process_batch_size (int) – Number of rows to process in a batch
  • insert_batch_size (int) – Number of rows to insert into the db in a batch
  • intermediate_bucket (str) – S3 bucket where the list of ids for each batch are written
date = <luigi.parameter.DateParameter object>
db_config_env = <luigi.parameter.Parameter object>
process_batch_size = <luigi.parameter.IntParameter object>
insert_batch_size = <luigi.parameter.IntParameter object>
intermediate_bucket = <luigi.parameter.Parameter object>
drop_and_recreate = <luigi.parameter.BoolParameter object>
requires()[source]

The Tasks that this Task depends on.

A Task will only run if all of the Tasks that it requires are completed. If your Task does not require any other Tasks, then you don’t need to override this method. Otherwise, a subclass can override this method to return a single Task, a list of Task instances, or a dict whose values are Task instances.

See Task.requires

output()[source]

Points to the output database engine

prepare()[source]

You should implement a method which returns a list of dict, where each dict corresponds to inputs to the batchable. Each row of the output must at least contain the following keys:

  • done (bool): indicating whether the job has already been finished.
  • outinfo (str): Text indicating e.g. the location of the output, for use in the batch job and for combine method
Returns:list of dict
combine(job_params)[source]

Touch the checkpoint

Novelty score (lolvelty)

Apply “lolvelty” score to Crunchbase data (in Elasticsearch). Note: this is a slow procedure that is applied on a document-by-document basis.

class CrunchbaseLolveltyRootTask(*args, **kwargs)[source]

Bases: luigi.task.WrapperTask

Apply Lolvelty score to crunchbase data.

Parameters:
  • production (bool) – Running in full production mode?
  • index (str) – Elasticsearch index to append Lolvelty score to.
  • date (datetime) – Date for timestamping this routine.
production = <luigi.parameter.BoolParameter object>
index = <luigi.parameter.Parameter object>
date = <luigi.parameter.DateParameter object>
requires()[source]

The Tasks that this Task depends on.

A Task will only run if all of the Tasks that it requires are completed. If your Task does not require any other Tasks, then you don’t need to override this method. Otherwise, a subclass can override this method to return a single Task, a list of Task instances, or a dict whose values are Task instances.

See Task.requires