diff --git a/qlib/contrib/strategy/strategy.py b/qlib/contrib/strategy/strategy.py index 74df39f3e3..550ff649db 100644 --- a/qlib/contrib/strategy/strategy.py +++ b/qlib/contrib/strategy/strategy.py @@ -7,7 +7,6 @@ import pandas as pd from ..backtest.order import Order -from ...utils import get_pre_trading_date from .order_generator import OrderGenWInteract @@ -390,11 +389,11 @@ def filter_stock(l): current_stock_list = current_temp.get_stock_list() value = cash * self.risk_degree / len(buy) if len(buy) > 0 else 0 - # open_cost should be considered in the real trading environment, while the backtest in evaluate.py does not consider it - # as the aim of demo is to accomplish same strategy as evaluate.py, so comment out this line + # open_cost should be considered in the real trading environment, while the backtest in evaluate.py does not + # consider it as the aim of demo is to accomplish same strategy as evaluate.py, so comment out this line # value = value / (1+trade_exchange.open_cost) # set open_cost limit for code in buy: - # check is stock supended + # check is stock suspended if not trade_exchange.is_stock_tradable(stock_id=code, trade_date=trade_date): continue # buy order diff --git a/qlib/model/base.py b/qlib/model/base.py index 5a295787f7..3708298d5c 100644 --- a/qlib/model/base.py +++ b/qlib/model/base.py @@ -43,7 +43,8 @@ def fit(self, dataset: Dataset): # get weights try: - wdf_train, wdf_valid = dataset.prepare(["train", "valid"], col_set=["weight"], data_key=DataHandlerLP.DK_L) + wdf_train, wdf_valid = dataset.prepare(["train", "valid"], col_set=["weight"], + data_key=DataHandlerLP.DK_L) w_train, w_valid = wdf_train["weight"], wdf_valid["weight"] except KeyError as e: w_train = pd.DataFrame(np.ones_like(y_train.values), index=y_train.index) diff --git a/qlib/model/riskmodel/__init__.py b/qlib/model/riskmodel/__init__.py new file mode 100644 index 0000000000..05af6b7d37 --- /dev/null +++ b/qlib/model/riskmodel/__init__.py @@ -0,0 +1,7 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +from .base import RiskModel +from .poet import POETCovEstimator +from .shrink import ShrinkCovEstimator +from .structured import StructuredCovEstimator diff --git a/qlib/model/riskmodel/base.py b/qlib/model/riskmodel/base.py new file mode 100644 index 0000000000..bb067e3d58 --- /dev/null +++ b/qlib/model/riskmodel/base.py @@ -0,0 +1,147 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +import inspect +import numpy as np +import pandas as pd +from typing import Union + +from qlib.model.base import BaseModel + + +class RiskModel(BaseModel): + """Risk Model + + A risk model is used to estimate the covariance matrix of stock returns. + """ + + MASK_NAN = "mask" + FILL_NAN = "fill" + IGNORE_NAN = "ignore" + + def __init__(self, nan_option: str = "ignore", assume_centered: bool = False, scale_return: bool = True): + """ + Args: + nan_option (str): nan handling option (`ignore`/`mask`/`fill`). + assume_centered (bool): whether the data is assumed to be centered. + scale_return (bool): whether scale returns as percentage. + """ + # nan + assert nan_option in [ + self.MASK_NAN, + self.FILL_NAN, + self.IGNORE_NAN, + ], f"`nan_option={nan_option}` is not supported" + self.nan_option = nan_option + + self.assume_centered = assume_centered + self.scale_return = scale_return + + def predict( + self, + X: Union[pd.Series, pd.DataFrame, np.ndarray], + return_corr: bool = False, + is_price: bool = True, + return_decomposed_components=False, + ) -> Union[pd.DataFrame, np.ndarray, tuple]: + """ + Args: + X (pd.Series, pd.DataFrame or np.ndarray): data from which to estimate the covariance, + with variables as columns and observations as rows. + return_corr (bool): whether return the correlation matrix. + is_price (bool): whether `X` contains price (if not assume stock returns). + return_decomposed_components (bool): whether return decomposed components of the covariance matrix. + + Returns: + pd.DataFrame or np.ndarray: estimated covariance (or correlation). + """ + assert ( + not return_corr or not return_decomposed_components + ), "Can only return either correlation matrix or decomposed components." + + # transform input into 2D array + if not isinstance(X, (pd.Series, pd.DataFrame)): + columns = None + else: + if isinstance(X.index, pd.MultiIndex): + if isinstance(X, pd.DataFrame): + X = X.iloc[:, 0].unstack(level="instrument") # always use the first column + else: + X = X.unstack(level="instrument") + else: + # X is 2D DataFrame + pass + columns = X.columns # will be used to restore dataframe + X = X.values + + # calculate pct_change + if is_price: + X = X[1:] / X[:-1] - 1 # NOTE: resulting `n - 1` rows + + # scale return + if self.scale_return: + X *= 100 + + # handle nan and centered + X = self._preprocess(X) + + # return decomposed components if needed + if return_decomposed_components: + assert ( + "return_decomposed_components" in inspect.getfullargspec(self._predict).args + ), "This risk model does not support return decomposed components of the covariance matrix " + + F, cov_b, var_u = self._predict(X, return_decomposed_components=True) + return F, cov_b, var_u + + # estimate covariance + S = self._predict(X) + + # return correlation if needed + if return_corr: + vola = np.sqrt(np.diag(S)) + corr = S / np.outer(vola, vola) + if columns is None: + return corr + return pd.DataFrame(corr, index=columns, columns=columns) + + # return covariance + if columns is None: + return S + return pd.DataFrame(S, index=columns, columns=columns) + + def _predict(self, X: np.ndarray) -> np.ndarray: + """covariance estimation implementation + + This method should be overridden by child classes. + + By default, this method implements the empirical covariance estimation. + + Args: + X (np.ndarray): data matrix containing multiple variables (columns) and observations (rows). + + Returns: + np.ndarray: covariance matrix. + """ + xTx = np.asarray(X.T.dot(X)) + N = len(X) + if isinstance(X, np.ma.MaskedArray): + M = 1 - X.mask + N = M.T.dot(M) # each pair has distinct number of samples + return xTx / N + + def _preprocess(self, X: np.ndarray) -> Union[np.ndarray, np.ma.MaskedArray]: + """handle nan and centerize data + + Note: + if `nan_option='mask'` then the returned array will be `np.ma.MaskedArray`. + """ + # handle nan + if self.nan_option == self.FILL_NAN: + X = np.nan_to_num(X) + elif self.nan_option == self.MASK_NAN: + X = np.ma.masked_invalid(X) + # centralize + if not self.assume_centered: + X = X - np.nanmean(X, axis=0) + return X diff --git a/qlib/model/riskmodel/poet.py b/qlib/model/riskmodel/poet.py new file mode 100644 index 0000000000..8403845558 --- /dev/null +++ b/qlib/model/riskmodel/poet.py @@ -0,0 +1,84 @@ +import numpy as np + +from qlib.model.riskmodel import RiskModel + + +class POETCovEstimator(RiskModel): + """Principal Orthogonal Complement Thresholding Estimator (POET) + + Reference: + [1] Fan, J., Liao, Y., & Mincheva, M. (2013). Large covariance estimation by thresholding principal orthogonal complements. + Journal of the Royal Statistical Society. Series B: Statistical Methodology, 75(4), 603–680. https://doi.org/10.1111/rssb.12016 + [2] http://econweb.rutgers.edu/yl1114/papers/poet/POET.m + """ + + THRESH_SOFT = "soft" + THRESH_HARD = "hard" + THRESH_SCAD = "scad" + + def __init__(self, num_factors: int = 0, thresh: float = 1.0, thresh_method: str = "soft", **kwargs): + """ + Args: + num_factors (int): number of factors (if set to zero, no factor model will be used). + thresh (float): the positive constant for thresholding. + thresh_method (str): thresholding method, which can be + - 'soft': soft thresholding. + - 'hard': hard thresholding. + - 'scad': scad thresholding. + kwargs: see `RiskModel` for more information. + """ + super().__init__(**kwargs) + + assert num_factors >= 0, "`num_factors` requires a positive integer" + self.num_factors = num_factors + + assert thresh >= 0, "`thresh` requires a positive float number" + self.thresh = thresh + + assert thresh_method in [ + self.THRESH_HARD, + self.THRESH_SOFT, + self.THRESH_SCAD, + ], "`thresh_method` should be `soft`/`hard`/`scad`" + self.thresh_method = thresh_method + + def _predict(self, X: np.ndarray) -> np.ndarray: + + Y = X.T # NOTE: to match POET's implementation + p, n = Y.shape + + if self.num_factors > 0: + Dd, V = np.linalg.eig(Y.T.dot(Y)) + V = V[:, np.argsort(Dd)] + F = V[:, -self.num_factors :][:, ::-1] * np.sqrt(n) + LamPCA = Y.dot(F) / n + uhat = np.asarray(Y - LamPCA.dot(F.T)) + Lowrank = np.asarray(LamPCA.dot(LamPCA.T)) + rate = 1 / np.sqrt(p) + np.sqrt(np.log(p) / n) + else: + uhat = np.asarray(Y) + rate = np.sqrt(np.log(p) / n) + Lowrank = 0 + + lamb = rate * self.thresh + SuPCA = uhat.dot(uhat.T) / n + SuDiag = np.diag(np.diag(SuPCA)) + R = np.linalg.inv(SuDiag ** 0.5).dot(SuPCA).dot(np.linalg.inv(SuDiag ** 0.5)) + + if self.thresh_method == self.THRESH_HARD: + M = R * (np.abs(R) > lamb) + elif self.thresh_method == self.THRESH_SOFT: + res = np.abs(R) - lamb + res = (res + np.abs(res)) / 2 + M = np.sign(R) * res + else: + M1 = (np.abs(R) < 2 * lamb) * np.sign(R) * (np.abs(R) - lamb) * (np.abs(R) > lamb) + M2 = (np.abs(R) < 3.7 * lamb) * (np.abs(R) >= 2 * lamb) * (2.7 * R - 3.7 * np.sign(R) * lamb) / 1.7 + M3 = (np.abs(R) >= 3.7 * lamb) * R + M = M1 + M2 + M3 + + Rthresh = M - np.diag(np.diag(M)) + np.eye(p) + SigmaU = (SuDiag ** 0.5).dot(Rthresh).dot(SuDiag ** 0.5) + SigmaY = SigmaU + Lowrank + + return SigmaY diff --git a/qlib/model/riskmodel.py b/qlib/model/riskmodel/shrink.py similarity index 57% rename from qlib/model/riskmodel.py rename to qlib/model/riskmodel/shrink.py index 07a1e0c9f6..3cb2620d1b 100644 --- a/qlib/model/riskmodel.py +++ b/qlib/model/riskmodel/shrink.py @@ -1,133 +1,7 @@ -# Copyright (c) Microsoft Corporation. -# Licensed under the MIT License. - -import warnings import numpy as np -import pandas as pd - from typing import Union -from qlib.model.base import BaseModel - - -class RiskModel(BaseModel): - """Risk Model - - A risk model is used to estimate the covariance matrix of stock returns. - """ - - MASK_NAN = "mask" - FILL_NAN = "fill" - IGNORE_NAN = "ignore" - - def __init__(self, nan_option: str = "ignore", assume_centered: bool = False, scale_return: bool = True): - """ - Args: - nan_option (str): nan handling option (`ignore`/`mask`/`fill`). - assume_centered (bool): whether the data is assumed to be centered. - scale_return (bool): whether scale returns as percentage. - """ - # nan - assert nan_option in [ - self.MASK_NAN, - self.FILL_NAN, - self.IGNORE_NAN, - ], f"`nan_option={nan_option}` is not supported" - self.nan_option = nan_option - - self.assume_centered = assume_centered - self.scale_return = scale_return - - def predict( - self, X: Union[pd.Series, pd.DataFrame, np.ndarray], return_corr: bool = False, is_price: bool = True - ) -> Union[pd.DataFrame, np.ndarray]: - """ - Args: - X (pd.Series, pd.DataFrame or np.ndarray): data from which to estimate the covariance, - with variables as columns and observations as rows. - return_corr (bool): whether return the correlation matrix. - is_price (bool): whether `X` contains price (if not assume stock returns). - - Returns: - pd.DataFrame or np.ndarray: estimated covariance (or correlation). - """ - # transform input into 2D array - if not isinstance(X, (pd.Series, pd.DataFrame)): - columns = None - else: - if isinstance(X.index, pd.MultiIndex): - if isinstance(X, pd.DataFrame): - X = X.iloc[:, 0].unstack(level="instrument") # always use the first column - else: - X = X.unstack(level="instrument") - else: - # X is 2D DataFrame - pass - columns = X.columns # will be used to restore dataframe - X = X.values - - # calculate pct_change - if is_price: - X = X[1:] / X[:-1] - 1 # NOTE: resulting `n - 1` rows - - # scale return - if self.scale_return: - X *= 100 - - # handle nan and centered - X = self._preprocess(X) - - # estimate covariance - S = self._predict(X) - - # return correlation if needed - if return_corr: - vola = np.sqrt(np.diag(S)) - corr = S / np.outer(vola, vola) - if columns is None: - return corr - return pd.DataFrame(corr, index=columns, columns=columns) - - # return covariance - if columns is None: - return S - return pd.DataFrame(S, index=columns, columns=columns) - - def _predict(self, X: np.ndarray) -> np.ndarray: - """covariance estimation implementation - - This method should be overridden by child classes. - - By default, this method implements the empirical covariance estimation. - - Args: - X (np.ndarray): data matrix containing multiple variables (columns) and observations (rows). - - Returns: - np.ndarray: covariance matrix. - """ - xTx = np.asarray(X.T.dot(X)) - N = len(X) - if isinstance(X, np.ma.MaskedArray): - M = 1 - X.mask - N = M.T.dot(M) # each pair has distinct number of samples - return xTx / N - - def _preprocess(self, X: np.ndarray) -> Union[np.ndarray, np.ma.MaskedArray]: - """handle nan and centerize data - - Note: - if `nan_option='mask'` then the returned array will be `np.ma.MaskedArray`. - """ - # handle nan - if self.nan_option == self.FILL_NAN: - X = np.nan_to_num(X) - elif self.nan_option == self.MASK_NAN: - X = np.ma.masked_invalid(X) - # centerize - if not self.assume_centered: - X = X - np.nanmean(X, axis=0) - return X +from qlib.model.riskmodel import RiskModel class ShrinkCovEstimator(RiskModel): @@ -162,8 +36,9 @@ class ShrinkCovEstimator(RiskModel): [3] Ledoit, O., & Wolf, M. (2003). Improved estimation of the covariance matrix of stock returns with an application to portfolio selection. Journal of Empirical Finance, 10(5), 603–621. https://doi.org/10.1016/S0927-5398(03)00007-0 - [4] Chen, Y., Wiesel, A., Eldar, Y. C., & Hero, A. O. (2010). Shrinkage algorithms for MMSE covariance estimation. - IEEE Transactions on Signal Processing, 58(10), 5016–5029. https://doi.org/10.1109/TSP.2010.2053029 + [4] Chen, Y., Wiesel, A., Eldar, Y. C., & Hero, A. O. (2010). Shrinkage algorithms for MMSE covariance + estimation. IEEE Transactions on Signal Processing, 58(10), 5016–5029. + https://doi.org/10.1109/TSP.2010.2053029 [5] https://www.econ.uzh.ch/dam/jcr:ffffffff-935a-b0d6-0000-00007f64e5b9/cov1para.m.zip [6] https://www.econ.uzh.ch/dam/jcr:ffffffff-935a-b0d6-ffff-ffffde5e2d4e/covCor.m.zip [7] https://www.econ.uzh.ch/dam/jcr:ffffffff-935a-b0d6-0000-0000648dfc98/covMarket.m.zip @@ -384,84 +259,3 @@ def _get_shrink_param_lw_single_factor(self, X: np.ndarray, S: np.ndarray, F: np alpha = max(0, min(1, kappa / t)) return alpha - - -class POETCovEstimator(RiskModel): - """Principal Orthogonal Complement Thresholding Estimator (POET) - - Reference: - [1] Fan, J., Liao, Y., & Mincheva, M. (2013). Large covariance estimation by thresholding principal orthogonal complements. - Journal of the Royal Statistical Society. Series B: Statistical Methodology, 75(4), 603–680. https://doi.org/10.1111/rssb.12016 - [2] http://econweb.rutgers.edu/yl1114/papers/poet/POET.m - """ - - THRESH_SOFT = "soft" - THRESH_HARD = "hard" - THRESH_SCAD = "scad" - - def __init__(self, num_factors: int = 0, thresh: float = 1.0, thresh_method: str = "soft", **kwargs): - """ - Args: - num_factors (int): number of factors (if set to zero, no factor model will be used). - thresh (float): the positive constant for thresholding. - thresh_method (str): thresholding method, which can be - - 'soft': soft thresholding. - - 'hard': hard thresholding. - - 'scad': scad thresholding. - kwargs: see `RiskModel` for more information. - """ - super().__init__(**kwargs) - - assert num_factors >= 0, "`num_factors` requires a positive integer" - self.num_factors = num_factors - - assert thresh >= 0, "`thresh` requires a positive float number" - self.thresh = thresh - - assert thresh_method in [ - self.THRESH_HARD, - self.THRESH_SOFT, - self.THRESH_SCAD, - ], "`thresh_method` should be `soft`/`hard`/`scad`" - self.thresh_method = thresh_method - - def _predict(self, X: np.ndarray) -> np.ndarray: - - Y = X.T # NOTE: to match POET's implementation - p, n = Y.shape - - if self.num_factors > 0: - Dd, V = np.linalg.eig(Y.T.dot(Y)) - V = V[:, np.argsort(Dd)] - F = V[:, -self.num_factors :][:, ::-1] * np.sqrt(n) - LamPCA = Y.dot(F) / n - uhat = np.asarray(Y - LamPCA.dot(F.T)) - Lowrank = np.asarray(LamPCA.dot(LamPCA.T)) - rate = 1 / np.sqrt(p) + np.sqrt(np.log(p) / n) - else: - uhat = np.asarray(Y) - rate = np.sqrt(np.log(p) / n) - Lowrank = 0 - - lamb = rate * self.thresh - SuPCA = uhat.dot(uhat.T) / n - SuDiag = np.diag(np.diag(SuPCA)) - R = np.linalg.inv(SuDiag ** 0.5).dot(SuPCA).dot(np.linalg.inv(SuDiag ** 0.5)) - - if self.thresh_method == self.THRESH_HARD: - M = R * (np.abs(R) > lamb) - elif self.thresh_method == self.THRESH_SOFT: - res = np.abs(R) - lamb - res = (res + np.abs(res)) / 2 - M = np.sign(R) * res - else: - M1 = (np.abs(R) < 2 * lamb) * np.sign(R) * (np.abs(R) - lamb) * (np.abs(R) > lamb) - M2 = (np.abs(R) < 3.7 * lamb) * (np.abs(R) >= 2 * lamb) * (2.7 * R - 3.7 * np.sign(R) * lamb) / 1.7 - M3 = (np.abs(R) >= 3.7 * lamb) * R - M = M1 + M2 + M3 - - Rthresh = M - np.diag(np.diag(M)) + np.eye(p) - SigmaU = (SuDiag ** 0.5).dot(Rthresh).dot(SuDiag ** 0.5) - SigmaY = SigmaU + Lowrank - - return SigmaY diff --git a/qlib/model/riskmodel/structured.py b/qlib/model/riskmodel/structured.py new file mode 100644 index 0000000000..878503401f --- /dev/null +++ b/qlib/model/riskmodel/structured.py @@ -0,0 +1,84 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +import numpy as np +import pandas as pd +from typing import Union +from sklearn.decomposition import PCA, FactorAnalysis + +from qlib.model.riskmodel import RiskModel + + +class StructuredCovEstimator(RiskModel): + """Structured Covariance Estimator + + This estimator assumes observations can be predicted by multiple factors + X = FB + U + where `F` can be specified by explicit risk factors or latent factors. + + Therefore the structured covariance can be estimated by + cov(X) = F cov(B) F.T + cov(U) + + We use latent factor models to estimate the structured covariance. + Specifically, the following latent factor models are supported: + - `pca`: Principal Component Analysis + - `fa`: Factor Analysis + + Reference: [1] Fan, J., Liao, Y., & Liu, H. (2016). An overview of the estimation of large covariance and + precision matrices. Econometrics Journal, 19(1), C1–C32. https://doi.org/10.1111/ectj.12061 + """ + + FACTOR_MODEL_PCA = "pca" + FACTOR_MODEL_FA = "fa" + DEFAULT_NAN_OPTION = "fill" + + def __init__(self, factor_model: str = "pca", num_factors: int = 10, **kwargs): + """ + Args: + factor_model (str): the latent factor models used to estimate the structured covariance (`pca`/`fa`). + num_factors (int): number of components to keep. + kwargs: see `RiskModel` for more information + """ + if "nan_option" in kwargs.keys(): + assert kwargs["nan_option"] in [self.DEFAULT_NAN_OPTION], "nan_option={} is not supported".format( + kwargs["nan_option"] + ) + else: + kwargs["nan_option"] = self.DEFAULT_NAN_OPTION + + super().__init__(**kwargs) + + assert factor_model in [ + self.FACTOR_MODEL_PCA, + self.FACTOR_MODEL_FA, + ], "factor_model={} is not supported".format(factor_model) + self.solver = PCA if factor_model == self.FACTOR_MODEL_PCA else FactorAnalysis + + self.num_factors = num_factors + + def _predict(self, X: np.ndarray, return_decomposed_components=False) -> Union[np.ndarray, tuple]: + """ + covariance estimation implementation + + Args: + X (np.ndarray): data matrix containing multiple variables (columns) and observations (rows). + return_decomposed_components (bool): whether return decomposed components of the covariance matrix. + + Returns: + tuple or np.ndarray: decomposed covariance matrix or covariance matrix. + """ + + model = self.solver(self.num_factors, random_state=0).fit(X) + + F = model.components_.T # num_features x num_factors + B = model.transform(X) # num_samples x num_factors + U = X - B @ F.T + cov_b = np.cov(B.T) # num_factors x num_factors + var_u = np.var(U, axis=0) # diagonal + + if return_decomposed_components: + return F, cov_b, var_u + + cov_x = F @ cov_b @ F.T + np.diag(var_u) + + return cov_x diff --git a/qlib/portfolio/__init__.py b/qlib/portfolio/__init__.py index e69de29bb2..59e481eb93 100644 --- a/qlib/portfolio/__init__.py +++ b/qlib/portfolio/__init__.py @@ -0,0 +1,2 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. diff --git a/qlib/portfolio/optimizer/__init__.py b/qlib/portfolio/optimizer/__init__.py new file mode 100644 index 0000000000..5080b9a469 --- /dev/null +++ b/qlib/portfolio/optimizer/__init__.py @@ -0,0 +1,6 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +from .base import BaseOptimizer +from .optimizer import PortfolioOptimizer +from .enhanced_indexing import EnhancedIndexingOptimizer diff --git a/qlib/portfolio/optimizer/base.py b/qlib/portfolio/optimizer/base.py new file mode 100644 index 0000000000..502443869d --- /dev/null +++ b/qlib/portfolio/optimizer/base.py @@ -0,0 +1,13 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +import abc + + +class BaseOptimizer(abc.ABC): + """ Construct portfolio with a optimization related method """ + + @abc.abstractmethod + def __call__(self, *args, **kwargs) -> object: + """ Generate a optimized portfolio allocation """ + pass diff --git a/qlib/portfolio/optimizer/enhanced_indexing.py b/qlib/portfolio/optimizer/enhanced_indexing.py new file mode 100644 index 0000000000..5a7a0804db --- /dev/null +++ b/qlib/portfolio/optimizer/enhanced_indexing.py @@ -0,0 +1,143 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +import numpy as np +import cvxpy as cp +import pandas as pd +from typing import Union + +from qlib.portfolio.optimizer import BaseOptimizer + + +class EnhancedIndexingOptimizer(BaseOptimizer): + """ + Portfolio Optimizer with Enhanced Indexing + + Note: + This optimizer always assumes full investment and no-shorting. + """ + + START_FROM_W0 = "w0" + START_FROM_BENCH = "benchmark" + + def __init__( + self, + lamb: float = 10, + delta: float = 0.4, + bench_dev: float = 0.01, + inds_dev: float = None, + scale_alpha: bool = True, + verbose: bool = False, + warm_start: str = None, + max_iters: int = 10000, + ): + """ + Args: + lamb (float): risk aversion parameter (larger `lamb` means less focus on return) + delta (float): turnover rate limit + bench_dev (float): benchmark deviation limit + inds_dev (float/None): industry deviation limit, set `inds_dev` to None to ignore industry specific + restriction + scale_alpha (bool): if to scale alpha to match the volatility of the covariance matrix + verbose (bool): if print detailed information about the solver + warm_start (str): whether try to warm start (`w0`/`benchmark`/``) + (https://www.cvxpy.org/tutorial/advanced/index.html#warm-start) + """ + + assert lamb >= 0, "risk aversion parameter `lamb` should be positive" + self.lamb = lamb + + assert delta >= 0, "turnover limit `delta` should be positive" + self.delta = delta + + assert bench_dev >= 0, "benchmark deviation limit `bench_dev` should be positive" + self.bench_dev = bench_dev + + assert inds_dev is None or inds_dev >= 0, "industry deviation limit `inds_dev` should be positive or None." + self.inds_dev = inds_dev + + assert warm_start in [ + None, + self.START_FROM_W0, + self.START_FROM_BENCH, + ], "illegal warm start option" + self.start_from_w0 = warm_start == self.START_FROM_W0 + self.start_from_bench = warm_start == self.START_FROM_BENCH + + self.scale_alpha = scale_alpha + self.verbose = verbose + self.max_iters = max_iters + + def __call__( + self, + u: Union[np.ndarray, pd.Series], + F: np.ndarray, + covB: np.ndarray, + varU: np.ndarray, + w0: np.ndarray, + w_bench: np.ndarray, + inds_onehot: np.ndarray = None, + ) -> Union[np.ndarray, pd.Series]: + """ + Args: + u (np.ndarray or pd.Series): expected returns (a.k.a., alpha) + F, covB, varU (np.ndarray): see StructuredCovEstimator + w0 (np.ndarray): initial weights (for turnover control) + w_bench (np.ndarray): benchmark weights + inds_onehot (np.ndarray): industry (onehot) + + Returns: + np.ndarray or pd.Series: optimized portfolio allocation + """ + assert inds_onehot is not None or self.inds_dev is None, "Industry onehot vector is required." + + # transform dataframe into array + if isinstance(u, pd.Series): + u = u.values + + # scale alpha to match volatility + if self.scale_alpha: + u = u / u.std() + x_variance = np.mean(np.diag(F @ covB @ F.T) + varU) + u *= x_variance ** 0.5 + + w = cp.Variable(len(u)) # num_assets + v = w @ F # num_factors + ret = w @ u + risk = cp.quad_form(v, covB) + cp.sum(cp.multiply(varU, w ** 2)) + obj = cp.Maximize(ret - self.lamb * risk) + d_bench = w - w_bench + cons = [ + w >= 0, + cp.sum(w) == 1, + d_bench >= -self.bench_dev, + d_bench <= self.bench_dev, + ] + + if self.inds_dev is not None: + d_inds = d_bench @ inds_onehot + cons.append(d_inds >= -self.inds_dev) + cons.append(d_inds <= self.inds_dev) + + if w0 is not None: + turnover = cp.sum(cp.abs(w - w0)) + cons.append(turnover <= self.delta) + + warm_start = False + if self.start_from_w0: + if w0 is None: + print("Warning: try warm start with w0, but w0 is `None`.") + else: + w.value = w0 + warm_start = True + elif self.start_from_bench: + w.value = w_bench + warm_start = True + + prob = cp.Problem(obj, cons) + prob.solve(solver=cp.SCS, verbose=self.verbose, warm_start=warm_start, max_iters=self.max_iters) + + if prob.status != "optimal": + print("Warning: solve failed.", prob.status) + + return np.asarray(w.value) diff --git a/qlib/portfolio/optimizer.py b/qlib/portfolio/optimizer/optimizer.py similarity index 92% rename from qlib/portfolio/optimizer.py rename to qlib/portfolio/optimizer/optimizer.py index 0e7d272545..54648a46ac 100644 --- a/qlib/portfolio/optimizer.py +++ b/qlib/portfolio/optimizer/optimizer.py @@ -1,15 +1,17 @@ # Copyright (c) Microsoft Corporation. # Licensed under the MIT License. + import warnings import numpy as np import pandas as pd import scipy.optimize as so - from typing import Optional, Union, Callable, List +from qlib.portfolio.optimizer import BaseOptimizer + -class PortfolioOptimizer: +class PortfolioOptimizer(BaseOptimizer): """Portfolio Optimizer The following optimization algorithms are supported: @@ -42,6 +44,7 @@ def __init__( lamb (float): risk aversion parameter (larger `lamb` means more focus on return) delta (float): turnover rate limit alpha (float): l2 norm regularizer + scale_alpha (bool): if to scale alpha to match the volatility of the covariance matrix tol (float): tolerance for optimization termination """ assert method in [self.OPT_GMV, self.OPT_MVO, self.OPT_RP, self.OPT_INV], f"method `{method}` is not supported" @@ -57,6 +60,7 @@ def __init__( self.alpha = alpha self.tol = tol + self.scale_alpha = scale_alpha def __call__( self, @@ -83,18 +87,18 @@ def __call__( if u is not None: assert len(u) == len(S), "`u` has mismatched shape" if isinstance(u, pd.Series): - assert all(u.index == index), "`u` has mismatched index" + assert u.index.equals(index), "`u` has mismatched index" u = u.values # transform initial weights if w0 is not None: assert len(w0) == len(S), "`w0` has mismatched shape" if isinstance(w0, pd.Series): - assert all(w0.index == index), "`w0` has mismatched index" + assert w0.index.equals(index), "`w0` has mismatched index" w0 = w0.values # scale alpha to match volatility - if u is not None: + if u is not None and self.scale_alpha: u = u / u.std() u *= np.mean(np.diag(S)) ** 0.5 @@ -173,7 +177,7 @@ def _optimize_rp(self, S: np.ndarray, w0: Optional[np.ndarray] = None) -> np.nda """ return self._solve(len(S), self._get_objective_rp(S), *self._get_constrains(w0)) - def _get_objective_gmv(self, S: np.ndarray) -> np.ndarray: + def _get_objective_gmv(self, S: np.ndarray) -> Callable: """global minimum variance optimization objective Optimization objective @@ -185,7 +189,7 @@ def func(x): return func - def _get_objective_mvo(self, S: np.ndarray, u: np.ndarray = None) -> np.ndarray: + def _get_objective_mvo(self, S: np.ndarray, u: np.ndarray = None) -> Callable: """mean-variance optimization objective Optimization objective @@ -199,7 +203,7 @@ def func(x): return func - def _get_objective_rp(self, S: np.ndarray) -> np.ndarray: + def _get_objective_rp(self, S: np.ndarray) -> Callable: """risk-parity optimization objective Optimization objective @@ -247,7 +251,11 @@ def _solve(self, n: int, obj: Callable, bounds: so.Bounds, cons: List) -> np.nda # add l2 regularization wrapped_obj = obj if self.alpha > 0: - wrapped_obj = lambda x: obj(x) + self.alpha * np.sum(np.square(x)) + + def opt_obj(x): + return obj(x) + self.alpha * np.sum(np.square(x)) + + wrapped_obj = opt_obj # solve x0 = np.ones(n) / n # init results diff --git a/setup.py b/setup.py index f759945fd5..83cf6e1b60 100644 --- a/setup.py +++ b/setup.py @@ -55,6 +55,7 @@ "tornado", "joblib>=0.17.0", "ruamel.yaml>=0.16.12", + "scikit-learn>=0.22", ] # Numpy include diff --git a/tests/test_structured_cov_estimator.py b/tests/test_structured_cov_estimator.py new file mode 100644 index 0000000000..494962cc33 --- /dev/null +++ b/tests/test_structured_cov_estimator.py @@ -0,0 +1,111 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +import unittest +import numpy as np +from scipy.linalg import sqrtm + +from qlib.model.riskmodel import StructuredCovEstimator + + +class TestStructuredCovEstimator(unittest.TestCase): + def test_random_covariance(self): + # Try to estimate the covariance from a randomly generated matrix. + NUM_VARIABLE = 10 + NUM_OBSERVATION = 200 + EPS = 1e-6 + + estimator = StructuredCovEstimator(scale_return=False, assume_centered=True) + + X = np.random.rand(NUM_OBSERVATION, NUM_VARIABLE) + + est_cov = estimator.predict(X, is_price=False) + np_cov = np.cov(X.T) # While numpy assume row means variable, qlib assume the other wise. + + delta = abs(est_cov - np_cov) + if_identical = (delta < EPS).all() + + self.assertTrue(if_identical) + + def test_nan_option_covariance(self): + # Test if nan_option is correctly passed. + NUM_VARIABLE = 10 + NUM_OBSERVATION = 200 + EPS = 1e-6 + + estimator = StructuredCovEstimator(scale_return=False, assume_centered=True, nan_option="fill") + + X = np.random.rand(NUM_OBSERVATION, NUM_VARIABLE) + + est_cov = estimator.predict(X, is_price=False) + np_cov = np.cov(X.T) # While numpy assume row means variable, qlib assume the other wise. + + delta = abs(est_cov - np_cov) + if_identical = (delta < EPS).all() + + self.assertTrue(if_identical) + + def test_decompose_covariance(self): + # Test if return_decomposed_components is correctly passed. + NUM_VARIABLE = 10 + NUM_OBSERVATION = 200 + + estimator = StructuredCovEstimator(scale_return=False, assume_centered=True, nan_option="fill") + + X = np.random.rand(NUM_OBSERVATION, NUM_VARIABLE) + + F, cov_b, var_u = estimator.predict(X, is_price=False, return_decomposed_components=True) + + self.assertTrue(F is not None and cov_b is not None and var_u is not None) + + def test_constructed_covariance(self): + # Try to estimate the covariance from a specially crafted matrix. + # There should be some significant correlation since X is specially crafted. + NUM_VARIABLE = 7 + NUM_OBSERVATION = 500 + EPS = 0.1 + + estimator = StructuredCovEstimator(scale_return=False, assume_centered=True, num_factors=NUM_VARIABLE - 1) + + sqrt_cov = None + while sqrt_cov is None or (np.iscomplex(sqrt_cov)).any(): + cov = np.random.rand(NUM_VARIABLE, NUM_VARIABLE) + for i in range(NUM_VARIABLE): + cov[i][i] = 1 + sqrt_cov = sqrtm(cov) + X = np.random.rand(NUM_OBSERVATION, NUM_VARIABLE) @ sqrt_cov + + est_cov = estimator.predict(X, is_price=False) + np_cov = np.cov(X.T) # While numpy assume row means variable, qlib assume the other wise. + + delta = abs(est_cov - np_cov) + if_identical = (delta < EPS).all() + + self.assertTrue(if_identical) + + def test_decomposition(self): + # Try to estimate the covariance from a specially crafted matrix. + # The matrix is generated in the assumption that observations can be predicted by multiple factors. + NUM_VARIABLE = 30 + NUM_OBSERVATION = 100 + NUM_FACTOR = 10 + EPS = 0.1 + + estimator = StructuredCovEstimator(scale_return=False, assume_centered=True, num_factors=NUM_FACTOR) + + F = np.random.rand(NUM_VARIABLE, NUM_FACTOR) + B = np.random.rand(NUM_FACTOR, NUM_OBSERVATION) + U = np.random.rand(NUM_OBSERVATION, NUM_VARIABLE) + X = (F @ B).T + U + + est_cov = estimator.predict(X, is_price=False) + np_cov = np.cov(X.T) # While numpy assume row means variable, qlib assume the other wise. + + delta = abs(est_cov - np_cov) + if_identical = (delta < EPS).all() + + self.assertTrue(if_identical) + + +if __name__ == "__main__": + unittest.main()