"""
run.py (topic_tag_elasticsearch)
--------------------------------
Batchable for piping data to Elasticsearch,
whilst implementing topic tags, and filtering groups with too
few members (given by the 10th percentile of group size, to avoid
"junk" groups).
"""
import logging
import lxml # To force pipreqs' hand
from nesta.packages.geo_utils.geocode import generate_composite_key
from nesta.packages.geo_utils.country_iso_code import country_iso_code_to_name
from nesta.packages.health_data.process_mesh import retrieve_mesh_terms
from nesta.packages.health_data.process_mesh import format_mesh_terms
from nesta.core.luigihacks.elasticsearchplus import ElasticsearchPlus
from nesta.core.orms.orm_utils import db_session, get_mysql_engine
from nesta.core.orms.meetup_orm import Group
from nesta.core.orms.geographic_orm import Geographic
from nesta.packages.meetup.meetup_utils import get_members_by_percentile
from nesta.core.orms.orm_utils import load_json_from_pathstub
from bs4 import BeautifulSoup
import json
from ast import literal_eval
from datetime import datetime as dt
import boto3
import os
import requests
[docs]def run():
# Fetch the input parameters
s3_bucket = os.environ["BATCHPAR_bucket"]
batch_file = os.environ["BATCHPAR_batch_file"]
members_perc = int(os.environ["BATCHPAR_members_perc"])
db_name = os.environ["BATCHPAR_db_name"]
es_host = os.environ['BATCHPAR_outinfo']
es_port = int(os.environ['BATCHPAR_out_port'])
es_index = os.environ['BATCHPAR_out_index']
es_type = os.environ['BATCHPAR_out_type']
entity_type = os.environ["BATCHPAR_entity_type"]
aws_auth_region = os.environ["BATCHPAR_aws_auth_region"]
routine_id = os.environ["BATCHPAR_routine_id"]
# Get continent lookup
url = ("https://nesta-open-data.s3.eu-west-2"
".amazonaws.com/rwjf-viz/continent_codes_names.json")
continent_lookup = {row["Code"]: row["Name"]
for row in requests.get(url).json()}
continent_lookup[None] = None
# Extract the core topics
logging.debug('Getting topics')
s3 = boto3.resource('s3')
topics_key = f'meetup-topics-{routine_id}.json'
topics_obj = s3.Object(s3_bucket, topics_key)
core_topics = set(json.loads(topics_obj.get()['Body']._raw_stream.read()))
# Extract the group ids for this task
ids_obj = s3.Object(s3_bucket, batch_file)
group_ids = set(json.loads(ids_obj.get()['Body']._raw_stream.read()))
# Extract the mesh terms for this task
mesh_obj = s3.Object('innovation-mapping-general',
'meetup_mesh/meetup_mesh_processed.txt')
df_mesh = retrieve_mesh_terms('innovation-mapping-general',
'meetup_mesh/meetup_mesh_processed.txt')
mesh_terms = format_mesh_terms(df_mesh)
# Setup ES+
field_null_mapping = load_json_from_pathstub(("tier_1/"
"field_null_mappings/"),
"health_scanner.json")
strans_kwargs={'filename':'meetup.json',
'from_key':'tier_0',
'to_key':'tier_1',
'ignore':[]}
es = ElasticsearchPlus(hosts=es_host,
port=es_port,
aws_auth_region=aws_auth_region,
no_commit=("AWSBATCHTEST" in os.environ),
entity_type=entity_type,
strans_kwargs=strans_kwargs,
field_null_mapping=field_null_mapping,
null_empty_str=True,
coordinates_as_floats=True,
country_detection=True,
auto_translate=True)
# Generate the lookup for geographies
engine = get_mysql_engine("BATCHPAR_config", "mysqldb", db_name)
geo_lookup = {}
with db_session(engine) as session:
query_result = session.query(Geographic).all()
for geography in query_result:
geo_lookup[geography.id] = {k: v for k, v in
geography.__dict__.items()
if k in geography.__table__.columns}
# Pipe the groups
members_limit = get_members_by_percentile(engine, perc=members_perc)
with db_session(engine) as session:
query_result = (session
.query(Group)
.filter(Group.members >= members_limit)
.filter(Group.id.in_(group_ids))
.all())
for count, group in enumerate(query_result, 1):
row = {k: v for k, v in group.__dict__.items()
if k in group.__table__.columns}
# Filter groups without the required topics
topics = [topic['name'] for topic in group.topics
if topic['name'] in core_topics]
if len(topics) == 0:
continue
# Assign mesh terms
mesh_id = f'{row["id"]}'.zfill(8)
row['mesh_terms'] = None
if mesh_id in mesh_terms:
row['mesh_terms'] = mesh_terms[mesh_id]
# Get the geographic data for this row
country_name = country_iso_code_to_name(row['country'], iso2=True)
geo_key = generate_composite_key(row['city'], country_name)
geo = geo_lookup[geo_key]
# Clean up the input data
row['topics'] = topics
row['urlname'] = f"https://www.meetup.com/{row['urlname']}"
row['coordinate'] = dict(lat=geo['latitude'], lon=geo['longitude'])
row['created'] = dt.strftime(dt.fromtimestamp(row['created']/1000),
format="%Y-%m-%d")
if row['description'] is not None:
row['description'] = BeautifulSoup(row['description'], 'lxml').text
row['continent'] = continent_lookup[geo['continent']]
row['country_name'] = geo['country']
row['continent_id'] = geo['continent']
row['country'] = geo['country_alpha_2']
row['iso3'] = geo['country_alpha_3']
row['isoNumeric'] = geo['country_numeric']
# Insert to ES
_row = es.index(index=es_index, doc_type=es_type,
id=row['id'], body=row)
if not count % 1000:
logging.info(f"{count} rows loaded to elasticsearch")
logging.info("Batch job complete.")
# For local debugging
if __name__ == "__main__":
log_level = logging.INFO
if 'BATCHPAR_outinfo' not in os.environ:
log_level = logging.DEBUG
environ = {'batch_file': ('2019-08-22-community-environment'
'--health-wellbeing--fitness-'
'10-99-False-1566471880891235.json'),
'config': ('/home/ec2-user/nesta-lol/nesta/core/'
'config/mysqldb.config'),
'db_name': 'dev',
'bucket': 'nesta-production-intermediate',
'outinfo': ('https://search-health-scanner'
'-5cs7g52446h7qscocqmiky5dn4.'
'eu-west-2.es.amazonaws.com'),
'out_port': '443',
'out_index': 'meetup_dev',
'out_type': '_doc',
'aws_auth_region': 'eu-west-2',
'entity_type': 'meetup',
'members_perc': '10',
'routine_id': ('2019-08-22-community-environment-'
'-health-wellbeing--fitness-10-99-False')}
for k, v in environ.items():
os.environ[f"BATCHPAR_{k}"] = v
#os.environ["AWSBATCHTEST"] = ""
log_stream_handler = logging.StreamHandler()
logging.basicConfig(handlers=[log_stream_handler, ],
level=log_level,
format="%(asctime)s:%(levelname)s:%(message)s")
run()