nesta¶
Branch | Docs | Build |
---|---|---|
Master | ||
Development |
Welcome to nesta! This repository houses our fully-audited tools and packages, as well as our in-house production system. If you’re reading this on our GitHub repo, you will find complete documentation at our Read the Docs site.
Packages¶
Nesta’s collection of tools for meaty tasks. Any processes that go into production come here first, but there are other good reasons for code to end up here.
Code and scripts¶
Meetup¶
NB: The meetup pipeline will not work until this issue has been resolved.
Data collection of Meetup data. The procedure starts with a single country and Meetup category. All of the groups within the country are discovered, from which all members are subsequently retrieved (no personal information!). In order to build a fuller picture, all other groups to which the members belong are retrieved, which may be in other categories or countries. Finally, all group details are retrieved.
The code should be executed in the following order, which reflects the latter procedure:
- country_groups.py
- groups_members.py
- members_groups.py
- groups_details.py
Each script generates a list of dictionaries which can be ingested by the proceeding script.
Country \(\rightarrow\) Groups¶
Start with a country (and Meetup category) and end up with Meetup groups.
-
generate_coords
(x0, y0, x1, y1, n)[source]¶ Generate \(\mathcal{O}(\frac{n}{2}^2)\) coordinates in the bounding box \((x0, y0), (x1, y1)\), such that overlapping circles of equal radii (situated at each coordinate) entirely cover the area of the bounding box. The longitude and latitude are treated as euclidean variables, although the radius (calculated from the smallest side of the bounding box divided by \(n\)) is calculated correctly. In order for the circles to fully cover the region, an unjustified factor of 10% is included in the radius. Feel free to do the maths and work out a better strategy for covering a geographical area with circles.
The circles (centred on each X) are staggered as so (single vertical lines or four underscores correspond to a circle radius):
____X____ ____X____
|
X________X________X
|
____X____ ____X____
This configuration corresponds to \(n=4\).
Parameters: - x0, y0, x1, y1 (float) – Bounding box coordinates (lat/lon)
- n (int) – The fraction by which to calculate the Meetup API radius parameter, with respect to the smallest side of the country’s shape bbox. This will generate \(\mathcal{O}(\frac{n}{2}^2)\) separate Meetup API radius searches. The total number of searches scales with the ratio of the bbox sides.
Returns: The radius and coordinates for the Meetup API request
Return type: float,
list
oftuple
-
get_coordinate_data
(n)[source]¶ Generate the radius and coordinate data (see
generate_coords
) for each shape (country) in the shapefile pointed to by the environmental variable WORLD_BORDERS.Parameters: n (int) – The fraction by which to calculate the Meetup API radius parameter, with respect to the smallest side of the country’s shape bbox. This will generate \(\mathcal{O}(\frac{n}{2}^2)\) separate Meetup API radius searches. The total number of searches scales with the ratio of the bbox sides. Returns: - containing coordinate and radius
- for each country.
Return type: pd.DataFrame
-
class
MeetupCountryGroups
(country_code, coords, radius, category, n=10)[source]¶ Bases:
object
Extract all meetup groups for a given country.
-
country_code
¶ ISO2 code
Type: str
-
params (
obj:’dict’): GET request parameters, including lat/lon.
-
groups
¶ List of meetup groups in this country, assigned assigned after calling get_groups.
Type: list
ofstr
-
Groups \(\rightarrow\) Members¶
Start with Meetup groups and end up with Meetup members.
-
get_members
(params)[source]¶ Hit the Meetup API for the members of a specified group.
Parameters: params ( dict
) –https://api.meetup.com/members/
parametersReturns: Meetup member IDs Return type: ( list
ofstr
)
-
get_all_members
(group_id, group_urlname, max_results, test=False)[source]¶ Get all of the Meetup members for a specified group.
Parameters: - group_id (int) – The Meetup ID of the group.
- group_urlname (str) – The URL name of the group.
- max_results (int) – The maximum number of results to return per API query.
- test (bool) – For testing.
Returns: A matchable list of Meetup members
Return type: (
list
ofdict
)
Members \(\rightarrow\) Groups¶
Start with Meetup members and end up with Meetup groups.
-
exception
NoMemberFound
(member_id)[source]¶ Bases:
Exception
Exception should no member be found by the Meetup API
Groups \(\rightarrow\) Group details¶
Start with Meetup groups and end up with Meetup group details.
-
exception
NoGroupFound
(group_urlname)[source]¶ Bases:
Exception
Exception should no group be found by the Meetup API
-
get_group_details
(group_urlname, max_results, avoid_exception=True)[source]¶ Hit the Meetup API for the details of a specified groups. :param group_urlname: A Meetup group urlname :type group_urlname: str :param max_results: Total number of results to return per API request. :type max_results: int
Returns: Meetup API response data Return type: ( list
ofdict
)
Utils¶
Common tools between the different data collection points.
-
get_api_key
()[source]¶ Get a random API key from those listed in the environmental variable
MEETUP_API_KEYS
.
-
save_sample
(json_data, filename, k)[source]¶ Dump a sample of
k
items from row-oriented JSON datajson_data
into file with namefilename
.
-
flatten_data
(list_json_data, keys, **kwargs)[source]¶ Flatten nested JSON data from a list of JSON objects, by a list of desired keys. Each element in the
keys
may also be an ordered iterable of keys, such that subsequent keys describe a path through the JSON to desired value. For example in order to extract key1 and key3 from:{'key': <some_value>, 'key2' : {'key3': <some_value>}}
one would specify
keys
as:['key1', ('key2', 'key3')]
Parameters: - list_json_data (
json
) – Row-orientated JSON data. - keys (
list
) – Mixed list of either: individual str keys for data values - are not nested; or sublists of str, as described above. (which) –
- **kwargs – Any constants to include in every flattened row of the output.
Returns: Flattened row-orientated JSON data.
Return type: json
- list_json_data (
-
get_members_by_percentile
(engine, perc=10)[source]¶ Get the number of meetup group members for a given percentile from the database.
Parameters: - engine – A SQL alchemy connectable.
- perc (int) – A percentile to evaluate.
Returns: The number of members corresponding to this percentile.
Return type: members (float)
-
get_core_topics
(engine, core_categories, members_limit, perc=99)[source]¶ Get the most frequent topics from a selection of meetup categories, from the database.
Parameters: - engine – A SQL alchemy connectable.
- core_categories (list) – A list of category_shortnames.
- members_limit (int) – Minimum number of members required in a group for it to be considered.
- perc (int) – A percentile to evaluate the most frequent topics.
Returns: The set of most frequent topics.
Return type: topics (set)
Health data¶
Initially for our project with the Robert Woods Johnson Foundation (RWJF), these procedures outline the data collection of health-specific data.
Collect NIH¶
Extract all of the NIH World RePORTER data via
their static data dump. N_TABS
outputs are produced
in CSV format (concatenated across all years), where
N_TABS
correspondes to the number of tabs in
the main table found at:
The data is transferred to the Nesta intermediate data bucket.
-
get_data_urls
(tab_index)[source]¶ Get all CSV URLs from the
tab_index`th tab of the main table found at :code:`TOP_URL
.Parameters: tab_index (int) – Tab number (0-indexed) of table to extract CSV URLs from. Returns: Title of the tab in the table. hrefs (list): List of URLs pointing to data CSVs. Return type: title (str)
Process NIH¶
Data cleaning and processing procedures for the NIH World Reporter data. Specifically, a lat/lon is generated for each city/country; and the formatting of date fields is unified.
NLP Utils¶
Standard tools for aiding natural language processing.
Preprocess¶
Tools for preprocessing text.
-
tokenize_document
(text, remove_stops=False)[source]¶ Preprocess a whole raw document. :param text: Raw string of text. :type text: str :param remove_stops: Flag to remove english stopwords :type remove_stops: bool
Returns: List of preprocessed and tokenized documents
-
clean_and_tokenize
(text, remove_stops)[source]¶ Preprocess a raw string/sentence of text. :param text: Raw string of text. :type text: str :param remove_stops: Flag to remove english stopwords :type remove_stops: bool
Returns: Preprocessed tokens. Return type: tokens (list, str)
-
filter_by_idf
(documents, lower_idf_limit, upper_idf_limit)[source]¶ Remove (from documents) terms which are in a range of IDF values.
Parameters: - documents (list) – Either a
list
ofstr
or alist
oflist
ofstr
to be filtered. - lower_idf_limit (float) – Lower percentile (between 0 and 100) on which to exclude terms by their IDF.
- upper_idf_limit (float) – Upper percentile (between 0 and 100) on which to exclude terms by their IDF.
Returns: Filtered documents
- documents (list) – Either a
Geo Utils¶
Tools for processing of geographical data, such as geocoding.
geocode¶
Tools for geocoding.
-
geocode
(**request_kwargs)[source]¶ Geocoder using the Open Street Map Nominatim API.
If there are multiple results the first one is returned (they are ranked by importance). The API usage policy allows maximum 1 request per second and no multithreading: https://operations.osmfoundation.org/policies/nominatim/
Parameters: request_kwargs (dict) – Parameters for OSM API. Returns: JSON from API response.
-
retry_if_not_value_error
(exception)[source]¶ Forces retry to exit if a valueError is returned. Supplied to the ‘retry_on_exception’ argument in the retry decorator.
Parameters: exception (Exception) – the raised exception, to check Returns: False if a ValueError, else True Return type: (bool)
-
geocode_dataframe
(df)[source]¶ A wrapper for the geocode function to process a supplied dataframe using the city and country.
Parameters: df (dataframe) – a dataframe containing city and country fields. Returns: a dataframe with a ‘coordinates’ column appended.
-
geocode_batch_dataframe
(df, city='city', country='country', latitude='latitude', longitude='longitude', query_method='both')[source]¶ Geocodes a dataframe, first by supplying the city and country to the api, if this fails a second attempt is made supplying the combination using the q= method. The supplied dataframe df is returned with additional columns appended, containing the latitude and longitude as floats.
Parameters: - df (
pandas.DataFrame
) – input dataframe - city (str) – name of the input column containing the city
- country (str) – name of the input column containing the country
- latitude (str) – name of the output column containing the latitude
- longitude (str) – name of the output column containing the longitude
- query_method (int) – query methods to attempt: ‘city_country_only’: city and country only ‘query_only’: q method only ‘both’: city, country with fallback to q method
Returns: original dataframe with lat and lon appended as floats
Return type: (
pandas.DataFrame
)- df (
Format Utils¶
Tools for formatting data, such as dates.
Decorators¶
ratelimit¶
Apply rate limiting at a threshold per second
schema_transform¶
Apply a field name transformation to a data output from the wrapped function, such that specified field names are transformed and unspecified fields are dropped. A valid file would be formatted as shown:
- { “tier0_to_tier1”:
- { “bad_col”: “good_col”,
- “another_bad_col”: “another_good_col”
}
}
-
schema_transform
(filename)[source]¶ Parameters: filename (str) – A record-oriented JSON file path mapping field names Returns: Data in the format it was originally passed to the wrapper in, with specified field names transformed and unspecified fields dropped.
-
schema_transformer
(data, *, filename, ignore=[])[source]¶ Function version of the schema_transformer wrapper. :param data: the data requiring the schama transformation :type data: dataframe OR list of dicts :param filename: the path to the schema json file :type filename: str :param ignore: optional list of fields, eg ids or keys which shouldn’t be dropped :type ignore: list
Returns: supplied data with schema applied
Code auditing¶
Packages are only accepted if they satisfy our internal auditing procedure:
- Common sense requirements:
- Either:
- The code produces at least one data or model output; or
- The code provides a service which abstracts away significant complexity.
- There is one unit test for each function or method, which lasts no longer than about 1 minute.
- Each data or model output is produced from a single function or method, as described in the
__main__
of a specified file. - Can the nearest programmer (or equivalent) checkout and execute your tests from scratch?
- Will the code be used to perform non-project specific tasks?
- Does the process perform a logical task or fulfil a logical purpose?
- If the code requires productionising, it satisfies one of the following conditions:
- There is a non-trivial pipeline, which would benefit from formal productionising.
- A procedure is foreseen to be reperformed for new contexts with atomic differences in run conditions.
- The output is a service which requires a pipeline.
- The process is a regular / longitudinal data collection.
- Basic PEP8 and style requirements:
- Docstrings for every exposable class, method and function.
- Usage in a README.rst or in Docstring at the top of the file.
- CamelCase class names.
- Underscore separation of all other variable, function and module names.
- No glaring programming no-nos.
- Never use
print
: opt forlogging
instead.
- Bureaucratic requirements:
- A requirements file*.
- The README file specifies the operating system and python version.
Production¶
Nesta’s production system is based on Luigi pipelines, and are designed to be entirely run on AWS via the batch service. The main Luigi server runs on a persistent EC2 instance. Beyond the well documented Luigi code, the main features of the nesta production system are:
luigihacks.autobatch
, which facilates a managedLuigi.Task
which is split, batched and combined in a single step. Currently only synchronous jobs are accepted. Asynchonous jobs (where downstreamLuigi.Task
jobs can be triggered) are a part of a longer term plan.scripts.nesta_prepare_batch
which zips up the batchable with the specified environmental files and ships it to AWS S3.scripts.nesta_docker_build
which builds a specified docker environment and ships it to AWS ECS.
How to put code into production at nesta¶
If you’re completely new, check out our training slides. In short, the steps you should go through when building production code are to:
- Audit the package code, required to pass all auditing tests
- Understand what environment is required
- Write a Dockerfile and docker launch script for this under scripts/docker_recipes
- Build the Docker environment (run: docker_build <recipe_name> from any directory)
- Build and test the batchable(s)
- Build and test a Luigi pipeline
- […] Need to have steps here which estimate run time cost parameters. Could use tests.py to estimate this. […]
- Run the full chain
Code and scripts¶
Routines¶
All of our pipelines, implemented as Luigi routines. Some of these pipelines (at least partly) rely on batch computing (via AWS Batch), where the ‘batched’ scripts (run.py modules) are described in core.batchables
. Other than luigihacks.autobatch
, which is respectively documented, the routine procedure follows the core Luigi documentation.
Examples¶
Examples of Luigi routines, from which all other nesta production routines can be built. Currently we have examples of routines with S3 and database (MySQL) IO, and routines which are entirely batched.
We’d recommend reading Spotify’s Luigi documentation, and also checking the Luigi Hacks documentation which contains modified Luigi modules which (who knows) one day we will suggest as pull requests.
S3 Example¶
An example of building a pipeline with S3 Targets
-
class
InputData
(*args, **kwargs)[source]¶ Bases:
luigi.task.ExternalTask
Dummy task acting as the single input data source
-
class
SomeTask
(*args, **kwargs)[source]¶ Bases:
luigi.task.Task
An intermediate task which increments the age of the muppets by 1 year.
Parameters: date (datetime) – Date used to label the outputs -
date
= <luigi.parameter.DateParameter object>¶
-
Database example¶
An example of building a pipeline with database Targets
-
class
InputData
(*args, **kwargs)[source]¶ Bases:
luigi.task.Task
Dummy task acting as the single input data source.
Parameters: - date (datetime) – Date used to label the outputs
- db_config – (dict) The input database configuration
-
date
= <luigi.parameter.DateParameter object>¶
-
db_config
= <luigi.parameter.DictParameter object>¶
-
class
SomeTask
(*args, **kwargs)[source]¶ Bases:
luigi.task.Task
Task which increments the age of the muppets, by first selecting muppets with an age less than max_age.
Parameters: - date (datetime) – Date used to label the outputs
- max_age (int) – Maximum age of muppets to select from the database
- in_db_config – (dict) The input database configuration
- out_db_config – (dict) The output database configuration
-
date
= <luigi.parameter.DateParameter object>¶
-
max_age
= <luigi.parameter.IntParameter object>¶
-
in_db_config
= <luigi.parameter.DictParameter object>¶
-
out_db_config
= <luigi.parameter.DictParameter object>¶
arXiv data (technical research)¶
Data collection and processing pipeline for arXiv data, principally for the arXlive platform. This pipeline orchestrates the collection of arXiv data, enrichment (via MAG and GRID), topic modelling, and novelty (lolvelty) measurement.
Collection task¶
Luigi routine to collect new data from the arXiv api and load it to MySQL.
-
class
CollectNewTask
(*args, **kwargs)[source]¶ Bases:
luigi.task.Task
Collect new data from the arXiv api and dump the data in the MySQL server.
Parameters: - date (datetime) – Datetime used to label the outputs
- _routine_id (str) – String used to label the AWS task
- db_config_env (str) – environmental variable pointing to the db config file
- db_config_path (str) – The output database configuration
- insert_batch_size (int) – number of records to insert into the database at once
- articles_from_date (str) – new and updated articles from this date will be retrieved. Must be in YYYY-MM-DD format
-
date
= <luigi.parameter.DateParameter object>¶
-
test
= <luigi.parameter.BoolParameter object>¶
-
db_config_env
= <luigi.parameter.Parameter object>¶
-
db_config_path
= <luigi.parameter.Parameter object>¶
-
insert_batch_size
= <luigi.parameter.IntParameter object>¶
-
articles_from_date
= <luigi.parameter.Parameter object>¶
Date task¶
Luigi wrapper to identify the date since the last iterative data collection
-
class
DateTask
(*args, **kwargs)[source]¶ Bases:
luigi.task.WrapperTask
Collect new data from the arXiv api and dump the data in the MySQL server.
Parameters: - date (datetime) – Datetime used to label the outputs
- _routine_id (str) – String used to label the AWS task
- db_config_env (str) – environmental variable pointing to the db config file
- db_config_path (str) – The output database configuration
- insert_batch_size (int) – number of records to insert into the database at once
- articles_from_date (str) – new and updated articles from this date will be retrieved. Must be in YYYY-MM-DD format
-
date
= <luigi.parameter.DateParameter object>¶
-
test
= <luigi.parameter.BoolParameter object>¶
-
db_config_path
= <luigi.parameter.Parameter object>¶
-
db_config_env
= <luigi.parameter.Parameter object>¶
-
insert_batch_size
= <luigi.parameter.IntParameter object>¶
-
articles_from_date
= <luigi.parameter.Parameter object>¶
arXiv enriched with MAG (API)¶
Luigi routine to query the Microsoft Academic Graph for additional data and append it to the exiting data in the database.
-
class
QueryMagTask
(*args, **kwargs)[source]¶ Bases:
luigi.task.Task
- Query the MAG for additional data to append to the arxiv articles,
- primarily the fields of study.
Parameters: - date (datetime) – Datetime used to label the outputs
- _routine_id (str) – String used to label the AWS task
- db_config_env (str) – environmental variable pointing to the db config file
- db_config_path (str) – The output database configuration
- mag_config_path (str) – Microsoft Academic Graph Api key configuration path
- insert_batch_size (int) – number of records to insert into the database at once (not used in this task but passed down to others)
- articles_from_date (str) – new and updated articles from this date will be retrieved. Must be in YYYY-MM-DD format (not used in this task but passed down to others)
-
date
= <luigi.parameter.DateParameter object>¶
-
test
= <luigi.parameter.BoolParameter object>¶
-
db_config_env
= <luigi.parameter.Parameter object>¶
-
db_config_path
= <luigi.parameter.Parameter object>¶
-
mag_config_path
= <luigi.parameter.Parameter object>¶
-
insert_batch_size
= <luigi.parameter.IntParameter object>¶
-
articles_from_date
= <luigi.parameter.Parameter object>¶
-
requires
()[source]¶ The Tasks that this Task depends on.
A Task will only run if all of the Tasks that it requires are completed. If your Task does not require any other Tasks, then you don’t need to override this method. Otherwise, a subclass can override this method to return a single Task, a list of Task instances, or a dict whose values are Task instances.
See Task.requires
arXiv enriched with MAG (SPARQL)¶
Luigi routine to query the Microsoft Academic Graph for additional data and append it to the exiting data in the database. This is to collect information which is difficult to retrieve via the MAG API.
-
class
MagSparqlTask
(*args, **kwargs)[source]¶ Bases:
luigi.task.Task
- Query the MAG for additional data to append to the arxiv articles,
- primarily the fields of study.
Parameters: - date (datetime) – Datetime used to label the outputs
- _routine_id (str) – String used to label the AWS task
- db_config_env (str) – environmental variable pointing to the db config file
- db_config_path (str) – The output database configuration
- mag_config_path (str) – Microsoft Academic Graph Api key configuration path
- insert_batch_size (int) – number of records to insert into the database at once (not used in this task but passed down to others)
- articles_from_date (str) – new and updated articles from this date will be retrieved. Must be in YYYY-MM-DD format (not used in this task but passed down to others)
-
date
= <luigi.parameter.DateParameter object>¶
-
test
= <luigi.parameter.BoolParameter object>¶
-
db_config_env
= <luigi.parameter.Parameter object>¶
-
db_config_path
= <luigi.parameter.Parameter object>¶
-
mag_config_path
= <luigi.parameter.Parameter object>¶
-
insert_batch_size
= <luigi.parameter.IntParameter object>¶
-
articles_from_date
= <luigi.parameter.Parameter object>¶
-
requires
()[source]¶ The Tasks that this Task depends on.
A Task will only run if all of the Tasks that it requires are completed. If your Task does not require any other Tasks, then you don’t need to override this method. Otherwise, a subclass can override this method to return a single Task, a list of Task instances, or a dict whose values are Task instances.
See Task.requires
arXiv enriched with GRID¶
Luigi routine to lookup arXiv author’s institutes via the GRID data, in order to “geocode” arXiv articles. The matching of institute name to GRID data is done via smart(ish) fuzzy matching, which then gives a confidence score per match.
-
class
GridTask
(*args, **kwargs)[source]¶ Bases:
luigi.task.Task
Join arxiv articles with GRID data for institute addresses and geocoding.
Parameters: - date (datetime) – Datetime used to label the outputs
- _routine_id (str) – String used to label the AWS task
- db_config_env (str) – environmental variable pointing to the db config file
- db_config_path (str) – The output database configuration
- mag_config_path (str) – Microsoft Academic Graph Api key configuration path
- insert_batch_size (int) – number of records to insert into the database at once (not used in this task but passed down to others)
- articles_from_date (str) – new and updated articles from this date will be retrieved. Must be in YYYY-MM-DD format (not used in this task but passed down to others)
-
date
= <luigi.parameter.DateParameter object>¶
-
test
= <luigi.parameter.BoolParameter object>¶
-
db_config_env
= <luigi.parameter.Parameter object>¶
-
db_config_path
= <luigi.parameter.Parameter object>¶
-
mag_config_path
= <luigi.parameter.Parameter object>¶
-
insert_batch_size
= <luigi.parameter.IntParameter object>¶
-
articles_from_date
= <luigi.parameter.Parameter object>¶
-
requires
()[source]¶ The Tasks that this Task depends on.
A Task will only run if all of the Tasks that it requires are completed. If your Task does not require any other Tasks, then you don’t need to override this method. Otherwise, a subclass can override this method to return a single Task, a list of Task instances, or a dict whose values are Task instances.
See Task.requires
-
class
GridRootTask
(*args, **kwargs)[source]¶ Bases:
luigi.task.WrapperTask
-
date
= <luigi.parameter.DateParameter object>¶
-
db_config_path
= <luigi.parameter.Parameter object>¶
-
production
= <luigi.parameter.BoolParameter object>¶
-
drop_and_recreate
= <luigi.parameter.BoolParameter object>¶
-
articles_from_date
= <luigi.parameter.Parameter object>¶
-
insert_batch_size
= <luigi.parameter.IntParameter object>¶
-
debug
= <luigi.parameter.BoolParameter object>¶
-
CORDIS (EU funded research)¶
Generic pipeline (i.e. not project specific) to collect all CORDIS data, discovering all entities by crawling an unofficial API.
Crunchbase (private sector companies)¶
NB: The Crunchbase pipeline may not work until this issue has been resolved.
Data collection and processing pipeline of Crunchbase data, principally for the healthMosaic platform.
EURITO (piping data to Elasticsearch)¶
Pipeline specific to EURITO for piping existing data to Elasticsearch. A recent “EU” cut of patstat data is transferred from the “main” patstat database, to Nesta’s central database.
Gateway to Research (UK publicly funded research)¶
Generic pipeline (i.e. not project specific) to collect all GtR data, discovering all entities by crawling the official API. The routine then geocodes and loads data to MYSQL.
NiH (health research)¶
Data collection and processing pipeline of NiH data, principally for the healthMosaic platform.
Meetup (social networking data)¶
NB: The Meetup pipeline will not work until this issue has been resolved.
Data collection and processing pipeline of Meetup data, principally for the healthMosaic platform.
Topic discovery¶
Task to automatically discover relevant topics from meetup data, defined as the most frequently occurring from a set of categories.
-
class
TopicDiscoveryTask
(*args, **kwargs)[source]¶ Bases:
luigi.task.Task
Task to automatically discover relevant topics from meetup data, defined as the most frequently occurring from a set of categories.
Parameters: - db_config_env (str) – Environmental variable pointing to the path of the DB config.
- routine_id (str) – The routine UID.
- core_categories (list) – A list of category_shortnames from which to identify topics.
- members_perc (int) – A percentile to evaluate the minimum number of members.
- topic_perc (int) – A percentile to evaluate the most frequent topics.
- test (bool) – Test mode.
-
db_config_env
= <luigi.parameter.Parameter object>¶
-
routine_id
= <luigi.parameter.Parameter object>¶
-
core_categories
= <luigi.parameter.ListParameter object>¶
-
members_perc
= <luigi.parameter.IntParameter object>¶
-
topic_perc
= <luigi.parameter.IntParameter object>¶
-
test
= <luigi.parameter.BoolParameter object>¶
Batchables¶
Packets of code to be batched by core.routines
routines. Each packet should sit in it’s own directory, with a file called run.py
, containing a ‘main’ function called run()
which will be executed on the AWS batch system.
Each run.py should expect an environment parameter called BATCHPAR_outfile
which should provide information on the output location. Other input parameters should be prefixed with BATCHPAR_
, as set in core.routines
routine.
Data / project specific batchables¶
Example¶
There are two batchable examples listed here. The first is a module which will be run if you try executing the batch_example luigi routine. The second is purely meant as a template, if you are learning the design pattern for nesta’s luigi batchables.
The batchable for the routines.examples.batch_example
,
which simply increments a muppet’s age by one unit.
arXiv data (technical research)¶
CORDIS (EU-funded research)¶
Transfer data on organisations, projects and outputs from the Cordis API on a project-by-project basis.
-
extract_core_orgs
(orgs, project_rcn)[source]¶ Seperate a project-organisation (which) is likely to be a department, with a non-unique address.
Parameters: - orgs (list) – List of organiations to process (NB: this will be modified)
- project_rcn (str) – The record number of this project
Returns: The unique ‘parent’ organisations.
Return type: core_orgs (list)
-
prepare_data
(items, rcn)[source]¶ Append the project code (‘RCN’) to each “row” (dict) of data (list)
Crunchbase data (private companies)¶
NB: The Crunchbase pipeline may not work until this issue has been resolved.
Batchables for the collection and processing of Crunchbase data. As documented under packages and routines, the pipeline is executed in the following order (documentation for the run.py files is given below, which isn’t super-informative. You’re better off looking under packages and routines).
The data is collected from proprietary data dumps, parsed into MySQL (tier 0) and then piped into Elasticsearch (tier 1), post-processing.
EURITO¶
Batchables for processing data (which has already been collected elsewhere within this codebase) for the EURITO project. All of these batchables pipe the data into an Elasticsearch database, which is then cloned by EURITO.
Transfer pre-collected PATSTAT data from MySQL to Elasticsearch. Only EU patents since the year 2000 are considered. The patents are grouped by patent families.
GtR (UK publicly funded research)¶
Batchable tools for collecting and processing GtR data. As documented under packages and routines, the pipeline is executed in the following order (documentation for the run.py files is given below, which isn’t super-informative. You’re better off looking under packages and routines).
The data is collected by traversing the graph exposed by the GtR API, and is parsed into MySQL (tier 0). There is a further module for directly generating document embeddings of GtR project descriptions, which can be used for finding topics.
Starting from GtR projects, iteratively and recursively discover all GtR entities by crawling the API.
Document embedding of GtR data. Would be better if this was generalized (i.e. not GtR specific), and migrated to batchables.nlp [see https://github.com/nestauk/nesta/issues/203]
NiH data (health research)¶
Batchables for the collection and processing of NiH data. As documented under packages and routines, the pipeline is executed in the following order (documentation for the run.py files is given below, which isn’t super-informative. You’re better off looking under packages and routines).
The data is collected from official data dumps, parsed into MySQL (tier 0) and then piped into Elasticsearch (tier 1), post-processing.
Geocode NiH data (from MySQL) and pipe into Elasticsearch.
Retrieve NiH abstracts from MySQL, assign pre-calculated MeSH terms for each abstract, and pipe data into Elasticsearch. Exact abstract duplicates are removed at this stage.
Deduplicate NiH articles based on similarity scores using Elasticsearch’s document similarity API. Similarity is calculated based on the description of the project, the project abstract and the title of the project. Funding information is aggregated (summed) across all deduplicated articles, for the total and annuals funds.
Meetup (social networking / knowledge exchange)¶
NB: The meetup pipeline will not work until this issue has been resolved.
Batchables for the Meetup data collection pipeline. As documented under packages and routines, the pipeline is executed in the following order (documentation for the run.py files is given below, which isn’t super-informative. You’re better off looking under packages and routines).
The topic_tag_elasticsearch
module is responsible for piping data to elasticsearch, as well as apply topic tags and filtering small groups out of the data.
General-purpose batchables¶
Bulk geocoding¶
Natural Language Processing¶
Batchable utilities for NLP. Note that modules prefixed with [AUTOML]
are
designed to be launched by AutoMLTask
, and those with the addition * (i.e.
[AUTOML*]
) are the designed to be the final task in an AutoMLTask
chain
(i.e. they provide a ‘loss’).
Generate topics based on the CorEx algorithm. Loss is calculated from the total correlation explained.
Find and replace ngrams in a body of text, based on Wiktionary N-Grams. Whilst at it, the ngrammer also tokenizes and removes stop words (unless they occur within an n-gram)
Applies TFIDF cuts to a dataset via environmental variables lower_tfidf_percentile and upper_tfidf_percentile.
Vectorizes (counts or binary) text data, and applies basic filtering of extreme term/document frequencies.
-
term_counts
(dct, row, binary=False)[source]¶ Convert a single single document to term counts via a gensim dictionary.
Parameters: - dct (Dictionary) – Gensim dictionary.
- row (str) – A document.
- binary (bool) – Binary rather than total count?
Returns: dict of term id (from the Dictionary) to term count.
ORMs¶
SQLAlchemy
ORMs for the routines, which allows easy integration of testing (including automatic setup of test databases and tables).
Meetup¶
NIH schema¶
The schema for the World RePORTER data.
-
class
Projects
(**kwargs)[source]¶ Bases:
sqlalchemy.ext.declarative.api.Base
-
application_id
¶
-
activity
¶
-
administering_ic
¶
-
application_type
¶
-
arra_funded
¶
-
award_notice_date
¶
-
budget_start
¶
-
budget_end
¶
-
cfda_code
¶
-
core_project_num
¶
-
ed_inst_type
¶
-
foa_number
¶
-
full_project_num
¶
-
funding_ics
¶
-
funding_mechanism
¶
-
fy
¶
-
ic_name
¶
-
org_city
¶
-
org_country
¶
-
org_dept
¶
-
org_district
¶
-
org_duns
¶
-
org_fips
¶
-
org_ipf_code
¶
-
org_name
¶
-
org_state
¶
-
org_zipcode
¶
-
phr
¶
-
pi_ids
¶
-
pi_names
¶
-
program_officer_name
¶
-
project_start
¶
-
project_end
¶
-
project_terms
¶
-
project_title
¶
-
serial_number
¶
-
study_section
¶
-
study_section_name
¶
-
suffix
¶
-
support_year
¶
-
direct_cost_amt
¶
-
indirect_cost_amt
¶
-
total_cost
¶
-
subproject_id
¶
-
total_cost_sub_project
¶
-
nih_spending_cats
¶
-
-
class
Abstracts
(**kwargs)[source]¶ Bases:
sqlalchemy.ext.declarative.api.Base
-
application_id
¶
-
abstract_text
¶
-
-
class
Publications
(**kwargs)[source]¶ Bases:
sqlalchemy.ext.declarative.api.Base
-
pmid
¶
-
affiliation
¶
-
country
¶
-
issn
¶
-
journal_issue
¶
-
journal_title
¶
-
journal_title_abbr
¶
-
journal_volume
¶
-
lang
¶
-
page_number
¶
-
pub_date
¶
-
pub_title
¶
-
pub_year
¶
-
pmc_id
¶
-
-
class
Patents
(**kwargs)[source]¶ Bases:
sqlalchemy.ext.declarative.api.Base
-
patent_id
¶
-
patent_title
¶
-
project_id
¶
-
patent_org_name
¶
-
Ontologies and schemas¶
Tier 0¶
Raw data collections (“tier 0”) in the production system do not adhere to a fixed schema or ontology, but instead have a schema which is very close to the raw data. Modifications to field names tend to be quite basic, such as lowercase and removal of whitespace in favour of a single underscore.
Tier 1¶
Processed data (“tier 1”) is intended for public consumption, using a common ontology. The convention we use is as follows:
- Field names are composed of up to three terms: a
firstName
,middleName
andlastName
- Each term (e.g.
firstName
) is written in lowerCamelCase. firstName
terms correspond to a restricted set of basic quantities.middleName
terms correspond to a restricted set of modifiers (e.g. adjectives) which add nuance to thefirstName
term. Note, the specialmiddleName
termof
is reserved as the default value in case nomiddleName
is specified.lastName
terms correspond to a restricted set of entity types.
Valid examples are date_start_project
and title_of_project
.
Tier 0 fields are implictly excluded from tier 1 if they are missing from the schema_transformation
file. Tier 1 schema field names are applied via nesta.packages.decorator.schema_transform
Tier 2¶
Although not-yet-implemented, the tier 2 schema is reserved for future graph ontologies. Don’t expect any changes any time soon!
Elasticsearch mappings¶
Our methodology for constructing Elasticsearch mappings is described here. It is intended to minimise duplication of efforts and enforce standardisation when referring to a common dataset whilst being flexible to individual project needs. It is implied in our framework that a single dataset
can be used across many projects, and each project is mapped to a single endpoint
. It is useful to start by looking at the structure of the nesta/core/schemas/tier_1/mappings/
directory:
.
├── datasets
│ ├── arxiv_mapping.json
│ ├── companies_mapping.json
│ ├── cordis_mapping.json
│ ├── gtr_mapping.json
│ ├── meetup_mapping.json
│ ├── nih_mapping.json
│ └── patstat_mapping.json
├── defaults
│ └── defaults.json
└── endpoints
├── arxlive
│ └── arxiv_mapping.json
├── eurito
│ ├── arxiv_mapping.json
│ ├── companies_mapping.json
│ └── patstat_mapping.json
└── health-scanner
├── aliases.json
├── config.yaml
├── nih_mapping.json
└── nulls.json
Firstly we consider defaults/defaults.json
which should contain all default fields for all mappings - for example standard analyzers and dynamic strictness. We might also consider putting global fields there.
Next consider the datasets
subdirectory. Each mapping file in here should contain the complete mappings
field for the respective dataset. The naming convention <dataset>_mapping.json
is a hard requirement, as <dataset>
will map to the index for this dataset
at any given endpoint
.
Finally consider the endpoints
subdirectory. Each sub-subdirectory here should map to any endpoint
which requires changes beyond the defaults
and datasets
mappings. Each mapping file within each endpoint
sub-subdirectory (e.g. arxlive
or health-scanner
) should satisfy the same naming convention (<dataset>_mapping.json
). All conventions here are also consistent with the elasticsearch.yaml
configuration file (to see this configuration, you will need to clone the repo and follow these steps to unencrypt the config), which looks a little like this:
## The following assumes the AWS host endpoing naming convention:
## {scheme}://search-{endpoint}-{id}.{region}.es.amazonaws.com
defaults:
scheme: https
port: 443
region: eu-west-2
type: _doc
endpoints:
# -------------------------------
# <AWS endpoint domain name>:
# id: <AWS endpoint UUID>
# <default override key>: <default override value> ## e.g.: scheme, port, region, _type
# indexes:
# <index name>: <incremental version number> ## Note: defaults to <index name>_dev in testing mode
# -------------------------------
arxlive:
id: <this is a secret>
indexes:
arxiv: 4
# -------------------------------
health-scanner:
id: <this is a secret>
indexes:
nih: 6
companies: 5
meetup: 4
... etc ...
Note that for the health-scanner
endpoint, companies
and meetup
will be generated from the datasets
mappings, as they are not specified under the endpoints/health-scanner
subdirectory. Also note that endpoints
sub-directories do not need to exist for each endpoint
to be generated: the mappings will simply be generated from the dataset defaults. For example, a new endpoint general
can be generated from the DAPS codebase using the above, even though there is no endpoints/general
sub-subdirectory.
Individual endpoints
can also specify aliases.json
which harmonises field names across datasets for specific endpoints. This uses a convention as follows:
{
#...the convention is...
"<new field name>": {
"<dataset 1>": "<old field name 1>",
"<dataset 2>": "<old field name 2>",
"<dataset 3>": "<old field name 3>"
},
#...an example is...
"city": {
"companies": "placeName_city_organisation",
"meetup": "placeName_city_group",
"nih": "placeName_city_organisation"
},
#...etc...#
}
By default, this applies (what Joel calls) a “soft” alias, which is an Elasticsearch alias, however by specifying hard-alias=true
in config.yaml
(see health-scanner
above), the alias is instead applied directly (i.e. field names are physically replaced, not aliased).
You will also notice the nulls.json
file in the health-scanner
endpoint. This is a relatively experimental feature for automatically nullifying values on ingestion through ElasticsearchPlus, in lieu of proper exploratory data analysis. The logic and format for this is documented here.
Mapping construction hierarchy¶
Each mapping is constructed by overriding nested fields using the defaults
datasets
and endpoints
, in that order (i.e. endpoints
override nested fields in datasets
, and datasets
override those in defaults
). If you would like to “switch off” a field from the defaults
or datasets
mappings, you should set the value of the nested field to null
. For example:
{
"mappings": {
"_doc": {
"dynamic": "strict",
"properties": {
"placeName_zipcode_organisation": null
}
}
}
}
will simply “switch off” the field placeName_zipcode_organisation
, which was specified in datasets
.
The logic for the mapping construction hierarchy is demonstrated in the respective orms.orm_utils.get_es_mapping
function:
def get_es_mapping(dataset, endpoint):
'''Load the ES mapping for this dataset and endpoint,
including aliases.
Args:
dataset (str): Name of the dataset for the ES mapping.
endpoint (str): Name of the AWS ES endpoint.
Returns:
:obj:`dict`
'''
mapping = _get_es_mapping(dataset, endpoint)
_apply_alias(mapping, dataset, endpoint)
_prune_nested(mapping) # prunes any nested keys with null values
return mapping
Integrated tests¶
The following pytest
tests are made (and triggered on PR via travis):
aliases.json
files are checked for consistency with availabledatasets
.- All mappings for each in
datasets
andendpoints
are fully generated, and tested for compatibility with the schema transformations (which are, in turn, checked against the valid ontology inontology.json
).
Features in DAPS2¶
- The index version (e.g.
'arxiv': 4
inelasticsearch.yaml
) will be automatically generated from semantic versioning and the git hash in DAPS2, therefore theindexes
field will consolidate to an itemised list of indexes. - The mappings under
datasets
will be automatically generated from the open ontology which will be baked into the tier-0 schemas. This will renderschema_transformations
redundant. - Elasticsearch components will be factored out of
orm_utils
.
Luigi Hacks¶
Modifications and possible future contributions to the Luigi module.
autobatch¶
batchclient¶
NOTE: overwhelmingly based on this, where the following documentation
has been directly lifted. The main difference to the latter, is that
AWS jobs are submitted via **kwargs
in order to allow more
flexibility (and probably more future-proofing if new parameters are
added to boto3).
AWS Batch wrapper for Luigi
From the AWS website:
AWS Batch enables you to run batch computing workloads on the AWS Cloud.
Batch computing is a common way for developers, scientists, and engineers to access large amounts of compute resources, and AWS Batch removes the undifferentiated heavy lifting of configuring and managing the required infrastructure. AWS Batch is similar to traditional batch computing software. This service can efficiently provision resources in response to jobs submitted in order to eliminate capacity constraints, reduce compute costs, and deliver results quickly.
See AWS Batch User Guide for more details.
To use AWS Batch, you create a jobDefinition JSON that defines a docker run command, and then submit this JSON to the API to queue up the task. Behind the scenes, AWS Batch auto-scales a fleet of EC2 Container Service instances, monitors the load on these instances, and schedules the jobs.
This boto3-powered wrapper allows you to create Luigi Tasks to submit Batch
jobDefinition``s. You can either pass a dict (mapping directly to the
``jobDefinition
JSON) OR an Amazon Resource Name (arn) for a previously
registered jobDefinition
.
Requires:
- boto3 package
- Amazon AWS credentials discoverable by boto3 (e.g., by using
aws configure
from awscli) - An enabled AWS Batch job queue configured to run on a compute environment.
Written and maintained by Jake Feala (@jfeala) for Outlier Bio (@outlierbio)
-
class
BatchClient
(poll_time=10, **kwargs)[source]¶ Bases:
object
-
get_job_status
(job_id)[source]¶ Retrieve task statuses from ECS API
Parameters: (str) (job_id) – AWS Batch job uuid Returns one of {SUBMITTED|PENDING|RUNNABLE|STARTING|RUNNING|SUCCEEDED|FAILED}
-
misctools¶
A collection of miscellaneous tools.
-
get_config
(file_name, header, path='core/config/')[source]¶ Get the configuration from a file in the luigi config path directory, and convert the key-value pairs under the config
header
into a dict.Parameters: - file_name (str) – The configuation file name.
- header (str) – The header key in the config file.
Returns: dict
-
get_paths_from_relative
(relative=1)[source]¶ A helper method for within
find_filepath_from_pathstub
. Prints all file and directory paths from a relative number of ‘backward steps’ from the current working directory.
-
find_filepath_from_pathstub
(path_stub)[source]¶ Find the full path of the ‘closest’ file (or directory) to the current working directory ending with
path_stub
. The closest file is determined by starting forwards of the current working directory. The algorithm is then repeated by moving the current working directory backwards, one step at a time until the file (or directory) is found. If the HOME directory is reached, the algorithm raisesFileNotFoundError
.Parameters: path_stub (str) – The partial file (or directory) path stub to find. Returns: The full path to the partial file (or directory) path stub.
-
load_yaml_from_pathstub
(pathstub, filename)[source]¶ Basic wrapper around
find_filepath_from_pathstub
which also opens the file (assumed to be yaml).Parameters: - pathstub (str) – Stub of filepath where the file should be found.
- filename (str) – The filename.
Returns: The file contents as a json-like object.
-
load_batch_config
(luigi_task, additional_env_files=[], **overrides)[source]¶ Load default luigi batch parametes, and apply any overrides if required. Note that the usage pattern for this is normally
load_batch_config(self, additional_env_files, **overrides)
from within a luigi Task, whereself
is the luigi Task.Parameters: - luigi_task (luigi.Task) – Task to extract test and date parameters from.
- additional_env_files (list) – List of files to pass directly to the batch local environment.
- overrides (**kwargs) – Any overrides or additional parameters to pass to the batch task as parameters.
Returns: Batch configuration paramaters, which can be expanded as **kwargs in BatchTask.
Return type: config (dict)
mysqldb¶
NOTE: overwhelmingly based on this2, where the following documentation
has been directly lifted. The main difference to the latter, is that
**cnx_kwargs
in the constructor can accept port as a key.
-
make_mysql_target
(luigi_task, mysqldb_env='MYSQLDB')[source]¶ Generate a MySQL target for a luigi Task, based on the Task’s
date
andtest
parameters, and indicated configuration file.Parameters: - luigi_task (luigi.Task) – Task to extract test and date parameters from.
- mysqldb_env (str) – Environmental variable storing the path to MySQL config.
Returns: target (MySqlTarget)
-
class
MySqlTarget
(host, database, user, password, table, update_id, **cnx_kwargs)[source]¶ Bases:
luigi.target.Target
Target for a resource in MySql.
-
marker_table
= 'table_updates'¶
-
s3¶
A more recent implementation of AWS S3 support, stolen from: https://gitlab.com/ced/s3_helpers/blob/master/luigi_s3_target.py, but instead using modern boto3 commands.
-
class
S3FS
(**kwargs)[source]¶ Bases:
luigi.target.FileSystem
-
mkdir
(path, parents=True, raise_if_exists=False)[source]¶ Create directory at location
path
Creates the directory at
path
and implicitly create parent directories if they do not already exist.Parameters: - path (str) – a path within the FileSystem to create as a directory.
- parents (bool) – Create parent directories when necessary. When parents=False and the parent directory doesn’t exist, raise luigi.target.MissingParentDirectory
- raise_if_exists (bool) – raise luigi.target.FileAlreadyExists if the folder already exists.
-
isdir
(path)[source]¶ Return
True
if the location atpath
is a directory. If not, returnFalse
.Parameters: path (str) – a path within the FileSystem to check as a directory. Note: This method is optional, not all FileSystem subclasses implements it.
-
listdir
(path)[source]¶ Return a list of files rooted in path.
This returns an iterable of the files rooted at
path
. This is intended to be a recursive listing.Parameters: path (str) – a path within the FileSystem to list. Note: This method is optional, not all FileSystem subclasses implements it.
-
-
class
S3Target
(path, s3_args={}, **kwargs)[source]¶ Bases:
luigi.target.FileSystemTarget
-
fs
= None¶
-
open
(mode='rb')[source]¶ Open the FileSystem target.
This method returns a file-like object which can either be read from or written to depending on the specified mode.
Parameters: mode (str) – the mode r opens the FileSystemTarget in read-only mode, whereas w will open the FileSystemTarget in write mode. Subclasses can implement additional options.
-
Scripts¶
A set of helper scripts for the batching system.
Note that this directory is required to sit in $PATH. By convention, all executables in this directory start with nesta_ so that our developers know where to find them.
nesta_prepare_batch¶
Collect a batchable run.py
file, including dependencies and an automaticlly generated requirements file; which is all zipped up and sent to AWS S3 for batching. This script is executed automatically in luigihacks.autobatch.AutoBatchTask.run
.
Parameters:
- BATCHABLE_DIRECTORY: The path to the directory containing the batchable
run.py
file. - ARGS: Space-separated-list of files or directories to include in the zip file, for example imports.
nesta_docker_build¶
Build a docker environment and register it with the AWS ECS container repository.
Parameters:
- DOCKER_RECIPE: A docker recipe. See
docker_recipes/
for a good idea of how to build a new environment.
Elasticsearch¶
The following steps will take you through setting up elasticsearch on an EC2 instance.
Launch the EC2 instance and ssh in so the following can be installed:
docker¶
sudo yum install docker -y
docker-compose¶
curl -L https://github.com/docker/compose/releases/download/1.22.0/docker-compose-\`uname -s\` - \`uname -m\` -o /usr/local/bin/docker-compose
chmod +x /usr/local/bin/docker-compose
sudo ln -s /usr/local/bin/docker-compose /usr/bin/docker-compose
more info: https://github.com/docker/compose/releases
docker permissions¶
sudo usermod -a -G docker $USER
vm.max_map_count¶
set permanantly in /etc/sysctl.conf by adding the following line:
vm.max_map_count=262144
more info: https://www.elastic.co/guide/en/elasticsearch/reference/current/docker.html
Docker¶
- the
docker-compose.yml
needs to include ulimits settings:: - ulimits:
- memlock:
- soft: -1 hard: -1
- nofile:
- soft: 65536 hard: 65536
Recipes for http or https clusters can be found in: nesta/core/scripts/elasticsearch
There is also an EC2 AMI for a http node stored in the London region: elasticsearch node London vX
Reindexing data from a remote cluster¶
- reindex permissions need to be set in the new cluster’s elasticsearch.yml
- if the existing cluster is AWS hosted ES the ip address needs to be added to the security settings
- follow this guide: https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-reindex.html#reindex-from-remote
- index and query do not need to be supplied
- if reindexing from AWS ES the port should be 443 for https. This is mandatory in the json sent to the reindexing api
Containerised Luigi¶
Requirements¶
To containerise a pipeline a few steps are required:
- All imports must be absolute, ie
from nesta.
packages, core etc - Once testing is complete the code should be committed and pushed to github, as this prevents the need to use local build options
- If building and running locally, Docker must be installed on the local machine and given enough RAM in the settings to run the pipeline
- Any required configuration files must be in
nesta.core.config
ie luigi and mysql config files, any API keys. This directory is ignored but check before committing
Build¶
The build uses a multi-stage Dockerfile to reduce the size of the final image:
- Code is git cloned and requirements are pip installed into a virtual environment
- The environment is copied into the second image
From the root of the repository:
docker build -f docker/Dockerfile -t name:tag .
Where name
is the name of the created image and tag
is the chosen tag.
Eg arxlive:latest
. This just makes the run step easier rather than using the generated id
The two stage build will normally just rebuild the second stage pulling in new code only.
If a full rebuild is required, eg after requirements.txt has changed then include:
--no-cache
Python version defaults to 3.6 but can be set during build by including the flag:
--build-arg python-version=3.7
Tag defaults to dev but this can be overridden by including the flag:
--build-arg GIT_TAG=0.3
a branch name also works --build-arg GIT_TAG=my-branch
Work from a branch or locally while testing. Override the target branch from Github using the method above, or use the commented out code in the Dockerfile to switch to build from local files. Don’t commit this change!
Run¶
As only one pipeline runs in the container the luigid
scheduler is not used.
There is a docker-compose
file for arXlive which mounts your local ~.aws folder for AWS credentials as this outside docker’s context:
docker-compose -f docker/docker-compose.yml run luigi --module module_path params
Where:
docker-compose.yml
is the docker-compose file containing the image:image_name:tag
from the buildmodule_path
is the full python path to the moduleparams
are any other params to supply as per normal, ie--date
--production
etc
Eg: docker-compose -f docker/docker-compose-arxlive-dev.yml run luigi --module nesta.core.routines.arxiv.arxiv_iterative_root_task RootTask --date 2019-04-16
This could be adapted for each pipeline, or alternatively run with the volume specified
with -v
docker run -v ~/.aws:/root/.aws:ro name:tag --module ...
Where name
is the name of the created image and tag
is the chosen tag.
Eg arxlive:latest
--module ...
onwards contains the arguments you would pass to Luigi.
Scheduling¶
- Create an executable shell script in
nesta.core.scripts
to launch docker-compose with all the necessary parameters eg: production - Add a cron job to the shell script (there are some good online cron syntax checking tools, if needed)
- Set the cron job to run every few minutes while testing and check the logs with
docker logs mycontainterhash --tail 50
. Obtain the hash usingdocker ps
- It will run logged in as the user who set it up but there still may still be some permissions issues to deal with
Currently scheduled¶
arXlive:
- A shell script to launch docker-compose for arXlive is set up to run in a cron job on user
russellwinch
- This is scheduled for Sunday-Thursday at 0300 GMT. arXiv is updated on these days at 0200 GMT
- Logs are just stored in the container, use
docker logs container_name
to view
Important points¶
- Keep any built images secure, they contain credentials
- You need to rebuild if code has changed
- As there is no central scheduler there is nothing stopping you from running the task more than once at the same time, by launching the container multiple times
- The graphical interface is not enabled without the scheduler
Debugging¶
If necessary, it’s possible to debug inside the container, but the endpoint
needs to be overridden with bash
:
docker run --entrypoint /bin/bash -itv ~/.aws:/root/.aws:ro image_name:tag
Where image_name:tag
is the image from the build step
This includes the mounting of the .aws folder
Almost nothing is installed (not even vi!!) other than Python so apt-get update
and then apt-get install
whatever you need
Todo¶
A few things are sub-optimal:
- The container runs on the prod box rather than in the cloud in ECS
- Credentials are held in the container and local AWS config is required, this is the cause of the above point
- Due to the Nesta monorepo everything is pip installed, making a large container size with many unrequired packages. Pipeline specific requirements should be considered
- As logs are stored in the old containers they are kept until the next run where they are pruned and the logs are lost. Add a method of getting the logs to the host logger and record centrally
- In the arXlive pipeline there are at least 500 calls to the MAG API each run as the process tries to pick up new title matches on existing articles. As the API key only allows 10,000 calls per month this is currently OK with the schedule as it is but could possibly go over at some point
FAQ¶
Where is the data?¶
As a general rule-of-thumb, our data is always stored in the London region (eu-west-2
), in either RDS (tier-0, MySQL) or Elasticsearch (tier-1). For the EURITO project we also use Neo4j (tier-1), and in the distant future we will use Neo4j for tier-2 (i.e. a knowledge graph).
Why don’t you use Aurora rather than MySQL?¶
Aurora is definitely cheaper for stable production and business processes but not for research and development. You are charged for every byte of data you have ever consumed. This quickly spirals out-of-control for big-data development. Maybe one day we’ll consider migrating back, once the situation stabilises.
Where are the production machines?¶
Production machines (EC2) run in Ohio (us-east-2).
Where is the latest config?¶
We use git-crypt
to encrypt our configuration files whilst allowing them to be versioned in git (meaning that we can also rollback configuration). To unlock the configuration encryption, you should install git-crypt
, then run bash install.sh
from the project root, and finally unlock the configuration using the key found here.
Where do I start with Elasticsearch?¶
All Elasticsearch indexes (aka “databases” to the rest of the world), mappings (aka “schemas”) and whitelisting can be found here.
I’d recommend using PostMan for spinning up and knocking down indexes. Practice this on a new cluster (which you can spin up from the above link), and then practice PUT
, POST
and DELETE
requests to PUT
an index (remember: “database”) with a mapping (“schema”), inserting a “row” with POST
and then deleting the index with DELETE
. You will quickly learn that it’s very easy to delete everything in Elasticsearch.
Troubleshooting¶
I’m having problems using the config files!¶
We use git-crypt
to encrypt our configuration files whilst allowing them to be versioned in git (meaning that we can also rollback configuration). To unlock the configuration encryption, you should install git-crypt
, then run bash install.sh
from the project root, and finally unlock the configuration using the key.
How do I restart the apache server after downtime?¶
sudo service httpd restart
How do I restart the luigi server after downtime?¶
sudo su - luigi
source activate py36
luigid --background --pidfile /var/run/luigi/luigi.pid --logdir /var/log/luigi
How do I perform initial setup to ensure the batchables will run?¶
- AWS CLI needs to be installed and configured:
pip install awscli
aws configure
AWS Access Key ID and Secret Access Key are set up in IAM > Users > Security Credentials
Default region name should be eu-west-1
to enable the error emails to be sent
In AWS SES the sender and receiver email addresses need to be verified
- The config files need to be accessible and the PATH and LUIGI_CONFIG_PATH need to be amended accordingly
How can I send/receive emails from Luigi?¶
You should set the environmental variable export LUIGI_EMAIL="<your.email@something>"
in your .bashrc
. You can test this with luigi TestNotificationsTask --local-scheduler --email-force-send
. Make sure your email address has been registered under AWS SES.
How do I add a new user to the server?¶
- add the user with
useradd --create-home username
- add sudo privileges following these instructions
- add to ec2 user group with
sudo usermod -a -G ec2-user username
- set a temp password with
passwd username
- their home directory will be
/home/username/
- copy
.bashrc
to their home directory - create folder
.ssh
in their home directory - copy
.ssh/authorized_keys
to the same folder in their home directory (DONT MOVE IT!!) cd
to their home directory and perform the below- chown their copy of
.ssh/authorized_keys
to their username:chown username .ssh/authorized_keys
- clone the nesta repo
- copy
core/config
files - set password to be changed next login
chage -d 0 username
- share the temp password and core pem file
If necessary:
- sudo chmod g+w /var/tmp/batch
Packages¶
Nesta’s collection of tools for meaty tasks. Any processes that go into production come here first, but there are other good reasons for code to end up here.
Production¶
Nesta’s production system is based on Luigi pipelines, and are designed to be entirely run on AWS via the batch service. The main Luigi server runs on a persistent EC2 instance. Beyond the well documented Luigi code, the main features of the nesta production system are:
luigihacks.autobatch
, which facilates a managedLuigi.Task
which is split, batched and combined in a single step. Currently only synchronous jobs are accepted. Asynchonous jobs (where downstreamLuigi.Task
jobs can be triggered) are a part of a longer term plan.scripts.nesta_prepare_batch
which zips up the batchable with the specified environmental files and ships it to AWS S3.scripts.nesta_docker_build
which builds a specified docker environment and ships it to AWS ECS.