Source code for nesta.core.routines.health_data.nih_data.nih_abstracts_mesh_task

'''
Assign MeSH terms to abstracts
==============================

Assign MeSH terms to projects which have abstracts (note: not all projects have abstracts).
'''

import boto3
import datetime
from elasticsearch import Elasticsearch
from elasticsearch.exceptions import NotFoundError
import logging
import luigi
import re

from nesta.core.orms.orm_utils import setup_es
from nesta.core.orms.orm_utils import get_es_ids
from nesta.core.routines.health_data.nih_data.nih_process_task import ProcessTask
from nesta.core.luigihacks import autobatch, misctools
from nesta.core.luigihacks.mysqldb import MySqlTarget
from nesta.core.luigihacks.misctools import find_filepath_from_pathstub as f3p

PATTERN = r'(\d+)-(\d+)\.out.txt$' # Match first & last idx  
[docs]def split_mesh_file_key(key): match = re.search(PATTERN, key) if match is None or match.groups() is None: raise ValueError("Could not extract start " f"and end doc_ids from {key}") return match.groups()
[docs]def subset_keys(es, es_config, keys): all_idxs = get_es_ids(es, es_config) _keys = set() for key in keys: if key in _keys: continue first_idx, last_idx = split_mesh_file_key(key) for idx in all_idxs: if (int(idx) >= int(first_idx) and int(idx) <= int(last_idx)): _keys.add(key) break return _keys
[docs]class AbstractsMeshTask(autobatch.AutoBatchTask): ''' Collects and combines Mesh terms from S3, Abstracts from MYSQL and projects in Elasticsearch. Args: date (str): Date used to label the outputs _routine_id (str): String used to label the AWS task db_config_path (str): Path to the MySQL database configuration ''' date = luigi.DateParameter() _routine_id = luigi.Parameter() db_config_path = luigi.Parameter() drop_and_recreate = luigi.BoolParameter(default=False)
[docs] def requires(self): '''Collects the configurations and executes the previous task.''' logging.getLogger().setLevel(logging.INFO) yield ProcessTask(date=self.date, drop_and_recreate=self.drop_and_recreate, _routine_id=self._routine_id, db_config_path=self.db_config_path, batchable=f3p("batchables/health_data/nih_process_data"), env_files=[f3p("nesta/"), f3p("config/mysqldb.config"), f3p("config/elasticsearch.config"), f3p("nih.json")], job_def=self.job_def, job_name="ProcessTask-%s" % self._routine_id, job_queue=self.job_queue, region_name=self.region_name, poll_time=10, test=self.test, memory=2048, max_live_jobs=2)
[docs] def output(self): '''Points to the input database target''' update_id = "NihAbstractMeshData-%s" % self._routine_id db_config = misctools.get_config("mysqldb.config", "mysqldb") db_config["database"] = "production" if not self.test else "dev" db_config["table"] = "NIH abstracts mesh DUMMY" # Note, not a real table return MySqlTarget(update_id=update_id, **db_config)
[docs] @staticmethod def get_abstract_file_keys(bucket, key_prefix): '''Retrieves keys to meshed files from s3. Args: bucket (str): s3 bucket key_prefix (str): prefix to identify the files, ie the folder and start of a filename Returns: (set of str): keys of the files ''' s3 = boto3.resource('s3') s3bucket = s3.Bucket(bucket) return {o.key for o in s3bucket.objects.filter(Prefix=key_prefix)}
[docs] def done_check(self, es_client, index, doc_type, key): ''' Checks elasticsearch for mesh terms in the first and last documents in the batch. Args: es_client (object): instantiated elasticsearch client index (str): name of the index doc_type (str): name of the document type key (str): filepath in s3 Returns: (bool): True if both existing, otherwise False ''' for idx in split_mesh_file_key(key): try: res = es_client.get(index=index, doc_type=doc_type, id=idx) except NotFoundError: logging.warning(f"{idx} not found") raise NotFoundError if res['_source'].get('terms_mesh_abstract') is None: return False return True
[docs] def prepare(self): # mysql setup db = 'production' if not self.test else 'dev' # elasticsearch setup es_mode = 'dev' if self.test else 'prod' es, es_config = setup_es(es_mode, self.test, drop_and_recreate=False, dataset='nih', aliases='health_scanner') # s3 setup and file key collection bucket = 'innovation-mapping-general' key_prefix = 'nih_abstracts_processed/22-07-2019/nih_' keys = self.get_abstract_file_keys(bucket, key_prefix) logging.info(f"Found keys: {keys}") # In test mode, manually filter keys for those which # contain our data if self.test: keys = subset_keys(es, es_config, keys) job_params = [] for key in keys: done = ((not self.test) and self.done_check(es, index=es_config['index'], doc_type=es_config['type'], key=key)) params = {'s3_key': key, 's3_bucket': bucket, 'dupe_file': ("nih_abstracts/24-05-19/" "duplicate_mapping.json"), 'config': "mysqldb.config", 'db': db, 'outinfo': es_config, 'done': done, 'entity_type': 'paper' } logging.info(params) job_params.append(params) return job_params
[docs] def combine(self, job_params): self.output().touch()
if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO) date = datetime.date(year=2018, month=12, day=25) process = AbstractsMeshTask(test=True, batchable='', job_def='', job_name='', job_queue='', region_name='', db_config_path='MYSQLDB', date=date, _routine_id='') process.prepare()