Source code for core.routines.examples.db_example.db_example
'''
Database example
================
An example of building a pipeline with database Targets
'''
import luigi
import datetime
from nesta.core.luigihacks import misctools
from nesta.core.luigihacks.mysqldb import MySqlTarget
[docs]class InputData(luigi.Task):
'''Dummy task acting as the single input data source.
Args:
date (datetime): Date used to label the outputs
db_config: (dict) The input database configuration
'''
date = luigi.DateParameter()
db_config = luigi.DictParameter()
[docs] def output(self):
'''Points to the input database target'''
update_id = self.db_config["table"]+str(self.date)
return MySqlTarget(update_id=update_id, **self.db_config)
[docs]class SomeTask(luigi.Task):
'''Task which increments the age of the muppets, by first selecting
muppets with an age less than `max_age`.
Args:
date (datetime): Date used to label the outputs
max_age (int): Maximum age of muppets to select from the database
in_db_config: (dict) The input database configuration
out_db_config: (dict) The output database configuration
'''
date = luigi.DateParameter(default=datetime.datetime.today())
max_age = luigi.IntParameter()
in_db_config = luigi.DictParameter()
out_db_config = luigi.DictParameter()
[docs] def requires(self):
'''Gets the input database engine'''
return InputData(date=self.date, db_config=self.in_db_config)
[docs] def output(self):
'''Points to the output database engine'''
update_id = self.out_db_config["table"]+str(self.date)
return MySqlTarget(update_id=update_id, **self.out_db_config)
[docs] def run(self):
'''Increments the muppets' ages by 1'''
# Extract the data from the input connection
cnx = self.input().connect()
cursor = cnx.cursor()
query = ("SELECT name, age FROM muppets_input WHERE age <= %s")
cursor.execute(query, (self.max_age, ))
data = [dict(name=name, age=age+1) for name, age in cursor]
cursor.close()
cnx.close()
# Write the data to the output connection
cnx = self.output().connect(autocommit=False)
cursor = cnx.cursor()
insert = ("INSERT INTO muppets_output "
"(name, age) VALUES (%(name)s, %(age)s) "
"ON DUPLICATE KEY UPDATE age=%(age)s")
for row in data:
print(row)
cursor.execute(insert, row)
# Commit, close and update the marker table
cnx.commit()
cursor.close()
cnx.close()
self.output().touch()
[docs]class RootTask(luigi.WrapperTask):
'''A dummy root task, which collects the database configurations
and executes the central task.
Args:
date (datetime): Date used to label the outputs
'''
date = luigi.DateParameter(default=datetime.date.today())
[docs] def requires(self):
'''Collects the database configurations
and executes the central task.'''
db_config = misctools.get_config("mysqldb.config", "mysqldb")
db_config["database"] = "dev"
# Prepare the input DB config
in_db_config = db_config.copy()
in_db_config["table"] = "muppets_input"
# Prepare the output DB config
out_db_config = db_config.copy()
out_db_config["table"] = "muppets_output"
yield SomeTask(date=self.date, max_age=40,
in_db_config=in_db_config,
out_db_config=out_db_config)