A library for making stability analysis simple. Easily evaluate the effect of judgment calls to your data-science pipeline (e.g. choice of imputation strategy)!
Why use vflow
?
Using vflow
s simple wrappers facilitates many best practices for data science,
as laid out in the predictability, computability, and stability (PCS) framework
for veridical data science. The goal
of vflow
is to easily enable data science pipelines that follow PCS by
providing intuitive low-code syntax, efficient and flexible computational
backends via Ray
,
and well-documented, reproducible experimentation via
MLflow
.
Computation | Reproducibility | Prediction | Stability |
---|---|---|---|
Automatic parallelization and caching throughout the pipeline | Automatic experiment tracking and saving | Filter the pipeline by training and validation performance | Replace a single function (e.g. preprocessing) with a set of functions and easily assess the stability of downstream results |
Here we show a simple example of an entire data-science pipeline with several
perturbations (e.g. different data subsamples, models, and metrics) written
simply using vflow
.
import sklearn
from sklearn.datasets import make_classification
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score, balanced_accuracy_score
from sklearn.model_selection import train_test_split
from sklearn.tree import DecisionTreeClassifier
from vflow import Vset, init_args
# initialize data
X, y = make_classification()
X_train, X_test, y_train, y_test = init_args(
train_test_split(X, y),
names=["X_train", "X_test", "y_train", "y_test"], # optionally name the args
)
# subsample data
subsampling_funcs = [sklearn.utils.resample for _ in range(3)]
subsampling_set = Vset(
name="subsampling", vfuncs=subsampling_funcs, output_matching=True
)
X_trains, y_trains = subsampling_set(X_train, y_train)
# fit models
models = [LogisticRegression(), DecisionTreeClassifier()]
modeling_set = Vset(name="modeling", vfuncs=models, vfunc_keys=["LR", "DT"])
modeling_set.fit(X_trains, y_trains)
preds_test = modeling_set.predict(X_test)
# get metrics
binary_metrics_set = Vset(
name="binary_metrics",
vfuncs=[accuracy_score, balanced_accuracy_score],
vfunc_keys=["Acc", "Bal_Acc"],
)
binary_metrics = binary_metrics_set.evaluate(preds_test, y_test)
Once we've written this pipeline, we can easily measure the stability of metrics (e.g. "Accuracy") to our choice of subsampling or model.
Documentation
See the docs for reference on the API
Notebook examples
Note that some of these require more dependencies than just those required for
vflow
. To install all, runpip install vflow[nb]
.
Installation
Stable version
pip install vflow
Development version (unstable)
pip install vflow@git+https://github.com/Yu-Group/veridical-flow
References
- interface: easily build on scikit-learn and dvc (data version control)
- computation: integration with ray and caching with joblib
- tracking: mlflow
- pull requests very welcome! (see contributing.md)
@software{duncan2020vflow,
author = {Duncan, James and Kapoor, Rush and Agarwal, Abhineet and Singh, Chandan and Yu, Bin},
doi = {10.21105/joss.03895},
month = {1},
title = {{VeridicalFlow: a Python package for building trustworthy data science pipelines with PCS}},
url = {https://doi.org/10.21105/joss.03895},
year = {2022}
}
Expand source code
"""
.. include:: ../README.md
"""
from .helpers import (
build_vset,
cum_acc_by_uncertainty,
filter_vset_by_metric,
init_args,
)
from .pipeline import PCSPipeline, build_graph
from .subkey import Subkey
from .utils import (
apply_vfuncs,
base_dict,
combine_dicts,
combine_keys,
dict_data,
dict_keys,
dict_to_df,
init_step,
perturbation_stats,
sep_dicts,
to_list,
to_tuple,
)
from .vfunc import AsyncVfunc, Vfunc, VfuncPromise
from .vset import Vset
__all__ = [
# vflow.helpers
"init_args",
"build_vset",
"filter_vset_by_metric",
"cum_acc_by_uncertainty",
# vflow.pipeline
"PCSPipeline",
"build_graph",
# vflow.subkey
"Subkey",
# vflow.utils
"apply_vfuncs",
"base_dict",
"combine_dicts",
"combine_keys",
"dict_data",
"dict_keys",
"dict_to_df",
"init_step",
"perturbation_stats",
"sep_dicts",
"to_list",
"to_tuple",
# vflow.vfunc
"Vfunc",
"AsyncVfunc",
"VfuncPromise",
# vflow.vset
"Vset",
]
Sub-modules
vflow.helpers
-
User-facing helper functions included at import vflow
vflow.pipeline
-
Class that stores the entire pipeline of steps in a data-science workflow
vflow.subkey
-
Defines a parameter from some origin Vset
vflow.utils
-
Useful functions for converting between different types (dicts, lists, tuples, etc.)
vflow.vfunc
-
A perturbation that can be used as a step in a pipeline
vflow.vset
-
Set of vfuncs to be parallelized over in a pipeline. Function arguments are each a list
Functions
def apply_vfuncs(vfuncs: dict, data_dict: dict, lazy: bool = False)
-
Apply a dictionary of functions
vfuncs
to each item ofdata_dict
, optionally returning a dictionary ofVfuncPromise
objects iflazy
is TrueOutput keys are determined by applying
combine_keys()
to each pair of items fromvfuncs
anddata_dict
. This function is used by all Vsets to apply functions.Parameters
vfuncs
:dict
- Dictionary of functions to apply to
data_dict
. data_dict
:dict
- Dictionary of parameters to call each function in
vfuncs
. lazy
:bool (option)
, defaultFalse
- If True,
vfuncs
are applied lazily, returningVfuncPromise
objects,
Returns
out_dict
:dict
- Output dictionary of applying
vfuncs
todata_dict
.
Expand source code
def apply_vfuncs(vfuncs: dict, data_dict: dict, lazy: bool = False): """Apply a dictionary of functions `vfuncs` to each item of `data_dict`, optionally returning a dictionary of `vflow.vfunc.VfuncPromise` objects if `lazy` is True Output keys are determined by applying `combine_keys` to each pair of items from `vfuncs` and `data_dict`. This function is used by all Vsets to apply functions. Parameters ---------- vfuncs: dict Dictionary of functions to apply to `data_dict`. data_dict: dict Dictionary of parameters to call each function in `vfuncs`. lazy: bool (option), default False If True, `vfuncs` are applied lazily, returning `vflow.vfunc.VfuncPromise` objects, Returns ------- out_dict: dict Output dictionary of applying `vfuncs` to `data_dict`. """ out_dict = {} for vf_k in vfuncs: if len(data_dict) == 0: func = deepcopy(vfuncs[vf_k]) if lazy: out_dict[vf_k] = VfuncPromise(func) else: out_dict[vf_k] = func() for data_k in data_dict: if PREV_KEY in (vf_k, data_k): continue combined_key = combine_keys(data_k, vf_k) if not len(combined_key) > 0: continue func = deepcopy(vfuncs[vf_k]) if lazy: # return a promise out_dict[combined_key] = VfuncPromise(func, *data_dict[data_k]) else: data_list = list(data_dict[data_k]) for i, data in enumerate(data_list): if isinstance(data, VfuncPromise): data_list[i] = data() if isinstance(func, RayRemoteFun) and not isinstance( data_list[i], ray.ObjectRef ): # send data to Ray's remote object store data_list[i] = ray.put(data_list[i]) elif isinstance(data_list[i], ray.ObjectRef): # this is not a remote function so get the data data_list[i] = ray.get(data_list[i]) out_dict[combined_key] = func(*data_list) return out_dict
def base_dict(d: dict)
-
Remove PREV_KEY from dict d if present
Expand source code
def base_dict(d: dict): """Remove PREV_KEY from dict d if present""" return {k: v for k, v in d.items() if k != PREV_KEY}
def build_graph(node, draw=True)
-
Helper function that just calls build_graph_recur with an empty graph
Parameters
node
:dict
orVset
Returns
G
:nx.Digraph()
Expand source code
def build_graph(node, draw=True): """Helper function that just calls build_graph_recur with an empty graph Parameters ---------- node: dict or Vset Returns ------- G: nx.Digraph() """ def unnest_node(node): """Unnest a node, if necessary (i.e., when node is a tuple) Parameters ---------- node: str, dict, Vset, or tuple Returns ------- unnested_node: str, Vset, or None """ node_type = type(node) if node_type is str or "Vset" in str(node_type): return node if node_type is tuple: return unnest_node(node[0]) return None def build_graph_recur(node, G): """Builds a graph up using __prev__ and PREV_KEY pointers Parameters ---------- node: str, dict, Vset, or tuple G: nx.Digraph() Returns ------- G: nx.Digraph() """ # base case: reached starting node if isinstance(node, str): return G # initial case: starting at dict if isinstance(node, dict): s_node = "End" nodes_prev = node[PREV_KEY] G.add_edge(nodes_prev[0], s_node) for node_prev in nodes_prev[1:]: G.add_edge(unnest_node(node_prev), nodes_prev[0]) G = build_graph_recur(node_prev, G) return G # main case: at a vfuncset if "Vset" in str(type(node)): if hasattr(node, PREV_KEY): nodes_prev = getattr(node, PREV_KEY) for node_prev in nodes_prev: G.add_edge(unnest_node(node_prev), node) G = build_graph_recur(node_prev, G) return G # nested prev key case if isinstance(node, tuple): func_node = unnest_node(node[0]) G = build_graph_recur(func_node, G) for arg_node in node[1:]: G.add_edge(unnest_node(arg_node), func_node) G = build_graph_recur(arg_node, G) return G return G G = nx.DiGraph() G = build_graph_recur(node, G) if draw: nx.draw(G, with_labels=True, node_color="#CCCCCC") return G
def build_vset(name: str, func, param_dict=None, reps: int = 1, is_async: bool = False, output_matching: bool = False, lazy: bool = False, cache_dir: str = None, tracking_dir: str = None, **kwargs) ‑> Vset
-
Builds a new Vset by currying or instantiating callable
func
with all combinations of parameters inparam_dict
and optional additional**kwargs
. Iffunc
andparam_dict
are lists, then the ith entry offunc
will be curried with ith entry ofparam_dict
. If only one offunc
orparam_dict
is a list, the samefunc
/param_dict
will be curried for all entries in the list. Vfuncs are named withparam_dict
items as tuples of str("param_name=param_val").Parameters
name
:str
- A name for the output Vset.
func
:callable
orlist[callable]
- A callable to use as the base for Vfuncs in the output Vset. Can also be
a class object, in which case the class is immediately instantiated with
the parameter combinations from
param_dict
. Can also be a list of callables, where the ith entry corresponds toparam_dict
or the ith entry ofparam_dict
(ifparam_dict
is a list). param_dict
:dict[str, list]
orlist[dict[str, list]]
, optional- A dict with string keys corresponding to argument names of
func
and entries which are lists of values to pass tofunc
at run time (or when instantiatingfunc
if it's a class object). Can also be a list of dicts, where the ith dict entry corresponds tofunc
or the ith entry offunc
(iffunc
is a list). If no parameters are required for the ith function, the ith entry ofparam_dict
can beNone
. reps
:int
, optional- The number of times to repeat
func
in the output Vset's vfuncs for each combination of the parameters inparam_dict
. is_async
:bool
, optional- If True, vfuncs are computed asynchronously.
output_matching
:bool
, optional- If True, then output keys from Vset will be matched when used in other Vsets.
cache_dir
:str
, optional- If provided, do caching and use
cache_dir
as the data store for joblib.Memory. tracking_dir
:str
, optional- If provided, use the mlflow.tracking API to log outputs as metrics with parameters determined by input keys.
**kwargs
- Additional fixed keyword arguments to pass to
func
.
Returns
new_vset
:Vset
Expand source code
def build_vset( name: str, func, param_dict=None, reps: int = 1, is_async: bool = False, output_matching: bool = False, lazy: bool = False, cache_dir: str = None, tracking_dir: str = None, **kwargs, ) -> Vset: """Builds a new Vset by currying or instantiating callable `func` with all combinations of parameters in `param_dict` and optional additional `**kwargs`. If `func` and `param_dict` are lists, then the ith entry of `func` will be curried with ith entry of `param_dict`. If only one of `func` or `param_dict` is a list, the same `func`/`param_dict` will be curried for all entries in the list. Vfuncs are named with `param_dict` items as tuples of str("param_name=param_val"). Parameters ---------- name : str A name for the output Vset. func : callable or list[callable] A callable to use as the base for Vfuncs in the output Vset. Can also be a class object, in which case the class is immediately instantiated with the parameter combinations from `param_dict`. Can also be a list of callables, where the ith entry corresponds to `param_dict` or the ith entry of `param_dict` (if `param_dict` is a list). param_dict : dict[str, list] or list[dict[str, list]], optional A dict with string keys corresponding to argument names of `func` and entries which are lists of values to pass to `func` at run time (or when instantiating `func` if it's a class object). Can also be a list of dicts, where the ith dict entry corresponds to `func` or the ith entry of `func` (if `func` is a list). If no parameters are required for the ith function, the ith entry of `param_dict` can be `None`. reps : int, optional The number of times to repeat `func` in the output Vset's vfuncs for each combination of the parameters in `param_dict`. is_async : bool, optional If True, vfuncs are computed asynchronously. output_matching : bool, optional If True, then output keys from Vset will be matched when used in other Vsets. cache_dir : str, optional If provided, do caching and use `cache_dir` as the data store for joblib.Memory. tracking_dir : str, optional If provided, use the mlflow.tracking API to log outputs as metrics with parameters determined by input keys. **kwargs Additional fixed keyword arguments to pass to `func`. Returns ------- new_vset : vflow.vset.Vset """ f_list = [] pd_list = [] if isinstance(func, list): if isinstance(param_dict, list): assert len(param_dict) == len( func ), "list of param_dicts must be same length as list of funcs" f_list.extend(func) pd_list.extend(param_dict) else: pd_list.extend([param_dict] * len(func)) f_list.extend(func) elif isinstance(param_dict, list): f_list.extend([func] * len(param_dict)) pd_list.extend(param_dict) else: f_list.append(func) pd_list.append(param_dict) vfuncs = [] vkeys = [] for f, pd in zip(f_list, pd_list): if pd is None: pd = {} assert callable(f), "func must be callable" kwargs_tuples = product(*list(pd.values())) for tup in kwargs_tuples: kwargs_dict = {} vkey_tup = (f"func={f.__name__}",) for param_name, param_val in zip(list(pd.keys()), tup): kwargs_dict[param_name] = param_val vkey_tup += (f"{param_name}={param_val}",) # add additional fixed kwargs to kwargs_dict for k, v in kwargs.items(): kwargs_dict[k] = v for i in range(reps): # add vfunc key to vkeys if reps > 1: vkeys.append((f"rep={i}",) + vkey_tup) else: vkeys.append(vkey_tup) # check if func is a class if isinstance(f, type): # instantiate func vfuncs.append(Vfunc(vfunc=f(**kwargs_dict), name=str(vkey_tup))) else: # use partial to wrap func vfuncs.append( Vfunc(vfunc=partial(f, **kwargs_dict), name=str(vkey_tup)) ) if all(pd is None for pd in pd_list) and reps == 1: vkeys = None return Vset( name, vfuncs, is_async=is_async, vfunc_keys=vkeys, output_matching=output_matching, lazy=lazy, cache_dir=cache_dir, tracking_dir=tracking_dir, )
def combine_dicts(*args: dict, base_case=True)
-
Combines any number of dictionaries into a single dictionary. Dictionaries are combined left to right matching all keys according to
combine_keys()
Parameters
*args
:dict
- Dictionaries to recursively combine left to right.
Returns
combined_dict
:dict
- Combined dictionary.
Expand source code
def combine_dicts(*args: dict, base_case=True): """Combines any number of dictionaries into a single dictionary. Dictionaries are combined left to right matching all keys according to `combine_keys` Parameters ---------- *args: dict Dictionaries to recursively combine left to right. Returns ------- combined_dict: dict Combined dictionary. """ n_args = len(args) combined_dict = {} if n_args == 0: return combined_dict if n_args == 1: for k in args[0]: # wrap the dict values in tuples; this is helpful so that when we # pass the values to a vfunc fun in we can just use * expansion if k != PREV_KEY: combined_dict[k] = (args[0][k],) else: combined_dict[k] = args[0][k] return combined_dict if n_args == 2: for k0 in args[0]: for k1 in args[1]: if PREV_KEY in (k0, k1): continue combined_key = combine_keys(k0, k1) if len(combined_key) > 0: if base_case: combined_dict[combined_key] = (args[0][k0], args[1][k1]) else: combined_dict[combined_key] = args[0][k0] + (args[1][k1],) return combined_dict # combine the first two dicts and call recursively with remaining args return combine_dicts(combine_dicts(args[0], args[1]), *args[2:], base_case=False)
def combine_keys(left_key, right_key)
-
Combines
left_key
andright_key
, attempting to match on anySubkey
whereSubkey.is_matching()
isTrue
.Returns an empty key on failed matches when
Subkey.mismatches()
isTrue
. Always filters onright_key
and returnscombined_key
withleft_key
prefix.Parameters
left_key
:tuple
- Left tuple key to combine.
right_key
:tuple
- Right tuple key to combine.
Returns
combined_key
:tuple
- Combined tuple key filtered according to
Subkey.matches()
rules, which is empty according toSubkey.mismatches()
rule.
Expand source code
def combine_keys(left_key, right_key): """Combines `left_key` and `right_key`, attempting to match on any `Subkey` where `vflow.subkey.Subkey.is_matching` is `True`. Returns an empty key on failed matches when `vflow.subkey.Subkey.mismatches` is `True`. Always filters on `right_key` and returns `combined_key` with `left_key` prefix. Parameters ---------- left_key: tuple Left tuple key to combine. right_key: tuple Right tuple key to combine. Returns ------- combined_key: tuple Combined tuple key filtered according to `vflow.subkey.Subkey.matches` rules, which is empty according to `vflow.subkey.Subkey.mismatches` rule. """ if len(left_key) < len(right_key): match_key = left_key compare_key = right_key else: match_key = right_key compare_key = left_key match_subkeys = [subkey for subkey in match_key if subkey.is_matching()] if len(match_subkeys) > 0: matched_subkeys = [] for subkey in match_subkeys: for c_subkey in compare_key: if subkey.matches(c_subkey): matched_subkeys.append(subkey) break if subkey.mismatches(c_subkey): # subkeys with same origin but different values are rejected return () if len(matched_subkeys) > 0: # always filter on right key filtered_key = tuple( subkey for subkey in right_key if subkey not in matched_subkeys ) combined_key = left_key + filtered_key return combined_key return left_key + right_key return left_key + right_key
def cum_acc_by_uncertainty(mean_preds, std_preds, true_labels)
-
Returns uncertainty and cumulative accuracy for grouped class predictions, sorted in increasing order of uncertainty
Params
mean_preds: dict mean predictions, output from Vset.predict_with_uncertainties std_preds: dict std predictions, output from Vset.predict_with_uncertainties true_labels: dict or list-like
TODO: generalize to multi-class classification
Expand source code
def cum_acc_by_uncertainty(mean_preds, std_preds, true_labels): """Returns uncertainty and cumulative accuracy for grouped class predictions, sorted in increasing order of uncertainty Params ------ mean_preds: dict mean predictions, output from Vset.predict_with_uncertainties std_preds: dict std predictions, output from Vset.predict_with_uncertainties true_labels: dict or list-like TODO: generalize to multi-class classification """ assert dict_keys(mean_preds) == dict_keys( std_preds ), "mean_preds and std_preds must share the same keys" # match predictions on keys paired_preds = [ [d[k] for d in (mean_preds, std_preds)] for k in dict_keys(mean_preds) ] mean_preds, std_preds = (np.array(p)[:, :, 1] for p in zip(*paired_preds)) if isinstance(true_labels, dict): true_labels = dict_data(true_labels) assert len(true_labels) == 1, "true_labels should have a single 1D vector entry" true_labels = true_labels[0] n_obs = len(mean_preds[0]) assert ( len(true_labels) == n_obs ), f"true_labels has {len(true_labels)} obs. but should have same as predictions ({n_obs})" sorted_idx = np.argsort(std_preds, axis=1) correct_labels = np.take_along_axis( np.around(mean_preds) - true_labels == 0, sorted_idx, 1 ) uncertainty = np.take_along_axis(std_preds, sorted_idx, 1) cum_acc = np.cumsum(correct_labels, axis=1) / range(1, n_obs + 1) return uncertainty, cum_acc, sorted_idx
def dict_data(d: dict)
-
Returns a list containing all data in dict d
Expand source code
def dict_data(d: dict): """Returns a list containing all data in dict d""" return list(base_dict(d).values())
def dict_keys(d: dict)
-
Returns a list containing all keys in dict d
Expand source code
def dict_keys(d: dict): """Returns a list containing all keys in dict d""" return list(base_dict(d).keys())
def dict_to_df(d: dict, param_key=None)
-
Converts a dictionary with tuple keys into a pandas DataFrame, optionally seperating parameters in
param_key
if not NoneParameters
d
:dict
- Output dictionary with tuple keys from a Vset.
param_key
:str (optional)
, defaultNone
- Name of parameter to seperate into multiple columns.
Returns
df
:pandas.DataFrame
- A DataFrame with
d
tuple keys seperated into columns.
Expand source code
def dict_to_df(d: dict, param_key=None): """Converts a dictionary with tuple keys into a pandas DataFrame, optionally seperating parameters in `param_key` if not None Parameters ---------- d: dict Output dictionary with tuple keys from a Vset. param_key: str (optional), default None Name of parameter to seperate into multiple columns. Returns ------- df: pandas.DataFrame A DataFrame with `d` tuple keys seperated into columns. """ d_copy = {tuple(sk.value for sk in k): d[k] for k in d if k != PREV_KEY} df = pd.Series(d_copy).reset_index() if len(d_copy.keys()) > 0: key_list = list(d.keys()) subkey_list = key_list[0] if key_list[0] != PREV_KEY else key_list[1] cols = [sk.origin for sk in subkey_list] + ["out"] # set each init col to init-{next_vfunc_set} cols = [ c if c != "init" else init_step(idx, cols) for idx, c in enumerate(cols) ] df = df.set_axis(cols, axis=1) if param_key: param_keys = df[ param_key ].tolist() # pylint: disable=unsubscriptable-object if param_key == "out" and hasattr(param_keys[0], "__iter__"): param_df = pd.DataFrame(param_keys) param_df.columns = [f"{param_key}-{col}" for col in param_df.columns] df = df.join(param_df) else: param_loc = df.columns.get_loc(param_key) param_key_cols = [ f"{p.split('=')[0]}-{param_key}" for p in param_keys[0] ] param_keys = [[s.split("=")[1] for s in t] for t in param_keys] df = df.join(pd.DataFrame(param_keys)).drop(columns=param_key) new_cols = df.columns[: len(cols) - 1].tolist() + param_key_cols df = df.set_axis(new_cols, axis=1) new_idx = list(range(len(new_cols))) new_idx = ( new_idx[:param_loc] + new_idx[len(cols) - 1 :] + new_idx[param_loc : len(cols) - 1] ) df = df.iloc[:, new_idx] return df
def filter_vset_by_metric(metric_dict: dict, vset: Vset, *vsets: Vset, n_keep: int = 1, bigger_is_better: bool = True, filter_on=None, group: bool = False) ‑> Union[Vset, list]
-
Returns a new Vset by filtering
vset.vfuncs
based on values in filter_dict.Parameters
metric_dict
:dict
- output from a Vset, typically with metrics or other numeric values to use when
filtering
vset.vfuncs
vset
:Vset
- a Vsets
*vsets
:Vset
- zero or more additional Vsets
n_keep
:int (optional)
- number of entries to keep from
vset.vfuncs
bigger_is_better
:bool (optional)
- if True, then the top
n_keep
largest values are retained filter_on
:list[str] (optional)
- if there are multiple metrics in
metric_dict
, you can specify a subset to consider group
:bool (optional)
- if True, average metrics after grouping values in
metric_dict
by the input Vset names
Returns
*new_vset
:Vset
- Copies of the input Vsets but with Vfuncs filtered based on metrics
Expand source code
def filter_vset_by_metric( metric_dict: dict, vset: Vset, *vsets: Vset, n_keep: int = 1, bigger_is_better: bool = True, filter_on=None, group: bool = False, ) -> Union[Vset, list]: """Returns a new Vset by filtering `vset.vfuncs` based on values in filter_dict. Parameters ---------- metric_dict: dict output from a Vset, typically with metrics or other numeric values to use when filtering `vset.vfuncs` vset: Vset a Vsets *vsets: Vset zero or more additional Vsets n_keep: int (optional) number of entries to keep from `vset.vfuncs` bigger_is_better: bool (optional) if True, then the top `n_keep` largest values are retained filter_on: list[str] (optional) if there are multiple metrics in `metric_dict`, you can specify a subset to consider group: bool (optional) if True, average metrics after grouping values in `metric_dict` by the input Vset names Returns ------- *new_vset : Vset Copies of the input Vsets but with Vfuncs filtered based on metrics """ if filter_on is None: filter_on = [] df = dict_to_df(metric_dict) vsets = [vset, *vsets] vset_names = [] for vset_i in vsets: if vset_i.name not in df.columns: raise ValueError( ( f"{vset_i.name} should be one " "of the columns of dict_to_df(metric_dict)" ) ) vset_names.append(vset_i.name) if len(filter_on) > 0: filter_col = list(metric_dict.keys())[0][-1].origin df = df[df[filter_col].isin(filter_on)] if group: df = df.groupby(by=vset_names, as_index=False).mean(numeric_only=True) if bigger_is_better: df = df.sort_values(by="out", ascending=False) else: df = df.sort_values(by="out") df = df.iloc[0:n_keep] for i, vset_i in enumerate(vsets): vfuncs = vset_i.vfuncs vfunc_filter = [str(name) for name in df[vset_i.name].to_numpy()] new_vfuncs = {k: v for k, v in vfuncs.items() if str(v.name) in vfunc_filter} tracking_dir = None if vset_i._mlflow is None else mlflow.get_tracking_uri() new_vset = Vset( "filtered_" + vset_i.name, new_vfuncs, is_async=vset_i._async, output_matching=vset_i._output_matching, lazy=vset_i._lazy, cache_dir=vset_i._cache_dir, tracking_dir=tracking_dir, ) setattr( new_vset, FILTER_PREV_KEY, ( metric_dict[PREV_KEY], vset_i, ), ) setattr(new_vset, PREV_KEY, getattr(new_vset, FILTER_PREV_KEY)) vsets[i] = new_vset if len(vsets) == 1: return vsets[0] return vsets
def init_args(args_tuple: Union[tuple, list], names=None)
-
Converts tuple of arguments to a list of dicts
Parameters
names
:list-like (optional)
, defaultNone
- given names for each of the arguments in the tuple
Expand source code
def init_args(args_tuple: Union[tuple, list], names=None): """Converts tuple of arguments to a list of dicts Parameters ---------- names: list-like (optional), default None given names for each of the arguments in the tuple """ if names is None: names = ["start"] * len(args_tuple) else: assert len(names) == len( args_tuple ), "names should be same length as args_tuple" output_dicts = [] for i, _ in enumerate(args_tuple): output_dicts.append( { (Subkey(names[i], "init"),): args_tuple[i], PREV_KEY: ("init",), } ) return output_dicts
def init_step(idx, cols)
-
Helper function to find init suffix in a column
Parameters
idx
:int
- Index of 'init' column in cols.
cols
:list[str]
- List of column names.
Expand source code
def init_step(idx, cols): """Helper function to find init suffix in a column Parameters ---------- idx: int Index of 'init' column in cols. cols: list[str] List of column names. """ for i in range(idx, len(cols)): if cols[i] != "init": return "init-" + cols[i] return None
def perturbation_stats(data: Union[pandas.core.frame.DataFrame, dict], *group_by: str, wrt: str = 'out', func=None, prefix: str = None, split: bool = False)
-
Compute statistics for
wrt
indata
, conditional ongroup_by
Parameters
data
:Union[pandas.DataFrame, dict]
- DataFrame, as from calling
dict_to_df()
on an output dict from a Vset, or the output dict itself. *group_by
:str
- Vset names in
data
to group on. If none provided, treats everything as one big group. wrt
:str (optional)
- Column name in
data
ordict_to_df()(data)
on which to compute statistics. Defaults to'out'
, the values of the original Vset output dict. func
:function, str, list
ordict (optional)
, defaultNone
- A list of functions or function names to use for computing
statistics, analogous to the parameter of the same name in
pandas.core.groupby.DataFrameGroupBy.aggregate. If
None
, defaults to['count', 'mean', 'std']
. prefix
:str (optional)
, defaultNone
- A string to prefix to new columns in output DataFrame. If
None
, uses the value ofwrt
. split
:bool (optional)
, defaultFalse
- If
True
andwrt
indata
haslist
ornumpy.ndarray
entries, will attempt to split the entries into multiple columns for the output.
Returns
df
:pandas.DataFrame
- A DataFrame with summary statistics on
wrt
.
Expand source code
def perturbation_stats( data: Union[pd.DataFrame, dict], *group_by: str, wrt: str = "out", func=None, prefix: str = None, split: bool = False, ): """Compute statistics for `wrt` in `data`, conditional on `group_by` Parameters ---------- data: Union[pandas.DataFrame, dict] DataFrame, as from calling `dict_to_df` on an output dict from a Vset, or the output dict itself. *group_by: str Vset names in `data` to group on. If none provided, treats everything as one big group. wrt: str (optional) Column name in `data` or `dict_to_df(data)` on which to compute statistics. Defaults to `'out'`, the values of the original Vset output dict. func: function, str, list or dict (optional), default None A list of functions or function names to use for computing statistics, analogous to the parameter of the same name in pandas.core.groupby.DataFrameGroupBy.aggregate. If `None`, defaults to `['count', 'mean', 'std']`. prefix: str (optional), default None A string to prefix to new columns in output DataFrame. If `None`, uses the value of `wrt`. split: bool (optional), default False If `True` and `wrt` in `data` has `list` or `numpy.ndarray` entries, will attempt to split the entries into multiple columns for the output. Returns ------- df: pandas.DataFrame A DataFrame with summary statistics on `wrt`. """ if func is None: func = ["count", "mean", "std"] if prefix is None: prefix = wrt if isinstance(data, dict): df = dict_to_df(data) else: df = data group_by = list(group_by) if len(group_by) > 0: gb = df.groupby(group_by)[wrt] else: gb = df.groupby(lambda x: True)[wrt] if (isinstance(func, list) and "mean" in func or "std" in func) and ( type(df[wrt].iloc[0]) in [list, np.ndarray] ): wrt_arrays = [ np.stack(d.tolist()) for d in (gb.get_group(grp) for grp in gb.groups) ] n_cols = wrt_arrays[0].shape[1] df_out = pd.DataFrame(gb.agg("count")) df_out.columns = [f"{prefix}-count"] if "mean" in func: if split: col_means = [arr.mean(axis=0) for arr in wrt_arrays] wrt_means = pd.DataFrame( col_means, columns=[f"{prefix}{i}-mean" for i in range(n_cols)], index=gb.groups.keys(), ) else: col_means = [{f"{prefix}-mean": arr.mean(axis=0)} for arr in wrt_arrays] wrt_means = pd.DataFrame(col_means, index=gb.groups.keys()) wrt_means.index.names = df_out.index.names df_out = df_out.join(wrt_means) if "std" in func: if split: col_stds = [arr.std(axis=0, ddof=1) for arr in wrt_arrays] wrt_stds = pd.DataFrame( col_stds, columns=[f"{prefix}{i}-std" for i in range(n_cols)], index=gb.groups.keys(), ) else: col_stds = [ {f"{prefix}-std": arr.std(axis=0, ddof=1)} for arr in wrt_arrays ] wrt_stds = pd.DataFrame(col_stds, index=gb.groups.keys()) wrt_stds.index.names = df_out.index.names df_out = df_out.join(wrt_stds) if "count" not in func: df_out = df_out.drop(f"{prefix}-count") else: df_out = gb.agg(func) df_out = df_out.reindex(sorted(df_out.columns), axis=1) df_out.reset_index(inplace=True) if len(group_by) > 0: return df_out.sort_values(group_by[0]) return df_out
def sep_dicts(d: dict, n_out: int = 1, keys=None)
-
Converts dictionary with value being saved as an iterable into multiple dictionaries
Assumes every value has same length n_out
Parameters
d
:dict
- Dictionary with iterable values to be converted.
n_out
:int
, default1
- The number of dictionaries to separate d into.
keys
:list-like
, defaultNone
- Optional list of keys to use in output dicts.
Returns
sep_dicts_list
:list
- List of seperated dictionaries.
Examples
>>> sep_dicts({k1: (x1, y1), k2: (x2, y2), ..., '__prev__': p}) [{k1: x1, k2: x2, ..., '__prev__': p}, {k1: y1, k2: y2, ..., '__prev__': p}]
Expand source code
def sep_dicts(d: dict, n_out: int = 1, keys=None): """Converts dictionary with value being saved as an iterable into multiple dictionaries Assumes every value has same length n_out Parameters ---------- d: dict Dictionary with iterable values to be converted. n_out: int, default 1 The number of dictionaries to separate d into. keys: list-like, default None Optional list of keys to use in output dicts. Returns ------- sep_dicts_list: list List of seperated dictionaries. Examples -------- >>> sep_dicts({k1: (x1, y1), k2: (x2, y2), ..., '__prev__': p}) [{k1: x1, k2: x2, ..., '__prev__': p}, {k1: y1, k2: y2, ..., '__prev__': p}] """ if keys is None: keys = [] if len(keys) > 0 and len(keys) != n_out: raise ValueError(f"keys should be empty or have length n_out={n_out}") # empty dict -- return empty dict if n_out <= 1: return d # try separating dict into multiple dicts sep_dicts_id = str(uuid4()) # w/ high prob, uuid4 is unique sep_dicts_list = [{} for _ in range(n_out)] for key, value in d.items(): if key != PREV_KEY: for i in range(n_out): # assumes the correct sub-key for item i is in the i-th position if len(keys) == 0: new_key = (key[i],) + key[n_out:] else: new_sub = Subkey( value=keys[i], origin=key[-1].origin + "-" + str(i) ) new_key = (new_sub,) + key new_key[-1].sep_dicts_id = sep_dicts_id if isinstance(value, VfuncPromise): # return a promise to get the value at index i of the # original promise value_i = VfuncPromise(lambda v, x: v[x], value, i) else: value_i = value[i] sep_dicts_list[i][new_key] = value_i return sep_dicts_list
def to_list(tup: tuple)
-
Convert from tuple to packed list
Allows us to call function with arguments in a loop
Parameters
tup
:tuple
- tuple of objects to convert to packed list
Raises
ValueError
- If passed uneven number of arguments without a list. Please wrap your args in a list.
Examples
>>> to_list(([x1, x2, x3], [y1, y2, y3])) [[x1, y1], [x2, y2], [x3, y3]] >>> to_list(([x1], [y1])) [[x1, y1]] >>> to_list(([x1, x2, x3], )) [[x1], [x2], [x3]] >>> to_list((x1, )) [[x1]] >>> to_list((x1, y1)) [[x1, y1]] >>> to_list((x1, x2, x3, y1, y2, y3)) [[x1, y1], [x2, y2], [x3, y3]]
Expand source code
def to_list(tup: tuple): """Convert from tuple to packed list Allows us to call function with arguments in a loop Parameters ---------- tup: tuple tuple of objects to convert to packed list Raises ------ ValueError If passed uneven number of arguments without a list. Please wrap your args in a list. Examples -------- >>> to_list(([x1, x2, x3], [y1, y2, y3])) [[x1, y1], [x2, y2], [x3, y3]] >>> to_list(([x1], [y1])) [[x1, y1]] >>> to_list(([x1, x2, x3], )) [[x1], [x2], [x3]] >>> to_list((x1, )) [[x1]] >>> to_list((x1, y1)) [[x1, y1]] >>> to_list((x1, x2, x3, y1, y2, y3)) [[x1, y1], [x2, y2], [x3, y3]] """ n_tup = len(tup) if n_tup == 0: return [] if not isinstance(tup[0], list): # the first element is data if n_tup == 1: return [list(tup)] if n_tup % 2 != 0: raise ValueError( "Don't know how to handle uneven number of args " "without a list. Please wrap your args in a list." ) # assume first half of args is input and second half is outcome return [list(el) for el in zip(tup[: (n_tup // 2)], tup[(n_tup // 2) :])] if n_tup == 1: return [[x] for x in tup[0]] n_mods = len(tup[0]) lists_packed = [[] for _ in range(n_mods)] for i in range(n_mods): for j in range(n_tup): lists_packed[i].append(tup[j][i]) return lists_packed
def to_tuple(lists: list)
-
Convert from lists to unpacked tuple
Allows us to write
X, y = to_tuple([[x1, y1], [x2, y2], [x3, y3]])
Parameters
lists
:list
- list of objects to convert to unpacked tuple
Examples
>>> to_tuple([[x1, y1], [x2, y2], [x3, y3]]) ([x1, x2, x3], [y1, y2, y3]) >>> to_tuple([[x1, y1]]) ([x1], [y1]) >>> to_tuple([m1, m2, m3]) [m1, m2, m3]
Expand source code
def to_tuple(lists: list): """Convert from lists to unpacked tuple Allows us to write `X, y = to_tuple([[x1, y1], [x2, y2], [x3, y3]])` Parameters ---------- lists: list list of objects to convert to unpacked tuple Examples -------- >>> to_tuple([[x1, y1], [x2, y2], [x3, y3]]) ([x1, x2, x3], [y1, y2, y3]) >>> to_tuple([[x1, y1]]) ([x1], [y1]) >>> to_tuple([m1, m2, m3]) [m1, m2, m3] """ n_mods = len(lists) if n_mods <= 1: return lists if not isinstance(lists[0], list): return lists n_tup = len(lists[0]) tup = [[] for _ in range(n_tup)] for i in range(n_mods): for j in range(n_tup): tup[j].append(lists[i][j]) return tuple(tup)
Classes
class AsyncVfunc (name: str = '', vfunc=<function AsyncVfunc.<lambda>>)
-
An asynchronous version of the Vfunc class.
Expand source code
class AsyncVfunc: """An asynchronous version of the Vfunc class.""" def __init__(self, name: str = "", vfunc=lambda x: x): self.name = name if isinstance(vfunc, Vfunc): self.vfunc = vfunc.vfunc else: assert hasattr(vfunc, "fit") or callable( vfunc ), "vfunc must be an object with a fit method or a callable" self.vfunc = vfunc def fit(self, *args, **kwargs): """This function fits params for this vfunc""" if hasattr(self.vfunc, "fit"): return _remote_fun.remote(self.vfunc.fit, *args, **kwargs) return _remote_fun.remote(self.vfunc, *args, **kwargs) def transform(self, *args, **kwargs): """This function transforms its input in some way""" if hasattr(self.vfunc, "transform"): return _remote_fun.remote(self.vfunc.transform, *args, **kwargs) return _remote_fun.remote(self.vfunc, *args, **kwargs) def __call__(self, *args, **kwargs): """This should decide what to call""" return self.fit(*args, **kwargs)
Methods
def fit(self, *args, **kwargs)
-
This function fits params for this vfunc
Expand source code
def fit(self, *args, **kwargs): """This function fits params for this vfunc""" if hasattr(self.vfunc, "fit"): return _remote_fun.remote(self.vfunc.fit, *args, **kwargs) return _remote_fun.remote(self.vfunc, *args, **kwargs)
def transform(self, *args, **kwargs)
-
This function transforms its input in some way
Expand source code
def transform(self, *args, **kwargs): """This function transforms its input in some way""" if hasattr(self.vfunc, "transform"): return _remote_fun.remote(self.vfunc.transform, *args, **kwargs) return _remote_fun.remote(self.vfunc, *args, **kwargs)
class PCSPipeline (steps=None, cache_dir=None)
-
Parameters
steps
:list
- a list of Vset instances
cache_dir
:str
, default=None
- The directory to use as data store by
joblib
. If None, won't do caching.
Expand source code
class PCSPipeline: def __init__(self, steps=None, cache_dir=None): """ Parameters ---------- steps: list a list of Vset instances cache_dir: str, default=None The directory to use as data store by `joblib`. If None, won't do caching. """ if steps is None: steps = [] self.steps = steps # set up the cache self.memory = joblib.Memory(location=cache_dir) def run(self, *args, **kwargs): """Runs the pipeline""" run_step_cached = self.memory.cache(_run_step) for i, step in enumerate(self.steps): try: step_name = step.name except AttributeError: step_name = f"Step {i}" print(step_name) _, fitted_step = run_step_cached(step, *args, **kwargs) self.steps[i] = fitted_step def __getitem__(self, i): """Accesses ith step of pipeline""" return self.steps[i] def __len__(self): return len(self.steps) def generate_names(self, as_pandas=True): name_lists = [] if as_pandas: for step in self.steps: name_lists.append([f"{i}_{str(mod)[:8]}" for i, mod in enumerate(step)]) indexes = list(itertools.product(*name_lists)) return pd.DataFrame(indexes, columns=[step.name for step in self.steps]) for step in self.steps: name_lists.append( [f"{step.name}_{i}_{str(mod)[:8]}" for i, mod in enumerate(step)] ) return list(itertools.product(*name_lists))
Methods
def generate_names(self, as_pandas=True)
-
Expand source code
def generate_names(self, as_pandas=True): name_lists = [] if as_pandas: for step in self.steps: name_lists.append([f"{i}_{str(mod)[:8]}" for i, mod in enumerate(step)]) indexes = list(itertools.product(*name_lists)) return pd.DataFrame(indexes, columns=[step.name for step in self.steps]) for step in self.steps: name_lists.append( [f"{step.name}_{i}_{str(mod)[:8]}" for i, mod in enumerate(step)] ) return list(itertools.product(*name_lists))
def run(self, *args, **kwargs)
-
Runs the pipeline
Expand source code
def run(self, *args, **kwargs): """Runs the pipeline""" run_step_cached = self.memory.cache(_run_step) for i, step in enumerate(self.steps): try: step_name = step.name except AttributeError: step_name = f"Step {i}" print(step_name) _, fitted_step = run_step_cached(step, *args, **kwargs) self.steps[i] = fitted_step
class Subkey (value, origin: str, output_matching: bool = False)
-
Parameters
value
:Any
- subkey value corresponding to a Vset vfunc
origin
:str
- name of the origin Vset of this Subkey
output_matching
:bool (optional)
, defaultFalse
- inherited from the Vset where the Subkey is created
Expand source code
class Subkey: def __init__(self, value, origin: str, output_matching: bool = False): """ Parameters ---------- value: Any subkey value corresponding to a Vset vfunc origin: str name of the origin Vset of this Subkey output_matching: bool (optional), default False inherited from the Vset where the Subkey is created """ self.value = value self.origin = origin self.output_matching = output_matching # sep_dicts_id identifies the particular call to sep_dicts() that this # key's dictionary went through (if any). self.sep_dicts_id = None def is_matching(self): """Checks if subkey should be matched in other Vsets""" return self.output_matching or self.sep_dicts_id is not None def matches_sep_dict_id(self, other: object): """Helper to match Subkey by _sep_dict_id""" if isinstance(other, self.__class__): return ( self.sep_dicts_id is not None and self.sep_dicts_id == other.sep_dicts_id ) return False def matches(self, other: object): """When Subkey matching is required, determines if this Subkey is compatible with another, meaning that the origins and values match, and either the _sep_dicts_id matches or both Subkeys have _output_matching True. """ if isinstance(other, self.__class__): # they're both matching cond0 = self.is_matching() and other.is_matching() # value and origins match cond1 = self.value == other.value and self.origin == other.origin # sep_dicts_id matches cond2 = self.sep_dicts_id == other.sep_dicts_id or ( self.output_matching and other.output_matching ) return cond0 and cond1 and cond2 return False def mismatches(self, other: object): """When Subkey matching is required, determines if this Subkey and another are a bad match, meaning either: 1. output_matching is True, origin is same, value is different 2. output_matching is False, sep_dicts_id is same and not None, origin is same, value is different """ if isinstance(other, self.__class__): # one of the two keys is output_matching cond0 = self.output_matching or other.output_matching # neither key is output_matching but sep_dict_ids not None and match cond1 = not cond0 and self.matches_sep_dict_id(other) # origins match and values mismatch cond2 = self.origin == other.origin and self.value != other.value return (cond0 or cond1) and cond2 return True def __eq__(self, other: object): """Mainly used for testing purposes.""" if isinstance(other, self.__class__): # value and origins match return self.value == other.value and self.origin == other.origin return False def __repr__(self): return str(self.value) def __hash__(self): """Mainly used for testing purposes.""" return hash(self.value) ^ hash(self.origin) ^ hash(self.output_matching)
Methods
def is_matching(self)
-
Checks if subkey should be matched in other Vsets
Expand source code
def is_matching(self): """Checks if subkey should be matched in other Vsets""" return self.output_matching or self.sep_dicts_id is not None
def matches(self, other: object)
-
When Subkey matching is required, determines if this Subkey is compatible with another, meaning that the origins and values match, and either the _sep_dicts_id matches or both Subkeys have _output_matching True.
Expand source code
def matches(self, other: object): """When Subkey matching is required, determines if this Subkey is compatible with another, meaning that the origins and values match, and either the _sep_dicts_id matches or both Subkeys have _output_matching True. """ if isinstance(other, self.__class__): # they're both matching cond0 = self.is_matching() and other.is_matching() # value and origins match cond1 = self.value == other.value and self.origin == other.origin # sep_dicts_id matches cond2 = self.sep_dicts_id == other.sep_dicts_id or ( self.output_matching and other.output_matching ) return cond0 and cond1 and cond2 return False
def matches_sep_dict_id(self, other: object)
-
Helper to match Subkey by _sep_dict_id
Expand source code
def matches_sep_dict_id(self, other: object): """Helper to match Subkey by _sep_dict_id""" if isinstance(other, self.__class__): return ( self.sep_dicts_id is not None and self.sep_dicts_id == other.sep_dicts_id ) return False
def mismatches(self, other: object)
-
When Subkey matching is required, determines if this Subkey and another are a bad match, meaning either:
- output_matching is True, origin is same, value is different
- output_matching is False, sep_dicts_id is same and not None, origin is same, value is different
Expand source code
def mismatches(self, other: object): """When Subkey matching is required, determines if this Subkey and another are a bad match, meaning either: 1. output_matching is True, origin is same, value is different 2. output_matching is False, sep_dicts_id is same and not None, origin is same, value is different """ if isinstance(other, self.__class__): # one of the two keys is output_matching cond0 = self.output_matching or other.output_matching # neither key is output_matching but sep_dict_ids not None and match cond1 = not cond0 and self.matches_sep_dict_id(other) # origins match and values mismatch cond2 = self.origin == other.origin and self.value != other.value return (cond0 or cond1) and cond2 return True
class Vfunc (name: str = '', vfunc=<function Vfunc.<lambda>>)
-
Vfunc is basically a function along with a name attribute. It may support a "fit" function, but may also just have a "transform" function. If none of these is supported, it need only be a function
Expand source code
class Vfunc: """Vfunc is basically a function along with a name attribute. It may support a "fit" function, but may also just have a "transform" function. If none of these is supported, it need only be a function """ def __init__(self, name: str = "", vfunc=lambda x: x): assert hasattr(vfunc, "fit") or callable( vfunc ), "vfunc must be an object with a fit method or a callable" self.name = name self.vfunc = vfunc def fit(self, *args, **kwargs): """This function fits params for this vfunc""" if hasattr(self.vfunc, "fit"): return self.vfunc.fit(*args, **kwargs) return self.vfunc(*args, **kwargs) def transform(self, *args, **kwargs): """This function transforms its input in some way""" if hasattr(self.vfunc, "transform"): return self.vfunc.transform(*args, **kwargs) return self.vfunc(*args, **kwargs) def __call__(self, *args, **kwargs): """This should decide what to call""" return self.fit(*args, **kwargs)
Methods
def fit(self, *args, **kwargs)
-
This function fits params for this vfunc
Expand source code
def fit(self, *args, **kwargs): """This function fits params for this vfunc""" if hasattr(self.vfunc, "fit"): return self.vfunc.fit(*args, **kwargs) return self.vfunc(*args, **kwargs)
def transform(self, *args, **kwargs)
-
This function transforms its input in some way
Expand source code
def transform(self, *args, **kwargs): """This function transforms its input in some way""" if hasattr(self.vfunc, "transform"): return self.vfunc.transform(*args, **kwargs) return self.vfunc(*args, **kwargs)
class VfuncPromise (vfunc:
, *args) -
A Vfunc promise used for lazy evaluation.
Expand source code
class VfuncPromise: """A Vfunc promise used for lazy evaluation.""" def __init__(self, vfunc: callable, *args): self.vfunc = vfunc self.args = args self.called = False self.value = None def __call__(self): """This should decide what to call""" if self.called: return self.value tmp_args = [] for i, arg in enumerate(self.args): tmp_args.append(arg) while isinstance(tmp_args[i], VfuncPromise): tmp_args[i] = tmp_args[i]() while isinstance(self.vfunc, VfuncPromise): self.vfunc = self.vfunc() self.value = self.vfunc(*tmp_args) self.called = True return self.value def _get_value(self): if isinstance(self(), ray.ObjectRef): self.value = ray.get(self.value) return self.value def transform(self, *args): """This function transforms its input in some way""" return self._get_value().transform(*args) def predict(self, *args): """This function calls predict on its inputs""" return self._get_value().predict(*args) def predict_proba(self, *args): """This function calls predict_proba on its inputs""" return self._get_value().predict_proba(*args) def __repr__(self): if self.called: return f"Fulfilled VfuncPromise({self.value})" return f"Unfulfilled VfuncPromise(func={self.vfunc}, args={self.args})"
Methods
def predict(self, *args)
-
This function calls predict on its inputs
Expand source code
def predict(self, *args): """This function calls predict on its inputs""" return self._get_value().predict(*args)
def predict_proba(self, *args)
-
This function calls predict_proba on its inputs
Expand source code
def predict_proba(self, *args): """This function calls predict_proba on its inputs""" return self._get_value().predict_proba(*args)
def transform(self, *args)
-
This function transforms its input in some way
Expand source code
def transform(self, *args): """This function transforms its input in some way""" return self._get_value().transform(*args)
class Vset (name: str, vfuncs, vfunc_keys: list = None, is_async: bool = False, output_matching: bool = False, lazy: bool = False, cache_dir: str = None, tracking_dir: str = None)
-
Parameters
name
:str
- Name of this Vset.
vfuncs
:list
ordict
- Dictionary of functions that we want to associate with
vfunc_keys
:list (optional)
- List of names corresponding to each vfunc
is_async
:bool (optional)
- If True,
vfuncs
are computed asynchronously output_matching
:bool (optional)
- If True, then output keys from this Vset will be matched when used in other Vsets
lazy
:bool (optional)
- If True, then vfuncs are evaluated lazily, i.e. outputs are
vset.vfunc.VfuncPromise
cache_dir
:str (optional)
- If provided, do caching and use
cache_dir
as the data store forjoblib.Memory
. tracking_dir
:str (optional)
- If provided, use the
mlflow.tracking
api to log outputs as metrics with params determined by input keys.
Expand source code
class Vset: def __init__( self, name: str, vfuncs, vfunc_keys: list = None, is_async: bool = False, output_matching: bool = False, lazy: bool = False, cache_dir: str = None, tracking_dir: str = None, ): """ Parameters ---------- name: str Name of this Vset. vfuncs: list or dict Dictionary of functions that we want to associate with vfunc_keys: list (optional) List of names corresponding to each vfunc is_async: bool (optional) If True, `vfuncs` are computed asynchronously output_matching: bool (optional) If True, then output keys from this Vset will be matched when used in other Vsets lazy: bool (optional) If True, then vfuncs are evaluated lazily, i.e. outputs are `vset.vfunc.VfuncPromise` cache_dir: str (optional) If provided, do caching and use `cache_dir` as the data store for `joblib.Memory`. tracking_dir: str (optional) If provided, use the `mlflow.tracking` api to log outputs as metrics with params determined by input keys. """ self.name = name self._fitted = False self.fitted_vfuncs = None # outputs self._async = is_async self._output_matching = output_matching self._lazy = lazy self._cache_dir = cache_dir self._memory = joblib.Memory(self._cache_dir) if tracking_dir is not None: self._mlflow = MlflowClient(tracking_uri=tracking_dir) experiment = self._mlflow.get_experiment_by_name(name=self.name) if experiment is None: self._exp_id = self._mlflow.create_experiment(name=self.name) else: self._exp_id = experiment.experiment_id else: self._mlflow = None # check if any of the vfuncs are AsyncVfuncs # if so, we'll make then all AsyncVfuncs later on if not self._async and np.any([isinstance(vf, AsyncVfunc) for vf in vfuncs]): self._async = True if isinstance(vfuncs, dict): self.vfuncs = vfuncs elif isinstance(vfuncs, list): if vfunc_keys is not None: assert isinstance( vfunc_keys, list ), "vfuncs passed as list but vfunc_keys is not a list" assert len(vfuncs) == len( vfunc_keys ), "vfuncs list and vfunc_keys list do not have the same length" # TODO: how best to handle tuple subkeys? vfunc_keys = [(self.__create_subkey(k),) for k in vfunc_keys] else: vfunc_keys = [ (self.__create_subkey(f"{name}_{i}"),) for i in range(len(vfuncs)) ] # convert vfunc keys to singleton tuples self.vfuncs = dict(zip(vfunc_keys, vfuncs)) # if needed, wrap the vfuncs in the Vfunc or AsyncVfunc class for k, v in self.vfuncs.items(): if self._async: if not isinstance(v, AsyncVfunc): self.vfuncs[k] = AsyncVfunc(k[0], v) elif not isinstance(v, Vfunc): self.vfuncs[k] = Vfunc(k[0], v) def _apply_func(self, *args, out_dict: dict = None): """Apply functions in out_dict to combined args dict Optionally logs output Subkeys and values as params and metrics using `mlflow.tracking` if this Vset has a `_tracking_dir`. Parameters ---------- *args: dict Takes multiple dicts and combines them into one. Then runs vfuncs on each item in combined dict. out_dict: dict (optional), default None The dictionary to pass to the matching function. If None, defaults to self.vfuncs. Returns ------- out_dict: dict Dictionary with items being determined by functions in vfunc set. Functions and input dictionaries are currently matched using a cartesian matching format. Examples -------- >>> vfuncs, data = {LR : logistic}, {train_1 : [X1,y1], train2 : [X2,y2]} {(train_1, LR) : fitted logistic, (train_2, LR) : fitted logistic} """ if out_dict is None: out_dict = deepcopy(self.vfuncs) apply_func_cached = self._memory.cache(_apply_func_cached) out_dict = apply_func_cached(out_dict, self._async, self._lazy, *args) prev = tuple() for arg in args: if PREV_KEY in arg: prev += (arg[PREV_KEY],) out_dict[PREV_KEY] = (self,) + prev if self._mlflow is not None: run_dict = {} # log subkeys as params and value as metric for k, v in out_dict.items(): if k == PREV_KEY: continue origins = np.array([subk.origin for subk in k]) # ignore init origins and the last origin (this Vset) param_idx = [i for i in range(len(k[:-1])) if origins[i] != "init"] # get or create mlflow run run_dict_key = tuple(subk.value for subk in k[:-1]) if run_dict_key in run_dict: run_id = run_dict[run_dict_key] else: run = self._mlflow.create_run(self._exp_id) run_id = run.info.run_id run_dict[run_dict_key] = run_id # log params for idx in param_idx: subkey = k[idx] param_name = subkey.origin # check if the origin occurs multiple times if np.sum(origins == param_name) > 1: occurence = np.sum(origins[:idx] == param_name) param_name = param_name + str(occurence) self._mlflow.log_param(run_id, param_name, subkey.value) self._mlflow.log_metric(run_id, k[-1].value, v) return out_dict def fit(self, *args): """Fits to args using `_apply_func`""" out_dict = {} for k, v in self.vfuncs.items(): out_dict[k] = v.fit self.fitted_vfuncs = self._apply_func(*args, out_dict=out_dict) prev = self.fitted_vfuncs[PREV_KEY][1:] if hasattr(self, FILTER_PREV_KEY): prev = getattr(self, FILTER_PREV_KEY) + prev setattr(self, PREV_KEY, prev) self._fitted = True return self def fit_transform(self, *args): """Fits to args and transforms only the first arg.""" return self.fit(*args).transform(args[0]) def transform(self, *args): """Transforms args using `_apply_func`""" if not self._fitted: raise AttributeError( "Please fit the Vset object before calling the transform method." ) out_dict = {} for k, v in self.fitted_vfuncs.items(): if hasattr(v, "transform"): out_dict[k] = v.transform return self._apply_func(*args, out_dict=out_dict) def predict(self, *args, with_uncertainty: bool = False, group_by: list = None): """Predicts args using `_apply_func`""" if not self._fitted: raise AttributeError("Please fit the Vset object before calling predict.") pred_dict = {} for k, v in self.fitted_vfuncs.items(): if hasattr(v, "predict"): pred_dict[k] = v.predict preds = self._apply_func(*args, out_dict=pred_dict) if with_uncertainty: return prediction_uncertainty(preds, group_by) return preds def predict_proba( self, *args, with_uncertainty: bool = False, group_by: list = None ): """Calls predict_proba on args using `_apply_func`""" if not self._fitted: raise AttributeError( "Please fit the Vset object before calling predict_proba." ) pred_dict = {} for k, v in self.fitted_vfuncs.items(): if hasattr(v, "predict_proba"): pred_dict[k] = v.predict_proba preds = self._apply_func(*args, out_dict=pred_dict) if with_uncertainty: return prediction_uncertainty(preds, group_by) return preds def evaluate(self, *args): """Combines dicts before calling `_apply_func`""" return self._apply_func(*args) def __call__(self, *args, n_out: int = None, keys=None, **kwargs): """Call args using `_apply_func`, optionally seperating output dictionary into `n_out` dictionaries with `keys` """ if keys is None: keys = [] if n_out is None: n_out = len(args) out_dict = self._apply_func(*args) if n_out == 1: return out_dict out_dicts = sep_dicts(out_dict, n_out=n_out, keys=keys) # add back prev prev = out_dict[PREV_KEY] for i in range(n_out): if n_out == len(args): out_dicts[i][PREV_KEY] = (prev[0],) + (prev[i + 1],) else: out_dicts[i][PREV_KEY] = prev return out_dicts def __getitem__(self, i): """Accesses ith item in the vfunc set""" return self.vfuncs[i] def __contains__(self, key): """Returns true if vfuncs is a dict and key is one of its keys""" if isinstance(self.vfuncs, dict): return key in self.vfuncs.keys() return False def keys(self): """Returns Vset vfunc keys""" if isinstance(self.vfuncs, dict): return self.vfuncs.keys() return {}.keys() def __len__(self): return len(self.vfuncs) def __str__(self): return "Vset(" + self.name + ")" def __create_subkey(self, value): """Helper function to construct `Subkey` with this Vset determining origin and output_matching """ return Subkey(value, self.name, self._output_matching)
Methods
def evaluate(self, *args)
-
Combines dicts before calling
_apply_func
Expand source code
def evaluate(self, *args): """Combines dicts before calling `_apply_func`""" return self._apply_func(*args)
def fit(self, *args)
-
Fits to args using
_apply_func
Expand source code
def fit(self, *args): """Fits to args using `_apply_func`""" out_dict = {} for k, v in self.vfuncs.items(): out_dict[k] = v.fit self.fitted_vfuncs = self._apply_func(*args, out_dict=out_dict) prev = self.fitted_vfuncs[PREV_KEY][1:] if hasattr(self, FILTER_PREV_KEY): prev = getattr(self, FILTER_PREV_KEY) + prev setattr(self, PREV_KEY, prev) self._fitted = True return self
def fit_transform(self, *args)
-
Fits to args and transforms only the first arg.
Expand source code
def fit_transform(self, *args): """Fits to args and transforms only the first arg.""" return self.fit(*args).transform(args[0])
def keys(self)
-
Returns Vset vfunc keys
Expand source code
def keys(self): """Returns Vset vfunc keys""" if isinstance(self.vfuncs, dict): return self.vfuncs.keys() return {}.keys()
def predict(self, *args, with_uncertainty: bool = False, group_by: list = None)
-
Predicts args using
_apply_func
Expand source code
def predict(self, *args, with_uncertainty: bool = False, group_by: list = None): """Predicts args using `_apply_func`""" if not self._fitted: raise AttributeError("Please fit the Vset object before calling predict.") pred_dict = {} for k, v in self.fitted_vfuncs.items(): if hasattr(v, "predict"): pred_dict[k] = v.predict preds = self._apply_func(*args, out_dict=pred_dict) if with_uncertainty: return prediction_uncertainty(preds, group_by) return preds
def predict_proba(self, *args, with_uncertainty: bool = False, group_by: list = None)
-
Calls predict_proba on args using
_apply_func
Expand source code
def predict_proba( self, *args, with_uncertainty: bool = False, group_by: list = None ): """Calls predict_proba on args using `_apply_func`""" if not self._fitted: raise AttributeError( "Please fit the Vset object before calling predict_proba." ) pred_dict = {} for k, v in self.fitted_vfuncs.items(): if hasattr(v, "predict_proba"): pred_dict[k] = v.predict_proba preds = self._apply_func(*args, out_dict=pred_dict) if with_uncertainty: return prediction_uncertainty(preds, group_by) return preds
def transform(self, *args)
-
Transforms args using
_apply_func
Expand source code
def transform(self, *args): """Transforms args using `_apply_func`""" if not self._fitted: raise AttributeError( "Please fit the Vset object before calling the transform method." ) out_dict = {} for k, v in self.fitted_vfuncs.items(): if hasattr(v, "transform"): out_dict[k] = v.transform return self._apply_func(*args, out_dict=out_dict)