Examples

Examples of Luigi routines, from which all other nesta production routines can be built. Currently we have examples of routines with S3 and database (MySQL) IO, and routines which are entirely batched.

We’d recommend reading Spotify’s Luigi documentation, and also checking the Luigi Hacks documentation which contains modified Luigi modules which (who knows) one day we will suggest as pull requests.

S3 Example

An example of building a pipeline with S3 Targets

class InputData(*args, **kwargs)[source]

Bases: luigi.task.ExternalTask

Dummy task acting as the single input data source

output()[source]

Points to the S3 Target

class SomeTask(*args, **kwargs)[source]

Bases: luigi.task.Task

An intermediate task which increments the age of the muppets by 1 year.

Parameters:date (datetime) – Date used to label the outputs
date = <luigi.parameter.DateParameter object>
requires()[source]

Gets the input data (json containing muppet name and age)

output()[source]

Points to the S3 Target

run()[source]

Increments the muppets’ ages by 1

class FinalTask(*args, **kwargs)[source]

Bases: luigi.task.Task

The root task, which adds the surname ‘Muppet’ to the names of the muppets.

Parameters:date (datetime) – Date used to label the outputs
date = <luigi.parameter.DateParameter object>
requires()[source]

Get data from the intermediate task

output()[source]

Points to the S3 Target

run()[source]

Appends ‘Muppet’ the muppets’ names

Database example

An example of building a pipeline with database Targets

class InputData(*args, **kwargs)[source]

Bases: luigi.task.Task

Dummy task acting as the single input data source.

Parameters:
  • date (datetime) – Date used to label the outputs
  • db_config – (dict) The input database configuration
date = <luigi.parameter.DateParameter object>
db_config = <luigi.parameter.DictParameter object>
output()[source]

Points to the input database target

run()[source]

Example of marking the update table

class SomeTask(*args, **kwargs)[source]

Bases: luigi.task.Task

Task which increments the age of the muppets, by first selecting muppets with an age less than max_age.

Parameters:
  • date (datetime) – Date used to label the outputs
  • max_age (int) – Maximum age of muppets to select from the database
  • in_db_config – (dict) The input database configuration
  • out_db_config – (dict) The output database configuration
date = <luigi.parameter.DateParameter object>
max_age = <luigi.parameter.IntParameter object>
in_db_config = <luigi.parameter.DictParameter object>
out_db_config = <luigi.parameter.DictParameter object>
requires()[source]

Gets the input database engine

output()[source]

Points to the output database engine

run()[source]

Increments the muppets’ ages by 1

class RootTask(*args, **kwargs)[source]

Bases: luigi.task.WrapperTask

A dummy root task, which collects the database configurations and executes the central task.

Parameters:date (datetime) – Date used to label the outputs
date = <luigi.parameter.DateParameter object>
requires()[source]

Collects the database configurations and executes the central task.