Source code for packages.nih.preprocess_nih

"""
preprocess_nih
==============

Data cleaning / wrangling before ingestion of raw data,
specifically:

  * Systematically removing generic prefixes using very hard-coded logic.
  * Inferring how to correctly deal with mystery question marks,
    using very hard-coded logic.
  * Splitting of strings into arrays as indicated by JSON the ORM,
  * CAPS to Camel Case for any string field which isn't VARCHAR(n) < 10
  * Dealing consistently with null values
  * explicit conversion to datetime of relevant fields
"""
from sqlalchemy.dialects.mysql import VARCHAR
from sqlalchemy.types import JSON, DATETIME
from datetime import datetime as dt
from functools import lru_cache
import pandas as pd
import string

GENERIC_PREFIXES = ['proposal narrative', 'project narrative',
                    'relevance to public health', 'public health relevance',
                    'narrative', 'relevance', 'statement',
                    '(relevance to veterans)', 'relevance to the va',
                    'description', '(provided by applicant)',
                    'project summary', 'abstract', 'overall', 'project',
                    '? ', ' ', '/', 'administrative', '.', 'core', 'title',
                    'summary', '*', 'pilot project', '#']

[docs]@lru_cache() def get_json_cols(orm): """Return the column names in the ORM which are of JSON type""" return {col.name for col in orm.__table__.columns if type(col.type) is JSON}
[docs]@lru_cache() def get_long_text_cols(orm, min_length=10): """Return the column names in the ORM which are a text type, (i.e. TEXT or VARCHAR) and if a max length is specified, with max length > 10. The length requirement is because we don't want to preprocess ID-like or code fields (e.g. ISO codes). """ return {col.name for col in orm.__table__.columns if col.type.python_type is str and not (type(col.type) == VARCHAR and col.type.length < 10)}
[docs]@lru_cache() def get_date_cols(orm): """Return the column names in the ORM which are of JSON type""" return {col.name for col in orm.__table__.columns if type(col.type) is DATETIME}
[docs]def is_nih_null(value, nulls=('', [], {}, 'N/A', 'Not Required', 'None')): """Returns True if the value is listed in the `nulls` argument, or the value is NaN, null or None.""" try: iter(value) # Only run pd.isnull if the value is not iterable (list, dict, etc) except TypeError: is_null = pd.isnull(value) # Otherwise set a dummy value = False else: is_null = False # Then check if value is either null, or in the list of null values finally: return is_null or value in nulls
[docs]@lru_cache() def expand_prefix_list(): """Expand GENERIC_PREFIXES to include integers, and then a large numbers of permutations of additional characters, upper case and title case. From tests, this covers 19 out of 20 generic prefixes from either abstract text or the "PHR" field.""" prefixes = [] numbers = list(str(i) for i in range(0, 10)) for prefix in GENERIC_PREFIXES + numbers: _prefixes = [prefix] _prefixes += [prefix+ext for ext in ['', '/', ':', '-', ' -', '.']] _prefixes += [ext+prefix for ext in ['?']] prefixes += _prefixes prefixes += [p+' ' for p in prefixes] prefixes += [p.upper() for p in prefixes] prefixes += [p.title() for p in prefixes] # Sort by the longest first return sorted(set(prefixes), key=lambda x: len(x), reverse=True)
[docs]def remove_generic_suffixes(text): """Iteratively remove any of the generic terms in `expand_prefix_list` from the front of the text, until none remain.""" still_replacing = True while still_replacing: still_replacing = False for prefix in expand_prefix_list(): # NB: lru_cached if not text.startswith(prefix): continue text = text.replace(prefix, '', 1) still_replacing = True break return text
[docs]def remove_large_spaces(text): """Iteratively replace any large spaces or tabs with a single space, until none remain.""" for pattern in ['\t', ' ']: while pattern in text: text = text.replace(pattern, ' ') while text.startswith(' '): text = text[1:] while text.endswith(' '): text = text[:-1] return text
[docs]def replace_question_with_best_guess(text): """Somewhere in NiH's backend, they have a unicode processing problem. From inspection, most of the '?' symbols have quite an intuitive origin, and so this function contains the hard-coded logic for inferring what symbol used to be in the place of each '?'. """ # Straightforward find and replace for find, replace in [('?s', "'s"), (' ? ', ' - '), ('.?', "'."), (',?', "',"), ('n?t', "n't")]: if find in text: text = text.replace(find, replace) if find.upper() in text: text = text.replace(find.upper(), replace.upper()) # Most '?' will be replaced with a single quote, # though some will be replaced by a hyphen. replace_quote, replace_hyphen = set(), set() # Iterate through chars in text to find those that should be changed for i, char in enumerate(text): if char != '?': continue # Ignore final char, assume is a question is_final_char = (i == len(text) - 1) # Ignore e.g. '? Title', but not '?. Title' or '? title' is_question = (i < len(text) - 2 and # a char exists after '? ' text[i+1] == ' ' and text[i+2].isupper()) if is_final_char or is_question: continue # The case 'sometext?somemoretext' --> 'sometext - somemoretext' if i > 0 and text[i-1].isalpha() and text[i+1].isalpha(): replace_hyphen.add(i) # Everything else else: replace_quote.add(i) # Replace from back-to-front, so that the indexes don't become shuffled for i in reversed(range(len(text))): if i in replace_quote: text = text[:i] + "'" + text[i+1:] elif i in replace_hyphen: text = text[:i] + ' - ' + text[i+1:] return text
[docs]def remove_trailing_exclamation(text): """A lot of abstracts end with '!' and then a bunch of spaces.""" while text.endswith('!'): text = text[:-1] return text
[docs]def upper_to_title(text, force_title=False): """Inconsistently, NiH has fields as all upper case. Convert to titlecase""" if text == text.upper() or force_title: text = string.capwords(text.lower()) return text
[docs]def clean_text(text, suffix_removal_length=100): """Apply the full text-cleaning procedure.""" operations = [remove_unspecified_unicode, remove_large_spaces, remove_trailing_exclamation] if len(text) > suffix_removal_length: operations.append(remove_generic_suffixes) operations += [replace_question_with_best_guess, upper_to_title, remove_trailing_exclamation] # Sequentially apply operations, and clean up spaces # after each operation for f in operations: text = f(text) text = remove_large_spaces(text) return text
[docs]def detect_and_split(value): """Split values either by colons or commas. If there are more colons than commas (+1), then colons are used for splitting (this takes into account that NiH name fields are written as 'last_name, first_name; next_last_name, next_first_name'). Otherwise NiH list fields are delimeted by commas.""" n_commas = value.count(",") n_colons = value.count(";") # e.g "last_name, first_name; next_last_name, next_first_name" if n_colons >= n_commas - 1: # also includes case where n=0 value = value.split(";") else: value = value.split(",") return value
[docs]def split_and_clean(col_value): """Apply `detect_and_split` and then apply some general cleaning.""" values = [] for value in detect_and_split(col_value): # Remove contact boilerplate value = value.replace('(contact)', '').strip() # Splitting procedure inconsistently leads to blanks if value == '': continue # Force all fields to be title-cased value = upper_to_title(value, force_title=True) values.append(value) return values
[docs]def parse_date(value): """Convert to date, unless the value null, or poorly formatted.""" try: value = dt.strptime(value, '%m/%d/%Y') except ValueError: value = None finally: return value
[docs]def remove_unspecified_unicode(value): """In a very small number of cases, NiH has some pre-processed badly formatted unspecifed unicode characters. The following recipe seems to clean up all of the discovered cases.""" # These are all spaces for char in ('\xa0\xa0', '\xa0 ', '\xa0'): value = value.replace(char, ' ') # This is a hyphen value = value.replace('-\xad', '-') # Clean up any bonus spaces while ' ' in value: value = value.replace(' ', ' ') return value
[docs]def preprocess_row(row, orm): """Clean text, split values and standardise nulls, as required. Args: row (dict): Row of data to clean, that should match the provided ORM. orm (SqlAlchemy selectable): ORM from which to infer JSON and text fields. """ json_cols = get_json_cols(orm) long_text_cols = get_long_text_cols(orm) date_cols = get_date_cols(orm) for col_name, col_value in row.copy().items(): # If it's: TEXT-> clean it, JSON-> split it, DATE-> parse it if col_name in long_text_cols and not pd.isnull(col_value): col_value = clean_text(col_value) elif col_name in json_cols: col_value = split_and_clean(col_value) elif col_name in date_cols: col_value = parse_date(col_value) if is_nih_null(col_value): col_value = None row[col_name] = col_value return row