Source code for metaforecast.ensembles.mlewa

import typing

import numpy as np
import pandas as pd

from metaforecast.ensembles.base import Mixture

RowIdentifierType = typing.Union[int, typing.Hashable]


[docs] class MLewa(Mixture): """Dynamic ensemble using exponentially weighted averaging (EWA). Implementation inspired by R's opera package, this class combines forecasts using online learning with exponential weights. Weights are updated based on recent performance to adapt to changing patterns. See Also -------- Mixture : Parent class implementing core ensemble functionality opera : R package with original implementation Notes ----- This implementation follows the EWA algorithm described in [1] and [2] References ----------- [1] Cesa-Bianchi, N., & Lugosi, G. (2006). "Prediction, learning, and games." Cambridge University Press. [2] Gaillard, P., & Goude, Y. (2015). "Forecasting electricity consumption by aggregating experts." In Modeling and Stochastic Learning for Forecasting in High Dimensions (pp. 95-115). Springer, Cham. [3] Cerqueira, V., Torgo, L., Pinto, F., & Soares, C. (2019). "Arbitrage of forecasting experts." Machine Learning, 108, 913-944. Examples --------- >>> from datasetsforecast.m3 import M3 >>> from neuralforecast import NeuralForecast >>> from neuralforecast.models import NHITS, NBEATS, MLP >>> from metaforecast.ensembles import MLewa >>> >>> df, *_ = M3.load('.', group='Monthly') >>> >>> CONFIG = {'input_size': 12, >>> 'h': 12, >>> 'accelerator': 'cpu', >>> 'max_steps': 10, } >>> >>> models = [ >>> NBEATS(**CONFIG, stack_types=3 * ["identity"]), >>> NHITS(**CONFIG), >>> MLP(**CONFIG), >>> MLP(num_layers=3, **CONFIG), >>> ] >>> >>> nf = NeuralForecast(models=models, freq='M') >>> >>> # cv to build meta-data >>> n_windows = df['unique_id'].value_counts().min() >>> n_windows = int(n_windows // 2) >>> fcst_cv = nf.cross_validation(df=df, n_windows=n_windows, step_size=1) >>> fcst_cv = fcst_cv.reset_index() >>> fcst_cv = fcst_cv.groupby(['unique_id', 'cutoff']).head(1).drop(columns='cutoff') >>> >>> # fitting combination rule >>> ensemble = MLewa(loss_type='square', gradient=True, trim_ratio=.8) >>> ensemble.fit(fcst_cv) >>> >>> # re-fitting models >>> nf.fit(df=df) >>> >>> # forecasting and combining >>> fcst = nf.predict() >>> fcst_ensemble = ensemble.predict(fcst.reset_index()) """
[docs] def __init__( self, loss_type: str, gradient: bool, weight_by_uid: bool = False, trim_ratio: float = 1, ): """Initialize online ensemble with exponential weighting strategy. Parameters ---------- loss_type : {'square', 'pinball', 'percentage', 'absolute', 'log'} Loss function for evaluating and weighting ensemble members: - square: Mean squared error - pinball: Quantile loss - percentage: Mean absolute percentage error - absolute: Mean absolute error - log: Log loss gradient : bool, default=False If True, use gradient for weight updates weight_by_uid : bool, default=True Whether to compute weights separately for each series: - True: Individual weights per series (may be computationally intensive) - False: Global weights across all series trim_ratio : float, default=1.0 Proportion of models to retain in ensemble, between 0 and 1: - 1.0: Keep all models - 0.5: Keep top 50% of models Models are selected based on validation performance """ super().__init__( loss_type=loss_type, gradient=gradient, trim_ratio=trim_ratio, weight_by_uid=weight_by_uid, ) self.alias = "MLewa"
def _update_mixture(self, fcst: pd.DataFrame, y: np.ndarray, **kwargs): """_update_mixture Updating the weights of the ensemble :param fcst: predictions of the ensemble members (columns) in different time steps (rows) :type fcst: pd.DataFrame :param y: actual values of the time series :type y: np.ndarray :return: self """ for i, fc in fcst.iterrows(): w = self._weights_from_regret(iteration=i) self.weights[i], self.ensemble_fcst[i] = self._calc_ensemble_fcst(fc, w) loss_experts = self._calc_loss( fcst=fc, y=y[i], fcst_c=self.ensemble_fcst[i] ) loss_mixture = self._calc_loss( fcst=self.ensemble_fcst[i], y=y[i], fcst_c=self.ensemble_fcst[i], ) regret_i = loss_mixture - loss_experts # update regret for mod in self.regret: self.regret[mod] += regret_i[mod] n = len(self.model_names) eta_update = np.sqrt( np.log(n) / (np.log(n) / self.eta[i] ** 2 + regret_i**2) ) self.eta[int(str(i)) + 1] = eta_update def _weights_from_regret(self, iteration: RowIdentifierType = -1, **kwargs): """_weights_from_regret Updating the weights based on regret minimization """ curr_regret = np.array(list(self.regret.values())) if np.max(curr_regret) > 0: w = self._truncate_loss(np.exp(self.eta[iteration] * curr_regret)) w /= np.sum(w) else: w = np.ones_like(curr_regret) / len(curr_regret) w = pd.Series(w, index=self.model_names) return w def _initialize_params(self, fcst: pd.DataFrame): n_row, n_col = fcst.shape[0], len(self.model_names) self.eta = np.full(shape=(n_row + 1, n_col), fill_value=np.exp(350)) self.regret = {k: 0 for k in self.model_names} self.weights = np.zeros((n_row, n_col)) self.ensemble_fcst = np.zeros(n_row)
[docs] def update_weights(self, fcst: pd.DataFrame): raise NotImplementedError
@staticmethod def _truncate_loss(x): return np.clip(x, np.exp(-700), np.exp(700))