arXiv data (technical research)

Data collection and processing pipeline for arXiv data, principally for the arXlive platform. This pipeline orchestrates the collection of arXiv data, enrichment (via MAG and GRID), topic modelling, and novelty (lolvelty) measurement.

Root task (arXlive)

Luigi routine to collect all data from the arXiv api and load it to MySQL, pipe to Elasticsearch, perform topic modelling, generate plots and measure novelty.

class RootTask(*args, **kwargs)[source]

Bases: luigi.task.WrapperTask

A dummy root task, which collects the database configurations and executes the central task.

Parameters:
  • date (datetime) – Date used to label the outputs
  • db_config_path (str) – Path to the MySQL database configuration
  • production (bool) – Flag indicating whether running in testing mode (False, default), or production mode (True).
  • drop_and_recreate (bool) – If in test mode, allows dropping the dev index from the ES database.
date = <luigi.parameter.DateParameter object>
db_config_path = <luigi.parameter.Parameter object>
production = <luigi.parameter.BoolParameter object>
drop_and_recreate = <luigi.parameter.BoolParameter object>
articles_from_date = <luigi.parameter.Parameter object>
insert_batch_size = <luigi.parameter.IntParameter object>
debug = <luigi.parameter.BoolParameter object>
requires()[source]

Collects the database configurations and executes the central task.

class EsOnlyRootTask(*args, **kwargs)[source]

Bases: luigi.task.WrapperTask

A dummy root task, which collects the database configurations and executes the central task.

Parameters:
  • date (datetime) – Date used to label the outputs
  • db_config_path (str) – Path to the MySQL database configuration
  • production (bool) – Flag indicating whether running in testing mode (False, default), or production mode (True).
  • drop_and_recreate (bool) – If in test mode, allows dropping the dev index from the ES database.
date = <luigi.parameter.DateParameter object>
db_config_path = <luigi.parameter.Parameter object>
production = <luigi.parameter.BoolParameter object>
drop_and_recreate = <luigi.parameter.BoolParameter object>
requires()[source]

Collects the database configurations and executes the central task.

Collection task

Luigi routine to collect new data from the arXiv api and load it to MySQL.

class CollectNewTask(*args, **kwargs)[source]

Bases: luigi.task.Task

Collect new data from the arXiv api and dump the data in the MySQL server.

Parameters:
  • date (datetime) – Datetime used to label the outputs
  • _routine_id (str) – String used to label the AWS task
  • db_config_env (str) – environmental variable pointing to the db config file
  • db_config_path (str) – The output database configuration
  • insert_batch_size (int) – number of records to insert into the database at once
  • articles_from_date (str) – new and updated articles from this date will be retrieved. Must be in YYYY-MM-DD format
date = <luigi.parameter.DateParameter object>
test = <luigi.parameter.BoolParameter object>
db_config_env = <luigi.parameter.Parameter object>
db_config_path = <luigi.parameter.Parameter object>
insert_batch_size = <luigi.parameter.IntParameter object>
articles_from_date = <luigi.parameter.Parameter object>
output()[source]

Points to the output database engine

run()[source]

The task run method, to be overridden in a subclass.

See Task.run

Date task

Luigi wrapper to identify the date since the last iterative data collection

class DateTask(*args, **kwargs)[source]

Bases: luigi.task.WrapperTask

Collect new data from the arXiv api and dump the data in the MySQL server.

Parameters:
  • date (datetime) – Datetime used to label the outputs
  • _routine_id (str) – String used to label the AWS task
  • db_config_env (str) – environmental variable pointing to the db config file
  • db_config_path (str) – The output database configuration
  • insert_batch_size (int) – number of records to insert into the database at once
  • articles_from_date (str) – new and updated articles from this date will be retrieved. Must be in YYYY-MM-DD format
date = <luigi.parameter.DateParameter object>
test = <luigi.parameter.BoolParameter object>
db_config_path = <luigi.parameter.Parameter object>
db_config_env = <luigi.parameter.Parameter object>
insert_batch_size = <luigi.parameter.IntParameter object>
articles_from_date = <luigi.parameter.Parameter object>
requires()[source]

Collects the last date of successful update from the database and launches the iterative data collection task.

arXiv enriched with MAG (API)

Luigi routine to query the Microsoft Academic Graph for additional data and append it to the exiting data in the database.

class QueryMagTask(*args, **kwargs)[source]

Bases: luigi.task.Task

Query the MAG for additional data to append to the arxiv articles,
primarily the fields of study.
Parameters:
  • date (datetime) – Datetime used to label the outputs
  • _routine_id (str) – String used to label the AWS task
  • db_config_env (str) – environmental variable pointing to the db config file
  • db_config_path (str) – The output database configuration
  • mag_config_path (str) – Microsoft Academic Graph Api key configuration path
  • insert_batch_size (int) – number of records to insert into the database at once (not used in this task but passed down to others)
  • articles_from_date (str) – new and updated articles from this date will be retrieved. Must be in YYYY-MM-DD format (not used in this task but passed down to others)
