Luigi Hacks

Modifications and possible future contributions to the Luigi module.

autobatch

batchclient

NOTE: overwhelmingly based on this, where the following documentation has been directly lifted. The main difference to the latter, is that AWS jobs are submitted via **kwargs in order to allow more flexibility (and probably more future-proofing if new parameters are added to boto3).

AWS Batch wrapper for Luigi

From the AWS website:

AWS Batch enables you to run batch computing workloads on the AWS Cloud.

Batch computing is a common way for developers, scientists, and engineers to access large amounts of compute resources, and AWS Batch removes the undifferentiated heavy lifting of configuring and managing the required infrastructure. AWS Batch is similar to traditional batch computing software. This service can efficiently provision resources in response to jobs submitted in order to eliminate capacity constraints, reduce compute costs, and deliver results quickly.

See AWS Batch User Guide for more details.

To use AWS Batch, you create a jobDefinition JSON that defines a docker run command, and then submit this JSON to the API to queue up the task. Behind the scenes, AWS Batch auto-scales a fleet of EC2 Container Service instances, monitors the load on these instances, and schedules the jobs.

This boto3-powered wrapper allows you to create Luigi Tasks to submit Batch jobDefinition``s. You can either pass a dict (mapping directly to the ``jobDefinition JSON) OR an Amazon Resource Name (arn) for a previously registered jobDefinition.

Requires:

  • boto3 package
  • Amazon AWS credentials discoverable by boto3 (e.g., by using aws configure from awscli)
  • An enabled AWS Batch job queue configured to run on a compute environment.

Written and maintained by Jake Feala (@jfeala) for Outlier Bio (@outlierbio)

exception BatchJobException[source]

Bases: Exception

class BatchClient(poll_time=10, **kwargs)[source]

Bases: object

get_active_queue()[source]

Get name of first active job queue

get_job_id_from_name(job_name)[source]

Retrieve the first job ID matching the given name

get_job_status(job_id)[source]

Retrieve task statuses from ECS API

Parameters:(str) (job_id) – AWS Batch job uuid

Returns one of {SUBMITTED|PENDING|RUNNABLE|STARTING|RUNNING|SUCCEEDED|FAILED}

get_logs(log_stream_name, get_last=50)[source]

Retrieve log stream from CloudWatch

submit_job(**kwargs)[source]

Wrap submit_job with useful defaults

terminate_job(**kwargs)[source]

Wrap terminate_job

hard_terminate(job_ids, reason, iattempt=0, **kwargs)[source]

Terminate all jobs with a hard(ish) exit via an Exception. The function will also wait for jobs to be explicitly terminated

wait_on_job(job_id)[source]

Poll task status until STOPPED

register_job_definition(json_fpath)[source]

Register a job definition with AWS Batch, using a JSON

misctools

A collection of miscellaneous tools.

get_config(file_name, header, path='core/config/')[source]

Get the configuration from a file in the luigi config path directory, and convert the key-value pairs under the config header into a dict.

Parameters:
  • file_name (str) – The configuation file name.
  • header (str) – The header key in the config file.
Returns:

dict

get_paths_from_relative(relative=1)[source]

A helper method for within find_filepath_from_pathstub. Prints all file and directory paths from a relative number of ‘backward steps’ from the current working directory.

find_filepath_from_pathstub(path_stub)[source]

Find the full path of the ‘closest’ file (or directory) to the current working directory ending with path_stub. The closest file is determined by starting forwards of the current working directory. The algorithm is then repeated by moving the current working directory backwards, one step at a time until the file (or directory) is found. If the HOME directory is reached, the algorithm raises FileNotFoundError.

Parameters:path_stub (str) – The partial file (or directory) path stub to find.
Returns:The full path to the partial file (or directory) path stub.
f3p(path_stub)[source]

Shortened name for coding convenience

load_yaml_from_pathstub(pathstub, filename)[source]

Basic wrapper around find_filepath_from_pathstub which also opens the file (assumed to be yaml).

Parameters:
  • pathstub (str) – Stub of filepath where the file should be found.
  • filename (str) – The filename.
Returns:

The file contents as a json-like object.

load_batch_config(luigi_task, additional_env_files=[], **overrides)[source]

