NiH (health research)¶
Data collection and processing pipeline of NiH data, principally for the healthMosaic platform.
Root Task (HealthMosaic)¶
Luigi routine to collect NIH World RePORTER data via the World ExPORTER data dump. The routine transfers the data into the MySQL database before processing and indexing the data to ElasticSearch.
-
class
RootTask
(*args, **kwargs)[source]¶ Bases:
luigi.task.WrapperTask
A dummy root task, which collects the database configurations and executes the central task.
Parameters: - date (datetime) – Date used to label the outputs
- db_config_path (str) – Path to the MySQL database configuration
- production (bool) – Flag indicating whether running in testing mode (False, default), or production mode (True).
-
date
= <luigi.parameter.DateParameter object>¶
-
db_config_path
= <luigi.parameter.Parameter object>¶
-
production
= <luigi.parameter.BoolParameter object>¶
-
drop_and_recreate
= <luigi.parameter.BoolParameter object>¶
Data collection¶
Luigi routine to collect NIH World RePORTER data via the World ExPORTER data dump.
-
class
CollectTask
(*args, **kwargs)[source]¶ Bases:
nesta.core.luigihacks.autobatch.AutoBatchTask
Scrape CSVs from the World ExPORTER site and dump the data in the MySQL server.
Parameters: - date (datetime) – Datetime used to label the outputs
- _routine_id (str) – String used to label the AWS task
- db_config_path – (str) The output database configuration
-
date
= <luigi.parameter.DateParameter object>¶
-
db_config_path
= <luigi.parameter.Parameter object>¶
Pipe to Elasticsearch¶
Transfers the data from the MySQL database to to ElasticSearch.
-
class
ProcessTask
(*args, **kwargs)[source]¶ Bases:
nesta.core.luigihacks.autobatch.AutoBatchTask
A dummy root task, which collects the database configurations and executes the central task.
Parameters: - date (str) – Date used to label the outputs
- _routine_id (str) – String used to label the AWS task
- db_config_path (str) – Path to the MySQL database configuration
-
date
= <luigi.parameter.DateParameter object>¶
-
db_config_path
= <luigi.parameter.Parameter object>¶
-
drop_and_recreate
= <luigi.parameter.BoolParameter object>¶
-
batch_limits
(query, batch_size)[source]¶ Determines first and last ids for a batch.
Parameters: - query (object) – orm query object
- batch_size (int) – rows of data in a batch
Returns: first (int), last (int) application_ids
-
prepare
()[source]¶ You should implement a method which returns a
list
ofdict
, where eachdict
corresponds to inputs to the batchable. Each row of the output must at least contain the following keys:- done (bool): indicating whether the job has already been finished.
- outinfo (str): Text indicating e.g. the location of the output, for use in the batch job and for combine method
Returns: list
ofdict
-
combine
(job_params)[source]¶ You should implement a method which collects the outputs specified by the outinfo key of
job_params
, which is the output from theprepare
method. This method should finally write to theluigi.Target
output.Parameters: job_params ( list
ofdict
) – The batchable job parameters, as returned from theprepare
method.
-
class
ProcessRootTask
(*args, **kwargs)[source]¶ Bases:
luigi.task.WrapperTask
A dummy root task, which collects the database configurations and executes the central task.
Parameters: - date (datetime) – Date used to label the outputs
- db_config_path (str) – Path to the MySQL database configuration
- production (bool) – Flag indicating whether running in testing mode (False, default), or production mode (True).
-
date
= <luigi.parameter.DateParameter object>¶
-
db_config_path
= <luigi.parameter.Parameter object>¶
-
production
= <luigi.parameter.BoolParameter object>¶
-
drop_and_recreate
= <luigi.parameter.BoolParameter object>¶
Assign MeSH terms to abstracts¶
Assign MeSH terms to projects which have abstracts (note: not all projects have abstracts).
-
class
AbstractsMeshTask
(*args, **kwargs)[source]¶ Bases:
nesta.core.luigihacks.autobatch.AutoBatchTask
Collects and combines Mesh terms from S3, Abstracts from MYSQL and projects in Elasticsearch.
Parameters: - date (str) – Date used to label the outputs
- _routine_id (str) – String used to label the AWS task
- db_config_path (str) – Path to the MySQL database configuration
-
date
= <luigi.parameter.DateParameter object>¶
-
db_config_path
= <luigi.parameter.Parameter object>¶
-
drop_and_recreate
= <luigi.parameter.BoolParameter object>¶
-
static
get_abstract_file_keys
(bucket, key_prefix)[source]¶ Retrieves keys to meshed files from s3.
Parameters: - bucket (str) – s3 bucket
- key_prefix (str) – prefix to identify the files, ie the folder and start of a filename
Returns: keys of the files
Return type: (set of str)
-
done_check
(es_client, index, doc_type, key)[source]¶ Checks elasticsearch for mesh terms in the first and last documents in the batch.
Parameters: - es_client (object) – instantiated elasticsearch client
- index (str) – name of the index
- doc_type (str) – name of the document type
- key (str) – filepath in s3
Returns: True if both existing, otherwise False
Return type: (bool)
-
prepare
()[source]¶ You should implement a method which returns a
list
ofdict
, where eachdict
corresponds to inputs to the batchable. Each row of the output must at least contain the following keys:- done (bool): indicating whether the job has already been finished.
- outinfo (str): Text indicating e.g. the location of the output, for use in the batch job and for combine method
Returns: list
ofdict
-
combine
(job_params)[source]¶ You should implement a method which collects the outputs specified by the outinfo key of
job_params
, which is the output from theprepare
method. This method should finally write to theluigi.Target
output.Parameters: job_params ( list
ofdict
) – The batchable job parameters, as returned from theprepare
method.
Deduplication of near duplicates¶
Remove nears duplicates of projects from the data. Numeric fields (such as funding) are aggregated together.
-
class
DedupeTask
(*args, **kwargs)[source]¶ Bases:
nesta.core.luigihacks.autobatch.AutoBatchTask
-
date
= <luigi.parameter.DateParameter object>¶
-
routine_id
= <luigi.parameter.Parameter object>¶
-
intermediate_bucket
= <luigi.parameter.Parameter object>¶
-
db_config_path
= <luigi.parameter.Parameter object>¶
-
process_batch_size
= <luigi.parameter.IntParameter object>¶
-
drop_and_recreate
= <luigi.parameter.BoolParameter object>¶
-
requires
()[source]¶ The Tasks that this Task depends on.
A Task will only run if all of the Tasks that it requires are completed. If your Task does not require any other Tasks, then you don’t need to override this method. Otherwise, a subclass can override this method to return a single Task, a list of Task instances, or a dict whose values are Task instances.
See Task.requires
-
prepare
()[source]¶ You should implement a method which returns a
list
ofdict
, where eachdict
corresponds to inputs to the batchable. Each row of the output must at least contain the following keys:- done (bool): indicating whether the job has already been finished.
- outinfo (str): Text indicating e.g. the location of the output, for use in the batch job and for combine method
Returns: list
ofdict
-
-
class
NiHLolveltyRootTask
(*args, **kwargs)[source]¶ Bases:
luigi.task.WrapperTask
-
production
= <luigi.parameter.BoolParameter object>¶
-
index
= <luigi.parameter.Parameter object>¶
-
date
= <luigi.parameter.DateParameter object>¶
-
requires
()[source]¶ The Tasks that this Task depends on.
A Task will only run if all of the Tasks that it requires are completed. If your Task does not require any other Tasks, then you don’t need to override this method. Otherwise, a subclass can override this method to return a single Task, a list of Task instances, or a dict whose values are Task instances.
See Task.requires
-