Source code for nesta.core.routines.gtr.gtr_geocode

"""
Geocode
=======

Apply geocoding to the collected GtR data.
Add country name, iso codes and continent.
"""
import logging
import luigi
from math import ceil
import os

from nesta.packages.gtr.get_gtr_data import add_country_details
from nesta.packages.gtr.get_gtr_data import geocode_uk_with_postcode
from nesta.packages.gtr.get_gtr_data import get_orgs_to_process
from nesta.packages.misc_utils.batches import split_batches
from nesta.core.luigihacks.misctools import get_config
from nesta.core.luigihacks.mysqldb import MySqlTarget
from nesta.core.orms.orm_utils import db_session, get_mysql_engine
from nesta.core.orms.orm_utils import insert_data, try_until_allowed
from nesta.core.orms.gtr_orm import Base, Organisation, OrganisationLocation
from nesta.core.luigihacks.misctools import find_filepath_from_pathstub
from nesta.core.routines.gtr.gtr_collect import GtrTask


[docs]class GtrGeocode(luigi.Task): """Perform geocoding on the collected GtR organisations data Args: _routine_id (str): String used to label the AWS task db_config_path: (str) The output database configuration """ date = luigi.DateParameter() _routine_id = luigi.Parameter() test = luigi.BoolParameter() db_config_env = luigi.Parameter() page_size = luigi.IntParameter()
[docs] def requires(self): '''Collects the database configurations and executes the central task.''' yield GtrTask(date=self.date, page_size=self.page_size, batchable=find_filepath_from_pathstub("core/batchables/gtr/collect_gtr"), env_files=[find_filepath_from_pathstub("/nesta/nesta"), find_filepath_from_pathstub("/config/mysqldb.config")], job_def="py36_amzn1_image", job_name=f"GtR-{self.date}-{self.page_size}-{not self.test}", job_queue="HighPriority", region_name="eu-west-2", poll_time=10, test=self.test)
[docs] def output(self): """Points to the output database engine""" self.db_config_path = os.environ[self.db_config_env] db_config = get_config(self.db_config_path, "mysqldb") db_config["database"] = 'dev' if self.test else 'production' db_config["table"] = "GtR Geocode <dummy>" # Note, not a real table update_id = "GtRGeocode_{}".format(self.date) return MySqlTarget(update_id=update_id, **db_config)
[docs] def run(self): """Collect and process organizations, categories and long descriptions.""" # database setup database = 'dev' if self.test else 'production' logging.warning(f"Using {database} database") self.engine = get_mysql_engine(self.db_config_env, 'mysqldb', database) try_until_allowed(Base.metadata.create_all, self.engine) limit = 2000 if self.test else None batch_size = 30 if self.test else 1000 with db_session(self.engine) as session: all_orgs = session.query(Organisation.id, Organisation.addresses).limit(limit).all() existing_org_location_ids = session.query(OrganisationLocation.id).all() logging.info(f"{len(all_orgs)} organisations retrieved from database") logging.info(f"{len(existing_org_location_ids)} organisations have previously been processed") # convert to a list of dictionaries with the nested addresses unpacked orgs = get_orgs_to_process(all_orgs, existing_org_location_ids) logging.info(f"{len(orgs)} new organisations to geocode") total_batches = ceil(len(orgs)/batch_size) logging.info(f"{total_batches} batches") completed_batches = 0 for batch in split_batches(orgs, batch_size=batch_size): # geocode first to add missing country for UK batch = map(geocode_uk_with_postcode, batch) batch = map(add_country_details, batch) # remove data not in OrganisationLocation columns org_location_cols = OrganisationLocation.__table__.columns.keys() batch = [{k: v for k, v in org.items() if k in org_location_cols} for org in batch] insert_data(self.db_config_env, 'mysqldb', database, Base, OrganisationLocation, batch) completed_batches += 1 logging.info(f"Completed {completed_batches} of {total_batches} batches") if self.test and completed_batches > 1: logging.warning("Breaking after 2 batches in test mode") break # mark as done logging.warning("Finished task") self.output().touch()