Luigi Hacks¶
Modifications and possible future contributions to the Luigi module.
autobatch¶
batchclient¶
NOTE: overwhelmingly based on this, where the following documentation
has been directly lifted. The main difference to the latter, is that
AWS jobs are submitted via **kwargs
in order to allow more
flexibility (and probably more future-proofing if new parameters are
added to boto3).
AWS Batch wrapper for Luigi
From the AWS website:
AWS Batch enables you to run batch computing workloads on the AWS Cloud.
Batch computing is a common way for developers, scientists, and engineers to access large amounts of compute resources, and AWS Batch removes the undifferentiated heavy lifting of configuring and managing the required infrastructure. AWS Batch is similar to traditional batch computing software. This service can efficiently provision resources in response to jobs submitted in order to eliminate capacity constraints, reduce compute costs, and deliver results quickly.
See AWS Batch User Guide for more details.
To use AWS Batch, you create a jobDefinition JSON that defines a docker run command, and then submit this JSON to the API to queue up the task. Behind the scenes, AWS Batch auto-scales a fleet of EC2 Container Service instances, monitors the load on these instances, and schedules the jobs.
This boto3-powered wrapper allows you to create Luigi Tasks to submit Batch
jobDefinition``s. You can either pass a dict (mapping directly to the
``jobDefinition
JSON) OR an Amazon Resource Name (arn) for a previously
registered jobDefinition
.
Requires:
- boto3 package
- Amazon AWS credentials discoverable by boto3 (e.g., by using
aws configure
from awscli) - An enabled AWS Batch job queue configured to run on a compute environment.
Written and maintained by Jake Feala (@jfeala) for Outlier Bio (@outlierbio)
-
class
BatchClient
(poll_time=10, **kwargs)[source]¶ Bases:
object
-
get_job_status
(job_id)[source]¶ Retrieve task statuses from ECS API
Parameters: (str) (job_id) – AWS Batch job uuid Returns one of {SUBMITTED|PENDING|RUNNABLE|STARTING|RUNNING|SUCCEEDED|FAILED}
-
misctools¶
A collection of miscellaneous tools.
-
get_config
(file_name, header, path='core/config/')[source]¶ Get the configuration from a file in the luigi config path directory, and convert the key-value pairs under the config
header
into a dict.Parameters: - file_name (str) – The configuation file name.
- header (str) – The header key in the config file.
Returns: dict
-
get_paths_from_relative
(relative=1)[source]¶ A helper method for within
find_filepath_from_pathstub
. Prints all file and directory paths from a relative number of ‘backward steps’ from the current working directory.
-
find_filepath_from_pathstub
(path_stub)[source]¶ Find the full path of the ‘closest’ file (or directory) to the current working directory ending with
path_stub
. The closest file is determined by starting forwards of the current working directory. The algorithm is then repeated by moving the current working directory backwards, one step at a time until the file (or directory) is found. If the HOME directory is reached, the algorithm raisesFileNotFoundError
.Parameters: path_stub (str) – The partial file (or directory) path stub to find. Returns: The full path to the partial file (or directory) path stub.
-
load_yaml_from_pathstub
(pathstub, filename)[source]¶ Basic wrapper around
find_filepath_from_pathstub
which also opens the file (assumed to be yaml).Parameters: - pathstub (str) – Stub of filepath where the file should be found.
- filename (str) – The filename.
Returns: The file contents as a json-like object.
-
load_batch_config
(luigi_task, additional_env_files=[], **overrides)[source]¶ Load default luigi batch parametes, and apply any overrides if required. Note that the usage pattern for this is normally
load_batch_config(self, additional_env_files, **overrides)
from within a luigi Task, whereself
is the luigi Task.Parameters: - luigi_task (luigi.Task) – Task to extract test and date parameters from.
- additional_env_files (list) – List of files to pass directly to the batch local environment.
- overrides (**kwargs) – Any overrides or additional parameters to pass to the batch task as parameters.
Returns: Batch configuration paramaters, which can be expanded as **kwargs in BatchTask.
Return type: config (dict)
mysqldb¶
NOTE: overwhelmingly based on this2, where the following documentation
has been directly lifted. The main difference to the latter, is that
**cnx_kwargs
in the constructor can accept port as a key.
-
make_mysql_target
(luigi_task, mysqldb_env='MYSQLDB')[source]¶ Generate a MySQL target for a luigi Task, based on the Task’s
date
andtest
parameters, and indicated configuration file.Parameters: - luigi_task (luigi.Task) – Task to extract test and date parameters from.
- mysqldb_env (str) – Environmental variable storing the path to MySQL config.
Returns: target (MySqlTarget)
-
class
MySqlTarget
(host, database, user, password, table, update_id, **cnx_kwargs)[source]¶ Bases:
luigi.target.Target
Target for a resource in MySql.
-
marker_table
= 'table_updates'¶
-
s3¶
A more recent implementation of AWS S3 support, stolen from: https://gitlab.com/ced/s3_helpers/blob/master/luigi_s3_target.py, but instead using modern boto3 commands.
-
class
S3FS
(**kwargs)[source]¶ Bases:
luigi.target.FileSystem
-
mkdir
(path, parents=True, raise_if_exists=False)[source]¶ Create directory at location
path
Creates the directory at
path
and implicitly create parent directories if they do not already exist.Parameters: - path (str) – a path within the FileSystem to create as a directory.
- parents (bool) – Create parent directories when necessary. When parents=False and the parent directory doesn’t exist, raise luigi.target.MissingParentDirectory
- raise_if_exists (bool) – raise luigi.target.FileAlreadyExists if the folder already exists.
-
isdir
(path)[source]¶ Return
True
if the location atpath
is a directory. If not, returnFalse
.Parameters: path (str) – a path within the FileSystem to check as a directory. Note: This method is optional, not all FileSystem subclasses implements it.
-
listdir
(path)[source]¶ Return a list of files rooted in path.
This returns an iterable of the files rooted at
path
. This is intended to be a recursive listing.Parameters: path (str) – a path within the FileSystem to list. Note: This method is optional, not all FileSystem subclasses implements it.
-
-
class
S3Target
(path, s3_args={}, **kwargs)[source]¶ Bases:
luigi.target.FileSystemTarget
-
fs
= None¶
-
open
(mode='rb')[source]¶ Open the FileSystem target.
This method returns a file-like object which can either be read from or written to depending on the specified mode.
Parameters: mode (str) – the mode r opens the FileSystemTarget in read-only mode, whereas w will open the FileSystemTarget in write mode. Subclasses can implement additional options.
-