Luigi Hacks

Modifications and possible future contributions to the Luigi module.

autobatch

Automatic preparation, submission and consolidation of AWS batch tasks; as a single Luigi Task.

command_line(command, verbose=False)[source]

Execute command line tasks and return the final output line. This is particularly useful for extracting the AWS access keys directly from the OS; as well as executing the environment preparation script (core/scripts/nesta_prepare_batch.sh).

class AutoBatchTask(*args, **kwargs)[source]

Bases: luigi.task.Task, abc.ABC

A base class for automatically preparing and submitting AWS batch tasks.

Unlike regular Luigi Tasks, which require the user to override the requires, output and run methods, AutoBatchTask instead effectively replaces run with two new abstract methods: prepare and combine, which are repectively documented. With these abstract methods specified, AutoBatchTask will automatically prepare, submit, and combine one batch task (specified in core.batchables) per parameter set specified in the prepare method. The combine method will subsequently combine the outputs from the batch task.

Parameters:
  • batchable (str) – Path to the directory containing the run.py batchable
  • job_def (str) – Name of the AWS job definition
  • job_name (str) – Name given to this AWS batch job
  • job_queue (str) – AWS batch queue
  • region_name (str) – AWS region from which to batch
  • env_files (list of str, optional) – List of names pointing to local environmental files (for example local imports or scripts) which should be zipped up with the AWS batch job environment. Defaults to [].
  • vcpus (int, optional) – Number of CPUs to request for the AWS batch job. Defaults to 1.
  • memory (int, optional) – Memory to request for the AWS batch job. Defaults to 512 MiB.
  • max_runs (int, optional) – Number of batch jobs to run, which is useful for testing a subset of the full pipeline, or making cost predictions for AWS computing time. Defaults to None, implying that all jobs should be run.
  • poll_time (int, optional) – Time in seconds between querying the AWS batch job status. Defaults to 60.
  • success_rate (float, optional) – If the fraction of FAILED jobs exceeds success_rate then the entire Task, along with any submitted AWS batch jobs, is killed. The fraction is calculated with respect to any jobs with RUNNING, SUCCEEDED or FAILED status. Defaults to 0.75.
batchable = <luigi.parameter.Parameter object>
job_def = <luigi.parameter.Parameter object>
job_name = <luigi.parameter.Parameter object>
job_queue = <luigi.parameter.Parameter object>
region_name = <luigi.parameter.Parameter object>
env_files = <luigi.parameter.ListParameter object>
vcpus = <luigi.parameter.IntParameter object>
memory = <luigi.parameter.IntParameter object>
max_runs = <luigi.parameter.IntParameter object>
timeout = <luigi.parameter.IntParameter object>
poll_time = <luigi.parameter.IntParameter object>
success_rate = <luigi.parameter.FloatParameter object>
test = <luigi.parameter.BoolParameter object>
max_live_jobs = <luigi.parameter.IntParameter object>
worker_timeout = inf
run()[source]

DO NOT OVERRIDE THIS METHOD.

An implementation of the Luigi.Task.run method which is a wrapper around the prepare, execute and combine methods. Instead of overriding this method, you should implement prepare and combine methods in your class.

prepare()[source]

You should implement a method which returns a list of dict, where each dict corresponds to inputs to the batchable. Each row of the output must at least contain the following keys:

  • done (bool): indicating whether the job has already been finished.
  • outinfo (str): Text indicating e.g. the location of the output, for use in the batch job and for combine method
Returns:list of dict
combine(job_params)[source]

You should implement a method which collects the outputs specified by the outinfo key of job_params, which is the output from the prepare method. This method should finally write to the luigi.Target output.

Parameters:job_params (list of dict) – The batchable job parameters, as returned from the prepare method.
execute(job_params, s3file_timestamp)[source]

The secret sauce, which automatically submits and monitors the AWS batch jobs. Your AWS access key and id are automatically retrieved via the AWS CLI.

Parameters:
  • job_params (list of dict) – The batchable job parameters, as returned from the prepare method. Each job is submitted from every item in this list. Each dict key-value per is converted into an environmental variable in the batch job, with the variable name formed from the key, prefixed by BATCHPAR_.
  • s3file_timestamp (str) – The timestamp of the batchable zip file to be found on S3 by the AWS batch job.

batchclient

NOTE: overwhelmingly based on this, where the following documentation has been directly lifted. The main difference to the latter, is that AWS jobs are submitted via **kwargs in order to allow more flexibility (and probably more future-proofing if new parameters are added to boto3).

AWS Batch wrapper for Luigi

From the AWS website:

AWS Batch enables you to run batch computing workloads on the AWS Cloud.

Batch computing is a common way for developers, scientists, and engineers to access large amounts of compute resources, and AWS Batch removes the undifferentiated heavy lifting of configuring and managing the required infrastructure. AWS Batch is similar to traditional batch computing software. This service can efficiently provision resources in response to jobs submitted in order to eliminate capacity constraints, reduce compute costs, and deliver results quickly.

See AWS Batch User Guide for more details.

To use AWS Batch, you create a jobDefinition JSON that defines a docker run command, and then submit this JSON to the API to queue up the task. Behind the scenes, AWS Batch auto-scales a fleet of EC2 Container Service instances, monitors the load on these instances, and schedules the jobs.

This boto3-powered wrapper allows you to create Luigi Tasks to submit Batch jobDefinition``s. You can either pass a dict (mapping directly to the ``jobDefinition JSON) OR an Amazon Resource Name (arn) for a previously registered jobDefinition.

