'''
Batch Example
=============
An example of building a pipeline with batched tasks.
'''
from nesta.core.luigihacks import autobatch
from nesta.core.luigihacks import s3
import luigi
import datetime
import json
import time
S3PREFIX = "s3://nesta-dev/production_batch_example_"
[docs]class SomeInitialTask(luigi.ExternalTask):
'''Dummy task acting as the single input data source'''
[docs] def output(self):
'''Points to the S3 Target'''
return s3.S3Target(S3PREFIX+'input.json')
[docs]class SomeBatchTask(autobatch.AutoBatchTask):
'''A set of batched tasks which increments the age of the muppets by 1 year.
Args:
date (datetime): Date used to label the outputs
batchable (str): Path to the directory containing the run.py batchable
job_def (str): Name of the AWS job definition
job_name (str): Name given to this AWS batch job
job_queue (str): AWS batch queue
region_name (str): AWS region from which to batch
poll_time (int): Time between querying the AWS batch job status
'''
date = luigi.DateParameter(default=datetime.datetime.today())
[docs] def requires(self):
'''Gets the input data (json containing muppet name and age)'''
return SomeInitialTask()
[docs] def output(self):
'''Points to the S3 Target'''
return s3.S3Target(S3PREFIX+"intermediate_output_%s.json" % self.date)
[docs] def prepare(self):
'''Prepare the batch job parameters'''
# Open the input file
instream = self.input().open("rb")
job_params = json.load(instream)
s3fs = s3.S3FS()
# Add the mandatory `outinfo' and `done' fields
for i, params in enumerate(job_params):
params["outinfo"] = ("s3://nesta-production-intermediate/"
"batch-example-{}-{}".format(self.date, i))
params["done"] = s3fs.exists(params["outinfo"])
return job_params
[docs] def combine(self, job_params):
'''Combine the outputs from the batch jobs'''
outdata = []
for params in job_params:
_body = s3.S3Target(params["outinfo"]).open("rb")
_data = _body.read().decode('utf-8')
outdata.append(json.loads(_data))
with self.output().open("wb") as f:
f.write(json.dumps(outdata).encode('utf-8'))
[docs]class RootTask(luigi.Task):
'''The root task, which adds the surname 'Muppet'
to the names of the muppets.
Args:
date (datetime): Date used to label the outputs
'''
date = luigi.DateParameter(default=datetime.datetime.today())
[docs] def requires(self):
'''Get the output from the batchtask'''
return SomeBatchTask(date=self.date,
batchable=("~/nesta/nesta/core/"
"batchables/examples/batch_example/"),
job_def="standard_image",
job_name="batch-example-%s" % self.date,
job_queue="HighPriority",
region_name="eu-west-2",
poll_time=60)
[docs] def output(self):
'''Points to the S3 Target'''
return s3.S3Target(S3PREFIX+"final_output_%s.json" % self.date)
[docs] def run(self):
'''Appends 'Muppet' the muppets' names'''
time.sleep(10)
instream = self.input().open('rb')
data = json.load(instream)
for row in data:
row["name"] += " Muppet"
with self.output().open('wb') as outstream:
outstream.write(json.dumps(data).encode('utf8'))