Source code for packages.decorators.schema_transform
'''
schema_transform
================
Apply a field name transformation to a data output from the wrapped function,
such that specified field names are transformed and unspecified fields are dropped.
A valid file would be formatted as shown:
{ "tier0_to_tier1":
{ "bad_col": "good_col",
"another_bad_col": "another_good_col"
}
}
'''
import pandas
import json
[docs]def load_transformer(filename):
with open(filename) as f:
_data = json.load(f)
return _data['tier0_to_tier1']
return transformer
[docs]def schema_transform(filename):
'''
Args:
filename (str): A record-oriented JSON file path mapping field names
Returns:
Data in the format it was originally passed to the wrapper in, with
specified field names transformed and unspecified fields dropped.
'''
transformer = load_transformer(filename)
def wrapper(func):
def transformed(*args, **kwargs):
data = func(*args,**kwargs)
# Accept DataFrames...
if type(data) == pandas.DataFrame:
drop_cols = [c for c in data.columns
if c not in transformer]
data.drop(drop_cols, axis=1, inplace=True)
data.rename(columns=transformer, inplace=True)
# ... OR list of dicts
elif type(data) == list and all(type(row) == dict for row in data):
data = [{transformer[k]:v for k, v in row.items()
if k in transformer} for row in data]
# Otherwise throw an error
else:
raise ValueError("Schema transform expects EITHER a "
"pandas.DataFrame "
"OR a list of dict from the wrapped "
"function.")
return data
return transformed
return wrapper
[docs]def schema_transformer(data, *, filename, ignore=[]):
'''Function version of the schema_transformer wrapper.
Args:
data (dataframe OR list of dicts): the data requiring the schama transformation
filename (str): the path to the schema json file
ignore (list): optional list of fields, eg ids or keys which shouldn't be dropped
Returns:
supplied data with schema applied
'''
# Accept DataFrames...
transformer = load_transformer(filename)
if type(data) == pandas.DataFrame:
drop_cols = [c for c in data.columns
if c not in transformer
and c not in ignore]
data.drop(drop_cols, axis=1, inplace=True)
data.rename(columns=transformer, inplace=True)
return data
# ... OR list of dicts
elif type(data) == list and all(type(row) == dict for row in data):
transformed_data = []
for row in data:
transformed = {transformer[k]: v for k, v in row.items()
if k in transformer}
ignored = {k: v for k, v in row.items() if k in ignore}
transformed_data.append({**transformed, **ignored})
return transformed_data
# ... OR a single dict
elif type(data) == dict:
transformed = {transformer[k]: v for k, v in data.items() if k in transformer}
ignored = {k: v for k, v in data.items() if k in ignore}
return {**transformed, **ignored}
# Otherwise throw an error
else:
raise ValueError("Schema transform expects EITHER a "
"pandas.DataFrame "
"OR a list of dict, "
"OR a single dict from the "
"wrapped function.")