Source code for nesta.core.batchables.nlp.tfidf.run

"""
[AutoML] run.py (tfidf)
-----------------------

Applies TFIDF cuts to a dataset via
environmental variables lower_tfidf_percentile
and upper_tfidf_percentile.
"""

from sklearn.feature_extraction.text import TfidfVectorizer
from nesta.core.luigihacks.s3 import parse_s3_path
import os
import boto3
import json
import numpy as np
import json


[docs]def chunker(_transformed, n_chunks): """Yield chunks from a numpy array. Args: _transformed (np.array): Array to split into chunks. n_chunks (int): Number of chunks to split the array into. Yields: chunk (np.array) """ n_rows, _ = _transformed.shape chunk_size = round(n_rows / n_chunks) remaining = n_rows % chunk_size for i in range(0, n_chunks): chunk = _transformed[i*chunk_size: (i+1)*chunk_size].toarray() for row in chunk: yield row for row in _transformed[n_rows-remaining:].toarray(): yield row
[docs]def run(): # Get variables out s3_path_in = os.environ['BATCHPAR_s3_path_in'] s3_path_out = os.environ["BATCHPAR_outinfo"] first_index = int(os.environ['BATCHPAR_first_index']) last_index = int(os.environ['BATCHPAR_last_index']) lower_tfidf_percentile = int(os.environ['BATCHPAR_lower_tfidf_percentile']) upper_tfidf_percentile = int(os.environ['BATCHPAR_upper_tfidf_percentile']) # Load the data s3 = boto3.resource('s3') s3_obj_in = s3.Object(*parse_s3_path(s3_path_in)) data = json.load(s3_obj_in.get()['Body']) # Create a "corpus" by joining together text fields # which have been analysed by the ngrammer already corpus = [] for row in data[first_index: last_index]: doc = [] for k, v in row.items(): if not (type(v) is list): continue doc += [" ".join(item) for item in v] corpus.append(" ".join(doc)) # Calculate tfidf values for all terms tvec = TfidfVectorizer() _transformed = tvec.fit_transform(corpus) # Extract a reverse lookup for indexes to terms lookup = {idx: term for term, idx in tvec.vocabulary_.items()} # Calculate the lower and upper bounds from the percentiles tfidf_values = _transformed[_transformed > 0] lower_cut = np.percentile(tfidf_values, lower_tfidf_percentile) upper_cut = np.percentile(tfidf_values, upper_tfidf_percentile) del tfidf_values # Generate the list of allowed terms for each document good_words_corpus = [] for row in chunker(_transformed, 100): good_words_doc = set(lookup[idx] for idx, value in enumerate(row) if (value > lower_cut) and (value < upper_cut)) good_words_corpus.append(good_words_doc) # Finally, filter the input data outdata = [] for row, good_words in zip(data[first_index: last_index], good_words_corpus): new_row = dict(**row) for k, v in row.items(): if not (type(v) is list): continue new_row[k] = [" ".join(term for term in sentence if term in good_words) for sentence in v] outdata.append(new_row) # Mark the task as done if s3_path_out != "": s3 = boto3.resource('s3') s3_obj = s3.Object(*parse_s3_path(s3_path_out)) s3_obj.put(Body=json.dumps(outdata)) else: return outdata
if __name__ == "__main__": # Local testing if "BATCHPAR_outinfo" not in os.environ: os.environ['BATCHPAR_s3_path_in'] = "s3://clio-data/gtr/ngram/NGRAM.test_False.json" os.environ['BATCHPAR_outinfo'] = "" os.environ['BATCHPAR_first_index'] = '0' os.environ["BATCHPAR_last_index"] = '20000' os.environ["BATCHPAR_upper_tfidf_percentile"] = '90' os.environ["BATCHPAR_lower_tfidf_percentile"] = '5' run()