Examples

Examples of Luigi routines, from which all other nesta production routines can be built. Currently we have examples of routines with S3 and database (MySQL) IO, and routines which are entirely batched.

We’d recommend reading Spotify’s Luigi documentation, and also checking the Luigi Hacks documentation which contains modified Luigi modules which (who knows) one day we will suggest as pull requests.

S3 Example

An example of building a pipeline with S3 Targets

class InputData(*args, **kwargs)[source]

Bases: luigi.task.ExternalTask

Dummy task acting as the single input data source

output()[source]

Points to the S3 Target

class SomeTask(*args, **kwargs)[source]

Bases: luigi.task.Task

An intermediate task which increments the age of the muppets by 1 year.

Parameters:date (datetime) – Date used to label the outputs
date = <luigi.parameter.DateParameter object>
requires()[source]

Gets the input data (json containing muppet name and age)

output()[source]

Points to the S3 Target

run()[source]

Increments the muppets’ ages by 1

class FinalTask(*args, **kwargs)[source]

Bases: luigi.task.Task

The root task, which adds the surname ‘Muppet’ to the names of the muppets.

Parameters:date (datetime) – Date used to label the outputs
date = <luigi.parameter.DateParameter object>
requires()[source]

Get data from the intermediate task

output()[source]

Points to the S3 Target

run()[source]

Appends ‘Muppet’ the muppets’ names

Database example

An example of building a pipeline with database Targets

class InputData(*args, **kwargs)[source]

Bases: luigi.task.Task

Dummy task acting as the single input data source.

Parameters:
  • date (datetime) – Date used to label the outputs
  • db_config – (dict) The input database configuration
date = <luigi.parameter.DateParameter object>
db_config = <luigi.parameter.DictParameter object>
output()[source]

Points to the input database target

run()[source]

Example of marking the update table

class SomeTask(*args, **kwargs)[source]

Bases: luigi.task.Task

Task which increments the age of the muppets, by first selecting muppets with an age less than max_age.

Parameters:
  • date (datetime) – Date used to label the outputs
  • max_age (int) – Maximum age of muppets to select from the database
  • in_db_config – (dict) The input database configuration
  • out_db_config – (dict) The output database configuration
date = <luigi.parameter.DateParameter object>
max_age = <luigi.parameter.IntParameter object>
in_db_config = <luigi.parameter.DictParameter object>
out_db_config = <luigi.parameter.DictParameter object>
requires()[source]

Gets the input database engine

output()[source]

Points to the output database engine

run()[source]

Increments the muppets’ ages by 1

class RootTask(*args, **kwargs)[source]

Bases: luigi.task.WrapperTask

A dummy root task, which collects the database configurations and executes the central task.

Parameters:date (datetime) – Date used to label the outputs
date = <luigi.parameter.DateParameter object>
requires()[source]

Collects the database configurations and executes the central task.

Batch Example

An example of building a pipeline with batched tasks.

class SomeInitialTask(*args, **kwargs)[source]

Bases: luigi.task.ExternalTask

Dummy task acting as the single input data source

output()[source]

Points to the S3 Target

class SomeBatchTask(*args, **kwargs)[source]

Bases: nesta.core.luigihacks.autobatch.AutoBatchTask

A set of batched tasks which increments the age of the muppets by 1 year.

Parameters:
  • date (datetime) – Date used to label the outputs
  • batchable (str) – Path to the directory containing the run.py batchable
  • job_def (str) – Name of the AWS job definition
  • job_name (str) – Name given to this AWS batch job
  • job_queue (str) – AWS batch queue
  • region_name (str) – AWS region from which to batch
  • poll_time (int) – Time between querying the AWS batch job status
date = <luigi.parameter.DateParameter object>
requires()[source]

Gets the input data (json containing muppet name and age)

output()[source]

Points to the S3 Target

prepare()[source]

Prepare the batch job parameters

combine(job_params)[source]

Combine the outputs from the batch jobs

class RootTask(*args, **kwargs)[source]

Bases: luigi.task.Task

The root task, which adds the surname ‘Muppet’ to the names of the muppets.

Parameters:date (datetime) – Date used to label the outputs
date = <luigi.parameter.DateParameter object>
requires()[source]

Get the output from the batchtask

output()[source]

Points to the S3 Target

run()[source]

Appends ‘Muppet’ the muppets’ names