"""
Logic regarding return and time decay attribution for sample weights from chapter 4.
"""
import numpy as np
import pandas as pd
from mlfinpy.sampling.concurrent import (
get_av_uniqueness_from_triple_barrier,
num_concurrent_events,
)
from mlfinpy.util.multiprocess import mp_pandas_obj
def _apply_weight_by_return(
label_endtime: pd.Series,
num_conc_events: pd.Series,
close_series: pd.Series,
molecule: np.ndarray,
) -> pd.Series:
"""
Determination of Sample Weight by Absolute Return Attribution
Derives sample weights based on concurrency and return. Works on a set of
datetime index values (molecule). This allows the program to parallelize the processing.
Parameters
----------
label_endtime : pd.Series
Label endtime series (t1 for triple barrier events).
num_conc_events : pd.Series
Number of concurrent labels (output from num_concurrent_events function).
close_series : pd.Series
Close prices series.
molecule : np.ndarray
A set of datetime index values for processing.
Returns
-------
pd.Series
Sample weights based on number return and concurrency for molecule
Notes
-----
Reference: Advances in Financial Machine Learning, Snippet 4.10, page 69.
"""
ret = np.log(close_series).diff() # Log-returns, so that they are additive
weights = pd.Series(index=molecule, dtype="float64")
for t_in, t_out in label_endtime.loc[weights.index].items():
# Weights depend on returns and label concurrency
weights.loc[t_in] = (ret.loc[t_in:t_out] / num_conc_events.loc[t_in:t_out]).sum()
return weights.abs()
[docs]
def get_weights_by_return(
triple_barrier_events: pd.DataFrame,
close_series: pd.Series,
num_threads: int = 5,
verbose: bool = True,
) -> pd.Series:
"""
Determination of Sample Weight by Absolute Return Attribution
This function is orchestrator for generating sample weights based on return using ``mp_pandas_obj``.
Parameters
----------
triple_barrier_events : pd.DataFrame
Events from ``labeling.get_events()``.
close_series : pd.Series
Close prices series.
num_threads : int, optional
The number of threads concurrently used by the function. Default is 5.
verbose : bool, optional
Flag to report progress on asynch jobs. Default is True.
Returns
-------
pd.Series
Sample weights based on number return and concurrency.
Notes
-----
Reference: Advances in Financial Machine Learning, Snippet 4.10, page 69.
"""
has_null_events = bool(triple_barrier_events.isnull().values.any())
has_null_index = bool(triple_barrier_events.index.isnull().any())
assert has_null_events is False and has_null_index is False, "NaN values in ``triple_barrier_events``, delete nans"
num_conc_events = mp_pandas_obj(
num_concurrent_events,
("molecule", triple_barrier_events.index),
num_threads,
close_series_index=close_series.index,
label_endtime=triple_barrier_events["t1"],
verbose=verbose,
)
num_conc_events = num_conc_events.loc[~num_conc_events.index.duplicated(keep="last")]
num_conc_events = num_conc_events.reindex(close_series.index).fillna(0)
weights = mp_pandas_obj(
_apply_weight_by_return,
("molecule", triple_barrier_events.index),
num_threads,
label_endtime=triple_barrier_events["t1"],
num_conc_events=num_conc_events,
close_series=close_series,
verbose=verbose,
)
weights *= weights.shape[0] / weights.sum()
return weights
[docs]
def get_weights_by_time_decay(
triple_barrier_events: pd.DataFrame,
close_series: pd.Series,
num_threads: int = 5,
decay: float = 1,
verbose: bool = True,
) -> pd.Series:
"""
Implementation of Time Decay Factors
Parameters
----------
triple_barrier_events : pd.DataFrame
Events from labeling.get_events()
close_series : pd.Series
Close prices
num_threads : int
The number of threads concurrently used by the function.
decay : float
Decay factor:
- ``decay`` = 1 means there is no time decay
- 0 < ``decay`` < 1 means that weights decay linearly over time, but every observation still receives a strictly
positive weight, regadless of how old
- ``decay`` = 0 means that weights converge linearly to zero, as they become older
- ``decay`` < 0 means that the oldes portion c of the observations receive zero weight (i.e they are
erased from memory)
verbose : bool
Flag to report progress on asynch jobs
Returns
-------
pd.Series
Sample weights based on time decay factors
Notes
-----
Reference: Advances in Financial Machine Learning, Snippet 4.10, page 69.
"""
assert (
bool(triple_barrier_events.isnull().values.any()) is False
and bool(triple_barrier_events.index.isnull().any()) is False
), "NaN values in triple_barrier_events, delete nans"
# Apply piecewise-linear decay to observed uniqueness
# Newest observation gets weight=1, oldest observation gets weight=decay
av_uniqueness = get_av_uniqueness_from_triple_barrier(triple_barrier_events, close_series, num_threads, verbose)
decay_w = av_uniqueness["tW"].sort_index().cumsum()
if decay >= 0:
slope = (1 - decay) / decay_w.iloc[-1]
else:
slope = 1 / ((decay + 1) * decay_w.iloc[-1])
const = 1 - slope * decay_w.iloc[-1]
decay_w = const + slope * decay_w
decay_w[decay_w < 0] = 0 # Weights can't be negative
return decay_w