Source code for core.batchables.health_data.nih_abstract_mesh_data.run

"""
run.py (nih_abstract_mesh_data)
-------------------------------

Retrieve NiH abstracts from MySQL, assign
pre-calculated MeSH terms for each abstract,
and pipe data into Elasticsearch. Exact abstract
duplicates are removed at this stage.
"""

from ast import literal_eval
from elasticsearch.exceptions import NotFoundError
import logging
import os
from sqlalchemy.orm import sessionmaker
from sqlalchemy.orm.exc import NoResultFound

from nesta.core.luigihacks.elasticsearchplus import ElasticsearchPlus
from nesta.packages.health_data.process_mesh import retrieve_mesh_terms
from nesta.packages.health_data.process_mesh import format_mesh_terms
from nesta.packages.health_data.process_mesh import retrieve_duplicate_map
from nesta.packages.health_data.process_mesh import format_duplicate_map
from nesta.core.orms.orm_utils import get_mysql_engine
from nesta.core.orms.orm_utils import load_json_from_pathstub
from nesta.core.orms.orm_utils import get_es_ids

from nesta.core.orms.nih_orm import Abstracts

[docs]def clean_abstract(abstract): '''Removes multiple spaces, tabs and newlines. Args: abstract (str): text to be cleaned Returns (str): cleaned text ''' abstract = abstract.replace('\t', ' ') abstract = abstract.replace('\n', ' ') while ' ' in abstract: abstract = abstract.replace(' ', ' ') abstract.strip() return abstract
[docs]def run(): bucket = os.environ["BATCHPAR_s3_bucket"] abstract_file = os.environ["BATCHPAR_s3_key"] dupe_file = os.environ["BATCHPAR_dupe_file"] es_config = literal_eval(os.environ["BATCHPAR_outinfo"]) db = os.environ["BATCHPAR_db"] entity_type = os.environ["BATCHPAR_entity_type"] # mysql setup engine = get_mysql_engine("BATCHPAR_config", "mysqldb", db) Session = sessionmaker(bind=engine) session = Session() # retrieve a batch of meshed terms mesh_terms = retrieve_mesh_terms(bucket, abstract_file) mesh_terms = format_mesh_terms(mesh_terms) logging.info(f'batch {abstract_file} contains ' f'{len(mesh_terms)} meshed abstracts') # retrieve duplicate map dupes = retrieve_duplicate_map(bucket, dupe_file) dupes = format_duplicate_map(dupes) # Set up elastic search connection field_null_mapping = load_json_from_pathstub("tier_1/" "field_null_mappings/", "health_scanner.json") es = ElasticsearchPlus(hosts=es_config['host'], port=es_config['port'], aws_auth_region=es_config['region'], use_ssl=True, entity_type=entity_type, strans_kwargs=None, field_null_mapping=field_null_mapping, null_empty_str=True, coordinates_as_floats=True, country_detection=True, listify_terms=True) all_es_ids = get_es_ids(es, es_config) docs = [] for doc_id, terms in mesh_terms.items(): if doc_id not in all_es_ids: continue try: _filter = Abstracts.application_id == doc_id abstract = (session.query(Abstracts) .filter(_filter).one()) except NoResultFound: logging.warning(f'Not found {doc_id} in database') raise NoResultFound(doc_id) clean_abstract_text = clean_abstract(abstract.abstract_text) docs.append({'doc_id': doc_id, 'terms_mesh_abstract': terms, 'textBody_abstract_project': clean_abstract_text }) duped_docs = dupes.get(doc_id, []) if len(duped_docs) > 0: logging.info(f'Found {len(duped_docs)} duplicates') for duped_doc in duped_docs: docs.append({'doc_id': duped_doc, 'terms_mesh_abstract': terms, 'textBody_abstract_project': clean_abstract_text, 'booleanFlag_duplicate_abstract': True }) # output to elasticsearch logging.warning(f'Writing {len(docs)} documents to elasticsearch') for doc in docs: uid = doc.pop("doc_id") # Extract existing info existing = es.get(es_config['index'], doc_type=es_config['type'], id=uid)['_source'] # Merge existing info into new doc doc = {**existing, **doc} es.index(index=es_config['index'], doc_type=es_config['type'], id=uid, body=doc)
if __name__ == '__main__': log_level = logging.INFO if "BATCHPAR_outinfo" not in os.environ: logging.getLogger('boto3').setLevel(logging.CRITICAL) logging.getLogger('botocore').setLevel(logging.CRITICAL) logging.getLogger('s3transfer').setLevel(logging.CRITICAL) logging.getLogger('urllib3').setLevel(logging.CRITICAL) log_level = logging.DEBUG pars = {"s3_key":("nih_abstracts_processed/22-07-2019/" "nih_abstracts_100065-2683329.out.txt"), "outinfo":("{'host': 'https://search-health-scanner" "-5cs7g52446h7qscocqmiky5dn4.eu-west" "-2.es.amazonaws.com', 'port': '443'," "'index': 'nih_dev', 'type': '_doc', " "'region': 'eu-west-2'}"), "dupe_file":("nih_abstracts/24-05-19/" "duplicate_mapping.json"), "db":"dev", "config":(f"{os.environ['HOME']}" "/nesta/nesta/core" "/config/mysqldb.config"), "s3_bucket":"innovation-mapping-general", "entity_type":"paper"} for k, v in pars.items(): os.environ[f'BATCHPAR_{k}'] = v log_stream_handler = logging.StreamHandler() logging.basicConfig(handlers=[log_stream_handler, ], level=log_level, format="%(asctime)s:%(levelname)s:%(message)s") run()