NiH (health research)

Data collection and processing pipeline of NiH data, principally for the healthMosaic platform.

Root Task (HealthMosaic)

Luigi routine to collect NIH World RePORTER data via the World ExPORTER data dump. The routine transfers the data into the MySQL database before processing and indexing the data to ElasticSearch.

class RootTask(*args, **kwargs)[source]

Bases: luigi.task.WrapperTask

A dummy root task, which collects the database configurations and executes the central task.

Parameters:
  • date (datetime) – Date used to label the outputs
  • db_config_path (str) – Path to the MySQL database configuration
  • production (bool) – Flag indicating whether running in testing mode (False, default), or production mode (True).
date = <luigi.parameter.DateParameter object>
db_config_path = <luigi.parameter.Parameter object>
production = <luigi.parameter.BoolParameter object>
drop_and_recreate = <luigi.parameter.BoolParameter object>
requires()[source]

Collects the database configurations and executes the central task.

Data collection

Luigi routine to collect NIH World RePORTER data via the World ExPORTER data dump.

exists(_class, **kwargs)[source]
class CollectTask(*args, **kwargs)[source]

Bases: nesta.core.luigihacks.autobatch.AutoBatchTask

Scrape CSVs from the World ExPORTER site and dump the data in the MySQL server.

Parameters:
  • date (datetime) – Datetime used to label the outputs
  • _routine_id (str) – String used to label the AWS task
  • db_config_path – (str) The output database configuration
date = <luigi.parameter.DateParameter object>
db_config_path = <luigi.parameter.Parameter object>
output()[source]

Points to the output database engine

prepare()[source]

Prepare the batch job parameters

combine(job_params)[source]

Touch the checkpoint

Pipe to Elasticsearch

Transfers the data from the MySQL database to to ElasticSearch.

class ProcessTask(*args, **kwargs)[source]

Bases: nesta.core.luigihacks.autobatch.AutoBatchTask

A dummy root task, which collects the database configurations and executes the central task.

Parameters:
  • date (str) – Date used to label the outputs
  • _routine_id (str) – String used to label the AWS task
  • db_config_path (str) – Path to the MySQL database configuration
date = <luigi.parameter.DateParameter object>
db_config_path = <luigi.parameter.Parameter object>
drop_and_recreate = <luigi.parameter.BoolParameter object>
requires()[source]

Collects the database configurations and executes the central task.

output()[source]

Points to the input database target

batch_limits(query, batch_size)[source]

Determines first and last ids for a batch.

Parameters:
  • query (object) – orm query object
  • batch_size (int) – rows of data in a batch
Returns:

first (int), last (int) application_ids

prepare()[source]

You should implement a method which returns a list of dict, where each dict corresponds to inputs to the batchable. Each row of the output must at least contain the following keys:

  • done (bool): indicating whether the job has already been finished.
  • outinfo (str): Text indicating e.g. the location of the output, for use in the batch job and for combine method
Returns:list of dict
combine(job_params)[source]

You should implement a method which collects the outputs specified by the outinfo key of job_params, which is the output from the prepare method. This method should finally write to the luigi.Target output.

Parameters:job_params (list of dict) – The batchable job parameters, as returned from the prepare method.
class ProcessRootTask(*args, **kwargs)[source]

Bases: luigi.task.WrapperTask

A dummy root task, which collects the database configurations and executes the central task.

Parameters:
  • date (datetime) – Date used to label the outputs
  • db_config_path (str) – Path to the MySQL database configuration
  • production (bool) – Flag indicating whether running in testing mode (False, default), or production mode (True).
date = <luigi.parameter.DateParameter object>
db_config_path = <luigi.parameter.Parameter object>
production = <luigi.parameter.BoolParameter object>
drop_and_recreate = <luigi.parameter.BoolParameter object>
requires()[source]

Collects the database configurations and executes the central task.

Assign MeSH terms to abstracts

