Class that stores the entire pipeline of steps in a data-science workflow
Expand source code
"""Class that stores the entire pipeline of steps in a data-science workflow
"""
import itertools
import joblib
import networkx as nx
import pandas as pd
from vflow.vset import PREV_KEY
class PCSPipeline:
def __init__(self, steps=None, cache_dir=None):
"""
Parameters
----------
steps: list
a list of Vset instances
cache_dir: str, default=None
The directory to use as data store by `joblib`. If None, won't do
caching.
"""
if steps is None:
steps = []
self.steps = steps
# set up the cache
self.memory = joblib.Memory(location=cache_dir)
def run(self, *args, **kwargs):
"""Runs the pipeline"""
run_step_cached = self.memory.cache(_run_step)
for i, step in enumerate(self.steps):
try:
step_name = step.name
except AttributeError:
step_name = f"Step {i}"
print(step_name)
_, fitted_step = run_step_cached(step, *args, **kwargs)
self.steps[i] = fitted_step
def __getitem__(self, i):
"""Accesses ith step of pipeline"""
return self.steps[i]
def __len__(self):
return len(self.steps)
def generate_names(self, as_pandas=True):
name_lists = []
if as_pandas:
for step in self.steps:
name_lists.append([f"{i}_{str(mod)[:8]}" for i, mod in enumerate(step)])
indexes = list(itertools.product(*name_lists))
return pd.DataFrame(indexes, columns=[step.name for step in self.steps])
for step in self.steps:
name_lists.append(
[f"{step.name}_{i}_{str(mod)[:8]}" for i, mod in enumerate(step)]
)
return list(itertools.product(*name_lists))
def build_graph(node, draw=True):
"""Helper function that just calls build_graph_recur with an empty graph
Parameters
----------
node: dict or Vset
Returns
-------
G: nx.Digraph()
"""
def unnest_node(node):
"""Unnest a node, if necessary (i.e., when node is a tuple)
Parameters
----------
node: str, dict, Vset, or tuple
Returns
-------
unnested_node: str, Vset, or None
"""
node_type = type(node)
if node_type is str or "Vset" in str(node_type):
return node
if node_type is tuple:
return unnest_node(node[0])
return None
def build_graph_recur(node, G):
"""Builds a graph up using __prev__ and PREV_KEY pointers
Parameters
----------
node: str, dict, Vset, or tuple
G: nx.Digraph()
Returns
-------
G: nx.Digraph()
"""
# base case: reached starting node
if isinstance(node, str):
return G
# initial case: starting at dict
if isinstance(node, dict):
s_node = "End"
nodes_prev = node[PREV_KEY]
G.add_edge(nodes_prev[0], s_node)
for node_prev in nodes_prev[1:]:
G.add_edge(unnest_node(node_prev), nodes_prev[0])
G = build_graph_recur(node_prev, G)
return G
# main case: at a vfuncset
if "Vset" in str(type(node)):
if hasattr(node, PREV_KEY):
nodes_prev = getattr(node, PREV_KEY)
for node_prev in nodes_prev:
G.add_edge(unnest_node(node_prev), node)
G = build_graph_recur(node_prev, G)
return G
# nested prev key case
if isinstance(node, tuple):
func_node = unnest_node(node[0])
G = build_graph_recur(func_node, G)
for arg_node in node[1:]:
G.add_edge(unnest_node(arg_node), func_node)
G = build_graph_recur(arg_node, G)
return G
return G
G = nx.DiGraph()
G = build_graph_recur(node, G)
if draw:
nx.draw(G, with_labels=True, node_color="#CCCCCC")
return G
def _run_step(step, *args, **kwargs):
if step._fitted:
return step.vfuncs, step
outputs = step(*args, **kwargs)
return outputs, step
Functions
def build_graph(node, draw=True)
-
Helper function that just calls build_graph_recur with an empty graph
Parameters
node
:dict
orVset
Returns
G
:nx.Digraph()
Expand source code
def build_graph(node, draw=True): """Helper function that just calls build_graph_recur with an empty graph Parameters ---------- node: dict or Vset Returns ------- G: nx.Digraph() """ def unnest_node(node): """Unnest a node, if necessary (i.e., when node is a tuple) Parameters ---------- node: str, dict, Vset, or tuple Returns ------- unnested_node: str, Vset, or None """ node_type = type(node) if node_type is str or "Vset" in str(node_type): return node if node_type is tuple: return unnest_node(node[0]) return None def build_graph_recur(node, G): """Builds a graph up using __prev__ and PREV_KEY pointers Parameters ---------- node: str, dict, Vset, or tuple G: nx.Digraph() Returns ------- G: nx.Digraph() """ # base case: reached starting node if isinstance(node, str): return G # initial case: starting at dict if isinstance(node, dict): s_node = "End" nodes_prev = node[PREV_KEY] G.add_edge(nodes_prev[0], s_node) for node_prev in nodes_prev[1:]: G.add_edge(unnest_node(node_prev), nodes_prev[0]) G = build_graph_recur(node_prev, G) return G # main case: at a vfuncset if "Vset" in str(type(node)): if hasattr(node, PREV_KEY): nodes_prev = getattr(node, PREV_KEY) for node_prev in nodes_prev: G.add_edge(unnest_node(node_prev), node) G = build_graph_recur(node_prev, G) return G # nested prev key case if isinstance(node, tuple): func_node = unnest_node(node[0]) G = build_graph_recur(func_node, G) for arg_node in node[1:]: G.add_edge(unnest_node(arg_node), func_node) G = build_graph_recur(arg_node, G) return G return G G = nx.DiGraph() G = build_graph_recur(node, G) if draw: nx.draw(G, with_labels=True, node_color="#CCCCCC") return G
Classes
class PCSPipeline (steps=None, cache_dir=None)
-
Parameters
steps
:list
- a list of Vset instances
cache_dir
:str
, default=None
- The directory to use as data store by
joblib
. If None, won't do caching.
Expand source code
class PCSPipeline: def __init__(self, steps=None, cache_dir=None): """ Parameters ---------- steps: list a list of Vset instances cache_dir: str, default=None The directory to use as data store by `joblib`. If None, won't do caching. """ if steps is None: steps = [] self.steps = steps # set up the cache self.memory = joblib.Memory(location=cache_dir) def run(self, *args, **kwargs): """Runs the pipeline""" run_step_cached = self.memory.cache(_run_step) for i, step in enumerate(self.steps): try: step_name = step.name except AttributeError: step_name = f"Step {i}" print(step_name) _, fitted_step = run_step_cached(step, *args, **kwargs) self.steps[i] = fitted_step def __getitem__(self, i): """Accesses ith step of pipeline""" return self.steps[i] def __len__(self): return len(self.steps) def generate_names(self, as_pandas=True): name_lists = [] if as_pandas: for step in self.steps: name_lists.append([f"{i}_{str(mod)[:8]}" for i, mod in enumerate(step)]) indexes = list(itertools.product(*name_lists)) return pd.DataFrame(indexes, columns=[step.name for step in self.steps]) for step in self.steps: name_lists.append( [f"{step.name}_{i}_{str(mod)[:8]}" for i, mod in enumerate(step)] ) return list(itertools.product(*name_lists))
Methods
def generate_names(self, as_pandas=True)
-
Expand source code
def generate_names(self, as_pandas=True): name_lists = [] if as_pandas: for step in self.steps: name_lists.append([f"{i}_{str(mod)[:8]}" for i, mod in enumerate(step)]) indexes = list(itertools.product(*name_lists)) return pd.DataFrame(indexes, columns=[step.name for step in self.steps]) for step in self.steps: name_lists.append( [f"{step.name}_{i}_{str(mod)[:8]}" for i, mod in enumerate(step)] ) return list(itertools.product(*name_lists))
def run(self, *args, **kwargs)
-
Runs the pipeline
Expand source code
def run(self, *args, **kwargs): """Runs the pipeline""" run_step_cached = self.memory.cache(_run_step) for i, step in enumerate(self.steps): try: step_name = step.name except AttributeError: step_name = f"Step {i}" print(step_name) _, fitted_step = run_step_cached(step, *args, **kwargs) self.steps[i] = fitted_step