Source code for core.batchables.meetup.country_groups.run

"""
run.py (country_groups)
-----------------------

Batchable for expanding from countries to groups
"""

import logging
import os
from ast import literal_eval
import boto3
from urllib.parse import urlsplit

from nesta.packages.meetup.country_groups import MeetupCountryGroups
from nesta.packages.meetup.meetup_utils import flatten_data
from nesta.core.orms.orm_utils import insert_data
from nesta.core.orms.meetup_orm import Base
from nesta.core.orms.meetup_orm import Group
from nesta.core.luigihacks.s3 import parse_s3_path

from sqlalchemy.orm import sessionmaker
from sqlalchemy import func

[docs]def run(): logging.getLogger().setLevel(logging.INFO) # Fetch the input parameters iso2 = os.environ["BATCHPAR_iso2"] name = os.environ["BATCHPAR_name"] category = os.environ["BATCHPAR_cat"] coords = literal_eval(os.environ["BATCHPAR_coords"]) radius = float(os.environ["BATCHPAR_radius"]) s3_path = os.environ["BATCHPAR_outinfo"] db = os.environ["BATCHPAR_db"] # Get the data mcg = MeetupCountryGroups(country_code=iso2, category=category, coords=coords, radius=radius) mcg.get_groups_recursive() output = flatten_data(mcg.groups, country_name=name, country=iso2, timestamp=func.utc_timestamp(), keys=[('category', 'name'), ('category', 'shortname'), ('category', 'id'), 'description', 'created', 'country', 'city', 'id', 'lat', 'lon', 'members', 'name', 'topics', 'urlname']) # Add the data objs = insert_data("BATCHPAR_config", "mysqldb", db, Base, Group, output) # Mark the task as done s3 = boto3.resource('s3') s3_obj = s3.Object(*parse_s3_path(s3_path)) s3_obj.put(Body="") # Mainly for testing return len(objs)
if __name__ == "__main__": run()