date = <luigi.parameter.DateParameter object>
test = <luigi.parameter.BoolParameter object>
db_config_env = <luigi.parameter.Parameter object>
db_config_path = <luigi.parameter.Parameter object>
mag_config_path = <luigi.parameter.Parameter object>
insert_batch_size = <luigi.parameter.IntParameter object>
articles_from_date = <luigi.parameter.Parameter object>
output()[source]

Points to the output database engine

requires()[source]

The Tasks that this Task depends on.

A Task will only run if all of the Tasks that it requires are completed. If your Task does not require any other Tasks, then you don’t need to override this method. Otherwise, a subclass can override this method to return a single Task, a list of Task instances, or a dict whose values are Task instances.

See Task.requires

run()[source]

The task run method, to be overridden in a subclass.

See Task.run

arXiv enriched with MAG (SPARQL)

Luigi routine to query the Microsoft Academic Graph for additional data and append it to the exiting data in the database. This is to collect information which is difficult to retrieve via the MAG API.

class MagSparqlTask(*args, **kwargs)[source]

Bases: luigi.task.Task

Query the MAG for additional data to append to the arxiv articles,
primarily the fields of study.
Parameters:
  • date (datetime) – Datetime used to label the outputs
  • _routine_id (str) – String used to label the AWS task
  • db_config_env (str) – environmental variable pointing to the db config file
  • db_config_path (str) – The output database configuration
  • mag_config_path (str) – Microsoft Academic Graph Api key configuration path
  • insert_batch_size (int) – number of records to insert into the database at once (not used in this task but passed down to others)
  • articles_from_date (str) – new and updated articles from this date will be retrieved. Must be in YYYY-MM-DD format (not used in this task but passed down to others)
date = <luigi.parameter.DateParameter object>
test = <luigi.parameter.BoolParameter object>
db_config_env = <luigi.parameter.Parameter object>
db_config_path = <luigi.parameter.Parameter object>
mag_config_path = <luigi.parameter.Parameter object>
insert_batch_size = <luigi.parameter.IntParameter object>
articles_from_date = <luigi.parameter.Parameter object>
output()[source]

Points to the output database engine

requires()[source]

The Tasks that this Task depends on.

A Task will only run if all of the Tasks that it requires are completed. If your Task does not require any other Tasks, then you don’t need to override this method. Otherwise, a subclass can override this method to return a single Task, a list of Task instances, or a dict whose values are Task instances.

See Task.requires

run()[source]

The task run method, to be overridden in a subclass.

See Task.run

arXiv enriched with GRID

Luigi routine to lookup arXiv author’s institutes via the GRID data, in order to “geocode” arXiv articles. The matching of institute name to GRID data is done via smart(ish) fuzzy matching, which then gives a confidence score per match.

class GridTask(*args, **kwargs)[source]

Bases: luigi.task.Task

Join arxiv articles with GRID data for institute addresses and geocoding.

Parameters:
  • date (datetime) – Datetime used to label the outputs
  • _routine_id (str) – String used to label the AWS task
  • db_config_env (str) – environmental variable pointing to the db config file
  • db_config_path (str) – The output database configuration
  • mag_config_path (str) – Microsoft Academic Graph Api key configuration path
  • insert_batch_size (int) – number of records to insert into the database at once (not used in this task but passed down to others)
  • articles_from_date (str) – new and updated articles from this date will be retrieved. Must be in YYYY-MM-DD format (not used in this task but passed down to others)
date = <luigi.parameter.DateParameter object>
test = <luigi.parameter.BoolParameter object>
db_config_env = <luigi.parameter.Parameter object>
db_config_path = <luigi.parameter.Parameter object>
mag_config_path = <luigi.parameter.Parameter object>
insert_batch_size = <luigi.parameter.IntParameter object>
articles_from_date = <luigi.parameter.Parameter object>
output()[source]

Points to the output database engine

requires()[source]

The Tasks that this Task depends on.

A Task will only run if all of the Tasks that it requires are completed. If your Task does not require any other Tasks, then you don’t need to override this method. Otherwise, a subclass can override this method to return a single Task, a list of Task instances, or a dict whose values are Task instances.

See Task.requires

run()[source]

The task run method, to be overridden in a subclass.

See Task.run

[AutoML] Topic modelling (CorEx)

Automated topic modelling of arXiv articles via the CorEx algorithm. See topic_process_task_chain.json for the full processing chain, but in brief: Vectorization is performed, followed by n-gramming (a lookup via Wiktionary) and then topics via CorEx.

