Commit 7973c6ab authored by Martino Bertoni's avatar Martino Bertoni 🌋
Browse files

safe chunk size for large matrices, better logging, imputation based on auxiliar signature

parent 6b7aa911
......@@ -24,10 +24,10 @@ class Sanitizer(object):
def __init__(self, *args, impute_missing=True, trim=True,
max_features=10000,
check_features=True, min_feature_abs=5, min_feature_freq=1.0,
check_keys=True, min_keys_abs=1, min_keys_freq=1.0,
sample_size=1000, max_categories=10, zero_as_missing=True,
chunk_size=1000, tmp_path=None, **kwargs):
check_features=True, min_feature_abs=5, max_feature_freq=0.8,
check_keys=True, min_keys_abs=1, max_keys_freq=0.8,
sample_size=1000, max_categories=20, zero_as_missing=True,
chunk_size=10000, tmp_path=None, **kwargs):
"""Initialize a Sanitizer instance.
Args:
......@@ -40,16 +40,16 @@ class Sanitizer(object):
frequency arguments. For categorical data, 0 is considered as
`missing`. For continuous, any non numerical value.
min_feature_abs (int): Minimum number (counts) of occurrences
of feature, row-wise. (default=5).
min_feature_freq (float): Minimum proportion of occurrences of
the feature, row-wise. (default=0.8).
of feature, column-wise. (default=5).
max_feature_freq (float): Maximum proportion of occurrences of
the feature, column-wise. (default=0.8).
check_keys (bool): True if we want to drop keys based on
frequency arguments. For categorical data, 0 is considered as
`missing`. For continuous, any non numerical value.
min_key_abs (int): Minimum number (counts) of non-zero
features per row (column-wise) (default=1).
min_key_freq (float): Minimum proportion of non-zero
occurrences per row (column-wise) (default=0.8).
min_key_abs (int): Minimum number (counts) of occurrences
of feature, row-wise. (default=1).
max_keys_freq (float): Maximum proportion of occurrences of
the feature, row-wise. (default=0.8).
sample_size (int): rows used for determining data type.
max_categories (int): Maximum number of categories we can expect.
zero_as_missing (bool): Only applyied to categorical data (usually)
......@@ -64,8 +64,6 @@ class Sanitizer(object):
continue
self.__log.debug("{:<22}: {:>12}".format(str(k), str(v)))
def transform(self, data=None, V=None, keys=None, keys_raw=None,
features=None, sign=None):
"""Sanitize data
......@@ -76,7 +74,7 @@ class Sanitizer(object):
keys (array): Keys (default=None).
keys_raw (array): Keys raw (default=None).
features (array): Features (default=None).
sign (signature): Auxiliary data used to impute (default=None).
sign (DataSignature): Auxiliary data used to impute (default=None).
"""
if data is not None and V is not None:
raise Exception("Too many inputs! Either provide `data` or `V`.")
......@@ -85,7 +83,7 @@ class Sanitizer(object):
was_data = False
if V is None or keys is None or features is None:
raise Exception("`data` not provided so "
"`V`, `keys`, `features` are expected")
"`V`, `keys`, `features` are expected.")
if keys_raw is None:
keys_raw = keys
tag = str(uuid.uuid4())
......@@ -111,71 +109,87 @@ class Sanitizer(object):
# check data type
self.__log.debug("Data type: %s" % str(self.data[0].dtype))
self.__log.debug("Data shape: %s" % str(self.data.shape))
self.__log.debug("Data size: %s" % str(self.data.size))
if self.data.size > 1e9:
self.__log.debug("Data size exceeds 1e9, reducing `chunk_size`.")
self.chunk_size = 100
cs = self.chunk_size
vals = data[:self.sample_size].ravel()
unique_vals = np.unique(vals[np.isfinite(vals)])
if len(unique_vals) < self.max_categories:
if len(unique_vals) <= self.max_categories:
self.is_categorical = True
self.categories = unique_vals
self.__log.debug("Data is categorical: %s" % str(unique_vals))
else:
self.is_categorical = False
self.__log.debug("Data is continuous")
self.__log.debug("Data is continuous.")
# if a signature is specified make sure the new input has equal columns
if sign is not None:
try:
ref_features = sign.get_h5_dataset('features')
except Exception:
raise Exception("`sign` should have the `features` dataset")
raise Exception("`sign` should have the `features` dataset.")
if len(set(features) & set(ref_features)) != len(set(ref_features)):
raise Exception("`data` must contains at least all features "
"present in `sign`")
"present in `sign`.")
add_features = sorted(list(set(features) - set(ref_features)))
if add_features:
self.__log.info("Some input features are skipped as are not "
"present in reference: %s" % str(add_features))
# we assume that features are in the same order
mask = np.isin(list(features), list(ref_features))
data.filter_h5_dataset('V', keep, axis=1)
data.filter_h5_dataset('features', keep, axis=1)
data.filter_h5_dataset('V', mask, axis=1)
data.filter_h5_dataset('features', mask, axis=1)
# check features frequencies
if self.check_features:
self.__log.debug('Checking features:')
features = data.get_h5_dataset('features')
keep = np.full((data.shape[1],), False, dtype=bool)
drop_abs = np.full((data.shape[1],), False, dtype=bool)
drop_freq = np.full((data.shape[1],), False, dtype=bool)
for chunk, cols in data.chunk_iter('V', cs, axis=1, chunk=True):
missing = np.sum(~np.isfinite(cols), axis=0)
if self.is_categorical and self.zero_as_missing:
missing += np.sum(cols == 0, axis=0)
drop_abs = data.shape[0] - missing < self.min_feature_abs
drop_freq = missing / data.shape[0] > self.min_feature_freq
keep[chunk] = ~np.logical_or(drop_abs, drop_freq)
present = data.shape[0] - missing
present_freq = present / data.shape[0]
drop_abs[chunk] = present < self.min_feature_abs
if self.is_categorical:
drop_freq[chunk] = present_freq > self.max_feature_freq
self.__log.info('Filter %s features (min_feature_abs): %s'
% (np.sum(drop_abs), str(features[drop_abs])))
self.__log.info('Filter %s features (max_feature_freq): %s'
% (np.sum(drop_freq), str(features[drop_freq])))
keep = ~np.logical_or(drop_abs, drop_freq)
if np.any(~keep):
data.filter_h5_dataset('V', keep, axis=1)
data.filter_h5_dataset('features', keep, axis=1)
self.__log.info('Filter %s features (frequency): %s'
% (np.sum(~keep), str(features[~keep])))
# check keys frequencies
if self.check_keys:
self.__log.debug('Checking keys:')
keys = data.get_h5_dataset('keys')
keep = np.full((data.shape[0],), False, dtype=bool)
drop_abs = np.full((data.shape[0],), False, dtype=bool)
drop_freq = np.full((data.shape[0],), False, dtype=bool)
for chunk, rows in data.chunk_iter('V', cs, axis=0, chunk=True):
missing = np.sum(~np.isfinite(rows), axis=1)
if self.is_categorical and self.zero_as_missing:
missing += np.sum(rows == 0, axis=1)
drop_abs = data.shape[1] - missing < self.min_keys_abs
drop_freq = missing / data.shape[1] > self.min_keys_freq
keep[chunk] = ~np.logical_or(drop_abs, drop_freq)
present = data.shape[1] - missing
present_freq = present / data.shape[1]
drop_abs[chunk] = present < self.min_keys_abs
if self.is_categorical:
drop_freq[chunk] = present_freq > self.max_keys_freq
self.__log.info('Filter %s keys (min_keys_abs): %s'
% (np.sum(drop_abs), str(keys[drop_abs])))
self.__log.info('Filter %s keys (max_keys_freq): %s'
% (np.sum(drop_freq), str(keys[drop_freq])))
keep = ~np.logical_or(drop_abs, drop_freq)
if np.any(~keep):
data.filter_h5_dataset('V', keep, axis=0)
data.filter_h5_dataset('keys', keep, axis=0)
data.filter_h5_dataset('keys_raw', keep, axis=0)
self.__log.info('Filter %s keys (frequency): %s'
% (np.sum(~keep), str(keys[~keep])))
# count NaN & infs
self.__log.debug('Missing values:')
......@@ -192,7 +206,7 @@ class Sanitizer(object):
nan_counts['all'] += nan_counts['+inf']
nan_counts['all'] += nan_counts['-inf']
for k, v in nan_counts.items():
if k =='all':
if k == 'all':
continue
self.__log.debug("{:<5}: {:>12}".format(str(k), str(v)))
......@@ -202,14 +216,18 @@ class Sanitizer(object):
hf = h5py.File(data.data_path, 'a')
for chunk, cols in data.chunk_iter('V', cs, axis=1, chunk=True):
# get values for replacements
nan_vals = np.nanmedian(cols, axis=0)
ref_cols = cols
if sign is not None:
ref_cols = sign[:, chunk]
nan_vals = np.nanmedian(ref_cols, axis=0)
if self.is_categorical:
# if categorical replace nans with most frequent instead
# if categorical replace NaNs with most frequent instead
count = np.zeros((cols.shape[1], len(self.categories)))
for idx, cat in enumerate(self.categories):
count[:, idx] = np.sum(cols == cat, axis=0)
count[:, idx] = np.sum(ref_cols == cat, axis=0)
nan_vals = np.argmax(count, axis=1)
cols_masked = np.ma.masked_array(cols, mask=~np.isfinite(cols))
cols_masked = np.ma.masked_array(
ref_cols, mask=~np.isfinite(ref_cols))
posinf_vals = np.max(cols_masked, axis=0).data
neginf_vals = np.min(cols_masked, axis=0).data
# replace
......@@ -235,14 +253,14 @@ class Sanitizer(object):
nan_counts['+inf'] += np.sum(np.isposinf(chunk))
nan_counts['-inf'] += np.sum(np.isneginf(chunk))
for k, v in nan_counts.items():
if k =='all':
if k == 'all':
continue
self.__log.debug("{:<5}: {:>12}".format(str(k), str(v)))
# trim if there are too many features
if self.trim and data.shape[1] > self.max_features:
self.__log.debug("More than %d features, trimming the "
"least informative ones" % self.max_features)
"least informative ones." % self.max_features)
if self.is_categorical:
entropy_vals = np.zeros((data.shape[1],))
for chunk, cols in data.chunk_iter('V', cs, axis=1, chunk=True):
......@@ -258,10 +276,10 @@ class Sanitizer(object):
filtered = data.get_h5_dataset('features', mask=~keep)
data.filter_h5_dataset('V', keep, axis=1)
data.filter_h5_dataset('features', keep, axis=1)
self.__log.info("Removed %i features: %s"
% (len(filtered), str(filtered)))
self.__log.info("Removed %i features (max_features): %s"
% (len(filtered), str(filtered)))
self.__log.debug("Final data shape: %s" % str(self.data.shape))
self.__log.info("Sanitized data shape: %s" % str(self.data.shape))
# return if input was raw data
if not was_data:
......@@ -272,4 +290,3 @@ class Sanitizer(object):
features = hf["features"][:].astype(str)
os.remove(data.data_path)
return V, keys, keys_raw, features
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment