Useful functions for converting between different types (dicts, lists, tuples, etc.)
Expand source code
"""Useful functions for converting between different types (dicts, lists, tuples, etc.)
"""
from copy import deepcopy
from typing import Union
from uuid import uuid4
import numpy as np
import pandas as pd
import ray
from ray.remote_function import RemoteFunction as RayRemoteFun
from vflow.subkey import Subkey
from vflow.vfunc import VfuncPromise
PREV_KEY = "__prev__"
def init_step(idx, cols):
"""Helper function to find init suffix
in a column
Parameters
----------
idx: int
Index of 'init' column in cols.
cols: list[str]
List of column names.
"""
for i in range(idx, len(cols)):
if cols[i] != "init":
return "init-" + cols[i]
return None
def base_dict(d: dict):
"""Remove PREV_KEY from dict d if present"""
return {k: v for k, v in d.items() if k != PREV_KEY}
def dict_data(d: dict):
"""Returns a list containing all data in dict d"""
return list(base_dict(d).values())
def dict_keys(d: dict):
"""Returns a list containing all keys in dict d"""
return list(base_dict(d).keys())
def to_tuple(lists: list):
"""Convert from lists to unpacked tuple
Allows us to write `X, y = to_tuple([[x1, y1], [x2, y2], [x3, y3]])`
Parameters
----------
lists: list
list of objects to convert to unpacked tuple
Examples
--------
>>> to_tuple([[x1, y1], [x2, y2], [x3, y3]])
([x1, x2, x3], [y1, y2, y3])
>>> to_tuple([[x1, y1]])
([x1], [y1])
>>> to_tuple([m1, m2, m3])
[m1, m2, m3]
"""
n_mods = len(lists)
if n_mods <= 1:
return lists
if not isinstance(lists[0], list):
return lists
n_tup = len(lists[0])
tup = [[] for _ in range(n_tup)]
for i in range(n_mods):
for j in range(n_tup):
tup[j].append(lists[i][j])
return tuple(tup)
def to_list(tup: tuple):
"""Convert from tuple to packed list
Allows us to call function with arguments in a loop
Parameters
----------
tup: tuple
tuple of objects to convert to packed list
Raises
------
ValueError
If passed uneven number of arguments without a list. Please wrap your args in a list.
Examples
--------
>>> to_list(([x1, x2, x3], [y1, y2, y3]))
[[x1, y1], [x2, y2], [x3, y3]]
>>> to_list(([x1], [y1]))
[[x1, y1]]
>>> to_list(([x1, x2, x3], ))
[[x1], [x2], [x3]]
>>> to_list((x1, ))
[[x1]]
>>> to_list((x1, y1))
[[x1, y1]]
>>> to_list((x1, x2, x3, y1, y2, y3))
[[x1, y1], [x2, y2], [x3, y3]]
"""
n_tup = len(tup)
if n_tup == 0:
return []
if not isinstance(tup[0], list):
# the first element is data
if n_tup == 1:
return [list(tup)]
if n_tup % 2 != 0:
raise ValueError(
"Don't know how to handle uneven number of args "
"without a list. Please wrap your args in a list."
)
# assume first half of args is input and second half is outcome
return [list(el) for el in zip(tup[: (n_tup // 2)], tup[(n_tup // 2) :])]
if n_tup == 1:
return [[x] for x in tup[0]]
n_mods = len(tup[0])
lists_packed = [[] for _ in range(n_mods)]
for i in range(n_mods):
for j in range(n_tup):
lists_packed[i].append(tup[j][i])
return lists_packed
def sep_dicts(d: dict, n_out: int = 1, keys=None):
"""Converts dictionary with value being saved as an iterable into multiple dictionaries
Assumes every value has same length n_out
Parameters
----------
d: dict
Dictionary with iterable values to be converted.
n_out: int, default 1
The number of dictionaries to separate d into.
keys: list-like, default None
Optional list of keys to use in output dicts.
Returns
-------
sep_dicts_list: list
List of seperated dictionaries.
Examples
--------
>>> sep_dicts({k1: (x1, y1), k2: (x2, y2), ..., '__prev__': p})
[{k1: x1, k2: x2, ..., '__prev__': p}, {k1: y1, k2: y2, ..., '__prev__': p}]
"""
if keys is None:
keys = []
if len(keys) > 0 and len(keys) != n_out:
raise ValueError(f"keys should be empty or have length n_out={n_out}")
# empty dict -- return empty dict
if n_out <= 1:
return d
# try separating dict into multiple dicts
sep_dicts_id = str(uuid4()) # w/ high prob, uuid4 is unique
sep_dicts_list = [{} for _ in range(n_out)]
for key, value in d.items():
if key != PREV_KEY:
for i in range(n_out):
# assumes the correct sub-key for item i is in the i-th position
if len(keys) == 0:
new_key = (key[i],) + key[n_out:]
else:
new_sub = Subkey(
value=keys[i], origin=key[-1].origin + "-" + str(i)
)
new_key = (new_sub,) + key
new_key[-1].sep_dicts_id = sep_dicts_id
if isinstance(value, VfuncPromise):
# return a promise to get the value at index i of the
# original promise
value_i = VfuncPromise(lambda v, x: v[x], value, i)
else:
value_i = value[i]
sep_dicts_list[i][new_key] = value_i
return sep_dicts_list
def dict_to_df(d: dict, param_key=None):
"""Converts a dictionary with tuple keys
into a pandas DataFrame, optionally seperating
parameters in `param_key` if not None
Parameters
----------
d: dict
Output dictionary with tuple keys from a Vset.
param_key: str (optional), default None
Name of parameter to seperate into multiple columns.
Returns
-------
df: pandas.DataFrame
A DataFrame with `d` tuple keys seperated into columns.
"""
d_copy = {tuple(sk.value for sk in k): d[k] for k in d if k != PREV_KEY}
df = pd.Series(d_copy).reset_index()
if len(d_copy.keys()) > 0:
key_list = list(d.keys())
subkey_list = key_list[0] if key_list[0] != PREV_KEY else key_list[1]
cols = [sk.origin for sk in subkey_list] + ["out"]
# set each init col to init-{next_vfunc_set}
cols = [
c if c != "init" else init_step(idx, cols) for idx, c in enumerate(cols)
]
df = df.set_axis(cols, axis=1)
if param_key:
param_keys = df[
param_key
].tolist() # pylint: disable=unsubscriptable-object
if param_key == "out" and hasattr(param_keys[0], "__iter__"):
param_df = pd.DataFrame(param_keys)
param_df.columns = [f"{param_key}-{col}" for col in param_df.columns]
df = df.join(param_df)
else:
param_loc = df.columns.get_loc(param_key)
param_key_cols = [
f"{p.split('=')[0]}-{param_key}" for p in param_keys[0]
]
param_keys = [[s.split("=")[1] for s in t] for t in param_keys]
df = df.join(pd.DataFrame(param_keys)).drop(columns=param_key)
new_cols = df.columns[: len(cols) - 1].tolist() + param_key_cols
df = df.set_axis(new_cols, axis=1)
new_idx = list(range(len(new_cols)))
new_idx = (
new_idx[:param_loc]
+ new_idx[len(cols) - 1 :]
+ new_idx[param_loc : len(cols) - 1]
)
df = df.iloc[:, new_idx]
return df
def perturbation_stats(
data: Union[pd.DataFrame, dict],
*group_by: str,
wrt: str = "out",
func=None,
prefix: str = None,
split: bool = False,
):
"""Compute statistics for `wrt` in `data`, conditional on `group_by`
Parameters
----------
data: Union[pandas.DataFrame, dict]
DataFrame, as from calling `dict_to_df` on an output dict from a Vset,
or the output dict itself.
*group_by: str
Vset names in `data` to group on. If none provided, treats everything as one big
group.
wrt: str (optional)
Column name in `data` or `dict_to_df(data)` on which to compute statistics.
Defaults to `'out'`, the values of the original Vset output dict.
func: function, str, list or dict (optional), default None
A list of functions or function names to use for computing
statistics, analogous to the parameter of the same name in
pandas.core.groupby.DataFrameGroupBy.aggregate. If `None`, defaults to
`['count', 'mean', 'std']`.
prefix: str (optional), default None
A string to prefix to new columns in output DataFrame. If `None`,
uses the value of `wrt`.
split: bool (optional), default False
If `True` and `wrt` in `data` has `list` or `numpy.ndarray` entries, will
attempt to split the entries into multiple columns for the output.
Returns
-------
df: pandas.DataFrame
A DataFrame with summary statistics on `wrt`.
"""
if func is None:
func = ["count", "mean", "std"]
if prefix is None:
prefix = wrt
if isinstance(data, dict):
df = dict_to_df(data)
else:
df = data
group_by = list(group_by)
if len(group_by) > 0:
gb = df.groupby(group_by)[wrt]
else:
gb = df.groupby(lambda x: True)[wrt]
if (isinstance(func, list) and "mean" in func or "std" in func) and (
type(df[wrt].iloc[0]) in [list, np.ndarray]
):
wrt_arrays = [
np.stack(d.tolist()) for d in (gb.get_group(grp) for grp in gb.groups)
]
n_cols = wrt_arrays[0].shape[1]
df_out = pd.DataFrame(gb.agg("count"))
df_out.columns = [f"{prefix}-count"]
if "mean" in func:
if split:
col_means = [arr.mean(axis=0) for arr in wrt_arrays]
wrt_means = pd.DataFrame(
col_means,
columns=[f"{prefix}{i}-mean" for i in range(n_cols)],
index=gb.groups.keys(),
)
else:
col_means = [{f"{prefix}-mean": arr.mean(axis=0)} for arr in wrt_arrays]
wrt_means = pd.DataFrame(col_means, index=gb.groups.keys())
wrt_means.index.names = df_out.index.names
df_out = df_out.join(wrt_means)
if "std" in func:
if split:
col_stds = [arr.std(axis=0, ddof=1) for arr in wrt_arrays]
wrt_stds = pd.DataFrame(
col_stds,
columns=[f"{prefix}{i}-std" for i in range(n_cols)],
index=gb.groups.keys(),
)
else:
col_stds = [
{f"{prefix}-std": arr.std(axis=0, ddof=1)} for arr in wrt_arrays
]
wrt_stds = pd.DataFrame(col_stds, index=gb.groups.keys())
wrt_stds.index.names = df_out.index.names
df_out = df_out.join(wrt_stds)
if "count" not in func:
df_out = df_out.drop(f"{prefix}-count")
else:
df_out = gb.agg(func)
df_out = df_out.reindex(sorted(df_out.columns), axis=1)
df_out.reset_index(inplace=True)
if len(group_by) > 0:
return df_out.sort_values(group_by[0])
return df_out
def combine_keys(left_key, right_key):
"""Combines `left_key` and `right_key`, attempting to match on any `Subkey` where
`vflow.subkey.Subkey.is_matching` is `True`.
Returns an empty key on failed matches when
`vflow.subkey.Subkey.mismatches` is `True`. Always filters on `right_key`
and returns `combined_key` with `left_key` prefix.
Parameters
----------
left_key: tuple
Left tuple key to combine.
right_key: tuple
Right tuple key to combine.
Returns
-------
combined_key: tuple
Combined tuple key filtered according to `vflow.subkey.Subkey.matches` rules,
which is empty according to `vflow.subkey.Subkey.mismatches` rule.
"""
if len(left_key) < len(right_key):
match_key = left_key
compare_key = right_key
else:
match_key = right_key
compare_key = left_key
match_subkeys = [subkey for subkey in match_key if subkey.is_matching()]
if len(match_subkeys) > 0:
matched_subkeys = []
for subkey in match_subkeys:
for c_subkey in compare_key:
if subkey.matches(c_subkey):
matched_subkeys.append(subkey)
break
if subkey.mismatches(c_subkey):
# subkeys with same origin but different values are rejected
return ()
if len(matched_subkeys) > 0:
# always filter on right key
filtered_key = tuple(
subkey for subkey in right_key if subkey not in matched_subkeys
)
combined_key = left_key + filtered_key
return combined_key
return left_key + right_key
return left_key + right_key
def combine_dicts(*args: dict, base_case=True):
"""Combines any number of dictionaries into a single dictionary. Dictionaries
are combined left to right matching all keys according to `combine_keys`
Parameters
----------
*args: dict
Dictionaries to recursively combine left to right.
Returns
-------
combined_dict: dict
Combined dictionary.
"""
n_args = len(args)
combined_dict = {}
if n_args == 0:
return combined_dict
if n_args == 1:
for k in args[0]:
# wrap the dict values in tuples; this is helpful so that when we
# pass the values to a vfunc fun in we can just use * expansion
if k != PREV_KEY:
combined_dict[k] = (args[0][k],)
else:
combined_dict[k] = args[0][k]
return combined_dict
if n_args == 2:
for k0 in args[0]:
for k1 in args[1]:
if PREV_KEY in (k0, k1):
continue
combined_key = combine_keys(k0, k1)
if len(combined_key) > 0:
if base_case:
combined_dict[combined_key] = (args[0][k0], args[1][k1])
else:
combined_dict[combined_key] = args[0][k0] + (args[1][k1],)
return combined_dict
# combine the first two dicts and call recursively with remaining args
return combine_dicts(combine_dicts(args[0], args[1]), *args[2:], base_case=False)
def apply_vfuncs(vfuncs: dict, data_dict: dict, lazy: bool = False):
"""Apply a dictionary of functions `vfuncs` to each item of `data_dict`,
optionally returning a dictionary of `vflow.vfunc.VfuncPromise` objects if `lazy` is True
Output keys are determined by applying `combine_keys` to each pair of items from
`vfuncs` and `data_dict`. This function is used by all Vsets to apply functions.
Parameters
----------
vfuncs: dict
Dictionary of functions to apply to `data_dict`.
data_dict: dict
Dictionary of parameters to call each function in `vfuncs`.
lazy: bool (option), default False
If True, `vfuncs` are applied lazily, returning `vflow.vfunc.VfuncPromise`
objects,
Returns
-------
out_dict: dict
Output dictionary of applying `vfuncs` to `data_dict`.
"""
out_dict = {}
for vf_k in vfuncs:
if len(data_dict) == 0:
func = deepcopy(vfuncs[vf_k])
if lazy:
out_dict[vf_k] = VfuncPromise(func)
else:
out_dict[vf_k] = func()
for data_k in data_dict:
if PREV_KEY in (vf_k, data_k):
continue
combined_key = combine_keys(data_k, vf_k)
if not len(combined_key) > 0:
continue
func = deepcopy(vfuncs[vf_k])
if lazy:
# return a promise
out_dict[combined_key] = VfuncPromise(func, *data_dict[data_k])
else:
data_list = list(data_dict[data_k])
for i, data in enumerate(data_list):
if isinstance(data, VfuncPromise):
data_list[i] = data()
if isinstance(func, RayRemoteFun) and not isinstance(
data_list[i], ray.ObjectRef
):
# send data to Ray's remote object store
data_list[i] = ray.put(data_list[i])
elif isinstance(data_list[i], ray.ObjectRef):
# this is not a remote function so get the data
data_list[i] = ray.get(data_list[i])
out_dict[combined_key] = func(*data_list)
return out_dict
Functions
def apply_vfuncs(vfuncs: dict, data_dict: dict, lazy: bool = False)
-
Apply a dictionary of functions
vfuncs
to each item ofdata_dict
, optionally returning a dictionary ofVfuncPromise
objects iflazy
is TrueOutput keys are determined by applying
combine_keys()
to each pair of items fromvfuncs
anddata_dict
. This function is used by all Vsets to apply functions.Parameters
vfuncs
:dict
- Dictionary of functions to apply to
data_dict
. data_dict
:dict
- Dictionary of parameters to call each function in
vfuncs
. lazy
:bool (option)
, defaultFalse
- If True,
vfuncs
are applied lazily, returningVfuncPromise
objects,
Returns
out_dict
:dict
- Output dictionary of applying
vfuncs
todata_dict
.
Expand source code
def apply_vfuncs(vfuncs: dict, data_dict: dict, lazy: bool = False): """Apply a dictionary of functions `vfuncs` to each item of `data_dict`, optionally returning a dictionary of `vflow.vfunc.VfuncPromise` objects if `lazy` is True Output keys are determined by applying `combine_keys` to each pair of items from `vfuncs` and `data_dict`. This function is used by all Vsets to apply functions. Parameters ---------- vfuncs: dict Dictionary of functions to apply to `data_dict`. data_dict: dict Dictionary of parameters to call each function in `vfuncs`. lazy: bool (option), default False If True, `vfuncs` are applied lazily, returning `vflow.vfunc.VfuncPromise` objects, Returns ------- out_dict: dict Output dictionary of applying `vfuncs` to `data_dict`. """ out_dict = {} for vf_k in vfuncs: if len(data_dict) == 0: func = deepcopy(vfuncs[vf_k]) if lazy: out_dict[vf_k] = VfuncPromise(func) else: out_dict[vf_k] = func() for data_k in data_dict: if PREV_KEY in (vf_k, data_k): continue combined_key = combine_keys(data_k, vf_k) if not len(combined_key) > 0: continue func = deepcopy(vfuncs[vf_k]) if lazy: # return a promise out_dict[combined_key] = VfuncPromise(func, *data_dict[data_k]) else: data_list = list(data_dict[data_k]) for i, data in enumerate(data_list): if isinstance(data, VfuncPromise): data_list[i] = data() if isinstance(func, RayRemoteFun) and not isinstance( data_list[i], ray.ObjectRef ): # send data to Ray's remote object store data_list[i] = ray.put(data_list[i]) elif isinstance(data_list[i], ray.ObjectRef): # this is not a remote function so get the data data_list[i] = ray.get(data_list[i]) out_dict[combined_key] = func(*data_list) return out_dict
def base_dict(d: dict)
-
Remove PREV_KEY from dict d if present
Expand source code
def base_dict(d: dict): """Remove PREV_KEY from dict d if present""" return {k: v for k, v in d.items() if k != PREV_KEY}
def combine_dicts(*args: dict, base_case=True)
-
Combines any number of dictionaries into a single dictionary. Dictionaries are combined left to right matching all keys according to
combine_keys()
Parameters
*args
:dict
- Dictionaries to recursively combine left to right.
Returns
combined_dict
:dict
- Combined dictionary.
Expand source code
def combine_dicts(*args: dict, base_case=True): """Combines any number of dictionaries into a single dictionary. Dictionaries are combined left to right matching all keys according to `combine_keys` Parameters ---------- *args: dict Dictionaries to recursively combine left to right. Returns ------- combined_dict: dict Combined dictionary. """ n_args = len(args) combined_dict = {} if n_args == 0: return combined_dict if n_args == 1: for k in args[0]: # wrap the dict values in tuples; this is helpful so that when we # pass the values to a vfunc fun in we can just use * expansion if k != PREV_KEY: combined_dict[k] = (args[0][k],) else: combined_dict[k] = args[0][k] return combined_dict if n_args == 2: for k0 in args[0]: for k1 in args[1]: if PREV_KEY in (k0, k1): continue combined_key = combine_keys(k0, k1) if len(combined_key) > 0: if base_case: combined_dict[combined_key] = (args[0][k0], args[1][k1]) else: combined_dict[combined_key] = args[0][k0] + (args[1][k1],) return combined_dict # combine the first two dicts and call recursively with remaining args return combine_dicts(combine_dicts(args[0], args[1]), *args[2:], base_case=False)
def combine_keys(left_key, right_key)
-
Combines
left_key
andright_key
, attempting to match on anySubkey
whereSubkey.is_matching()
isTrue
.Returns an empty key on failed matches when
Subkey.mismatches()
isTrue
. Always filters onright_key
and returnscombined_key
withleft_key
prefix.Parameters
left_key
:tuple
- Left tuple key to combine.
right_key
:tuple
- Right tuple key to combine.
Returns
combined_key
:tuple
- Combined tuple key filtered according to
Subkey.matches()
rules, which is empty according toSubkey.mismatches()
rule.
Expand source code
def combine_keys(left_key, right_key): """Combines `left_key` and `right_key`, attempting to match on any `Subkey` where `vflow.subkey.Subkey.is_matching` is `True`. Returns an empty key on failed matches when `vflow.subkey.Subkey.mismatches` is `True`. Always filters on `right_key` and returns `combined_key` with `left_key` prefix. Parameters ---------- left_key: tuple Left tuple key to combine. right_key: tuple Right tuple key to combine. Returns ------- combined_key: tuple Combined tuple key filtered according to `vflow.subkey.Subkey.matches` rules, which is empty according to `vflow.subkey.Subkey.mismatches` rule. """ if len(left_key) < len(right_key): match_key = left_key compare_key = right_key else: match_key = right_key compare_key = left_key match_subkeys = [subkey for subkey in match_key if subkey.is_matching()] if len(match_subkeys) > 0: matched_subkeys = [] for subkey in match_subkeys: for c_subkey in compare_key: if subkey.matches(c_subkey): matched_subkeys.append(subkey) break if subkey.mismatches(c_subkey): # subkeys with same origin but different values are rejected return () if len(matched_subkeys) > 0: # always filter on right key filtered_key = tuple( subkey for subkey in right_key if subkey not in matched_subkeys ) combined_key = left_key + filtered_key return combined_key return left_key + right_key return left_key + right_key
def dict_data(d: dict)
-
Returns a list containing all data in dict d
Expand source code
def dict_data(d: dict): """Returns a list containing all data in dict d""" return list(base_dict(d).values())
def dict_keys(d: dict)
-
Returns a list containing all keys in dict d
Expand source code
def dict_keys(d: dict): """Returns a list containing all keys in dict d""" return list(base_dict(d).keys())
def dict_to_df(d: dict, param_key=None)
-
Converts a dictionary with tuple keys into a pandas DataFrame, optionally seperating parameters in
param_key
if not NoneParameters
d
:dict
- Output dictionary with tuple keys from a Vset.
param_key
:str (optional)
, defaultNone
- Name of parameter to seperate into multiple columns.
Returns
df
:pandas.DataFrame
- A DataFrame with
d
tuple keys seperated into columns.
Expand source code
def dict_to_df(d: dict, param_key=None): """Converts a dictionary with tuple keys into a pandas DataFrame, optionally seperating parameters in `param_key` if not None Parameters ---------- d: dict Output dictionary with tuple keys from a Vset. param_key: str (optional), default None Name of parameter to seperate into multiple columns. Returns ------- df: pandas.DataFrame A DataFrame with `d` tuple keys seperated into columns. """ d_copy = {tuple(sk.value for sk in k): d[k] for k in d if k != PREV_KEY} df = pd.Series(d_copy).reset_index() if len(d_copy.keys()) > 0: key_list = list(d.keys()) subkey_list = key_list[0] if key_list[0] != PREV_KEY else key_list[1] cols = [sk.origin for sk in subkey_list] + ["out"] # set each init col to init-{next_vfunc_set} cols = [ c if c != "init" else init_step(idx, cols) for idx, c in enumerate(cols) ] df = df.set_axis(cols, axis=1) if param_key: param_keys = df[ param_key ].tolist() # pylint: disable=unsubscriptable-object if param_key == "out" and hasattr(param_keys[0], "__iter__"): param_df = pd.DataFrame(param_keys) param_df.columns = [f"{param_key}-{col}" for col in param_df.columns] df = df.join(param_df) else: param_loc = df.columns.get_loc(param_key) param_key_cols = [ f"{p.split('=')[0]}-{param_key}" for p in param_keys[0] ] param_keys = [[s.split("=")[1] for s in t] for t in param_keys] df = df.join(pd.DataFrame(param_keys)).drop(columns=param_key) new_cols = df.columns[: len(cols) - 1].tolist() + param_key_cols df = df.set_axis(new_cols, axis=1) new_idx = list(range(len(new_cols))) new_idx = ( new_idx[:param_loc] + new_idx[len(cols) - 1 :] + new_idx[param_loc : len(cols) - 1] ) df = df.iloc[:, new_idx] return df
def init_step(idx, cols)
-
Helper function to find init suffix in a column
Parameters
idx
:int
- Index of 'init' column in cols.
cols
:list[str]
- List of column names.
Expand source code
def init_step(idx, cols): """Helper function to find init suffix in a column Parameters ---------- idx: int Index of 'init' column in cols. cols: list[str] List of column names. """ for i in range(idx, len(cols)): if cols[i] != "init": return "init-" + cols[i] return None
def perturbation_stats(data: Union[pandas.core.frame.DataFrame, dict], *group_by: str, wrt: str = 'out', func=None, prefix: str = None, split: bool = False)
-
Compute statistics for
wrt
indata
, conditional ongroup_by
Parameters
data
:Union[pandas.DataFrame, dict]
- DataFrame, as from calling
dict_to_df()
on an output dict from a Vset, or the output dict itself. *group_by
:str
- Vset names in
data
to group on. If none provided, treats everything as one big group. wrt
:str (optional)
- Column name in
data
ordict_to_df()(data)
on which to compute statistics. Defaults to'out'
, the values of the original Vset output dict. func
:function, str, list
ordict (optional)
, defaultNone
- A list of functions or function names to use for computing
statistics, analogous to the parameter of the same name in
pandas.core.groupby.DataFrameGroupBy.aggregate. If
None
, defaults to['count', 'mean', 'std']
. prefix
:str (optional)
, defaultNone
- A string to prefix to new columns in output DataFrame. If
None
, uses the value ofwrt
. split
:bool (optional)
, defaultFalse
- If
True
andwrt
indata
haslist
ornumpy.ndarray
entries, will attempt to split the entries into multiple columns for the output.
Returns
df
:pandas.DataFrame
- A DataFrame with summary statistics on
wrt
.
Expand source code
def perturbation_stats( data: Union[pd.DataFrame, dict], *group_by: str, wrt: str = "out", func=None, prefix: str = None, split: bool = False, ): """Compute statistics for `wrt` in `data`, conditional on `group_by` Parameters ---------- data: Union[pandas.DataFrame, dict] DataFrame, as from calling `dict_to_df` on an output dict from a Vset, or the output dict itself. *group_by: str Vset names in `data` to group on. If none provided, treats everything as one big group. wrt: str (optional) Column name in `data` or `dict_to_df(data)` on which to compute statistics. Defaults to `'out'`, the values of the original Vset output dict. func: function, str, list or dict (optional), default None A list of functions or function names to use for computing statistics, analogous to the parameter of the same name in pandas.core.groupby.DataFrameGroupBy.aggregate. If `None`, defaults to `['count', 'mean', 'std']`. prefix: str (optional), default None A string to prefix to new columns in output DataFrame. If `None`, uses the value of `wrt`. split: bool (optional), default False If `True` and `wrt` in `data` has `list` or `numpy.ndarray` entries, will attempt to split the entries into multiple columns for the output. Returns ------- df: pandas.DataFrame A DataFrame with summary statistics on `wrt`. """ if func is None: func = ["count", "mean", "std"] if prefix is None: prefix = wrt if isinstance(data, dict): df = dict_to_df(data) else: df = data group_by = list(group_by) if len(group_by) > 0: gb = df.groupby(group_by)[wrt] else: gb = df.groupby(lambda x: True)[wrt] if (isinstance(func, list) and "mean" in func or "std" in func) and ( type(df[wrt].iloc[0]) in [list, np.ndarray] ): wrt_arrays = [ np.stack(d.tolist()) for d in (gb.get_group(grp) for grp in gb.groups) ] n_cols = wrt_arrays[0].shape[1] df_out = pd.DataFrame(gb.agg("count")) df_out.columns = [f"{prefix}-count"] if "mean" in func: if split: col_means = [arr.mean(axis=0) for arr in wrt_arrays] wrt_means = pd.DataFrame( col_means, columns=[f"{prefix}{i}-mean" for i in range(n_cols)], index=gb.groups.keys(), ) else: col_means = [{f"{prefix}-mean": arr.mean(axis=0)} for arr in wrt_arrays] wrt_means = pd.DataFrame(col_means, index=gb.groups.keys()) wrt_means.index.names = df_out.index.names df_out = df_out.join(wrt_means) if "std" in func: if split: col_stds = [arr.std(axis=0, ddof=1) for arr in wrt_arrays] wrt_stds = pd.DataFrame( col_stds, columns=[f"{prefix}{i}-std" for i in range(n_cols)], index=gb.groups.keys(), ) else: col_stds = [ {f"{prefix}-std": arr.std(axis=0, ddof=1)} for arr in wrt_arrays ] wrt_stds = pd.DataFrame(col_stds, index=gb.groups.keys()) wrt_stds.index.names = df_out.index.names df_out = df_out.join(wrt_stds) if "count" not in func: df_out = df_out.drop(f"{prefix}-count") else: df_out = gb.agg(func) df_out = df_out.reindex(sorted(df_out.columns), axis=1) df_out.reset_index(inplace=True) if len(group_by) > 0: return df_out.sort_values(group_by[0]) return df_out
def sep_dicts(d: dict, n_out: int = 1, keys=None)
-
Converts dictionary with value being saved as an iterable into multiple dictionaries
Assumes every value has same length n_out
Parameters
d
:dict
- Dictionary with iterable values to be converted.
n_out
:int
, default1
- The number of dictionaries to separate d into.
keys
:list-like
, defaultNone
- Optional list of keys to use in output dicts.
Returns
sep_dicts_list
:list
- List of seperated dictionaries.
Examples
>>> sep_dicts({k1: (x1, y1), k2: (x2, y2), ..., '__prev__': p}) [{k1: x1, k2: x2, ..., '__prev__': p}, {k1: y1, k2: y2, ..., '__prev__': p}]
Expand source code
def sep_dicts(d: dict, n_out: int = 1, keys=None): """Converts dictionary with value being saved as an iterable into multiple dictionaries Assumes every value has same length n_out Parameters ---------- d: dict Dictionary with iterable values to be converted. n_out: int, default 1 The number of dictionaries to separate d into. keys: list-like, default None Optional list of keys to use in output dicts. Returns ------- sep_dicts_list: list List of seperated dictionaries. Examples -------- >>> sep_dicts({k1: (x1, y1), k2: (x2, y2), ..., '__prev__': p}) [{k1: x1, k2: x2, ..., '__prev__': p}, {k1: y1, k2: y2, ..., '__prev__': p}] """ if keys is None: keys = [] if len(keys) > 0 and len(keys) != n_out: raise ValueError(f"keys should be empty or have length n_out={n_out}") # empty dict -- return empty dict if n_out <= 1: return d # try separating dict into multiple dicts sep_dicts_id = str(uuid4()) # w/ high prob, uuid4 is unique sep_dicts_list = [{} for _ in range(n_out)] for key, value in d.items(): if key != PREV_KEY: for i in range(n_out): # assumes the correct sub-key for item i is in the i-th position if len(keys) == 0: new_key = (key[i],) + key[n_out:] else: new_sub = Subkey( value=keys[i], origin=key[-1].origin + "-" + str(i) ) new_key = (new_sub,) + key new_key[-1].sep_dicts_id = sep_dicts_id if isinstance(value, VfuncPromise): # return a promise to get the value at index i of the # original promise value_i = VfuncPromise(lambda v, x: v[x], value, i) else: value_i = value[i] sep_dicts_list[i][new_key] = value_i return sep_dicts_list
def to_list(tup: tuple)
-
Convert from tuple to packed list
Allows us to call function with arguments in a loop
Parameters
tup
:tuple
- tuple of objects to convert to packed list
Raises
ValueError
- If passed uneven number of arguments without a list. Please wrap your args in a list.
Examples
>>> to_list(([x1, x2, x3], [y1, y2, y3])) [[x1, y1], [x2, y2], [x3, y3]] >>> to_list(([x1], [y1])) [[x1, y1]] >>> to_list(([x1, x2, x3], )) [[x1], [x2], [x3]] >>> to_list((x1, )) [[x1]] >>> to_list((x1, y1)) [[x1, y1]] >>> to_list((x1, x2, x3, y1, y2, y3)) [[x1, y1], [x2, y2], [x3, y3]]
Expand source code
def to_list(tup: tuple): """Convert from tuple to packed list Allows us to call function with arguments in a loop Parameters ---------- tup: tuple tuple of objects to convert to packed list Raises ------ ValueError If passed uneven number of arguments without a list. Please wrap your args in a list. Examples -------- >>> to_list(([x1, x2, x3], [y1, y2, y3])) [[x1, y1], [x2, y2], [x3, y3]] >>> to_list(([x1], [y1])) [[x1, y1]] >>> to_list(([x1, x2, x3], )) [[x1], [x2], [x3]] >>> to_list((x1, )) [[x1]] >>> to_list((x1, y1)) [[x1, y1]] >>> to_list((x1, x2, x3, y1, y2, y3)) [[x1, y1], [x2, y2], [x3, y3]] """ n_tup = len(tup) if n_tup == 0: return [] if not isinstance(tup[0], list): # the first element is data if n_tup == 1: return [list(tup)] if n_tup % 2 != 0: raise ValueError( "Don't know how to handle uneven number of args " "without a list. Please wrap your args in a list." ) # assume first half of args is input and second half is outcome return [list(el) for el in zip(tup[: (n_tup // 2)], tup[(n_tup // 2) :])] if n_tup == 1: return [[x] for x in tup[0]] n_mods = len(tup[0]) lists_packed = [[] for _ in range(n_mods)] for i in range(n_mods): for j in range(n_tup): lists_packed[i].append(tup[j][i]) return lists_packed
def to_tuple(lists: list)
-
Convert from lists to unpacked tuple
Allows us to write
X, y = to_tuple([[x1, y1], [x2, y2], [x3, y3]])
Parameters
lists
:list
- list of objects to convert to unpacked tuple
Examples
>>> to_tuple([[x1, y1], [x2, y2], [x3, y3]]) ([x1, x2, x3], [y1, y2, y3]) >>> to_tuple([[x1, y1]]) ([x1], [y1]) >>> to_tuple([m1, m2, m3]) [m1, m2, m3]
Expand source code
def to_tuple(lists: list): """Convert from lists to unpacked tuple Allows us to write `X, y = to_tuple([[x1, y1], [x2, y2], [x3, y3]])` Parameters ---------- lists: list list of objects to convert to unpacked tuple Examples -------- >>> to_tuple([[x1, y1], [x2, y2], [x3, y3]]) ([x1, x2, x3], [y1, y2, y3]) >>> to_tuple([[x1, y1]]) ([x1], [y1]) >>> to_tuple([m1, m2, m3]) [m1, m2, m3] """ n_mods = len(lists) if n_mods <= 1: return lists if not isinstance(lists[0], list): return lists n_tup = len(lists[0]) tup = [[] for _ in range(n_tup)] for i in range(n_mods): for j in range(n_tup): tup[j].append(lists[i][j]) return tuple(tup)