class PrepareArxivS3Data(*args, **kwargs)[source]

Bases: luigi.task.Task

Task that pipes SQL text fields to a number of S3 JSON files. This is particularly useful for preparing autoML tasks.

s3_path_out = <luigi.parameter.Parameter object>
db_conf_env = <luigi.parameter.Parameter object>
chunksize = <luigi.parameter.IntParameter object>
test = <luigi.parameter.BoolParameter object>
grid_task_kwargs = <nesta.core.luigihacks.parameter.DictParameterPlus object>
requires()[source]

The Tasks that this Task depends on.

A Task will only run if all of the Tasks that it requires are completed. If your Task does not require any other Tasks, then you don’t need to override this method. Otherwise, a subclass can override this method to return a single Task, a list of Task instances, or a dict whose values are Task instances.

See Task.requires

output()[source]

The output that this Task produces.

The output of the Task determines if the Task needs to be run–the task is considered finished iff the outputs all exist. Subclasses should override this method to return a single Target or a list of Target instances.

Implementation note
If running multiple workers, the output must be a resource that is accessible by all workers, such as a DFS or database. Otherwise, workers might compute the same output since they don’t see the work done by other workers.

See Task.output

write_to_s3(data, ichunk)[source]
run()[source]

The task run method, to be overridden in a subclass.

See Task.run

class WriteTopicTask(*args, **kwargs)[source]

Bases: luigi.task.Task

s3_path_prefix = <luigi.parameter.Parameter object>
raw_s3_path_prefix = <luigi.parameter.Parameter object>
data_path = <luigi.parameter.Parameter object>
date = <luigi.parameter.DateParameter object>
db_config_path = <luigi.parameter.Parameter object>
db_conf_env = <luigi.parameter.Parameter object>
test = <luigi.parameter.BoolParameter object>
insert_batch_size = <luigi.parameter.IntParameter object>
cherry_picked = <luigi.parameter.Parameter object>
grid_task_kwargs = <nesta.core.luigihacks.parameter.DictParameterPlus object>
output()[source]

Points to the output database engine

requires()[source]

The Tasks that this Task depends on.

A Task will only run if all of the Tasks that it requires are completed. If your Task does not require any other Tasks, then you don’t need to override this method. Otherwise, a subclass can override this method to return a single Task, a list of Task instances, or a dict whose values are Task instances.

See Task.requires

run()[source]

The task run method, to be overridden in a subclass.

See Task.run

Pipe to elasticsearch

Luigi routine to load the Arxiv data from MYSQL into Elasticsearch.

class ArxivESTask(*args, **kwargs)[source]

Bases: nesta.core.luigihacks.sql2estask.Sql2EsTask

date = <luigi.parameter.DateParameter object>
drop_and_recreate = <luigi.parameter.BoolParameter object>
grid_task_kwargs = <nesta.core.luigihacks.parameter.DictParameterPlus object>
requires()[source]

The Tasks that this Task depends on.

A Task will only run if all of the Tasks that it requires are completed. If your Task does not require any other Tasks, then you don’t need to override this method. Otherwise, a subclass can override this method to return a single Task, a list of Task instances, or a dict whose values are Task instances.

See Task.requires

Elasticsearch tokenize

Tokenize arXiv field, which allows the search to leverage high quality n-grams, as provided by the Ngrammer module.

class ArxivESTokenTask(*args, **kwargs)[source]

Bases: nesta.core.luigihacks.estask.ElasticsearchTask

done_ids()[source]

All document ids which do not require processing. If you want to avoid writing that function see LazyElasticsearchTask.

Returns:A set of document ids, not to be processed.
Return type:done_ids (set)
class ArxivTokenRootTask(*args, **kwargs)[source]

Bases: luigi.task.WrapperTask

production = <luigi.parameter.BoolParameter object>
date = <luigi.parameter.DateParameter object>
requires()[source]

The Tasks that this Task depends on.

A Task will only run if all of the Tasks that it requires are completed. If your Task does not require any other Tasks, then you don’t need to override this method. Otherwise, a subclass can override this method to return a single Task, a list of Task instances, or a dict whose values are Task instances.

See Task.requires

Estimate novelty (lolvelty)

Estimate the novelty of each article via the lolvelty algorithm. This is performed on a document-by-document basis and is regrettably very slow since it is computationally very expensive for the Elasticsearch server.

class ArxivElasticsearchTask(*args, **kwargs)[source]

Bases: nesta.core.luigihacks.estask.ElasticsearchTask

date = <luigi.parameter.DateParameter object>
drop_and_recreate = <luigi.parameter.BoolParameter object>
grid_task_kwargs = <nesta.core.luigihacks.parameter.DictParameterPlus object>
done_ids()[source]

All document ids which do not require processing. If you want to avoid writing that function see LazyElasticsearchTask.

Returns:A set of document ids, not to be processed.
Return type:done_ids (set)
requires()[source]

The Tasks that this Task depends on.

A Task will only run if all of the Tasks that it requires are completed. If your Task does not require any other Tasks, then you don’t need to override this method. Otherwise, a subclass can override this method to return a single Task, a list of Task instances, or a dict whose values are Task instances.

See Task.requires

class ArxivLolveltyRootTask(*args, **kwargs)[source]

Bases: luigi.task.WrapperTask

production = <luigi.parameter.BoolParameter object>
date = <luigi.parameter.DateParameter object>
requires()[source]

The Tasks that this Task depends on.

A Task will only run if all of the Tasks that it requires are completed. If your Task does not require any other Tasks, then you don’t need to override this method. Otherwise, a subclass can override this method to return a single Task, a list of Task instances, or a dict whose values are Task instances.

See Task.requires

“Deep learning, Deep Change” analysis

Luigi routine to perform the analysis from the Deep learning, deep change paper, placing the results in an S3 bucket to be picked up by the arXlive front end.

sql_queries()[source]
class AnalysisTask(*args, **kwargs)[source]

Bases: luigi.task.Task

Extract and analyse arXiv data to produce data and charts for the arXlive front end to consume.

Proposed charts:
  1. distribution of dl/non dl papers by country (horizontal bar)
  2. distribution of dl/non dl papers by city (horizontal bar)
  3. % ML papers by year (line)
  4. share of ML activity in arxiv subjects, pre/post 2012 (horizontal point / slope)
  5. rca, pre/post 2012 by country (horizontal point / slope)
  6. rca over time, citation > mean & top 50 countries (horizontal violin) [NOT DONE]
Proposed table data:
  1. top countries by rca (moving window of last 12 months?) [NOT DONE]
Parameters:
  • date (datetime) – Datetime used to label the outputs
  • _routine_id (str) – String used to label the AWS task
  • db_config_env (str) – environmental variable pointing to the db config file
  • db_config_path (str) – The output database configuration
  • mag_config_path (str) – Microsoft Academic Graph Api key configuration path
  • insert_batch_size (int) – number of records to insert into the database at once (not used in this task but passed down to others)
  • articles_from_date (str) – new and updated articles from this date will be retrieved. Must be in YYYY-MM-DD format (not used in this task but passed down to others)
date = <luigi.parameter.DateParameter object>
test = <luigi.parameter.BoolParameter object>
db_config_env = <luigi.parameter.Parameter object>
db_config_path = <luigi.parameter.Parameter object>
mag_config_path = <luigi.parameter.Parameter object>
insert_batch_size = <luigi.parameter.IntParameter object>
articles_from_date = <luigi.parameter.Parameter object>
s3_path_prefix = <luigi.parameter.Parameter object>
raw_data_path = <luigi.parameter.Parameter object>
grid_task_kwargs = <nesta.core.luigihacks.parameter.DictParameterPlus object>
cherry_picked = <luigi.parameter.Parameter object>
output()[source]

Points to the output database engine

requires()[source]

The Tasks that this Task depends on.

A Task will only run if all of the Tasks that it requires are completed. If your Task does not require any other Tasks, then you don’t need to override this method. Otherwise, a subclass can override this method to return a single Task, a list of Task instances, or a dict whose values are Task instances.

See Task.requires

run()[source]

The task run method, to be overridden in a subclass.

See Task.run

class StandaloneAnalysisTask(*args, **kwargs)[source]

Bases: nesta.core.routines.arxiv.deepchange_analysis_task.AnalysisTask

date = <luigi.parameter.DateParameter object>
production = <luigi.parameter.BoolParameter object>
test = <luigi.parameter.BoolParameter object>
db_config_env = <luigi.parameter.Parameter object>
db_config_path = <luigi.parameter.Parameter object>
mag_config_path = <luigi.parameter.Parameter object>
insert_batch_size = <luigi.parameter.IntParameter object>
articles_from_date = <luigi.parameter.Parameter object>
s3_path_prefix = <luigi.parameter.Parameter object>
raw_data_path = <luigi.parameter.Parameter object>
grid_task_kwargs = <nesta.core.luigihacks.parameter.DictParameterPlus object>
cherry_picked = <luigi.parameter.Parameter object>
requires()[source]

The Tasks that this Task depends on.

A Task will only run if all of the Tasks that it requires are completed. If your Task does not require any other Tasks, then you don’t need to override this method. Otherwise, a subclass can override this method to return a single Task, a list of Task instances, or a dict whose values are Task instances.

See Task.requires