Source code for metaforecast.ensembles.ade

from typing import List, Optional, Tuple, Union

import lightgbm as lgb
import pandas as pd
from mlforecast import MLForecast
from sklearn.multioutput import MultiOutputRegressor as MIMO
from statsforecast import StatsForecast

from metaforecast.ensembles.base import BaseADE
from metaforecast.utils.normalization import Normalizations

DataFrameTuple = Tuple[pd.DataFrame, pd.DataFrame]
DataFrameLike = Union[pd.DataFrame, DataFrameTuple]


[docs] class ADE(BaseADE): """Arbitrated Dynamic Ensemble Dynamic ensemble approach where ensemble members are weighted based on a meta-model that forecasts their error. The ADE approach dynamically combines forecasts from multiple models by learning their error patterns. It's particularly effective when dealing with heterogeneous time series and when the relative performance of individual models varies over time. Parameters ---------- freq : str String denoting the sampling frequency of the time series (e.g. "MS"). trim_ratio : float, optional Ratio (0-1) of ensemble members to keep in the ensemble. (1-trim_ratio) of models will not be used during inference based on validation accuracy. Defaults to 1, which means all ensemble members are used. meta_lags : list of int List of lags to be used in the training of the meta-model. Follows the structure of mlforecast. Example: [1,2,3,8]. trim_by_uid : bool, optional Whether to trim the ensemble by unique_id (True) or dataset (False). Defaults to True, but this can become computationally demanding for datasets with a large number of time series. meta_model : object, optional Learning algorithm to use in the meta-level to forecast the error of ensemble members. Defaults to a linear LGBM with a default configuration. References ---------- Cerqueira, V., Torgo, L., Pinto, F., & Soares, C. (2019). Arbitrage of forecasting experts. Machine Learning, 108, 913-944. Examples -------- >>> from datasetsforecast.m3 import M3 >>> from neuralforecast import NeuralForecast >>> from neuralforecast.models import NHITS, NBEATS, MLP >>> >>> from metaforecast.ensembles import ADE >>> >>> df, *_ = M3.load('.', group='Monthly') >>> >>> # ensemble members setup >>> CONFIG = {'input_size': 12, >>> 'h': 12, >>> 'accelerator': 'cpu', >>> 'max_steps': 10, } >>> >>> models = [ >>> NBEATS(**CONFIG, stack_types=3 * ["identity"]), >>> NHITS(**CONFIG), >>> MLP(**CONFIG), >>> MLP(num_layers=3, **CONFIG), >>> ] >>> >>> nf = NeuralForecast(models=models, freq='M') >>> >>> # cv to build meta-data >>> n_windows = df['unique_id'].value_counts().min() >>> n_windows = int(n_windows // 2) >>> fcst_cv = nf.cross_validation(df=df, n_windows=n_windows, step_size=1) >>> fcst_cv = fcst_cv.reset_index() >>> fcst_cv = fcst_cv.groupby(['unique_id', 'cutoff']).head(1).drop(columns='cutoff') >>> >>> # fitting combination rule >>> ensemble = ADE(freq='ME', meta_lags=list(range(1,7)), trim_ratio=0.6) >>> ensemble.fit(fcst_cv) >>> >>> # re-fitting models >>> nf.fit(df=df) >>> >>> # forecasting and combining >>> fcst = nf.predict() >>> fcst_ensemble = ensemble.predict(fcst.reset_index(), train=df, h=12) """ _LGB_PARS = {"verbosity": -1, "n_jobs": 1, "linear_tree": True} _MLF_PREPROCESS_PARS = {"static_features": []}
[docs] def __init__( self, freq: str, meta_lags: Optional[List[int]] = None, trim_ratio: float = 1, trim_by_uid: bool = True, meta_model=MIMO(lgb.LGBMRegressor(**_LGB_PARS)), ): self.frequency = freq super().__init__( window_size=self.WINDOW_SIZE_BY_FREQ[self.frequency], trim_ratio=trim_ratio, trim_by_uid=trim_by_uid, meta_model=meta_model, ) self.model_names = None if meta_lags is None: n_lags = self.WINDOW_SIZE_BY_FREQ[self.frequency] self.meta_lags = list((1, n_lags + 1)) else: self.meta_lags = meta_lags self.lag_names = [f"lag{i}" for i in self.meta_lags] self.meta_mlf = MLForecast(models=[], freq=self.frequency, lags=self.meta_lags) self.meta_df = None self.raw_meta_data = None self.insample_scores = None self.use_window = False self.weights = None
# pylint: disable=arguments-differ
[docs] def fit(self, insample_fcst: pd.DataFrame, **kwargs): """fit Parameters ---------- insample_fcst : pd.DataFrame Forecast and actual values dataset formatted like mlforecast cross-validation output. Contains either: - In-sample forecasts (predictions on training data) - Cross-validation results (out-of-sample predictions) Expected columns: - unique_id (or other id_col): Identifier for each series - ds (or other time_col): Timestamp - y (or other target_col): Actual values - *model_name*: Predictions by a model with name *model_name* Returns ------- self self, with a fitted self.meta_model """ self._fit(insample_fcst)
def _fit(self, insample_fcst): if self.model_names is None: self.model_names = insample_fcst.columns.to_list() self.model_names = [ x for x in self.model_names if x not in self.METADATA + ["h"] ] self._set_n_models() in_sample_loss_df = self._get_insample_loss(insample_fcst) self.insample_scores = self.evaluate_base_fcst( insample_fcst=insample_fcst, use_window=self.use_window ) self.raw_meta_data = self.meta_mlf.preprocess( in_sample_loss_df, **self._MLF_PREPROCESS_PARS ) self.meta_df = self._process_meta_data(self.raw_meta_data) x, y = self.meta_df if y.isna().any().any(): y = y.ffill().bfill() self.meta_model.fit(x, y) # pylint: disable=arguments-differ
[docs] def predict(self, fcst: pd.DataFrame, train: pd.DataFrame, h: int, **kwargs): """Combine ensemble member forecasts using the meta-model. Parameters ---------- fcst : pd.DataFrame Forecasts from individual ensemble members. Expected columns: ['unique_id', 'ds', 'model_name1', 'model_name2', ...] train : pd.DataFrame Training dataset used to compute recent lags for meta-model input. Expected columns: ['unique_id', 'ds', 'y'] h : int Forecast horizon (number of future periods to predict) Returns ------- pd.Series Combined ensemble forecasts for h periods ahead. """ self._assert_fcst(fcst) ade_fcst = self._predict(preds=fcst, train=train, h=h) ade_fcst.name = self.alias return ade_fcst
# pylint: disable=arguments-differ
[docs] def update_weights(self, fcst: pd.DataFrame, **kwargs): """Update performance statistics of ensemble members based on recent forecasts. Used to identify and retain top-performing models for ensemble trimming. Updates internal statistics tracking each model's forecast accuracy. Parameters ---------- fcst: pd.DataFrame Recent forecasts and actual values for ensemble members. Expected columns: - unique_id: Series identifier - ds: Timestamp - model_name: Predictions with model with name "model_name" - y: Actual values Notes ----- Not implemented yet """ raise NotImplementedError
def _predict(self, preds: pd.DataFrame, train: pd.DataFrame, h: int): df_ext = train.merge(preds, on=["unique_id", "ds"], how="outer") df_ext = df_ext[self.METADATA] df_ext["y"] = df_ext["y"].fillna(value=-1) meta_dataset = self.meta_mlf.preprocess(df_ext, **self._MLF_PREPROCESS_PARS) self.weights = self._weights_by_uid(meta_dataset, h=h) fcst = preds.apply(lambda x: self._weighted_average(x, self.weights), axis=1) return fcst def _get_insample_loss(self, insample_fcst: pd.DataFrame): """_get_insample_loss Compute in-sample (CV) point-wise loss :param insample_fcst: validation predictions for each model and actual values (y) :type insample_fcst: pd.DataFrame :return: pd.DataFrame with point-wise error scores of each ensemble member across the validation set """ in_sample_loss = [] in_sample_uid = insample_fcst.copy().groupby("unique_id") for _, uid_df in in_sample_uid: for mod in self.model_names: uid_df[mod] = uid_df[mod] - uid_df["y"] in_sample_loss.append(uid_df) in_sample_loss_df = pd.concat(in_sample_loss) # first h forward # could average all horizons if "h" in in_sample_loss_df.columns: in_sample_loss_df = in_sample_loss_df.query("h==1").drop(columns=["h"]) return in_sample_loss_df # pylint: disable=invalid-name def _process_meta_data( self, meta_data: pd.DataFrame, return_X_y: bool = True ) -> DataFrameLike: lag_locs = meta_data.columns.str.startswith("lag") lag_cols = meta_data.columns[lag_locs].to_list() if return_X_y: # pylint: disable=invalid-name X_meta, Y_meta = meta_data[lag_cols], meta_data[self.model_names] return X_meta, Y_meta meta_df = meta_data[lag_cols + self.model_names] return meta_df # pylint: disable=arguments-differ def _weights_by_uid(self, df: pd.DataFrame, h: int, **kwargs): top_overall = self._get_top_k(self.insample_scores.mean()) top_by_uid = self.insample_scores.apply(self._get_top_k, axis=1) uid_weights = {} for uid, meta_uid_df in df.groupby("unique_id"): if h > 1: lags = meta_uid_df.head(-(h - 1)).tail(1)[self.lag_names] else: lags = meta_uid_df.tail(1)[self.lag_names] meta_pred = self.meta_model.predict(lags) meta_pred = pd.DataFrame(meta_pred, columns=self.model_names) weights = self._weights_from_errors(meta_pred) if self.trim_by_uid: poor_models = [x not in top_by_uid[uid] for x in weights.index] else: poor_models = [x not in top_overall for x in weights.index] weights[poor_models] = 0 weights /= weights.sum() uid_weights[uid] = weights weights_df = pd.DataFrame(uid_weights).T weights_df.index.name = "unique_id" return weights_df def _reweight_by_redundancy(self): raise NotImplementedError # pylint: disable=arguments-renamed @staticmethod def _weights_from_errors(meta_predictions: pd.DataFrame) -> pd.Series: e_hat = meta_predictions.abs() weights = e_hat.apply( func=lambda x: Normalizations.normalize_and_proportion(-x), axis=1 ) weight_s = weights.iloc[0] return weight_s
[docs] class MLForecastADE(ADE): """Arbitrated Dynamic Ensemble (ADE) implemented for MLForecast. Creates a dynamic ensemble where member weights are determined by a meta-model that predicts each model's forecast error. The meta-model adapts weights based on recent performance and input patterns. Implementation follows the MLForecast conventions and models. To use ADE with statsforecast or neuralforecast, see :class:`ADE`. References ---------- Cerqueira, V., Torgo, L., Pinto, F., & Soares, C. (2019). "Arbitrage of forecasting experts." Machine Learning, 108, 913-944. Examples -------- >>> from datasetsforecast.m3 import M3 >>> from mlforecast import MLForecast >>> from sklearn.linear_model import RidgeCV, LassoCV, ElasticNetCV >>> from sklearn.tree import DecisionTreeRegressor >>> from sklearn.neighbors import KNeighborsRegressor >>> >>> from metaforecast.ensembles import MLForecastADE >>> >>> df, *_ = M3.load('.', group='Monthly') >>> >>> # ensemble members setup >>> models_ml = { >>> 'Ridge': RidgeCV(), >>> 'Lasso': LassoCV(), >>> 'Elastic-net': ElasticNetCV(), >>> 'DT': DecisionTreeRegressor(max_depth=5), >>> 'DStump': DecisionTreeRegressor(max_depth=1), >>> 'KNN': KNeighborsRegressor(n_neighbors=30), >>> } >>> >>> mlf = MLForecast(models=models_ml, freq='ME', lags=range(1, 7)) >>> >>> # make sure fitted=True so we have samples for meta-learning >>> mlf.fit(df=df, fitted=True) >>> >>> # fitting combination rule >>> ensemble = MLForecastADE(mlf=mlf, trim_ratio=0.5) >>> ensemble.fit() >>> >>> fcst = ensemble.predict(train=df, h=12) """
[docs] def __init__( self, mlf: MLForecast, sf: Optional[StatsForecast] = None, trim_ratio: float = 1, meta_model=MIMO(lgb.LGBMRegressor(**ADE._LGB_PARS)), ): """Initialize the Arbitrated Dynamic Ensemble with MLForecast models. Parameters ---------- mlf : MLForecast Fitted MLForecast object containing ensemble members. Must be initialized with fitted=True to generate the meta-dataset for error prediction. sf : StatsForecast, optional StatsForecast object containing classical forecasting models to be included in the ensemble alongside MLForecast models. trim_ratio : float, default=1.0 Proportion of ensemble members to retain, between 0 and 1. Models are selected based on validation accuracy: - 1.0 keeps all models - 0.5 keeps top 50% of models - Lower values create a more selective ensemble meta_model : object, optional Model used to predict ensemble members' errors. If None, defaults to LightGBM with linear trees and default parameters. Notes ----- The meta_model should support scikit-learn's fit/predict API. """ self.mlf = mlf self.sf = sf self.frequency = self.mlf.ts.freq super().__init__( freq=self.frequency, trim_ratio=trim_ratio, meta_model=meta_model, meta_lags=self.mlf.ts.lags, )
# pylint: disable=arguments-differ
[docs] def fit(self, **kwargs): """Train the meta-model using ensemble members' in-sample predictions. Uses cross-validation predictions from the MLForecast object to: 1. Compute forecast errors for each ensemble member 2. Train the meta-model to predict these errors 3. Update model weights based on validation performance 4. If trim_ratio < 1, selects top performing models """ insample_fcst = self.mlf.fcst_fitted_values_ if self.sf is not None: self.sf.forecast(fitted=True, h=1) insample_fcst_sf = self.sf.forecast_fitted_values() insample_fcst = insample_fcst.merge( insample_fcst_sf.drop(columns="y"), on=self.METADATA_NO_T ) self._fit(insample_fcst)
# pylint: disable=arguments-differ
[docs] def predict(self, train: pd.DataFrame, h: int, **kwargs): """Generate ensemble forecasts using weighted model combinations. Parameters ---------- train : pd.DataFrame Training dataset used to compute recent lags for meta-model input. Expected columns: - unique_id: Series identifier - ds: Timestamp - y: Target variable h : int Forecast horizon (number of periods to predict ahead) Returns ------- pd.Series Combined ensemble predictions. """ base_fcst = self.mlf.predict(h=h) if self.sf is not None: base_fcst_sf = self.sf.predict(h=h) base_fcst = base_fcst.merge(base_fcst_sf, on=self.METADATA_NO_T) fcst = self._predict(preds=base_fcst, train=train, h=h) return fcst
[docs] def update_weights(self, fcst: pd.DataFrame, **kwargs): """Update performance statistics of ensemble members. See :meth:`ADE.update_weights` for full documentation. """ raise NotImplementedError
def _reweight_by_redundancy(self): raise NotImplementedError