"""
Directed Acyclic Graphs (DAGs) may be used to construct complex workflows for
scikit-learn estimators. As the name suggests, data may only flow in one
direction and can't go back on itself to a previously run step.
"""
from collections import UserDict
from copy import deepcopy
from inspect import signature
from itertools import chain
from typing import Iterable
import networkx as nx
import numpy as np
from joblib import Parallel, delayed
from scipy.sparse import dok_matrix, issparse
from skdag.dag._render import DAGRenderer
from skdag.dag._utils import (
_format_output,
_in_notebook,
_is_pandas,
_is_passthrough,
_is_predictor,
_is_transformer,
_stack,
)
from sklearn.base import clone
from sklearn.exceptions import NotFittedError
from sklearn.utils import Bunch, _print_elapsed_time, _safe_indexing, deprecated
from sklearn.utils._tags import _safe_tags
from sklearn.utils.metaestimators import _BaseComposition, available_if
from sklearn.utils.validation import check_is_fitted, check_memory
__all__ = ["DAG", "DAGStep"]
def _get_columns(X, dep, cols, is_root, dep_is_passthrough, axis=1):
if callable(cols):
# sklearn.compose.make_column_selector
cols = cols(X)
if not is_root and not dep_is_passthrough:
# The DAG will prepend output columns with the step name, so add this in to any
# dep columns if missing. This helps keep user-provided deps readable.
if isinstance(cols, str):
cols = cols if cols.startswith(f"{dep}__") else f"{dep}__{cols}"
elif isinstance(cols, Iterable):
orig = cols
cols = []
for col in orig:
if isinstance(col, str):
cols.append(col if col.startswith(f"{dep}__") else f"{dep}__{col}")
else:
cols.append(col)
return _safe_indexing(X, cols, axis=axis)
def _stack_inputs(dag, X, node):
# For root nodes, the dependency is just the node name itself.
deps = {node.name: None} if node.is_root else node.deps
cols = [
_get_columns(
X[dep],
dep,
cols,
node.is_root,
_is_passthrough(dag.graph_.nodes[dep]["step"].estimator),
axis=1,
)
for dep, cols in deps.items()
]
to_stack = [
# If we sliced a single column from an input, reshape it to a 2d array.
col.reshape(-1, 1)
if col is not None and deps[dep] is not None and col.ndim < 2
else col
for col, dep in zip(cols, deps)
]
X_stacked = _stack(to_stack, axis=node.axis)
return X_stacked
def _leaf_estimators_have(attr, how="all"):
"""Check that leaves have `attr`.
Used together with `avaliable_if` in `DAG`."""
def check_leaves(self):
# raises `AttributeError` with all details if `attr` does not exist
failed = []
for leaf in self.leaves_:
try:
_is_passthrough(leaf.estimator) or getattr(leaf.estimator, attr)
except AttributeError:
failed.append(leaf.estimator)
if (how == "all" and failed) or (
how == "any" and len(failed) != len(self.leaves_)
):
raise AttributeError(
f"{', '.join([repr(type(est)) for est in failed])} "
f"object(s) has no attribute '{attr}'"
)
return True
return check_leaves
def _transform_one(transformer, X, weight, allow_predictor=True, **fit_params):
if _is_passthrough(transformer):
res = X
elif allow_predictor and not hasattr(transformer, "transform"):
for fn in ["predict_proba", "decision_function", "predict"]:
if hasattr(transformer, fn):
res = getattr(transformer, fn)(X)
if res.ndim < 2:
res = res.reshape(-1, 1)
break
else:
raise AttributeError(
f"'{type(transformer).__name__}' object has no attribute 'transform'"
)
else:
res = transformer.transform(X)
# if we have a weight for this transformer, multiply output
if weight is not None:
res = res * weight
return res
def _fit_transform_one(
transformer,
X,
y,
weight,
message_clsname="",
message=None,
allow_predictor=True,
**fit_params,
):
"""
Fits ``transformer`` to ``X`` and ``y``. The transformed result is returned
with the fitted transformer. If ``weight`` is not ``None``, the result will
be multiplied by ``weight``.
"""
with _print_elapsed_time(message_clsname, message):
failed = False
if _is_passthrough(transformer):
res = X
elif hasattr(transformer, "fit_transform"):
res = transformer.fit_transform(X, y, **fit_params)
elif hasattr(transformer, "transform"):
res = transformer.fit(X, y, **fit_params).transform(X)
elif allow_predictor:
for fn in ["predict_proba", "decision_function", "predict"]:
if hasattr(transformer, fn):
res = getattr(transformer.fit(X, y, **fit_params), fn)(X)
if res.ndim < 2:
res = res.reshape(-1, 1)
break
else:
failed = True
res = None
if res is not None and res.ndim < 2:
res = res.reshape(-1, 1)
else:
failed = True
if failed:
raise AttributeError(
f"'{type(transformer).__name__}' object has no attribute 'transform'"
)
if weight is not None:
res = res * weight
return res, transformer
def _parallel_fit(dag, step, Xin, Xs, y, fit_transform_fn, memory, **fit_params):
transformer = step.estimator
if step.deps:
X = _stack_inputs(dag, Xs, step)
else:
# For root nodes, the destination rather than the source is
# specified.
# X = Xin[step.name]
X = _stack_inputs(dag, Xin, step)
clsname = type(dag).__name__
with _print_elapsed_time(clsname, dag._log_message(step)):
if transformer is None or transformer == "passthrough":
Xt, fitted_transformer = X, transformer
else:
if hasattr(memory, "location") and memory.location is None:
# we do not clone when caching is disabled to
# preserve backward compatibility
cloned_transformer = transformer
else:
cloned_transformer = clone(transformer)
# Fit or load from cache the current transformer
Xt, fitted_transformer = fit_transform_fn(
cloned_transformer,
X,
y,
None,
message_clsname=clsname,
message=dag._log_message(step),
**fit_params,
)
Xt = _format_output(Xt, X, step)
return Xt, fitted_transformer
def _parallel_transform(dag, step, Xin, Xs, transform_fn, **fn_params):
transformer = step.estimator
if step.deps:
X = _stack_inputs(dag, Xs, step)
else:
# For root nodes, the destination rather than the source is
# specified.
X = _stack_inputs(dag, Xin, step)
# X = Xin[step.name]
clsname = type(dag).__name__
with _print_elapsed_time(clsname, dag._log_message(step)):
if transformer is None or transformer == "passthrough":
Xt = X
else:
# Fit or load from cache the current transformer
Xt = transform_fn(
transformer,
X,
None,
message_clsname=clsname,
message=dag._log_message(step),
**fn_params,
)
Xt = _format_output(Xt, X, step)
return Xt
def _parallel_fit_leaf(dag, leaf, Xts, y, **fit_params):
with _print_elapsed_time(type(dag).__name__, dag._log_message(leaf)):
if leaf.estimator == "passthrough":
fitted_estimator = leaf.estimator
else:
Xt = _stack_inputs(dag, Xts, leaf)
fitted_estimator = leaf.estimator.fit(Xt, y, **fit_params)
return fitted_estimator
def _parallel_execute(
dag, leaf, fn, Xts, y=None, fit_first=False, fit_params=None, fn_params=None
):
with _print_elapsed_time("DAG", dag._log_message(leaf)):
Xt = _stack_inputs(dag, Xts, leaf)
fit_params = fit_params or {}
fn_params = fn_params or {}
if leaf.estimator == "passthrough":
Xout = Xt
elif fit_first and hasattr(leaf.estimator, f"fit_{fn}"):
Xout = getattr(leaf.estimator, f"fit_{fn}")(Xt, y, **fit_params)
else:
if fit_first:
leaf.estimator.fit(Xt, y, **fit_params)
est_fn = getattr(leaf.estimator, fn)
if "y" in signature(est_fn).parameters:
Xout = est_fn(Xt, y=y, **fn_params)
else:
Xout = est_fn(Xt, **fn_params)
Xout = _format_output(Xout, Xt, leaf)
fitted_estimator = leaf.estimator
return Xout, fitted_estimator
class DAGStep:
"""
A single estimator step in a DAG.
Parameters
----------
name : str
The reference name for this step.
estimator : estimator-like
The estimator (transformer or predictor) that will be executed by this step.
deps : dict
A map of dependency names to columns. If columns is ``None``, then all input
columns will be selected.
dataframe_columns : list of str or "infer" (optional)
Either a hard-coded list of column names to apply to any output data, or the
string "infer", which means the column outputs will be assumed to match the
column inputs if the output is 2d and not already a dataframe, the estimator is
a transformer, and the final axis dimensions match the inputs. Otherwise the
column names will be assumed to be the step name + index if the output is not
already a dataframe. If set to ``None`` or inference is not possible, the
outputs will be left unmodified.
axis : int, default = 1
The strategy for merging inputs if there is more than upstream dependency.
``axis=0`` will assume all inputs have the same features and stack the rows
together; ``axis=1`` will assume each input provides different features for the
same samples.
"""
def __init__(self, name, estimator, deps, dataframe_columns, axis=1):
self.name = name
self.estimator = estimator
self.deps = deps
self.dataframe_columns = dataframe_columns
self.axis = axis
self.index = None
self.is_root = False
self.is_leaf = False
self.is_fitted = False
def __repr__(self):
return f"{type(self).__name__}({repr(self.name)}, {repr(self.estimator)})"
[docs]class DAG(_BaseComposition):
"""
A Directed Acyclic Graph (DAG) of estimators, that itself implements the estimator
interface.
A DAG may consist of a simple chain of estimators (being exactly equivalent to a
:mod:`sklearn.pipeline.Pipeline`) or a more complex path of dependencies. But as the
name suggests, it may not contain any cyclic dependencies and data may only flow
from one or more start points (roots) to one or more endpoints (leaves).
Parameters
----------
graph : :class:`networkx.DiGraph`
A directed graph with string node IDs indicating the step name. Each node must
have a ``step`` attribute, which contains a :class:`skdag.dag.DAGStep`.
memory : str or object with the joblib.Memory interface, default=None
Used to cache the fitted transformers of the DAG. By default, no caching is
performed. If a string is given, it is the path to the caching directory.
Enabling caching triggers a clone of the transformers before fitting. Therefore,
the transformer instance given to the DAG cannot be inspected directly. Use the
attribute ``named_steps`` or ``steps`` to inspect estimators within the
pipeline. Caching the transformers is advantageous when fitting is time
consuming.
n_jobs : int, default=None
Number of jobs to run in parallel. ``None`` means 1 unless in a
:obj:`joblib.parallel_backend` context.
verbose : bool, default=False
If True, the time elapsed while fitting each step will be printed as it is
completed.
Attributes
----------
graph_ : :class:`networkx.DiGraph`
A read-only view of the workflow.
classes_ : ndarray of shape (n_classes,)
The classes labels. Only exists if the last step of the pipeline is a
classifier.
n_features_in_ : int
Number of features seen during :term:`fit`. Only defined if all of the
underlying root estimators in `graph_` expose such an attribute when fit.
feature_names_in_ : ndarray of shape (`n_features_in_`,)
Names of features seen during :term:`fit`. Only defined if the underlying
estimators expose such an attribute when fit.
See Also
--------
:class:`skdag.DAGBuilder` : Convenience utility for simplified DAG construction.
Examples
--------
The simplest DAGs are just a chain of singular dependencies. These DAGs may be
created from the :meth:`skdag.dag.DAG.from_pipeline` method in the same way as a
DAG:
>>> from sklearn.decomposition import PCA
>>> from sklearn.impute import SimpleImputer
>>> from sklearn.linear_model import LogisticRegression
>>> dag = DAG.from_pipeline(
... steps=[
... ("impute", SimpleImputer()),
... ("pca", PCA()),
... ("lr", LogisticRegression())
... ]
... )
>>> print(dag.draw().strip())
o impute
|
o pca
|
o lr
For more complex DAGs, it is recommended to use a :class:`skdag.dag.DAGBuilder`,
which allows you to define the graph by specifying the dependencies of each new
estimator:
>>> from skdag import DAGBuilder
>>> dag = (
... DAGBuilder()
... .add_step("impute", SimpleImputer())
... .add_step("vitals", "passthrough", deps={"impute": slice(0, 4)})
... .add_step("blood", PCA(n_components=2, random_state=0), deps={"impute": slice(4, 10)})
... .add_step("lr", LogisticRegression(random_state=0), deps=["blood", "vitals"])
... .make_dag()
... )
>>> print(dag.draw().strip())
o impute
|\\
o o blood,vitals
|/
o lr
In the above examples we pass the first four columns directly to a regressor, but
the remaining columns have dimensionality reduction applied first before being
passed to the same regressor. Note that we can define our graph edges in two
different ways: as a dict (if we need to select only certain columns from the source
node) or as a simple list (if we want to simply grab all columns from all input
nodes).
The DAG may now be used as an estimator in its own right:
>>> from sklearn import datasets
>>> X, y = datasets.load_diabetes(return_X_y=True)
>>> dag.fit_predict(X, y)
array([...
In an extension to the scikit-learn estimator interface, DAGs also support multiple
inputs and multiple outputs. Let's say we want to compare two different classifiers:
>>> from sklearn.ensemble import RandomForestClassifier
>>> cal = DAG.from_pipeline(
... [("rf", RandomForestClassifier(random_state=0))]
... )
>>> dag2 = dag.join(cal, edges=[("blood", "rf"), ("vitals", "rf")])
>>> print(dag2.draw().strip())
o impute
|\\
o o blood,vitals
|x|
o o lr,rf
Now our DAG will return two outputs: one from each classifier. Multiple outputs are
returned as a :class:`sklearn.utils.Bunch<Bunch>`:
>>> y_pred = dag2.fit_predict(X, y)
>>> y_pred.lr
array([...
>>> y_pred.rf
array([...
Similarly, multiple inputs are also acceptable and inputs can be provided by
specifying ``X`` and ``y`` as a ``dict``-like object.
"""
# BaseEstimator interface
_required_parameters = ["graph"]
@classmethod
@deprecated(
"DAG.from_pipeline is deprecated in 0.0.3 and will be removed in a future "
"release. Please use DAGBuilder.from_pipeline instead."
)
def from_pipeline(cls, steps, **kwargs):
from skdag.dag._builder import DAGBuilder
return DAGBuilder().from_pipeline(steps, **kwargs).make_dag()
def __init__(self, graph, *, memory=None, n_jobs=None, verbose=False):
self.graph = graph
self.memory = memory
self.verbose = verbose
self.n_jobs = n_jobs
def get_params(self, deep=True):
"""
Get parameters for this metaestimator.
Returns the parameters given in the constructor as well as the
estimators contained within the `steps_` of the `DAG`.
Parameters
----------
deep : bool, default=True
If True, will return the parameters for this estimator and
contained subobjects that are estimators.
Returns
-------
params : mapping of string to any
Parameter names mapped to their values.
"""
return self._get_params("steps_", deep=deep)
def set_params(self, **params):
"""
Set the parameters of this metaestimator.
Valid parameter keys can be listed with ``get_params()``. Note that
you can directly set the parameters of the estimators contained in
`steps_`.
Parameters
----------
**params : dict
Parameters of this metaestimator or parameters of estimators contained
in `steps`. Parameters of the steps may be set using its name and
the parameter name separated by a '__'.
Returns
-------
self : object
DAG class instance.
"""
step_names = set(self.step_names)
for param in list(params.keys()):
if "__" not in param and param in step_names:
self.graph_.nodes[param]["step"].estimator = params.pop(param)
super().set_params(**params)
return self
def _log_message(self, step):
if not self.verbose:
return None
return f"(step {step.name}: {step.index} of {len(self.graph_)}) Processing {step.name}"
def _iter(self, with_leaves=True, filter_passthrough=True):
"""
Generate stage lists from self.graph_.
When filter_passthrough is True, 'passthrough' and None transformers
are filtered out.
"""
for stage in nx.topological_generations(self.graph_):
stage = [self.graph_.nodes[step]["step"] for step in stage]
if not with_leaves:
stage = [step for step in stage if not step.is_leaf]
if filter_passthrough:
stage = [
step
for step in stage
if step.estimator is not None and step.estimator != "passthough"
]
if len(stage) == 0:
continue
yield stage
def __len__(self):
"""
Returns the size of the DAG
"""
return len(self.graph_)
def __getitem__(self, name):
"""
Retrieve a named estimator.
"""
return self.graph_.nodes[name]["step"].estimator
def _fit(self, X, y=None, **fit_params_steps):
# Setup the memory
memory = check_memory(self.memory)
fit_transform_one_cached = memory.cache(_fit_transform_one)
root_names = set([root.name for root in self.roots_])
Xin = self._resolve_inputs(X)
Xs = {}
with Parallel(n_jobs=self.n_jobs) as parallel:
for stage in self._iter(with_leaves=False, filter_passthrough=False):
stage_names = [step.name for step in stage]
outputs, fitted_transformers = zip(
*parallel(
delayed(_parallel_fit)(
self,
step,
Xin,
Xs,
y,
fit_transform_one_cached,
memory,
**fit_params_steps[step.name],
)
for step in stage
)
)
for step, fitted_transformer in zip(stage, fitted_transformers):
# Replace the transformer of the step with the fitted
# transformer. This is necessary when loading the transformer
# from the cache.
step.estimator = fitted_transformer
step.is_fitted = True
Xs.update(dict(zip(stage_names, outputs)))
# If all of a dep's dependents are now complete, we can free up some
# memory.
root_names = root_names - set(stage_names)
for dep in {dep for step in stage for dep in step.deps}:
dependents = self.graph_.successors(dep)
if all(d in Xs and d not in root_names for d in dependents):
del Xs[dep]
# If a root node is also a leaf, it hasn't been fit yet and we need to pass on
# its input for later.
Xs.update({name: Xin[name] for name in root_names})
return Xs
def _transform(self, X, **fn_params_steps):
# Setup the memory
memory = check_memory(self.memory)
transform_one_cached = memory.cache(_transform_one)
root_names = set([root.name for root in self.roots_])
Xin = self._resolve_inputs(X)
Xs = {}
with Parallel(n_jobs=self.n_jobs) as parallel:
for stage in self._iter(with_leaves=False, filter_passthrough=False):
stage_names = [step.name for step in stage]
outputs = parallel(
delayed(_parallel_transform)(
self,
step,
Xin,
Xs,
transform_one_cached,
**fn_params_steps[step.name],
)
for step in stage
)
Xs.update(dict(zip(stage_names, outputs)))
# If all of a dep's dependents are now complete, we can free up some
# memory.
root_names = root_names - set(stage_names)
for dep in {dep for step in stage for dep in step.deps}:
dependents = self.graph_.successors(dep)
if all(d in Xs and d not in root_names for d in dependents):
del Xs[dep]
# If a root node is also a leaf, it hasn't been fit yet and we need to pass on
# its input for later.
Xs.update({name: Xin[name] for name in root_names})
return Xs
def _resolve_inputs(self, X):
if isinstance(X, (dict, Bunch, UserDict)) and not isinstance(X, dok_matrix):
inputs = sorted(X.keys())
if inputs != sorted(root.name for root in self.roots_):
raise ValueError(
"Input dicts must contain one key per entry node. "
f"Entry nodes are {self.roots_}, got {inputs}."
)
else:
if len(self.roots_) != 1:
raise ValueError(
"Must provide a dictionary of inputs for a DAG with multiple entry "
"points."
)
X = {self.roots_[0].name: X}
X = {
step: x if issparse(x) or _is_pandas(x) else np.asarray(x)
for step, x in X.items()
}
return X
def _match_input_format(self, Xin, Xout):
if len(self.leaves_) == 1 and (
not isinstance(Xin, (dict, Bunch, UserDict)) or isinstance(Xin, dok_matrix)
):
return Xout[self.leaves_[0].name]
return Bunch(**Xout)
def fit(self, X, y=None, **fit_params):
"""
Fit the model.
Fit all the transformers one after the other and transform the
data. Finally, fit the transformed data using the final estimators.
Parameters
----------
X : iterable
Training data. Must fulfill input requirements of first step of the
DAG.
y : iterable, default=None
Training targets. Must fulfill label requirements for all steps of
the DAG.
**fit_params : dict of string -> object
Parameters passed to the ``fit`` method of each step, where
each parameter name is prefixed such that parameter ``p`` for step
``s`` has key ``s__p``.
Returns
-------
self : object
DAG fitted steps.
"""
self._validate_graph()
fit_params_steps = self._check_fit_params(**fit_params)
Xts = self._fit(X, y, **fit_params_steps)
fitted_estimators = Parallel(n_jobs=self.n_jobs)(
[
delayed(_parallel_fit_leaf)(
self, leaf, Xts, y, **fit_params_steps[leaf.name]
)
for leaf in self.leaves_
]
)
for est, leaf in zip(fitted_estimators, self.leaves_):
leaf.estimator = est
leaf.is_fitted = True
# If we have a single root, mirror certain attributes in the DAG.
if len(self.roots_) == 1:
root = self.roots_[0].estimator
for attr in ["n_features_in_", "feature_names_in_"]:
if hasattr(root, attr):
setattr(self, attr, getattr(root, attr))
return self
def _fit_execute(self, fn, X, y=None, **fit_params):
self._validate_graph()
fit_params_steps = self._check_fit_params(**fit_params)
Xts = self._fit(X, y, **fit_params_steps)
Xout = {}
leaf_names = [leaf.name for leaf in self.leaves_]
outputs, fitted_estimators = zip(
*Parallel(n_jobs=self.n_jobs)(
delayed(_parallel_execute)(
self,
leaf,
fn,
Xts,
y,
fit_first=True,
fit_params=fit_params_steps[leaf.name],
)
for leaf in self.leaves_
)
)
Xout = dict(zip(leaf_names, outputs))
for step, fitted_estimator in zip(self.leaves_, fitted_estimators):
step.estimator = fitted_estimator
step.is_fitted = True
return self._match_input_format(X, Xout)
def _execute(self, fn, X, y=None, **fn_params):
Xout = {}
fn_params_steps = self._check_fn_params(**fn_params)
Xts = self._transform(X, **fn_params_steps)
leaf_names = [leaf.name for leaf in self.leaves_]
outputs, _ = zip(
*Parallel(n_jobs=self.n_jobs)(
delayed(_parallel_execute)(
self,
leaf,
fn,
Xts,
y,
fit_first=False,
fn_params=fn_params_steps[leaf.name],
)
for leaf in self.leaves_
)
)
Xout = dict(zip(leaf_names, outputs))
return self._match_input_format(X, Xout)
@available_if(_leaf_estimators_have("transform"))
def fit_transform(self, X, y=None, **fit_params):
"""
Fit the model and transform with the final estimator.
Fits all the transformers one after the other and transform the
data. Then uses `fit_transform` on transformed data with the final
estimator.
Parameters
----------
X : iterable
Training data. Must fulfill input requirements of first step of the
DAG.
y : iterable, default=None
Training targets. Must fulfill label requirements for all steps of
the DAG.
**fit_params : dict of string -> object
Parameters passed to the ``fit`` method of each step, where
each parameter name is prefixed such that parameter ``p`` for step
``s`` has key ``s__p``.
Returns
-------
Xt : ndarray of shape (n_samples, n_transformed_features)
Transformed samples.
"""
return self._fit_execute("transform", X, y, **fit_params)
@available_if(_leaf_estimators_have("transform"))
def transform(self, X):
"""
Transform the data, and apply `transform` with the final estimator.
Call `transform` of each transformer in the DAG. The transformed
data are finally passed to the final estimator that calls
`transform` method. Only valid if the final estimator
implements `transform`.
This also works where final estimator is `None` in which case all prior
transformations are applied.
Parameters
----------
X : iterable
Data to transform. Must fulfill input requirements of first step
of the DAG.
Returns
-------
Xt : ndarray of shape (n_samples, n_transformed_features)
Transformed data.
"""
return self._execute("transform", X)
@available_if(_leaf_estimators_have("predict"))
def fit_predict(self, X, y=None, **fit_params):
"""
Transform the data, and apply `fit_predict` with the final estimator.
Call `fit_transform` of each transformer in the DAG. The transformed data are
finally passed to the final estimator that calls `fit_predict` method. Only
valid if the final estimators implement `fit_predict`.
Parameters
----------
X : iterable
Training data. Must fulfill input requirements of first step of
the DAG.
y : iterable, default=None
Training targets. Must fulfill label requirements for all steps
of the DAG.
**fit_params : dict of string -> object
Parameters passed to the ``fit`` method of each step, where
each parameter name is prefixed such that parameter ``p`` for step
``s`` has key ``s__p``.
Returns
-------
y_pred : ndarray
Result of calling `fit_predict` on the final estimator.
"""
return self._fit_execute("predict", X, y, **fit_params)
@available_if(_leaf_estimators_have("predict"))
def predict(self, X, **predict_params):
"""
Transform the data, and apply `predict` with the final estimator.
Call `transform` of each transformer in the DAG. The transformed
data are finally passed to the final estimator that calls `predict`
method. Only valid if the final estimators implement `predict`.
Parameters
----------
X : iterable
Data to predict on. Must fulfill input requirements of first step
of the DAG.
**predict_params : dict of string -> object
Parameters to the ``predict`` called at the end of all
transformations in the DAG. Note that while this may be
used to return uncertainties from some models with return_std
or return_cov, uncertainties that are generated by the
transformations in the DAG are not propagated to the
final estimator.
Returns
-------
y_pred : ndarray
Result of calling `predict` on the final estimator.
"""
return self._execute("predict", X, **predict_params)
@available_if(_leaf_estimators_have("predict_proba"))
def predict_proba(self, X, **predict_proba_params):
"""
Transform the data, and apply `predict_proba` with the final estimator.
Call `transform` of each transformer in the DAG. The transformed
data are finally passed to the final estimator that calls
`predict_proba` method. Only valid if the final estimators implement
`predict_proba`.
Parameters
----------
X : iterable
Data to predict on. Must fulfill input requirements of first step
of the DAG.
**predict_proba_params : dict of string -> object
Parameters to the `predict_proba` called at the end of all
transformations in the DAG.
Returns
-------
y_proba : ndarray of shape (n_samples, n_classes)
Result of calling `predict_proba` on the final estimator.
"""
return self._execute("predict_proba", X, **predict_proba_params)
@available_if(_leaf_estimators_have("decision_function"))
def decision_function(self, X):
"""
Transform the data, and apply `decision_function` with the final estimator.
Call `transform` of each transformer in the DAG. The transformed
data are finally passed to the final estimator that calls
`decision_function` method. Only valid if the final estimators
implement `decision_function`.
Parameters
----------
X : iterable
Data to predict on. Must fulfill input requirements of first step
of the DAG.
Returns
-------
y_score : ndarray of shape (n_samples, n_classes)
Result of calling `decision_function` on the final estimator.
"""
return self._execute("decision_function", X)
@available_if(_leaf_estimators_have("score_samples"))
def score_samples(self, X):
"""
Transform the data, and apply `score_samples` with the final estimator.
Call `transform` of each transformer in the DAG. The transformed
data are finally passed to the final estimator that calls
`score_samples` method. Only valid if the final estimators implement
`score_samples`.
Parameters
----------
X : iterable
Data to predict on. Must fulfill input requirements of first step
of the DAG.
Returns
-------
y_score : ndarray of shape (n_samples,)
Result of calling `score_samples` on the final estimator.
"""
return self._execute("score_samples", X)
@available_if(_leaf_estimators_have("score"))
def score(self, X, y=None, **score_params):
"""
Transform the data, and apply `score` with the final estimator.
Call `transform` of each transformer in the DAG. The transformed
data are finally passed to the final estimator that calls
`score` method. Only valid if the final estimators implement `score`.
Parameters
----------
X : iterable
Data to predict on. Must fulfill input requirements of first step
of the DAG.
y : iterable, default=None
Targets used for scoring. Must fulfill label requirements for all
steps of the DAG.
sample_weight : array-like, default=None
If not None, this argument is passed as ``sample_weight`` keyword
argument to the ``score`` method of the final estimator.
Returns
-------
score : float
Result of calling `score` on the final estimator.
"""
return self._execute("score", X, y, **score_params)
@available_if(_leaf_estimators_have("predict_log_proba"))
def predict_log_proba(self, X, **predict_log_proba_params):
"""
Transform the data, and apply `predict_log_proba` with the final estimator.
Call `transform` of each transformer in the DAG. The transformed
data are finally passed to the final estimator that calls
`predict_log_proba` method. Only valid if the final estimator
implements `predict_log_proba`.
Parameters
----------
X : iterable
Data to predict on. Must fulfill input requirements of first step
of the DAG.
**predict_log_proba_params : dict of string -> object
Parameters to the ``predict_log_proba`` called at the end of all
transformations in the DAG.
Returns
-------
y_log_proba : ndarray of shape (n_samples, n_classes)
Result of calling `predict_log_proba` on the final estimator.
"""
return self._execute("predict_log_proba", X, **predict_log_proba_params)
def _check_fit_params(self, **fit_params):
fit_params_steps = {
name: {} for (name, step) in self.steps_ if step is not None
}
for pname, pval in fit_params.items():
if pval is None:
continue
if "__" not in pname:
raise ValueError(
f"DAG.fit does not accept the {pname} parameter. "
"You can pass parameters to specific steps of your "
"DAG using the stepname__parameter format, e.g. "
"`DAG.fit(X, y, logisticregression__sample_weight"
"=sample_weight)`."
)
step, param = pname.split("__", 1)
fit_params_steps[step][param] = pval
return fit_params_steps
def _check_fn_params(self, **fn_params):
global_params = {}
fn_params_steps = {name: {} for (name, step) in self.steps_ if step is not None}
for pname, pval in fn_params.items():
if pval is None:
continue
if "__" not in pname:
global_params[pname] = pval
else:
step, param = pname.split("__", 1)
fn_params_steps[step][param] = pval
for step in fn_params_steps:
fn_params_steps[step].update(global_params)
return fn_params_steps
def _validate_graph(self):
if len(self.graph_) == 0:
raise ValueError("DAG has no nodes.")
for i, (name, est) in enumerate(self.steps_):
step = self.graph_.nodes[name]["step"]
step.index = i
# validate names
self._validate_names([name for (name, step) in self.steps_])
# validate transformers
for step in self.roots_ + self.branches_:
if step in self.leaves_:
# This will get validated later
continue
est = step.estimator
# Unlike pipelines we also allow predictors to be used as a transformer, to support
# model stacking.
if (
not _is_passthrough(est)
and not _is_transformer(est)
and not _is_predictor(est)
):
raise TypeError(
"All intermediate steps should be "
"transformers and implement fit and transform "
"or be the string 'passthrough' "
f"'{est}' (type {type(est)}) doesn't"
)
# Validate final estimator(s)
for step in self.leaves_:
est = step.estimator
if not _is_passthrough(est) and not hasattr(est, "fit"):
raise TypeError(
"Leaf nodes of a DAG should implement fit "
"or be the string 'passthrough'. "
f"'{est}' (type {type(est)}) doesn't"
)
@property
def graph_(self):
if not hasattr(self, "_graph"):
# Read-only view of the graph. We should not modify
# the original graph.
self._graph = self.graph.copy(as_view=True)
return self._graph
@property
def leaves_(self):
if not hasattr(self, "_leaves"):
self._leaves = [node for node in self.nodes_ if node.is_leaf]
return self._leaves
@property
def branches_(self):
if not hasattr(self, "_branches"):
self._branches = [
node for node in self.nodes_ if not node.is_leaf and not node.is_root
]
return self._branches
@property
def roots_(self):
if not hasattr(self, "_roots"):
self._roots = [node for node in self.nodes_ if node.is_root]
return self._roots
@property
def nodes_(self):
if not hasattr(self, "_nodes"):
self._nodes = []
for name, estimator in self.steps_:
step = self.graph_.nodes[name]["step"]
if self.graph_.out_degree(name) == 0:
step.is_leaf = True
if self.graph_.in_degree(name) == 0:
step.is_root = True
self._nodes.append(step)
return self._nodes
@property
def steps_(self):
"return list of (name, estimator) tuples to conform with Pipeline interface."
if not hasattr(self, "_steps"):
self._steps = [
(node, self.graph_.nodes[node]["step"].estimator)
for node in nx.lexicographical_topological_sort(self.graph_)
]
return self._steps
def join(self, other, edges, **kwargs):
"""
Create a new DAG by joining this DAG to another one, according to the edges
specified.
Parameters
----------
other : :class:`skdag.dag.DAG`
The other DAG to connect to.
edges : (str, str) or (str, str, index-like)
``(u, v)`` edges that connect the two DAGs. ``u`` and ``v`` should be the
names of steps in the first and second DAG respectively. Optionally a third
parameter may be included to specify which columns to pass along the edge.
**kwargs : keyword params
Any other parameters to pass to the new DAG's constructor.
Returns
-------
dag : :class:`skdag.DAG`
A new DAG, containing a copy of each of the input DAGs, joined by the
specified edges. Note that the original input dags are unmodified.
Examples
--------
>>> from sklearn.decomposition import PCA
>>> from sklearn.impute import SimpleImputer
>>> from sklearn.linear_model import LogisticRegression
>>> from sklearn.calibration import CalibratedClassifierCV
>>> from skdag.dag import DAGBuilder
>>> dag1 = (
... DAGBuilder()
... .add_step("impute", SimpleImputer())
... .add_step("vitals", "passthrough", deps={"impute": slice(0, 4)})
... .add_step("blood", PCA(n_components=2, random_state=0), deps={"impute": slice(4, 10)})
... .add_step("lr", LogisticRegression(random_state=0), deps=["blood", "vitals"])
... .make_dag()
... )
>>> print(dag1.draw().strip())
o impute
|\\
o o blood,vitals
|/
o lr
>>> dag2 = (
... DAGBuilder()
... .add_step(
... "calib",
... CalibratedClassifierCV(LogisticRegression(random_state=0), cv=5),
... )
... .make_dag()
... )
>>> print(dag2.draw().strip())
o calib
>>> dag3 = dag1.join(dag2, edges=[("blood", "calib"), ("vitals", "calib")])
>>> print(dag3.draw().strip())
o impute
|\\
o o blood,vitals
|x|
o o calib,lr
"""
if set(self.step_names) & set(other.step_names):
raise ValueError("DAGs with overlapping step names cannot be combined.")
newgraph = deepcopy(self.graph_).copy()
for edge in edges:
if len(edge) == 2:
u, v, idx = *edge, None
else:
u, v, idx = edge
if u not in self.graph_:
raise KeyError(u)
if v not in other.graph_:
raise KeyError(v)
# source node can no longer be a leaf
ustep = newgraph.nodes[u]["step"]
if ustep.is_leaf:
ustep.is_leaf = False
vnode = other.graph_.nodes[v]
old_step = vnode["step"]
vstep = DAGStep(
name=old_step.name,
estimator=old_step.estimator,
deps=old_step.deps,
dataframe_columns=old_step.dataframe_columns,
axis=old_step.axis,
)
if u not in vstep.deps:
vstep.deps[u] = idx
vnode["step"] = vstep
newgraph.add_node(v, **vnode)
newgraph.add_edge(u, v)
return DAG(newgraph, **kwargs)
def draw(
self, filename=None, style=None, detailed=False, format=None, layout="dot"
):
"""
Render a graphical view of the DAG.
By default the rendered file will be returned as a string. However if an output
file is provided then the output will be saved to file.
Parameters
----------
filename : str
The file to write the image to. If None, the rendered image will be sent to
stdout.
style : str, optional, choice of ['light', 'dark']
Draw the image in light or dark mode.
detailed : bool, default = False
If True, show extra details in the node labels such as the estimator
signature.
format : str, choice of ['svg', 'png', 'jpg', 'txt']
The rendering format to use. MAy be omitted if the format can be inferred
from the filename.
layout : str, default = 'dot'
The program to use for generating a graph layout.
See Also
--------
:meth:`skdag.dag.DAG.show`, for use in interactive notebooks.
Returns
-------
output : str, bytes or None
If a filename is provided the output is written to file and `None` is
returned. Otherwise, the output is returned as a string (for textual formats
like ascii or svg) or bytes.
"""
if filename is None and format is None:
try:
from IPython import get_ipython
rich = type(get_ipython()).__name__ == "ZMQInteractiveShell"
except (ModuleNotFoundError, NameError):
rich = False
format = "svg" if rich else "txt"
if format is None:
format = filename.split(".")[-1]
if format not in ["svg", "png", "jpg", "txt"]:
raise ValueError(f"Unsupported file format '{format}'")
render = DAGRenderer(self.graph_, detailed=detailed, style=style).draw(
format=format, layout=layout
)
if filename is None:
return render
else:
mode = "wb" if isinstance(render, bytes) else "w"
with open(filename, mode) as fp:
fp.write(render)
def show(self, style=None, detailed=False, format=None, layout="dot"):
"""
Display a graphical representation of the DAG in an interactive notebook
environment.
DAGs will be shown when displayed in a notebook, but calling this method
directly allows more options to be passed to customise the appearance more.
Arguments are as for :meth`.draw`.
Returns
-------
``None``
See Also
--------
:meth:`skdag.DAG.draw`
"""
if format is None:
format = "svg" if _in_notebook() else "txt"
data = self.draw(style=style, detailed=detailed, format=format, layout=layout)
if format == "svg":
from IPython.display import SVG, display
display(SVG(data))
elif format == "txt":
print(data)
elif format in ("jpg", "png"):
from IPython.display import Image, display
display(Image(data))
else:
raise ValueError(f"'{format}' format not supported.")
def _repr_svg_(self):
return self.draw(format="svg")
def _repr_png_(self):
return self.draw(format="png")
def _repr_jpeg_(self):
return self.draw(format="jpg")
def _repr_html_(self):
return self.draw(format="svg")
def _repr_pretty_(self, p, cycle):
if cycle:
p.text(repr(self))
else:
p.text(str(self))
def _repr_mimebundle_(self, **kwargs):
# Don't render yet...
renderers = {
"image/svg+xml": self._repr_svg_,
"image/png": self._repr_png_,
"image/jpeg": self._repr_jpeg_,
"text/plain": self.__str__,
"text/html": self._repr_html_,
}
include = kwargs.get("include")
if include:
renderers = {k: v for k, v in renderers.items() if k in include}
exclude = kwargs.get("exclude")
if exclude:
renderers = {k: v for k, v in renderers.items() if k not in exclude}
# Now render any remaining options.
return {k: v() for k, v in renderers.items()}
@property
def named_steps(self):
"""
Access the steps by name.
Read-only attribute to access any step by given name.
Keys are steps names and values are the steps objects.
"""
# Use Bunch object to improve autocomplete
return Bunch(**dict(self.steps_))
@property
def step_names(self):
return list(self.graph_.nodes)
@property
def edges(self):
return self.graph_.edges
def _get_leaf_attr(self, attr):
if len(self.leaves_) == 1:
return getattr(self.leaves_[0].estimator, attr)
else:
return Bunch(
**{leaf.name: getattr(leaf.estimator, attr) for leaf in self.leaves_}
)
@property
def _estimator_type(self):
return self._get_leaf_attr("_estimator_type")
@property
def classes_(self):
"""The classes labels. Only exist if the leaf steps are classifiers."""
return self._get_leaf_attr("classes_")
def __sklearn_is_fitted__(self):
"""Indicate whether DAG has been fit."""
try:
# check if the last steps of the DAG are fitted
# we only check the last steps since if the last steps are fit, it
# means the previous steps should also be fit. This is faster than
# checking if every step of the DAG is fit.
for leaf in self._leaves:
check_is_fitted(leaf.estimator)
return True
except NotFittedError:
return False
def _more_tags(self):
tags = {}
# We assume the DAG can handle NaN if *all* the steps can.
tags["allow_nan"] = all(
_safe_tags(node.estimator, "allow_nan") for node in self.nodes_
)
# Check if all *root* nodes expect pairwise input.
tags["pairwise"] = all(
_safe_tags(root.estimator, "pairwise") for root in self.roots_
)
# CHeck if all *leaf* notes support multioutput
tags["multioutput"] = all(
_safe_tags(leaf.estimator, "multioutput") for leaf in self.leaves_
)
return tags