Source code for nesta.core.routines.arxiv.arxiv_mag_task

"""
arXiv enriched with MAG (API)
=============================

Luigi routine to query the Microsoft Academic Graph for additional data and append it to
the exiting data in the database.
"""
from datetime import date
import luigi
import logging
import pprint

from nesta.core.routines.arxiv.arxiv_iterative_date_task import DateTask
from nesta.packages.arxiv.collect_arxiv import BatchedTitles, update_existing_articles
from nesta.packages.misc_utils.batches import BatchWriter
from nesta.packages.mag.query_mag_api import build_expr, query_mag_api, dedupe_entities, update_field_of_study_ids
from nesta.core.orms.arxiv_orm import Base, Article
from nesta.core.orms.mag_orm import FieldOfStudy
from nesta.core.orms.orm_utils import get_mysql_engine, db_session
from nesta.core.luigihacks import misctools
from nesta.core.luigihacks.mysqldb import MySqlTarget


[docs]class QueryMagTask(luigi.Task): """Query the MAG for additional data to append to the arxiv articles, primarily the fields of study. Args: date (datetime): Datetime used to label the outputs _routine_id (str): String used to label the AWS task db_config_env (str): environmental variable pointing to the db config file db_config_path (str): The output database configuration mag_config_path (str): Microsoft Academic Graph Api key configuration path insert_batch_size (int): number of records to insert into the database at once (not used in this task but passed down to others) articles_from_date (str): new and updated articles from this date will be retrieved. Must be in YYYY-MM-DD format (not used in this task but passed down to others) """ date = luigi.DateParameter() _routine_id = luigi.Parameter() test = luigi.BoolParameter(default=True) db_config_env = luigi.Parameter() db_config_path = luigi.Parameter() mag_config_path = luigi.Parameter() insert_batch_size = luigi.IntParameter(default=500) articles_from_date = luigi.Parameter()
[docs] def output(self): '''Points to the output database engine''' db_config = misctools.get_config(self.db_config_path, "mysqldb") db_config["database"] = 'dev' if self.test else 'production' db_config["table"] = "arXlive <dummy>" # Note, not a real table update_id = "ArxivQueryMag_{}".format(self.date) return MySqlTarget(update_id=update_id, **db_config)
[docs] def requires(self): yield DateTask(date=self.date, _routine_id=self._routine_id, db_config_path=self.db_config_path, db_config_env=self.db_config_env, test=self.test, articles_from_date=self.articles_from_date, insert_batch_size=self.insert_batch_size)
[docs] def run(self): pp = pprint.PrettyPrinter(indent=4, width=100) mag_config = misctools.get_config(self.mag_config_path, 'mag') mag_subscription_key = mag_config['subscription_key'] # database setup database = 'dev' if self.test else 'production' logging.warning(f"Using {database} database") self.engine = get_mysql_engine(self.db_config_env, 'mysqldb', database) Base.metadata.create_all(self.engine) with db_session(self.engine) as session: paper_fields = ["Id", "Ti", "F.FId", "CC", "AA.AuN", "AA.AuId", "AA.AfN", "AA.AfId", "AA.S"] author_mapping = {'AuN': 'author_name', 'AuId': 'author_id', 'AfN': 'author_affiliation', 'AfId': 'author_affiliation_id', 'S': 'author_order'} field_mapping = {'Id': 'mag_id', 'Ti': 'title', 'F': 'fields_of_study', 'AA': 'mag_authors', 'CC': 'citation_count', 'logprob': 'mag_match_prob'} logging.info("Querying database for articles without fields of study") arxiv_ids_to_process = {a.id for a in (session. query(Article) .filter(~Article.fields_of_study.any()) .all())} total_arxiv_ids_to_process = len(arxiv_ids_to_process) logging.info(f"{total_arxiv_ids_to_process} articles to process") all_articles_to_update = BatchWriter(self.insert_batch_size, update_existing_articles, self.engine) batched_titles = BatchedTitles(arxiv_ids_to_process, 10000, session) batch_field_of_study_ids = set() for count, expr in enumerate(build_expr(batched_titles, 'Ti'), 1): logging.debug(pp.pformat(expr)) expr_length = len(expr.split(',')) logging.info(f"Querying MAG for {expr_length} titles") total_arxiv_ids_to_process -= expr_length batch_data = query_mag_api(expr, paper_fields, mag_subscription_key) logging.debug(pp.pformat(batch_data)) returned_entities = batch_data['entities'] logging.info(f"{len(returned_entities)} entities returned from MAG (potentially including duplicates)") # dedupe response keeping the entity with the highest logprob deduped_mag_ids = dedupe_entities(returned_entities) logging.info(f"{len(deduped_mag_ids)} entities after deduplication") missing_articles = expr_length - len(deduped_mag_ids) if missing_articles != 0: logging.info(f"{missing_articles} titles not found in MAG") batch_article_data = [] for row in returned_entities: # exclude duplicate titles if row['Id'] not in deduped_mag_ids: continue # renaming and reformatting for code, description in field_mapping.items(): try: row[description] = row.pop(code) except KeyError: pass for author in row.get('mag_authors', []): for code, description in author_mapping.items(): try: author[description] = author.pop(code) except KeyError: pass if row.get('citation_count', None) is not None: row['citation_count_updated'] = date.today() # reformat fos_ids out of dictionaries try: row['fields_of_study'] = {f['FId'] for f in row.pop('fields_of_study')} except KeyError: row['fields_of_study'] = [] batch_field_of_study_ids.update(row['fields_of_study']) # get list of ids which share the same title try: matching_articles = batched_titles[row['title']] except KeyError: logging.warning(f"Returned title not found in original data: {row['title']}") continue # drop unnecessary fields for f in ['prob', 'title']: del row[f] # add each matching article for this title to the batch for article_id in matching_articles: batch_article_data.append({**row, 'id': article_id}) # check fields of study are in database batch_field_of_study_ids = {fos_id for article in batch_article_data for fos_id in article['fields_of_study']} logging.debug('Checking fields of study exist in db') found_fos_ids = {fos.id for fos in (session .query(FieldOfStudy) .filter(FieldOfStudy.id.in_(batch_field_of_study_ids)) .all())} missing_fos_ids = batch_field_of_study_ids - found_fos_ids if missing_fos_ids: # query mag for details if not found update_field_of_study_ids(mag_subscription_key, session, missing_fos_ids) # add this batch to the queue all_articles_to_update.extend(batch_article_data) logging.info(f"Batch {count} done. {total_arxiv_ids_to_process} articles left to process") if self.test and count == 2: logging.warning("Exiting after 2 batches in test mode") break # pick up any left over in the batch if all_articles_to_update: all_articles_to_update.write() # mark as done logging.warning("Task complete") self.output().touch()