Module mean_variance_hedge.dynamic_programming
Based on
Černý, Aleš, Dynamic Programming and Mean-Variance Hedging in Discrete Time (October 1, 2003). Applied Mathematical Finance, 2004, 11(1), 1-25, Available at SSRN: https://ssrn.com/abstract=561223 or http://dx.doi.org/10.2139/ssrn.561223
Expand source code
"""
Based on
Černý, Aleš, Dynamic Programming and Mean-Variance Hedging in Discrete Time
(October 1, 2003). Applied Mathematical Finance, 2004, 11(1), 1-25,
Available at SSRN: https://ssrn.com/abstract=561223
or http://dx.doi.org/10.2139/ssrn.561223
"""
import numpy as np
import matplotlib.pyplot as plt
import itertools
from scipy.stats import norm
from scipy.special import comb
def possible_nodes(log_ret_space, T, scale_factor):
"""
Inputs:
+ log_ret_space: array of log-returns
+ T: time at maturity
Outputs:
+ attainable_nodes: Attainable nodes (log-returns) as a dictionary indexed by time t = 0, ... , T.
The entries of attainable_nodes[t] would be the attainable log-returns at time t
Note: It is recommendeded to space log-returns equally so they recombine,
and scale log-returns so that they are integers
example: [-6, -4, -2, 0, 2, 4, 6]
"""
assert len(log_ret_space) > 0
log_ret_space2 = [int(round(x * scale_factor)) for x in log_ret_space]
attainable_nodes = {}
attainable_nodes[0] = [0]
for i in range(1, T + 1):
attainable_nodes[i] = set()
for x in log_ret_space2:
for val in attainable_nodes[i - 1]:
if val + x not in attainable_nodes[i]:
attainable_nodes[i].add(val + x)
return attainable_nodes
def calc_variance_optimal_measure(ret_space, rf, p_probs):
"""
Inputs:
+ ret_space: array of returns, i.e. exp(log_returns)
+ rf: risk-free return
+ p_probs: probabilitites under the physical measure
Outputs:
+ a, b, m: quantities required to calculate `q_probs`
+ q_probs: the probabilities under the variance optimal probabilities
"""
assert len(ret_space) > 0
assert len(ret_space) == len(p_probs)
assert abs(sum(p_probs) - 1) < 0.001
a = np.dot(p_probs, ret_space - rf) / np.dot(p_probs, (ret_space - rf) ** 2)
b = 1 - np.dot(p_probs, (ret_space - rf)) ** 2 / np.dot(p_probs, (ret_space - rf) ** 2)
m = (1 - a * (ret_space - rf) / b)
q_probs = m * p_probs
q_probs = q_probs / np.sum(q_probs) # ensure Q-probs sum to 1
return a, b, m, q_probs
def calc_mean_value_process(attainable_nodes, S0, K, rf, log_ret_space, T, scale_factor, q_probs):
"""
Inputs:
+ attainable_nodes: attainable log-returns indexed by time t, from the `possible_nodes` function
+ S0: Initial asset price, e.g. 100
+ K: Strike
+ scale_factor: factor to divide log-return indices by, e.g. 100
+ rf: risk-free return (discrete), e.g. (1.001)
+ log_ret_space: array of log-returns
+ T: time at maturity
+ q_probs: Variance-optimal probabilities
Output:
+ Hts: Mean-Value process Ht as dictionary, indexed by time t and log-returns
"""
N_STATES = len(log_ret_space)
log_ret_space2 = [round(x * scale_factor) for x in log_ret_space]
Hts = {}
Hts[T] = {}
for x in attainable_nodes[T]:
Sts = S0 * np.exp(x / scale_factor)
Hts[T][x] = np.maximum(Sts - K, 0)
# calculate node-values iteratively in reverse
for t in range(T - 1, -1, -1):
Hts[t] = {}
for x in attainable_nodes[t]:
Htp1 = [Hts[t + 1][x + log_ret_space2[j]] for j in range(N_STATES)]
Hts[t][x] = np.dot(q_probs, Htp1) / rf
return Hts
def calc_dynamic_deltas(attainable_nodes, Hts, S0, rf, log_ret_space, T, scale_factor, p_probs):
"""
Inputs:
+ attainable_nodes: attainable log-returns indexed by time t, from the `possible_nodes` function
+ Hts: Mean-Value process of the liability from the function `calc_mean_value_process`
+ S0: Initial asset price, e.g. 100
+ scale_factor: factor to divide log-return indices by, e.g. 100
+ rf: risk-free return (discrete), e.g. (1.001)
+ log_ret_space: array of log-returns
+ T: time at maturity
+ p_probs: physical probabilities under P
+ Hts: S0, rf, log_ret_space, T, scale_factor, q_probs
Outputs:
+ dynamic_delta: the locally optimal hedge as a nested dictionary, indexed by time t and log-returns
"""
log_ret_space2 = [int(round(x * scale_factor)) for x in log_ret_space]
N_STATES = len(log_ret_space)
ret_space = np.exp(log_ret_space)
ret_change = ret_space - rf
dynamic_delta = {}
for t in range(T - 1, -1, -1):
dynamic_delta[t] = {}
for x in attainable_nodes[t]:
St = S0 * np.exp(x / scale_factor)
ht = Hts[t][x]
ht_change = np.array([Hts[t + 1][x + log_ret_space2[j]] - rf * ht for j in range(N_STATES)])
cov = np.dot(p_probs, ht_change * ret_change)
qhedge_delta = cov / (St * np.dot(p_probs, (ret_change) ** 2))
dynamic_delta[t][x] = qhedge_delta
return dynamic_delta
def calc_expected_squared_replication_error(attainable_nodes, Hts, dynamic_delta, S0, rf, log_ret_space, T, scale_factor, p_probs):
"""
Inputs:
+ attainable_nodes: attainable log-returns indexed by time t, from the `possible_nodes` function
+ Hts: Mean-Value process of the liability from the function `calc_mean_value_process`
+ dynamic_delta: Deltas at each node from the function `calc_dynamic_deltas`
+ S0: Initial asset price, e.g. 100
+ scale_factor: factor to divide log-return indices by, e.g. 100
+ rf: risk-free return (discrete), e.g. (1.001)
+ log_ret_space: array of log-returns
+ T: time at maturity
+ p_probs: physical probabilities under P
+ Hts: S0, rf, log_ret_space, T, scale_factor, q_probs
Outputs:
+ expected_squared_replication_error: the ESRE as a dictionary, indexed by time t and log returns
"""
expected_squared_replication_error = {}
N_STATES = len(log_ret_space)
log_ret_space2 = [int(round(x * scale_factor)) for x in log_ret_space]
ret_space = np.exp(log_ret_space)
for t in range(T - 1, -1, -1):
expected_squared_replication_error[t] = {}
for x in attainable_nodes[t]:
St = S0 * np.exp(x / scale_factor)
ht = Hts[t][x]
htp1 = np.array([Hts[t + 1][x + log_ret_space2[i]] for i in range(N_STATES)])
error = rf * ht + dynamic_delta[t][x] * St * (ret_space - rf) - htp1
expected_squared_replication_error[t][x] = np.dot(p_probs, error ** 2)
return expected_squared_replication_error
def calc_squared_error_process(attainable_nodes, Hts, dynamic_delta, ERSEs, S0, rf, log_ret_space, T, scale_factor, p_probs, b):
"""
Inputs:
+ attainable_nodes: attainable log-returns indexed by time t, from the `possible_nodes` function
+ Hts: Mean-Value process of the liability from the function `calc_mean_value_process`
+ dynamic_delta: locally optimal deltas at each node from the function `calc_dynamic_deltas`
+ ERSEs: Expected Squared Replication Errors from `calc_expected_squared_replication_error`
+ S0: Initial asset price, e.g. 100
+ scale_factor: factor to divide log-return indices by, e.g. 100
+ rf: risk-free return (discrete), e.g. (1.001)
+ log_ret_space: array of log-returns
+ T: time at maturity
+ p_probs: physical probabilities under P
+ b: value from `calc_variance_optimal_measure`
+ Hts: S0, rf, log_ret_space, T, scale_factor, q_probs
Outputs:
+ squared_error_process: the Squared Error Process as a nested dictionary, indexed by time t and log returns
"""
squared_error_process = {}
squared_error_process[T] = {}
N_STATES = len(log_ret_space)
log_ret_space2 = [int(round(x * scale_factor)) for x in log_ret_space]
for x in attainable_nodes[T]:
squared_error_process[T][x] = 0
for t in range(T - 1, -1, -1):
squared_error_process[t] = {}
for x in attainable_nodes[t]:
# squared error values for t + 1
sep_tp1 = np.array([squared_error_process[t + 1][x + log_ret_space2[i]] for i in range(N_STATES)])
# conditional expectation of squared error process for t + 1 at time t
exp_t_sep_tp1 = np.dot(p_probs, sep_tp1)
kt = rf ** (2 *(T - (t + 1))) * b ** (T - (t + 1))
squared_error_process[t][x] = exp_t_sep_tp1 + kt * ERSEs[t][x]
return squared_error_process
Functions
def calc_dynamic_deltas(attainable_nodes, Hts, S0, rf, log_ret_space, T, scale_factor, p_probs)
-
Inputs:
-
attainable_nodes: attainable log-returns indexed by time t, from the
possible_nodes()
function -
Hts: Mean-Value process of the liability from the function
calc_mean_value_process()
-
S0: Initial asset price, e.g. 100
-
scale_factor: factor to divide log-return indices by, e.g. 100
-
rf: risk-free return (discrete), e.g. (1.001)
-
log_ret_space: array of log-returns
-
T: time at maturity
-
p_probs: physical probabilities under P
-
Hts: S0, rf, log_ret_space, T, scale_factor, q_probs
Outputs:
- dynamic_delta: the locally optimal hedge as a nested dictionary, indexed by time t and log-returns
Expand source code
def calc_dynamic_deltas(attainable_nodes, Hts, S0, rf, log_ret_space, T, scale_factor, p_probs): """ Inputs: + attainable_nodes: attainable log-returns indexed by time t, from the `possible_nodes` function + Hts: Mean-Value process of the liability from the function `calc_mean_value_process` + S0: Initial asset price, e.g. 100 + scale_factor: factor to divide log-return indices by, e.g. 100 + rf: risk-free return (discrete), e.g. (1.001) + log_ret_space: array of log-returns + T: time at maturity + p_probs: physical probabilities under P + Hts: S0, rf, log_ret_space, T, scale_factor, q_probs Outputs: + dynamic_delta: the locally optimal hedge as a nested dictionary, indexed by time t and log-returns """ log_ret_space2 = [int(round(x * scale_factor)) for x in log_ret_space] N_STATES = len(log_ret_space) ret_space = np.exp(log_ret_space) ret_change = ret_space - rf dynamic_delta = {} for t in range(T - 1, -1, -1): dynamic_delta[t] = {} for x in attainable_nodes[t]: St = S0 * np.exp(x / scale_factor) ht = Hts[t][x] ht_change = np.array([Hts[t + 1][x + log_ret_space2[j]] - rf * ht for j in range(N_STATES)]) cov = np.dot(p_probs, ht_change * ret_change) qhedge_delta = cov / (St * np.dot(p_probs, (ret_change) ** 2)) dynamic_delta[t][x] = qhedge_delta return dynamic_delta
-
def calc_expected_squared_replication_error(attainable_nodes, Hts, dynamic_delta, S0, rf, log_ret_space, T, scale_factor, p_probs)
-
Inputs:
-
attainable_nodes: attainable log-returns indexed by time t, from the
possible_nodes()
function -
Hts: Mean-Value process of the liability from the function
calc_mean_value_process()
-
dynamic_delta: Deltas at each node from the function
calc_dynamic_deltas()
-
S0: Initial asset price, e.g. 100
-
scale_factor: factor to divide log-return indices by, e.g. 100
-
rf: risk-free return (discrete), e.g. (1.001)
-
log_ret_space: array of log-returns
-
T: time at maturity
-
p_probs: physical probabilities under P
-
Hts: S0, rf, log_ret_space, T, scale_factor, q_probs
Outputs:
- expected_squared_replication_error: the ESRE as a dictionary, indexed by time t and log returns
Expand source code
def calc_expected_squared_replication_error(attainable_nodes, Hts, dynamic_delta, S0, rf, log_ret_space, T, scale_factor, p_probs): """ Inputs: + attainable_nodes: attainable log-returns indexed by time t, from the `possible_nodes` function + Hts: Mean-Value process of the liability from the function `calc_mean_value_process` + dynamic_delta: Deltas at each node from the function `calc_dynamic_deltas` + S0: Initial asset price, e.g. 100 + scale_factor: factor to divide log-return indices by, e.g. 100 + rf: risk-free return (discrete), e.g. (1.001) + log_ret_space: array of log-returns + T: time at maturity + p_probs: physical probabilities under P + Hts: S0, rf, log_ret_space, T, scale_factor, q_probs Outputs: + expected_squared_replication_error: the ESRE as a dictionary, indexed by time t and log returns """ expected_squared_replication_error = {} N_STATES = len(log_ret_space) log_ret_space2 = [int(round(x * scale_factor)) for x in log_ret_space] ret_space = np.exp(log_ret_space) for t in range(T - 1, -1, -1): expected_squared_replication_error[t] = {} for x in attainable_nodes[t]: St = S0 * np.exp(x / scale_factor) ht = Hts[t][x] htp1 = np.array([Hts[t + 1][x + log_ret_space2[i]] for i in range(N_STATES)]) error = rf * ht + dynamic_delta[t][x] * St * (ret_space - rf) - htp1 expected_squared_replication_error[t][x] = np.dot(p_probs, error ** 2) return expected_squared_replication_error
-
def calc_mean_value_process(attainable_nodes, S0, K, rf, log_ret_space, T, scale_factor, q_probs)
-
Inputs:
-
attainable_nodes: attainable log-returns indexed by time t, from the
possible_nodes()
function -
S0: Initial asset price, e.g. 100
-
K: Strike
-
scale_factor: factor to divide log-return indices by, e.g. 100
-
rf: risk-free return (discrete), e.g. (1.001)
-
log_ret_space: array of log-returns
-
T: time at maturity
-
q_probs: Variance-optimal probabilities
Output:
- Hts: Mean-Value process Ht as dictionary, indexed by time t and log-returns
Expand source code
def calc_mean_value_process(attainable_nodes, S0, K, rf, log_ret_space, T, scale_factor, q_probs): """ Inputs: + attainable_nodes: attainable log-returns indexed by time t, from the `possible_nodes` function + S0: Initial asset price, e.g. 100 + K: Strike + scale_factor: factor to divide log-return indices by, e.g. 100 + rf: risk-free return (discrete), e.g. (1.001) + log_ret_space: array of log-returns + T: time at maturity + q_probs: Variance-optimal probabilities Output: + Hts: Mean-Value process Ht as dictionary, indexed by time t and log-returns """ N_STATES = len(log_ret_space) log_ret_space2 = [round(x * scale_factor) for x in log_ret_space] Hts = {} Hts[T] = {} for x in attainable_nodes[T]: Sts = S0 * np.exp(x / scale_factor) Hts[T][x] = np.maximum(Sts - K, 0) # calculate node-values iteratively in reverse for t in range(T - 1, -1, -1): Hts[t] = {} for x in attainable_nodes[t]: Htp1 = [Hts[t + 1][x + log_ret_space2[j]] for j in range(N_STATES)] Hts[t][x] = np.dot(q_probs, Htp1) / rf return Hts
-
def calc_squared_error_process(attainable_nodes, Hts, dynamic_delta, ERSEs, S0, rf, log_ret_space, T, scale_factor, p_probs, b)
-
Inputs:
-
attainable_nodes: attainable log-returns indexed by time t, from the
possible_nodes()
function -
Hts: Mean-Value process of the liability from the function
calc_mean_value_process()
-
dynamic_delta: locally optimal deltas at each node from the function
calc_dynamic_deltas()
-
ERSEs: Expected Squared Replication Errors from
calc_expected_squared_replication_error()
-
S0: Initial asset price, e.g. 100
-
scale_factor: factor to divide log-return indices by, e.g. 100
-
rf: risk-free return (discrete), e.g. (1.001)
-
log_ret_space: array of log-returns
-
T: time at maturity
-
p_probs: physical probabilities under P
-
b: value from
calc_variance_optimal_measure()
-
Hts: S0, rf, log_ret_space, T, scale_factor, q_probs
Outputs:
- squared_error_process: the Squared Error Process as a nested dictionary, indexed by time t and log returns
Expand source code
def calc_squared_error_process(attainable_nodes, Hts, dynamic_delta, ERSEs, S0, rf, log_ret_space, T, scale_factor, p_probs, b): """ Inputs: + attainable_nodes: attainable log-returns indexed by time t, from the `possible_nodes` function + Hts: Mean-Value process of the liability from the function `calc_mean_value_process` + dynamic_delta: locally optimal deltas at each node from the function `calc_dynamic_deltas` + ERSEs: Expected Squared Replication Errors from `calc_expected_squared_replication_error` + S0: Initial asset price, e.g. 100 + scale_factor: factor to divide log-return indices by, e.g. 100 + rf: risk-free return (discrete), e.g. (1.001) + log_ret_space: array of log-returns + T: time at maturity + p_probs: physical probabilities under P + b: value from `calc_variance_optimal_measure` + Hts: S0, rf, log_ret_space, T, scale_factor, q_probs Outputs: + squared_error_process: the Squared Error Process as a nested dictionary, indexed by time t and log returns """ squared_error_process = {} squared_error_process[T] = {} N_STATES = len(log_ret_space) log_ret_space2 = [int(round(x * scale_factor)) for x in log_ret_space] for x in attainable_nodes[T]: squared_error_process[T][x] = 0 for t in range(T - 1, -1, -1): squared_error_process[t] = {} for x in attainable_nodes[t]: # squared error values for t + 1 sep_tp1 = np.array([squared_error_process[t + 1][x + log_ret_space2[i]] for i in range(N_STATES)]) # conditional expectation of squared error process for t + 1 at time t exp_t_sep_tp1 = np.dot(p_probs, sep_tp1) kt = rf ** (2 *(T - (t + 1))) * b ** (T - (t + 1)) squared_error_process[t][x] = exp_t_sep_tp1 + kt * ERSEs[t][x] return squared_error_process
-
def calc_variance_optimal_measure(ret_space, rf, p_probs)
-
Inputs: + ret_space: array of returns, i.e. exp(log_returns) + rf: risk-free return + p_probs: probabilitites under the physical measure
Outputs:
- a, b, m: quantities required to calculate
q_probs
- q_probs: the probabilities under the variance optimal probabilities
Expand source code
def calc_variance_optimal_measure(ret_space, rf, p_probs): """ Inputs: + ret_space: array of returns, i.e. exp(log_returns) + rf: risk-free return + p_probs: probabilitites under the physical measure Outputs: + a, b, m: quantities required to calculate `q_probs` + q_probs: the probabilities under the variance optimal probabilities """ assert len(ret_space) > 0 assert len(ret_space) == len(p_probs) assert abs(sum(p_probs) - 1) < 0.001 a = np.dot(p_probs, ret_space - rf) / np.dot(p_probs, (ret_space - rf) ** 2) b = 1 - np.dot(p_probs, (ret_space - rf)) ** 2 / np.dot(p_probs, (ret_space - rf) ** 2) m = (1 - a * (ret_space - rf) / b) q_probs = m * p_probs q_probs = q_probs / np.sum(q_probs) # ensure Q-probs sum to 1 return a, b, m, q_probs
- a, b, m: quantities required to calculate
def possible_nodes(log_ret_space, T, scale_factor)
-
Inputs:
- log_ret_space: array of log-returns
- T: time at maturity
Outputs:
- attainable_nodes: Attainable nodes (log-returns) as a dictionary indexed by time t = 0, … , T. The entries of attainable_nodes[t] would be the attainable log-returns at time t
Note: It is recommendeded to space log-returns equally so they recombine, and scale log-returns so that they are integers example: [-6, -4, -2, 0, 2, 4, 6]
Expand source code
def possible_nodes(log_ret_space, T, scale_factor): """ Inputs: + log_ret_space: array of log-returns + T: time at maturity Outputs: + attainable_nodes: Attainable nodes (log-returns) as a dictionary indexed by time t = 0, ... , T. The entries of attainable_nodes[t] would be the attainable log-returns at time t Note: It is recommendeded to space log-returns equally so they recombine, and scale log-returns so that they are integers example: [-6, -4, -2, 0, 2, 4, 6] """ assert len(log_ret_space) > 0 log_ret_space2 = [int(round(x * scale_factor)) for x in log_ret_space] attainable_nodes = {} attainable_nodes[0] = [0] for i in range(1, T + 1): attainable_nodes[i] = set() for x in log_ret_space2: for val in attainable_nodes[i - 1]: if val + x not in attainable_nodes[i]: attainable_nodes[i].add(val + x) return attainable_nodes