Assign MeSH terms to projects which have abstracts (note: not all projects have abstracts).

split_mesh_file_key(key)[source]
subset_keys(es, es_config, keys)[source]
class AbstractsMeshTask(*args, **kwargs)[source]

Bases: nesta.core.luigihacks.autobatch.AutoBatchTask

Collects and combines Mesh terms from S3, Abstracts from MYSQL and projects in Elasticsearch.

Parameters:
  • date (str) – Date used to label the outputs
  • _routine_id (str) – String used to label the AWS task
  • db_config_path (str) – Path to the MySQL database configuration
date = <luigi.parameter.DateParameter object>
db_config_path = <luigi.parameter.Parameter object>
drop_and_recreate = <luigi.parameter.BoolParameter object>
requires()[source]

Collects the configurations and executes the previous task.

output()[source]

Points to the input database target

static get_abstract_file_keys(bucket, key_prefix)[source]

Retrieves keys to meshed files from s3.

Parameters:
  • bucket (str) – s3 bucket
  • key_prefix (str) – prefix to identify the files, ie the folder and start of a filename
Returns:

keys of the files

Return type:

(set of str)

done_check(es_client, index, doc_type, key)[source]

Checks elasticsearch for mesh terms in the first and last documents in the batch.

Parameters:
  • es_client (object) – instantiated elasticsearch client
  • index (str) – name of the index
  • doc_type (str) – name of the document type
  • key (str) – filepath in s3
Returns:

True if both existing, otherwise False

Return type:

(bool)

prepare()[source]

You should implement a method which returns a list of dict, where each dict corresponds to inputs to the batchable. Each row of the output must at least contain the following keys:

  • done (bool): indicating whether the job has already been finished.
  • outinfo (str): Text indicating e.g. the location of the output, for use in the batch job and for combine method
Returns:list of dict
combine(job_params)[source]

You should implement a method which collects the outputs specified by the outinfo key of job_params, which is the output from the prepare method. This method should finally write to the luigi.Target output.

Parameters:job_params (list of dict) – The batchable job parameters, as returned from the prepare method.

Deduplication of near duplicates

Remove nears duplicates of projects from the data. Numeric fields (such as funding) are aggregated together.

class DedupeTask(*args, **kwargs)[source]

Bases: nesta.core.luigihacks.autobatch.AutoBatchTask

date = <luigi.parameter.DateParameter object>
routine_id = <luigi.parameter.Parameter object>
intermediate_bucket = <luigi.parameter.Parameter object>
db_config_path = <luigi.parameter.Parameter object>
process_batch_size = <luigi.parameter.IntParameter object>
drop_and_recreate = <luigi.parameter.BoolParameter object>
output()[source]

Points to the output database engine

requires()[source]

The Tasks that this Task depends on.

A Task will only run if all of the Tasks that it requires are completed. If your Task does not require any other Tasks, then you don’t need to override this method. Otherwise, a subclass can override this method to return a single Task, a list of Task instances, or a dict whose values are Task instances.

See Task.requires

prepare()[source]

You should implement a method which returns a list of dict, where each dict corresponds to inputs to the batchable. Each row of the output must at least contain the following keys:

  • done (bool): indicating whether the job has already been finished.
  • outinfo (str): Text indicating e.g. the location of the output, for use in the batch job and for combine method
Returns:list of dict
combine(job_params)[source]

Touch the checkpoint

class NiHLolveltyRootTask(*args, **kwargs)[source]

Bases: luigi.task.WrapperTask

production = <luigi.parameter.BoolParameter object>
index = <luigi.parameter.Parameter object>
date = <luigi.parameter.DateParameter object>
requires()[source]

The Tasks that this Task depends on.

A Task will only run if all of the Tasks that it requires are completed. If your Task does not require any other Tasks, then you don’t need to override this method. Otherwise, a subclass can override this method to return a single Task, a list of Task instances, or a dict whose values are Task instances.

See Task.requires