Source code for nesta.core.routines.crunchbase.crunchbase_geocode_task

'''
Geocoding
=========

Luigi routines to geocode the Organization, FundingRound, Investor, Ipo and People tables.
'''

import logging
import luigi
import os

from nesta.core.routines.crunchbase.crunchbase_non_org_collect_task import NonOrgCollectTask
from nesta.core.luigihacks.batchgeocode import GeocodeBatchTask
from nesta.core.luigihacks.misctools import find_filepath_from_pathstub, get_config
from nesta.core.luigihacks.mysqldb import MySqlTarget
from nesta.core.orms.crunchbase_orm import FundingRound, Investor, Ipo, People


[docs]class OrgGeocodeTask(GeocodeBatchTask): date = luigi.DateParameter() _routine_id = luigi.Parameter() insert_batch_size = luigi.IntParameter()
[docs] def output(self): '''Points to the output database engine''' db_config = get_config(os.environ[self.db_config_env], "mysqldb") db_config["database"] = 'dev' if self.test else 'production' db_config["table"] = "Crunchbase <dummy>" # Note, not a real table update_id = "CrunchbaseGeocodeOrgs_{}".format(self.date) return MySqlTarget(update_id=update_id, **db_config)
[docs] def requires(self): '''Collects the database configurations and executes the central task.''' logging.getLogger().setLevel(logging.INFO) yield FundingRoundGeocodeTask(date=self.date, _routine_id=self._routine_id, test=self.test, db_config_env="MYSQLDB", city_col=FundingRound.city, country_col=FundingRound.country, location_key_col=FundingRound.location_id, insert_batch_size=self.insert_batch_size, env_files=[find_filepath_from_pathstub("nesta/nesta/"), find_filepath_from_pathstub("config/mysqldb.config"), find_filepath_from_pathstub("config/crunchbase.config")], job_def="py36_amzn1_image", job_name=f"CrunchBaseFundingRoundGeocodeTask-{self._routine_id}", job_queue="HighPriority", region_name="eu-west-2", poll_time=10, memory=4096, max_live_jobs=2)
[docs] def combine(self, job_params): '''Touch the checkpoint''' self.output().touch()
[docs]class FundingRoundGeocodeTask(GeocodeBatchTask): date = luigi.DateParameter() _routine_id = luigi.Parameter() insert_batch_size = luigi.IntParameter()
[docs] def output(self): '''Points to the output database engine''' db_config = get_config(os.environ[self.db_config_env], "mysqldb") db_config["database"] = 'dev' if self.test else 'production' db_config["table"] = "Crunchbase <dummy>" # Note, not a real table update_id = "CrunchbaseGeocodeFundingRound_{}".format(self.date) return MySqlTarget(update_id=update_id, **db_config)
[docs] def requires(self): '''Collects the database configurations and executes the central task.''' logging.getLogger().setLevel(logging.INFO) yield InvestorGeocodeTask(date=self.date, _routine_id=self._routine_id, test=self.test, db_config_env="MYSQLDB", city_col=Investor.city, country_col=Investor.country, location_key_col=Investor.location_id, insert_batch_size=self.insert_batch_size, env_files=[find_filepath_from_pathstub("nesta/nesta/"), find_filepath_from_pathstub("config/mysqldb.config"), find_filepath_from_pathstub("config/crunchbase.config")], job_def="py36_amzn1_image", job_name=f"CrunchBaseInvestorGeocodeTask-{self._routine_id}", job_queue="HighPriority", region_name="eu-west-2", poll_time=10, memory=4096, max_live_jobs=2)
[docs] def combine(self, job_params): '''Touch the checkpoint''' self.output().touch()
[docs]class InvestorGeocodeTask(GeocodeBatchTask): date = luigi.DateParameter() _routine_id = luigi.Parameter() insert_batch_size = luigi.IntParameter()
[docs] def output(self): '''Points to the output database engine''' db_config = get_config(os.environ[self.db_config_env], "mysqldb") db_config["database"] = 'dev' if self.test else 'production' db_config["table"] = "Crunchbase <dummy>" # Note, not a real table update_id = "CrunchbaseGeocodeInvestor_{}".format(self.date) return MySqlTarget(update_id=update_id, **db_config)
[docs] def requires(self): '''Collects the database configurations and executes the central task.''' logging.getLogger().setLevel(logging.INFO) yield IpoGeocodeTask(date=self.date, _routine_id=self._routine_id, test=self.test, db_config_env="MYSQLDB", city_col=Ipo.city, country_col=Ipo.country, location_key_col=Ipo.location_id, insert_batch_size=self.insert_batch_size, env_files=[find_filepath_from_pathstub("nesta/nesta/"), find_filepath_from_pathstub("config/mysqldb.config"), find_filepath_from_pathstub("config/crunchbase.config")], job_def="py36_amzn1_image", job_name=f"CrunchBaseIpoGeocodeTask-{self._routine_id}", job_queue="HighPriority", region_name="eu-west-2", poll_time=10, memory=4096, max_live_jobs=2)
[docs] def combine(self, job_params): '''Touch the checkpoint''' self.output().touch()
[docs]class IpoGeocodeTask(GeocodeBatchTask): date = luigi.DateParameter() _routine_id = luigi.Parameter() insert_batch_size = luigi.IntParameter()
[docs] def output(self): '''Points to the output database engine''' db_config = get_config(os.environ[self.db_config_env], "mysqldb") db_config["database"] = 'dev' if self.test else 'production' db_config["table"] = "Crunchbase <dummy>" # Note, not a real table update_id = "CrunchbaseGeocodeIpo_{}".format(self.date) return MySqlTarget(update_id=update_id, **db_config)
[docs] def requires(self): '''Collects the database configurations and executes the central task.''' logging.getLogger().setLevel(logging.INFO) yield PeopleGeocodeTask(date=self.date, _routine_id=self._routine_id, test=self.test, db_config_env="MYSQLDB", city_col=People.city, country_col=People.country, location_key_col=People.location_id, insert_batch_size=self.insert_batch_size, env_files=[find_filepath_from_pathstub("nesta/nesta/"), find_filepath_from_pathstub("config/mysqldb.config"), find_filepath_from_pathstub("config/crunchbase.config")], job_def="py36_amzn1_image", job_name=f"CrunchBasePeopleGeocodeTask-{self._routine_id}", job_queue="HighPriority", region_name="eu-west-2", poll_time=10, memory=4096, max_live_jobs=2)
[docs] def combine(self, job_params): '''Touch the checkpoint''' self.output().touch()
[docs]class PeopleGeocodeTask(GeocodeBatchTask): date = luigi.DateParameter() _routine_id = luigi.Parameter() insert_batch_size = luigi.IntParameter()
[docs] def output(self): '''Points to the output database engine''' db_config = get_config(os.environ[self.db_config_env], "mysqldb") db_config["database"] = 'dev' if self.test else 'production' db_config["table"] = "Crunchbase <dummy>" # Note, not a real table update_id = "CrunchbaseGeocodePeople_{}".format(self.date) return MySqlTarget(update_id=update_id, **db_config)
[docs] def requires(self): '''Collects the database configurations and executes the central task.''' logging.getLogger().setLevel(logging.INFO) yield NonOrgCollectTask(date=self.date, _routine_id=self._routine_id, test=self.test, db_config_path=find_filepath_from_pathstub("mysqldb.config"), insert_batch_size=self.insert_batch_size, batchable=find_filepath_from_pathstub("batchables/crunchbase/crunchbase_collect"), env_files=[find_filepath_from_pathstub("nesta/nesta/"), find_filepath_from_pathstub("config/mysqldb.config"), find_filepath_from_pathstub("config/crunchbase.config")], job_def="py36_amzn1_image", job_name=f"CrunchBaseNonOrgCollectTask-{self._routine_id}", job_queue="HighPriority", region_name="eu-west-2", poll_time=10, memory=4096, max_live_jobs=20)
[docs] def combine(self, job_params): '''Touch the checkpoint''' self.output().touch()