Load default luigi batch parametes, and apply any overrides if required. Note that the usage pattern for this is normally load_batch_config(self, additional_env_files, **overrides) from within a luigi Task, where self is the luigi Task.

Parameters:
  • luigi_task (luigi.Task) – Task to extract test and date parameters from.
  • additional_env_files (list) – List of files to pass directly to the batch local environment.
  • overrides (**kwargs) – Any overrides or additional parameters to pass to the batch task as parameters.
Returns:

Batch configuration paramaters, which can be expanded as **kwargs in BatchTask.

Return type:

config (dict)

extract_task_info[source]

Extract task name and generate a routine id from a luigi task, from the date and test fields.

Parameters:luigi_task (luigi.Task) – Task to extract test and date parameters from.
Returns:Test flag, and routine ID for this task.
Return type:{test, routine_id} (tuple)

mysqldb

NOTE: overwhelmingly based on this2, where the following documentation has been directly lifted. The main difference to the latter, is that **cnx_kwargs in the constructor can accept port as a key.

make_mysql_target(luigi_task, mysqldb_env='MYSQLDB')[source]

Generate a MySQL target for a luigi Task, based on the Task’s date and test parameters, and indicated configuration file.

Parameters:
  • luigi_task (luigi.Task) – Task to extract test and date parameters from.
  • mysqldb_env (str) – Environmental variable storing the path to MySQL config.
Returns:

target (MySqlTarget)

class MySqlTarget(host, database, user, password, table, update_id, **cnx_kwargs)[source]

Bases: luigi.target.Target

Target for a resource in MySql.

marker_table = 'table_updates'
touch(connection=None)[source]

Mark this update as complete.

IMPORTANT, If the marker table doesn’t exist, the connection transaction will be aborted and the connection reset. Then the marker table will be created.

exists(connection=None)[source]

Returns True if the Target exists and False otherwise.

connect(autocommit=False)[source]
create_marker_table()[source]

Create marker table if it doesn’t exist.

Using a separate connection since the transaction might have to be reset.

s3

A more recent implementation of AWS S3 support, stolen from: https://gitlab.com/ced/s3_helpers/blob/master/luigi_s3_target.py, but instead using modern boto3 commands.

merge_dicts(*dicts)[source]

Merge dicts together, with later entries overriding earlier ones.

parse_s3_path(path)[source]

For a given S3 path, return the bucket and key values

class S3FS(**kwargs)[source]

Bases: luigi.target.FileSystem

exists(path)[source]

Return true if S3 key exists

remove(path, recursive=True)[source]

Remove a file or directory from S3

mkdir(path, parents=True, raise_if_exists=False)[source]

Create directory at location path

Creates the directory at path and implicitly create parent directories if they do not already exist.

Parameters:
  • path (str) – a path within the FileSystem to create as a directory.
  • parents (bool) – Create parent directories when necessary. When parents=False and the parent directory doesn’t exist, raise luigi.target.MissingParentDirectory
  • raise_if_exists (bool) – raise luigi.target.FileAlreadyExists if the folder already exists.
isdir(path)[source]

Return True if the location at path is a directory. If not, return False.

Parameters:path (str) – a path within the FileSystem to check as a directory.

Note: This method is optional, not all FileSystem subclasses implements it.

listdir(path)[source]

Return a list of files rooted in path.

This returns an iterable of the files rooted at path. This is intended to be a recursive listing.

Parameters:path (str) – a path within the FileSystem to list.

Note: This method is optional, not all FileSystem subclasses implements it.

copy(path, dest)[source]

Copy a file or a directory with contents. Currently, LocalFileSystem and MockFileSystem support only single file copying but S3Client copies either a file or a directory as required.

move(path, dest)[source]

Move a file, as one would expect.

du(path)[source]
class S3Target(path, s3_args={}, **kwargs)[source]

Bases: luigi.target.FileSystemTarget

fs = None
open(mode='rb')[source]

Open the FileSystem target.

This method returns a file-like object which can either be read from or written to depending on the specified mode.

Parameters:mode (str) – the mode r opens the FileSystemTarget in read-only mode, whereas w will open the FileSystemTarget in write mode. Subclasses can implement additional options.
class AtomicS3File(path, s3_obj, **kwargs)[source]

Bases: luigi.target.AtomicLocalFile

move_to_final_destination()[source]