"""
[AutoML*] run.py (corex_topic_model)
====================================
Generate topics based on the CorEx algorithm. Loss is calculated from the total correlation explained.
"""
import pandas as pd
import json
from itertools import chain
from scipy.sparse import csr_matrix
from corextopic import corextopic as ct
from nesta.core.luigihacks.s3 import parse_s3_path
import os
import boto3
from ast import literal_eval
WEIGHT_THRESHOLD = 1e-2
[docs]def run():
s3_path_in = os.environ['BATCHPAR_s3_path_in']
n_hidden = int(literal_eval(os.environ['BATCHPAR_n_hidden']))
# Load and shape the data
s3 = boto3.resource('s3')
s3_obj_in = s3.Object(*parse_s3_path(s3_path_in))
data = json.load(s3_obj_in.get()['Body'])
# Pack the data into a sparse matrix
ids = [] # Index of each row
indptr = [0] # Number of non-null entries per row
indices = [] # Positions of non-null entries per row
counts = [] # Term counts/weights per position
vocab = {} # {Term: position} lookup
for row in data:
ids.append(row.pop('id'))
for term, count in row.items():
idx = vocab.setdefault(term, len(vocab))
indices.append(idx)
counts.append(count)
indptr.append(len(indices))
X = csr_matrix((counts, indices, indptr), dtype=int)
# {Position: term} lookup
_vocab = {v:k for k, v in vocab.items()}
# Fit the model
topic_model = ct.Corex(n_hidden=n_hidden)
topic_model.fit(X)
topics = topic_model.get_topics()
# Generate topic names
topic_names = {f'topic_{itop}': [_vocab[idx]
for idx, weight in topic]
for itop, topic in enumerate(topics)}
# Calculate topic weights as sum(bool(term in doc)*{term_weight})
rows = [{f'topic_{itop}': sum(row.getcol(idx).toarray()[0][0]*weight
for idx, weight in topic)
for itop, topic in enumerate(topics)}
for row in X]
# Zip the row indexes back in, and ignore small weights
rows = [dict(id=id, **{k: v for k, v in row.items()
if v > WEIGHT_THRESHOLD})
for id, row in zip(ids, rows)]
# Curate the output
output = {'loss': topic_model.tc,
'data': {'topic_names': topic_names,
'rows': rows}}
# Mark the task as done and save the data
if "BATCHPAR_outinfo" in os.environ:
s3_path_out = os.environ["BATCHPAR_outinfo"]
s3 = boto3.resource('s3')
s3_obj = s3.Object(*parse_s3_path(s3_path_out))
s3_obj.put(Body=json.dumps(output))
if __name__ == "__main__":
if "BATCHPAR_outinfo" not in os.environ:
os.environ["BATCHPAR_s3_path_in"] = 's3://nesta-arxlive/automl/2019-07-04/VECTORIZER.binary_True.min_df_0.001.NGRAM.TEST_True-0_5164.json'
os.environ["BATCHPAR_n_hidden"] = '39'
run()