Requires:

  • boto3 package
  • Amazon AWS credentials discoverable by boto3 (e.g., by using aws configure from awscli)
  • An enabled AWS Batch job queue configured to run on a compute environment.

Written and maintained by Jake Feala (@jfeala) for Outlier Bio (@outlierbio)

exception BatchJobException[source]

Bases: Exception

class BatchClient(poll_time=10, **kwargs)[source]

Bases: object

get_active_queue()[source]

Get name of first active job queue

get_job_id_from_name(job_name)[source]

Retrieve the first job ID matching the given name

get_job_status(job_id)[source]

Retrieve task statuses from ECS API

Parameters:(str) (job_id) – AWS Batch job uuid

Returns one of {SUBMITTED|PENDING|RUNNABLE|STARTING|RUNNING|SUCCEEDED|FAILED}

get_logs(log_stream_name, get_last=50)[source]

Retrieve log stream from CloudWatch

submit_job(**kwargs)[source]

Wrap submit_job with useful defaults

terminate_job(**kwargs)[source]

Wrap terminate_job

hard_terminate(job_ids, reason, iattempt=0, **kwargs)[source]

Terminate all jobs with a hard(ish) exit via an Exception. The function will also wait for jobs to be explicitly terminated

wait_on_job(job_id)[source]

Poll task status until STOPPED

register_job_definition(json_fpath)[source]

Register a job definition with AWS Batch, using a JSON

misctools

A collection of miscellaneous tools.

get_config(file_name, header, path='core/config/')[source]

Get the configuration from a file in the luigi config path directory, and convert the key-value pairs under the config header into a dict.

Parameters:
  • file_name (str) – The configuation file name.
  • header (str) – The header key in the config file.
Returns:

dict

get_paths_from_relative(relative=1)[source]

A helper method for within find_filepath_from_pathstub. Prints all file and directory paths from a relative number of ‘backward steps’ from the current working directory.

find_filepath_from_pathstub(path_stub)[source]

Find the full path of the ‘closest’ file (or directory) to the current working directory ending with path_stub. The closest file is determined by starting forwards of the current working directory. The algorithm is then repeated by moving the current working directory backwards, one step at a time until the file (or directory) is found. If the HOME directory is reached, the algorithm raises FileNotFoundError.

Parameters:path_stub (str) – The partial file (or directory) path stub to find.
Returns:The full path to the partial file (or directory) path stub.

mysqldb

NOTE: overwhelmingly based on this2, where the following documentation has been directly lifted. The main difference to the latter, is that **cnx_kwargs in the constructor can accept port as a key.

class MySqlTarget(host, database, user, password, table, update_id, **cnx_kwargs)[source]

Bases: luigi.target.Target

Target for a resource in MySql.

marker_table = 'table_updates'
touch(connection=None)[source]

Mark this update as complete.

IMPORTANT, If the marker table doesn’t exist, the connection transaction will be aborted and the connection reset. Then the marker table will be created.

exists(connection=None)[source]

Returns True if the Target exists and False otherwise.

connect(autocommit=False)[source]
create_marker_table()[source]

Create marker table if it doesn’t exist.

Using a separate connection since the transaction might have to be reset.

s3

A more recent implementation of AWS S3 support, stolen from: https://gitlab.com/ced/s3_helpers/blob/master/luigi_s3_target.py, but instead using modern boto3 commands.

merge_dicts(*dicts)[source]

Merge dicts together, with later entries overriding earlier ones.

parse_s3_path(path)[source]

For a given S3 path, return the bucket and key values

class S3FS(**kwargs)[source]

Bases: luigi.target.FileSystem

exists(path)[source]

Return true if S3 key exists

remove(path, recursive=True)[source]

Remove a file or directory from S3

mkdir(path, parents=True, raise_if_exists=False)[source]

Create directory at location path

Creates the directory at path and implicitly create parent directories if they do not already exist.

Parameters:
  • path (str) – a path within the FileSystem to create as a directory.
  • parents (bool) – Create parent directories when necessary. When parents=False and the parent directory doesn’t exist, raise luigi.target.MissingParentDirectory
  • raise_if_exists (bool) – raise luigi.target.FileAlreadyExists if the folder already exists.
isdir(path)[source]

Return True if the location at path is a directory. If not, return False.

Parameters:path (str) – a path within the FileSystem to check as a directory.

Note: This method is optional, not all FileSystem subclasses implements it.

listdir(path)[source]

Return a list of files rooted in path.

This returns an iterable of the files rooted at path. This is intended to be a recursive listing.

Parameters:path (str) – a path within the FileSystem to list.

Note: This method is optional, not all FileSystem subclasses implements it.

copy(path, dest)[source]

Copy a file or a directory with contents. Currently, LocalFileSystem and MockFileSystem support only single file copying but S3Client copies either a file or a directory as required.

move(path, dest)[source]

Move a file, as one would expect.

du(path)[source]
class S3Target(path, s3_args={}, **kwargs)[source]

Bases: luigi.target.FileSystemTarget

fs = None
open(mode='rb')[source]

Open the FileSystem target.

This method returns a file-like object which can either be read from or written to depending on the specified mode.

Parameters:mode (str) – the mode r opens the FileSystemTarget in read-only mode, whereas w will open the FileSystemTarget in write mode. Subclasses can implement additional options.
class AtomicS3File(path, s3_obj, **kwargs)[source]

Bases: luigi.target.AtomicLocalFile

move_to_final_destination()[source]