"""
Estimate novelty (lolvelty)
---------------------------
Estimate the novelty of each article via the :obj:`lolvelty` algorithm.
This is performed on a document-by-document basis and is regrettably
very slow since it is computationally very expensive for the Elasticsearch
server.
"""
from nesta.core.luigihacks.estask import ElasticsearchTask
from nesta.core.luigihacks.misctools import find_filepath_from_pathstub as f3p
import luigi
import logging
from datetime import datetime as dt
from nesta.core.orms.orm_utils import setup_es, get_es_ids
from nesta.core.orms.arxiv_orm import Article
from nesta.core.luigihacks.parameter import DictParameterPlus
from nesta.core.routines.arxiv.arxiv_es_task import ArxivESTask
[docs]class ArxivElasticsearchTask(ElasticsearchTask):
date = luigi.DateParameter(default=dt.today())
drop_and_recreate = luigi.BoolParameter(default=False)
grid_task_kwargs = DictParameterPlus(default={})
[docs] def done_ids(self):
es_mode = 'dev' if self.test else 'prod'
es, es_config = setup_es(es_mode, self.test,
drop_and_recreate=False,
dataset=self.dataset,
increment_version=False)
field = "metric_novelty_article"
ids = get_es_ids(es, es_config, size=10000,
query={"query": {"exists": {"field" : field}}})
return ids
[docs] def requires(self):
yield ArxivESTask(routine_id=self.routine_id,
date=self.date,
grid_task_kwargs=self.grid_task_kwargs,
process_batch_size=10000,
drop_and_recreate=self.drop_and_recreate,
dataset='arxiv',
id_field=Article.id,
entity_type='article',
db_config_env='MYSQLDB',
test=self.test,
intermediate_bucket=('nesta-production'
'-intermediate'),
batchable=f3p('batchables/arxiv/'
'arxiv_elasticsearch'),
env_files=[f3p('nesta/'),
f3p('config/'
'mysqldb.config'),
f3p('schema_transformations/'
'arxiv.json'),
f3p('config/'
'elasticsearch.config')],
job_def='py36_amzn1_image',
job_name=self.routine_id,
job_queue='HighPriority',
region_name='eu-west-2',
memory=2048,
poll_time=10,
max_live_jobs=100)
class _ArxivElasticsearchTask(ArxivElasticsearchTask):
def requires(self):
pass
[docs]class ArxivLolveltyRootTask(luigi.WrapperTask):
production = luigi.BoolParameter(default=False)
date = luigi.DateParameter(default=dt.now())
[docs] def requires(self):
logging.getLogger().setLevel(logging.INFO)
kwargs = {'score_field': 'metric_novelty_article',
'fields': ['textBody_abstract_article']}
test = not self.production
routine_id = f"ArxivLolveltyTask-{self.date}-{test}"
index = 'arxiv_v3' if self.production else 'arxiv_dev'
return _ArxivElasticsearchTask(routine_id=routine_id,
test=test,
index=index,
dataset='arxiv',
entity_type='article',
kwargs=kwargs,
batchable=f3p("batchables/novelty"
"/lolvelty"),
env_files=[f3p("nesta/"),
f3p("config/mysqldb.config"),
f3p("config/"
"elasticsearch.config")],
job_def="py36_amzn1_image",
job_name=routine_id,
job_queue="HighPriority",
region_name="eu-west-2",
poll_time=10,
memory=1024,
max_live_jobs=30)