Luigi Hacks¶
Modifications and possible future contributions to the Luigi module.
autobatch¶
Automatic preparation, submission and consolidation of AWS batch tasks; as a single Luigi Task.
-
command_line
(command, verbose=False)[source]¶ Execute command line tasks and return the final output line. This is particularly useful for extracting the AWS access keys directly from the OS; as well as executing the environment preparation script (
core/scripts/nesta_prepare_batch.sh
).
-
class
AutoBatchTask
(*args, **kwargs)[source]¶ Bases:
luigi.task.Task
,abc.ABC
A base class for automatically preparing and submitting AWS batch tasks.
Unlike regular Luigi
Tasks
, which require the user to override therequires
,output
andrun
methods,AutoBatchTask
instead effectively replacesrun
with two new abstract methods:prepare
andcombine
, which are repectively documented. With these abstract methods specified,AutoBatchTask
will automatically prepare, submit, and combine one batch task (specified incore.batchables
) per parameter set specified in theprepare
method. Thecombine
method will subsequently combine the outputs from the batch task.Parameters: - batchable (str) – Path to the directory containing the run.py batchable
- job_def (str) – Name of the AWS job definition
- job_name (str) – Name given to this AWS batch job
- job_queue (str) – AWS batch queue
- region_name (str) – AWS region from which to batch
- env_files (
list
ofstr
, optional) – List of names pointing to local environmental files (for example local imports or scripts) which should be zipped up with the AWS batch job environment. Defaults to []. - vcpus (int, optional) – Number of CPUs to request for the AWS batch job. Defaults to 1.
- memory (int, optional) – Memory to request for the AWS batch job. Defaults to 512 MiB.
- max_runs (int, optional) – Number of batch jobs to run, which is useful for testing a subset of the full pipeline, or making cost predictions for AWS computing time. Defaults to None, implying that all jobs should be run.
- poll_time (int, optional) – Time in seconds between querying the AWS batch job status. Defaults to 60.
- success_rate (float, optional) – If the fraction of FAILED jobs exceeds
success_rate
then the entire Task, along with any submitted AWS batch jobs, is killed. The fraction is calculated with respect to any jobs with RUNNING, SUCCEEDED or FAILED status. Defaults to 0.75.
-
batchable
= <luigi.parameter.Parameter object>¶
-
job_def
= <luigi.parameter.Parameter object>¶
-
job_name
= <luigi.parameter.Parameter object>¶
-
job_queue
= <luigi.parameter.Parameter object>¶
-
region_name
= <luigi.parameter.Parameter object>¶
-
env_files
= <luigi.parameter.ListParameter object>¶
-
vcpus
= <luigi.parameter.IntParameter object>¶
-
memory
= <luigi.parameter.IntParameter object>¶
-
max_runs
= <luigi.parameter.IntParameter object>¶
-
timeout
= <luigi.parameter.IntParameter object>¶
-
poll_time
= <luigi.parameter.IntParameter object>¶
-
success_rate
= <luigi.parameter.FloatParameter object>¶
-
test
= <luigi.parameter.BoolParameter object>¶
-
max_live_jobs
= <luigi.parameter.IntParameter object>¶
-
worker_timeout
= inf¶
-
run
()[source]¶ DO NOT OVERRIDE THIS METHOD.
An implementation of the
Luigi.Task.run
method which is a wrapper around theprepare
,execute
andcombine
methods. Instead of overriding this method, you should implementprepare
andcombine
methods in your class.
-
prepare
()[source]¶ You should implement a method which returns a
list
ofdict
, where eachdict
corresponds to inputs to the batchable. Each row of the output must at least contain the following keys:- done (bool): indicating whether the job has already been finished.
- outinfo (str): Text indicating e.g. the location of the output, for use in the batch job and for combine method
Returns: list
ofdict
-
combine
(job_params)[source]¶ You should implement a method which collects the outputs specified by the outinfo key of
job_params
, which is the output from theprepare
method. This method should finally write to theluigi.Target
output.Parameters: job_params ( list
ofdict
) – The batchable job parameters, as returned from theprepare
method.
-
execute
(job_params, s3file_timestamp)[source]¶ The secret sauce, which automatically submits and monitors the AWS batch jobs. Your AWS access key and id are automatically retrieved via the AWS CLI.
Parameters: - job_params (
list
ofdict
) – The batchable job parameters, as returned from theprepare
method. Each job is submitted from every item in thislist
. Each dict key-value per is converted into an environmental variable in the batch job, with the variable name formed from the key, prefixed by BATCHPAR_. - s3file_timestamp (str) – The timestamp of the batchable zip file to be found on S3 by the AWS batch job.
- job_params (
batchclient¶
NOTE: overwhelmingly based on this, where the following documentation
has been directly lifted. The main difference to the latter, is that
AWS jobs are submitted via **kwargs
in order to allow more
flexibility (and probably more future-proofing if new parameters are
added to boto3).
AWS Batch wrapper for Luigi
From the AWS website:
AWS Batch enables you to run batch computing workloads on the AWS Cloud.
Batch computing is a common way for developers, scientists, and engineers to access large amounts of compute resources, and AWS Batch removes the undifferentiated heavy lifting of configuring and managing the required infrastructure. AWS Batch is similar to traditional batch computing software. This service can efficiently provision resources in response to jobs submitted in order to eliminate capacity constraints, reduce compute costs, and deliver results quickly.
See AWS Batch User Guide for more details.
To use AWS Batch, you create a jobDefinition JSON that defines a docker run command, and then submit this JSON to the API to queue up the task. Behind the scenes, AWS Batch auto-scales a fleet of EC2 Container Service instances, monitors the load on these instances, and schedules the jobs.
This boto3-powered wrapper allows you to create Luigi Tasks to submit Batch
jobDefinition``s. You can either pass a dict (mapping directly to the
``jobDefinition
JSON) OR an Amazon Resource Name (arn) for a previously
registered jobDefinition
.
Requires:
- boto3 package
- Amazon AWS credentials discoverable by boto3 (e.g., by using
aws configure
from awscli) - An enabled AWS Batch job queue configured to run on a compute environment.
Written and maintained by Jake Feala (@jfeala) for Outlier Bio (@outlierbio)
-
class
BatchClient
(poll_time=10, **kwargs)[source]¶ Bases:
object
-
get_job_status
(job_id)[source]¶ Retrieve task statuses from ECS API
Parameters: (str) (job_id) – AWS Batch job uuid Returns one of {SUBMITTED|PENDING|RUNNABLE|STARTING|RUNNING|SUCCEEDED|FAILED}
-
misctools¶
A collection of miscellaneous tools.
-
get_config
(file_name, header, path='core/config/')[source]¶ Get the configuration from a file in the luigi config path directory, and convert the key-value pairs under the config
header
into a dict.Parameters: - file_name (str) – The configuation file name.
- header (str) – The header key in the config file.
Returns: dict
-
get_paths_from_relative
(relative=1)[source]¶ A helper method for within
find_filepath_from_pathstub
. Prints all file and directory paths from a relative number of ‘backward steps’ from the current working directory.
-
find_filepath_from_pathstub
(path_stub)[source]¶ Find the full path of the ‘closest’ file (or directory) to the current working directory ending with
path_stub
. The closest file is determined by starting forwards of the current working directory. The algorithm is then repeated by moving the current working directory backwards, one step at a time until the file (or directory) is found. If the HOME directory is reached, the algorithm raisesFileNotFoundError
.Parameters: path_stub (str) – The partial file (or directory) path stub to find. Returns: The full path to the partial file (or directory) path stub.
mysqldb¶
NOTE: overwhelmingly based on this2, where the following documentation
has been directly lifted. The main difference to the latter, is that
**cnx_kwargs
in the constructor can accept port as a key.
-
class
MySqlTarget
(host, database, user, password, table, update_id, **cnx_kwargs)[source]¶ Bases:
luigi.target.Target
Target for a resource in MySql.
-
marker_table
= 'table_updates'¶
-
s3¶
A more recent implementation of AWS S3 support, stolen from: https://gitlab.com/ced/s3_helpers/blob/master/luigi_s3_target.py, but instead using modern boto3 commands.
-
class
S3FS
(**kwargs)[source]¶ Bases:
luigi.target.FileSystem
-
mkdir
(path, parents=True, raise_if_exists=False)[source]¶ Create directory at location
path
Creates the directory at
path
and implicitly create parent directories if they do not already exist.Parameters: - path (str) – a path within the FileSystem to create as a directory.
- parents (bool) – Create parent directories when necessary. When parents=False and the parent directory doesn’t exist, raise luigi.target.MissingParentDirectory
- raise_if_exists (bool) – raise luigi.target.FileAlreadyExists if the folder already exists.
-
isdir
(path)[source]¶ Return
True
if the location atpath
is a directory. If not, returnFalse
.Parameters: path (str) – a path within the FileSystem to check as a directory. Note: This method is optional, not all FileSystem subclasses implements it.
-
listdir
(path)[source]¶ Return a list of files rooted in path.
This returns an iterable of the files rooted at
path
. This is intended to be a recursive listing.Parameters: path (str) – a path within the FileSystem to list. Note: This method is optional, not all FileSystem subclasses implements it.
-
-
class
S3Target
(path, s3_args={}, **kwargs)[source]¶ Bases:
luigi.target.FileSystemTarget
-
fs
= None¶
-
open
(mode='rb')[source]¶ Open the FileSystem target.
This method returns a file-like object which can either be read from or written to depending on the specified mode.
Parameters: mode (str) – the mode r opens the FileSystemTarget in read-only mode, whereas w will open the FileSystemTarget in write mode. Subclasses can implement additional options.
-