"""Generic renewal processes."""
# Copyright (c) 2022, RTE (https://www.rte-france.com)
# See AUTHORS.txt
# SPDX-License-Identifier: Apache-2.0 (see LICENSE.txt)
# This file is part of ReLife, an open source Python library for asset
# management based on reliability theory and lifetime data analysis.
import numpy as np
from typing import Tuple
from .model import LifetimeModel, AbsolutelyContinuousLifetimeModel
from .reward import Reward
from .discounting import Discount, ExponentialDiscounting
from .data import RenewalData, RenewalRewardData
from .utils import args_size, args_take, args_ndim
def _check_absolutely_continuous(model: LifetimeModel) -> None:
if not isinstance(model, AbsolutelyContinuousLifetimeModel):
raise NotImplementedError("The lifetime model must be absolutely continuous")
def _check_exponential_discounting(discount: Discount) -> None:
if not isinstance(discount, ExponentialDiscounting):
raise NotImplementedError("The discount function must be exponential")
[docs]def renewal_equation_solver(
F: np.ndarray, Fm: np.ndarray, y: np.ndarray, D: np.ndarray = None
) -> np.ndarray:
r"""Renewal equation solver.
Parameters
----------
F : ndarray
The cumulative distribution function evaluated at each point of the
timeline.
Fm : ndarray
The cumulative distribution function evaluated at each midpoint of the
timeline.
y : ndarray
A given function evaluated at each point of the timeline.
D : ndarray, optional
Discount function value at each point of the timeline, by default None.
Returns
-------
ndarray
Renewal function evaluated at each point of the timeline.
Notes
-----
The timeline must start at 0.
Solve the renewal equation for :math:`z`, such that for :math:`t\geq 0`.
.. math::
\begin{aligned} z(t) & = y(t) + \int_0^t z(t-x) D(x) \mathrm{d}F(x) \\
& = y(t) + z \ast F(t)
\end{aligned}
where :math:`F` is the cumulative distribution function, :math:`y` a
function and :math:`D` the exponential discounting factor.
References
----------
.. [1] Dragomir, S. S. (2011). Approximating the Riemann–Stieltjes integral
by a trapezoidal quadrature rule with applications. Mathematical and
computer modelling, 54(1-2), 243-260.
.. [2] Tortorella, M. (2005). Numerical solutions of renewal-type integral
equations. INFORMS Journal on Computing, 17(1), 66-74.
"""
if D is None:
D = np.ones_like(F)
z = np.empty(y.shape)
u = D * np.insert(F[..., 1:] - Fm, 0, 1, axis=-1)
v = D[..., :-1] * np.insert(np.diff(Fm), 0, 1, axis=-1)
q0 = 1 / (1 - D[..., 0] * Fm[..., 0])
z[..., 0] = y[..., 0]
z[..., 1] = q0 * (y[..., 1] + z[..., 0] * u[..., 1])
for n in range(2, F.shape[-1]):
z[..., n] = q0 * (
y[..., n]
+ z[..., 0] * u[..., n]
+ np.sum(z[..., 1:n][..., ::-1] * v[..., 1:n], axis=-1)
)
return z
[docs]def delayed_renewal(
z: np.ndarray, F1: np.ndarray, F1m: np.ndarray, y1: np.ndarray, D: np.ndarray = None
) -> np.ndarray:
r"""Add delay for the first renewal to a solution of a renewal equation.
Parameters
----------
z : ndarray
The solution of a renewal equation at each point of the timeline.
F1 : ndarray
The cumulative distribution function of the first renewal evaluated at
each point of the timeline.
F1m : ndarray
The cumulative distribution function of the first renewal evaluated at
each midpoint of the timeline.
y : ndarray
A given function related to the first renewal evaluated at each point of
the timeline.
D : ndarray, optional
Discount function value at each point of the timeline, by default None.
Returns
-------
ndarray
Delayed solution of the renewal equation at each point of the timeline.
Notes
-----
The solution of the renewal equation is delayed by computing:
.. math::
z_1(t) = y_1(t) + \int_0^t z(t-x) D(x) \mathrm{d}F_1(x)
where :math:`z` is a solution of a renewal equation, :math:`D` the
exponential discounting factor, :math:`F_1` is the cumulative distribution
function of the first renwal and :math:`y_1` a function related to the first
renewal.
"""
if D is None:
D = np.ones_like(F1)
z1 = np.empty(y1.shape)
u1 = D * np.insert(F1[..., 1:] - F1m, 0, 1, axis=-1)
v1 = D[..., :-1] * np.insert(np.diff(F1m), 0, 1, axis=-1)
z1[..., 0] = y1[..., 0]
z1[..., 1] = (
y1[..., 1] + z[..., 0] * u1[..., 1] + z[..., 1] * D[..., 0] * F1m[..., 0]
)
for n in range(2, F1.shape[-1]):
z1[..., n] = (
y1[..., n]
+ z[..., 0] * u1[..., n]
+ z[..., n] * D[..., 0] * F1m[..., 0]
+ np.sum(z[..., 1:n][..., ::-1] * v1[..., 1:n], axis=-1)
)
return z1
[docs]class RenewalProcess:
"""Renewal process."""
def __init__(self, model: LifetimeModel, model1: LifetimeModel = None) -> None:
"""Creates a renewal process.
Parameters
----------
model : LifetimeModel
A lifetime model representing the durations between events.
model1 : LifetimeModel, optional
A lifetime model for the first renewal (delayed renewal process), by
default None.
"""
self.model = model
self.model1 = model1
[docs] def renewal_function(
self,
t: np.ndarray,
model_args: Tuple[np.ndarray, ...] = (),
model1_args: Tuple[np.ndarray, ...] = (),
) -> np.ndarray:
r"""The renewal function.
Parameters
----------
t : 1D array
Timeline.
model_args : Tuple[ndarray,...], optional
Extra arguments required by the underlying lifetime model, by
default ().
model1_args : Tuple[ndarray,...], optional
Extra arguments required by the lifetime model of the first renewal,
by default ().
Returns
-------
ndarray
The renewal function evaluated at each point of the timeline.
Notes
-----
The expected total number of renewals is computed by solving the
renewal equation:
.. math::
m(t) = F_1(t) + \int_0^t m(t-x) \mathrm{d}F(x)
where:
- :math:`m` is the renewal function,
- :math:`F` is the cumulative distribution function of the underlying
lifetime model,
- :math:`F_1` is the cumulative distribution function of the underlying
lifetime model for the fist renewal in the case of a delayed renewal
process.
References
----------
.. [1] Rausand, M., Barros, A., & Hoyland, A. (2020). System Reliability
Theory: Models, Statistical Methods, and Applications. John Wiley &
Sons.
"""
tm = 0.5 * (t[1:] + t[:-1])
F = self.model.cdf(t, *model_args)
Fm = self.model.cdf(tm, *model_args)
if self.model1 is not None:
y = self.model1.cdf(t, *model1_args)
else:
y = F
return renewal_equation_solver(F, Fm, y)
[docs] def renewal_density(
self,
t: np.ndarray,
model_args: Tuple[np.ndarray, ...] = (),
model1_args: Tuple[np.ndarray, ...] = (),
) -> np.ndarray:
r"""The renewal density.
Parameters
----------
t : 1D array
Timeline.
model_args : Tuple[ndarray,...], optional
Extra arguments required by the underlying lifetime model, by
default ().
model1_args : Tuple[ndarray,...], optional
Extra arguments required by the lifetime model of the first renewal,
by default ().
Returns
-------
ndarray
Renewal density evaluated at each point of the timeline.
Raises
------
NotImplementedError
If the lifetime model is not absolutely continuous.
Notes
-----
The renewal density is the derivative of the renewal function with
respect to time. It is computed by solving the renewal equation:
.. math::
\mu(t) = f_1(t) + \int_0^t \mu(t-x) \mathrm{d}F(x)
where:
- :math:`\mu` is the renewal function,
- :math:`F` is the cumulative distribution function of the underlying
lifetime model,
- :math:`f_1` is the probability density function of the underlying
lifetime model for the fist renewal in the case of a delayed renewal
process.
References
----------
.. [1] Rausand, M., Barros, A., & Hoyland, A. (2020). System Reliability
Theory: Models, Statistical Methods, and Applications. John Wiley &
Sons.
"""
if self.model1 is not None:
_check_absolutely_continuous(self.model1)
y = self.model1.pdf(t, *model1_args)
else:
_check_absolutely_continuous(self.model)
y = self.model.pdf(t, *model_args)
tm = 0.5 * (t[1:] + t[:-1])
F = self.model.cdf(t, *model_args)
Fm = self.model.cdf(tm, *model_args)
return renewal_equation_solver(F, Fm, y)
[docs] def sample(
self,
T: float,
model_args: Tuple[np.ndarray, ...] = (),
model1_args: Tuple[np.ndarray, ...] = (),
n_samples: int = 1,
random_state: int = None,
) -> RenewalData:
"""Renewal data sampling.
Parameters
----------
T : float
Time at the end of the observation.
model_args : Tuple[ndarray,...], optional
Extra arguments required by the underlying lifetime model, by
default ().
model1_args : Tuple[ndarray,...], optional
Extra arguments required by the lifetime model of the first renewal,
by default ().
n_samples : int, optional
Number of samples, by default 1.
random_state : int, optional
Random seed, by default None.
Returns
-------
RenewalData
Samples of replacement times and durations.
"""
n_indices = max(1, args_size(*model_args, *model1_args))
if self.model1 is not None:
data = self._sample_init(
T, self.model1, model1_args, n_indices, n_samples, random_state
)
else:
data = self._sample_init(
T, self.model, model_args, n_indices, n_samples, random_state
)
return self._sample_loop(data, self.model, model_args)
@classmethod
def _sample_init(
cls,
T: float,
model: LifetimeModel,
model_args: Tuple[np.ndarray, ...],
n_indices: int,
n_samples: int,
random_state: int = None,
) -> RenewalData:
"""Initializing a RenewalData sample.
Creates a RenewalData instance with the first renewal times and durations.
Parameters
----------
T : float
Time at the end of observation.
model : LifetimeModel
A lifetime model representing the durations between events.
model_args : Tuple[ndarray,...]
Extra arguments required by the lifetime model.
n_indices : int
Number of assets.
n_samples : int
Number of samples.
random_state : int, optional
Random seed, by default None.
Returns
-------
RenewalData
The renewal data sample with the first renewal times and durations.
"""
indices = np.repeat(np.arange(n_indices), n_samples)
samples = np.tile(np.arange(n_samples), n_indices)
size = n_indices * n_samples if len(model_args) == 0 else 1
times = model.rvs(
*args_take(indices, *model_args), size=size, random_state=random_state
).ravel()
durations = times.copy()
return RenewalData(T, n_indices, n_samples, indices, samples, times, durations)
@classmethod
def _sample_loop(
cls, data: RenewalData, model: LifetimeModel, model_args: Tuple[np.ndarray, ...]
) -> RenewalData:
"""Sampling loop on renewals until the end time is reached.
Parameters
----------
data : RenewalData
An initial RenewalData instance.
model : LifetimeModel
A lifetime model representing the durations between events.
model_args : Tuple[ndarray,...]
Extra arguments required by the lifetime model.
Returns
-------
RenewalData
The renewal data sample with times and durations.
"""
T = data.T
ind_T = np.nonzero(data.times < T)
times = data.times[ind_T]
indices = data.indices[ind_T]
samples = data.samples[ind_T]
while len(ind_T[0])>0:
size = indices.size if len(model_args) == 0 else 1
durations = model.rvs(*args_take(indices, *model_args), size=size).ravel()
times = times + durations
data.times = np.concatenate((data.times, times))
data.durations = np.concatenate((data.durations, durations))
data.indices = np.concatenate((data.indices, indices))
data.samples = np.concatenate((data.samples, samples))
ind_T = np.nonzero(times < T)
times = times[ind_T]
indices = indices[ind_T]
samples = samples[ind_T]
return data
[docs]class RenewalRewardProcess(RenewalProcess):
"""Renewal reward process."""
def __init__(
self,
model: LifetimeModel,
reward: Reward,
model1: LifetimeModel = None,
reward1: Reward = None,
discount: Discount = ExponentialDiscounting(),
):
"""Creates a renewal reward process.
Parameters
----------
model : LifetimeModel
A lifetime model representing the durations between events.
reward : Reward
A reward associated to the interarrival time.
model1 : LifetimeModel, optional
A lifetime model for the first renewal (delayed renewal process), by
default None.
reward1 : Reward, optional
A reward associated to the first renewal, by default None
discount : Discount, optional
A discount function related to the rewards, by default
ExponentialDiscounting()
"""
super().__init__(model, model1)
self.reward = reward
self.reward1 = reward1
self.discount = discount
[docs] def expected_total_reward(
self,
t: np.ndarray,
model_args: Tuple[np.ndarray, ...] = (),
reward_args: Tuple[np.ndarray, ...] = (),
model1_args: Tuple[np.ndarray, ...] = (),
reward1_args: Tuple[np.ndarray, ...] = (),
discount_args: Tuple[np.ndarray, ...] = (),
) -> np.ndarray:
r"""The expected total reward.
Parameters
----------
t : 1D array
Timeline.
model_args : Tuple[ndarray,...], optional
Extra arguments required by the underlying lifetime model, by
default ().
reward_args : Tuple[ndarray,...], optional
Extra arguments required by the associated reward, by default ().
model1_args : Tuple[ndarray,...], optional
Extra arguments required by the lifetime model of the first renewal,
by default ().
reward1_args : Tuple[ndarray,...], optional
Extra arguments required by the associated reward of the first renewal, by default ().
discount_args : Tuple[ndarray,...], optional
Extra arguments required by the discount function, by default ().
Returns
-------
ndarray
Expected total reward of process evaluated at each point of the
timeline.
Raises
------
NotImplementedError
If the discount function is not exponential.
Notes
-----
The renewal equation solved by the expected reward is:
.. math::
z(t) = \int_0^t E[Y | X = x] D(x) \mathrm{d}F(x) + \int_0^t z(t-x)
D(x)\mathrm{d}F(x)
where:
- :math:`z` is the expected total reward,
- :math:`F` is the cumulative distribution function of the underlying
lifetime model,
- :math:`X` the interarrival random variable,
- :math:`Y` the associated reward,
- :math:`D` the exponential discount factor.
If the renewal reward process is delayed, the expected total reward is
modified as:
.. math::
z_1(t) = \int_0^t E[Y_1 | X_1 = x] D(x) \mathrm{d}F_1(x) + \int_0^t
z(t-x) D(x) \mathrm{d}F_1(x)
where:
- :math:`z_1` is the expected total reward with delay,
- :math:`F_1` is the cumulative distribution function of the lifetime
model for the first renewal,
- :math:`X_1` the interarrival random variable of the first renewal,
- :math:`Y_1` the associated reward of the first renewal,
"""
_check_exponential_discounting(self.discount)
tm = 0.5 * (t[1:] + t[:-1])
F = self.model.cdf(t, *model_args)
Fm = self.model.cdf(tm, *model_args)
D = self.discount.factor(t, *discount_args)
y = self._reward_partial_expectation(
t,
self.model,
self.reward,
self.discount,
model_args,
reward_args,
discount_args,
)
z = renewal_equation_solver(F, Fm, y, D)
if self.model1 is not None:
F1 = self.model1.cdf(t, *model1_args)
F1m = self.model1.cdf(tm, *model1_args)
y1 = self._reward_partial_expectation(
t,
self.model1,
self.reward1,
self.discount,
model1_args,
reward1_args,
discount_args,
)
z = delayed_renewal(z, F1, F1m, y1, D)
return z
[docs] def expected_equivalent_annual_worth(
self,
t: np.ndarray,
model_args: Tuple[np.ndarray, ...] = (),
reward_args: Tuple[np.ndarray, ...] = (),
model1_args: Tuple[np.ndarray, ...] = (),
reward1_args: Tuple[np.ndarray, ...] = (),
discount_args: Tuple[np.ndarray, ...] = (),
) -> np.ndarray:
"""Expected equivalent annual worth.
Gives the equivalent annual worth of the expected total reward of the
process at each point of the timeline.
Parameters
----------
t : 1D array
Timeline.
model_args : Tuple[ndarray,...], optional
Extra arguments required by the underlying lifetime model, by
default ().
reward_args : Tuple[ndarray,...], optional
Extra arguments required by the associated reward, by default ().
model1_args : Tuple[ndarray,...], optional
Extra arguments required by the lifetime model of the first renewal,
by default ().
reward1_args : Tuple[ndarray,...], optional
Extra arguments required by the associated reward of the first
renewal, by default ().
discount_args : Tuple[ndarray,...], optional
Extra arguments required by the discount function, by default ().
Returns
-------
ndarray
The expected equivalent annual worth evaluated at each point of the
timeline.
Notes
-----
The equivalent annual worth at time :math:`t` is equal to the expected
total reward :math:`z` divided by the annuity factor :math:`AF(t)`.
"""
z = self.expected_total_reward(
t, model_args, reward_args, model1_args, reward1_args, discount_args
)
af = self.discount.annuity_factor(t, *discount_args)
mask = af == 0
af = np.ma.masked_where(mask, af)
q = z / af
if self.model1 is not None:
q0 = self.reward1.conditional_expectation(
0, *reward1_args
) * self.model1.pdf(0, *model1_args)
else:
q0 = self.reward.conditional_expectation(0, *reward_args) * self.model.pdf(
0, *model_args
)
return np.where(mask, q0, q)
[docs] def asymptotic_expected_total_reward(
self,
model_args: Tuple[np.ndarray, ...] = (),
reward_args: Tuple[np.ndarray, ...] = (),
model1_args: Tuple[np.ndarray, ...] = (),
reward1_args: Tuple[np.ndarray, ...] = (),
discount_args: Tuple[np.ndarray, ...] = (),
) -> np.ndarray:
r"""Asymptotic expected total reward.
Parameters
----------
model_args : Tuple[ndarray,...], optional
Extra arguments required by the underlying lifetime model, by
default ().
reward_args : Tuple[ndarray,...], optional
Extra arguments required by the associated reward, by default ().
model1_args : Tuple[ndarray,...], optional
Extra arguments required by the lifetime model of the first renewal,
by default ().
reward1_args : Tuple[ndarray,...], optional
Extra arguments required by the associated reward of the first
renewal, by default ().
discount_args : Tuple[ndarray,...], optional
Extra arguments required by the discount function, by default ().
Returns
-------
ndarray
The assymptotic expected total reward of the process.
Raises
------
NotImplementedError
If the discount function is not exponential.
Notes
-----
The asymptotic expected total reward is:
.. math::
z^\infty = \lim_{t\to \infty} z(t) = \dfrac{E[Y D(X)]}{1-E[D(X)]}
where:
- :math:`X` the interarrival random variable,
- :math:`Y` the associated reward,
- :math:`D` the exponential discount factor.
If the renewal reward process is delayed, the asymptotic expected total
reward is modified as:
.. math::
z_1^\infty = E[Y_1 D(X_1)] + z^\infty E[D(X_1)]
where:
- :math:`X_1` the interarrival random variable of the first renewal,
- :math:`Y_1` the associated reward of the first renewal,
"""
_check_exponential_discounting(self.discount)
rate = discount_args[0]
mask = rate <= 0
rate = np.ma.MaskedArray(rate, mask)
ndim = args_ndim(*model_args, *reward_args, rate)
f = lambda x: self.discount.factor(x, rate)
y = lambda x: self.discount.factor(
x, rate
) * self.reward.conditional_expectation(x, *reward_args)
lf = self.model.ls_integrate(f, 0, np.inf, *model_args, ndim=ndim)
ly = self.model.ls_integrate(y, 0, np.inf, *model_args, ndim=ndim)
z = ly / (1 - lf)
if self.model1 is not None:
ndim1 = args_ndim(*model1_args, *reward1_args, rate)
y1 = lambda x: self.discount.factor(
x, rate
) * self.reward.conditional_expectation(x, *reward1_args)
lf1 = self.model1.ls_integrate(f, 0, np.inf, *model1_args, ndim=ndim1)
ly1 = self.model1.ls_integrate(y1, 0, np.inf, *model1_args, ndim=ndim1)
z = ly1 + z * lf1
return np.where(mask, np.inf, z)
[docs] def asymptotic_expected_equivalent_annual_worth(
self,
model_args: Tuple[np.ndarray, ...] = (),
reward_args: Tuple[np.ndarray, ...] = (),
model1_args: Tuple[np.ndarray, ...] = (),
reward1_args: Tuple[np.ndarray, ...] = (),
discount_args: Tuple[np.ndarray, ...] = (),
) -> np.ndarray:
"""Asymptotic expected equivalent annual worth.
Parameters
----------
model_args : Tuple[ndarray,...], optional
Extra arguments required by the underlying lifetime model, by
default ().
reward_args : Tuple[ndarray,...], optional
Extra arguments required by the associated reward, by default ().
model1_args : Tuple[ndarray,...], optional
Extra arguments required by the lifetime model of the first renewal,
by default ().
reward1_args : Tuple[ndarray,...], optional
Extra arguments required by the associated reward of the first
renewal, by default ().
discount_args : Tuple[ndarray,...], optional
Extra arguments required by the discount function, by default ().
Returns
-------
ndarray
The asymptotic expected equivalent annual worth of the process.
Raises
------
NotImplementedError
If the discount function is not exponential.
"""
_check_exponential_discounting(self.discount)
rate = discount_args[0]
mask = rate <= 0
rate = np.ma.MaskedArray(rate, mask)
ndim = args_ndim(*model_args, *reward_args)
q = rate * self.asymptotic_expected_total_reward(
model_args, reward_args, model1_args, reward1_args, discount_args
)
q0 = self.model.ls_integrate(
lambda x: self.reward.conditional_expectation(x, *reward_args),
0,
np.inf,
*model_args,
ndim=ndim
) / self.model.mean(*model_args)
return np.where(mask, q0, q)
[docs] def sample(
self,
T: float,
model_args: Tuple[np.ndarray, ...] = (),
reward_args: Tuple[np.ndarray, ...] = (),
model1_args: Tuple[np.ndarray, ...] = (),
reward1_args: Tuple[np.ndarray, ...] = (),
discount_args: Tuple[np.ndarray, ...] = (),
n_samples: int = 1,
random_state: int = None,
) -> RenewalRewardData:
"""Renewal reward data sampling.
Parameters
----------
T : float
Time at the end of the observation.
model_args : Tuple[ndarray,...], optional
Extra arguments required by the underlying lifetime model, by
default ().
reward_args : Tuple[ndarray,...], optional
Extra arguments required by the associated reward, by default ().
model1_args : Tuple[ndarray,...], optional
Extra arguments required by the lifetime model of the first renewal,
by default ().
reward1_args : Tuple[ndarray,...], optional
Extra arguments required by the associated reward of the first
renewal, by default ().
discount_args : Tuple[ndarray,...], optional
Extra arguments required by the discount function, by default ().
n_samples : int, optional
Number of samples, by default 1.
random_state : int, optional
Random seed, by default None.
Returns
-------
RenewalRewardData
Samples of replacement times, durations and rewards.
"""
n_indices = max(
1, args_size(*model_args, *reward_args, *model1_args, *reward1_args)
)
if self.model1 is not None:
data = self._sample_init(
T,
self.model1,
self.reward1,
self.discount,
model1_args,
reward1_args,
discount_args,
n_indices,
n_samples,
random_state,
)
else:
data = self._sample_init(
T,
self.model,
self.reward,
self.discount,
model_args,
reward_args,
discount_args,
n_indices,
n_samples,
random_state,
)
return self._sample_loop(
data,
self.model,
self.reward,
self.discount,
model_args,
reward_args,
discount_args,
)
@classmethod
def _reward_partial_expectation(
cls,
t: np.ndarray,
model: LifetimeModel,
reward: Reward,
discount: Discount,
model_args: Tuple[np.ndarray, ...] = (),
reward_args: Tuple[np.ndarray, ...] = (),
discount_args: Tuple[np.ndarray, ...] = (),
) -> np.ndarray:
r"""Reward partial expectation.
Parameters
----------
t : float or 1D array
Timeline.
model : LifetimeModel
A lifetime model representing the durations between events.
reward : Reward
A reward associated to the interarrival time.
discount : Discount
A discount function related to the rewards.
model_args : Tuple[ndarray,...], optional
Extra arguments required by the underlying lifetime model, by
default ().
reward_args : Tuple[ndarray,...], optional
Extra arguments required by the associated reward, by default ().
discount_args : Tuple[ndarray,...], optional
Extra arguments required by the discount function, by default ().
Returns
-------
ndarray
Reward partial expectation at each point of the timeline.
Notes
-----
The reward partial expactation is defined by:
.. math::
y(t) = \int_0^t E[Y | X = x] D(x) \mathrm{d}F(x)
where:
- :math:`F` is the cumulative distribution function of the underlying
lifetime model,
- :math:`X` the interarrival random variable,
- :math:`Y` the associated reward,
- :math:`D` the exponential discount factor.
"""
func = lambda x: reward.conditional_expectation(
x, *reward_args
) * discount.factor(x, *discount_args)
ndim = args_ndim(t, *model_args, *reward_args, *discount_args)
return model.ls_integrate(func, 0, t, *model_args, ndim=ndim)
@classmethod
def _sample_init(
cls,
T: float,
model: LifetimeModel,
reward: Reward,
discount: Discount,
model_args: Tuple[np.ndarray, ...],
reward_args: Tuple[np.ndarray, ...],
discount_args: Tuple[np.ndarray, ...],
n_indices: int,
n_samples: int,
random_state: int = None,
) -> RenewalRewardData:
"""Initializing a RenewalRewardData sample.
Creates a RenewalRewardData instance with the first renewal times,
durations and rewards.
Parameters
----------
T : float
Time at the end of the observation.
model : LifetimeModel
A lifetime model representing the durations between events.
reward : Reward
A reward associated to the interarrival time.
discount : Discount
A discount function related to the rewards.
model_args : Tuple[ndarray,...]
Extra arguments required by the underlying lifetime model.
reward_args : Tuple[ndarray,...]
Extra arguments required by the associated reward.
discount_args : Tuple[ndarray,...]
Extra arguments required by the discount function.
n_indices : int
Number of assets.
n_samples : int
Number of samples.
random_state : int, optional
Random seed, by default None.
Returns
-------
RenewalRewardData
The renewal reward data sample with the first renewal times,
durations and rewards.
"""
data = super()._sample_init(
T, model, model_args, n_indices, n_samples, random_state
)
rewards = (
np.array(
reward.sample(
data.durations.reshape(-1, 1),
*args_take(data.indices, *reward_args)
).swapaxes(-2, -1)
* discount.factor(data.times, *discount_args),
ndmin=3,
)
.sum(axis=0)
.ravel()
)
return RenewalRewardData(*data.astuple(), rewards)
@classmethod
def _sample_loop(
cls,
data: RenewalRewardData,
model: LifetimeModel,
reward: Reward,
discount: Discount,
model_args: Tuple[np.ndarray, ...],
reward_args: Tuple[np.ndarray, ...],
discount_args: Tuple[np.ndarray, ...],
) -> RenewalRewardData:
"""Sampling loop on renewals until the end time is reached.
Parameters
----------
data : RenewalRewardData
An initial RenewalRewardData instance.
model : LifetimeModel
A lifetime model representing the durations between events.
reward : Reward
A reward associated to the interarrival time.
discount : Discount
A discount function related to the rewards.
model_args : Tuple[ndarray,...]
Extra arguments required by the underlying lifetime model.
reward_args : Tuple[ndarray,...]
Extra arguments required by the associated reward.
discount_args : Tuple[ndarray,...]
Extra arguments required by the discount function.
Returns
-------
RenewalRewardData
The renewal reward data sample with times, durations and rewards.
"""
data = super()._sample_loop(data, model, model_args)
ind = data.rewards.size
indices = data.indices[ind:]
times = data.times[ind:]
durations = data.durations[ind:]
rewards = (
np.array(
reward.sample(
durations.reshape(-1, 1), *args_take(indices, *reward_args)
).swapaxes(-2, -1)
* discount.factor(times, *discount_args),
ndmin=3,
)
.sum(axis=0)
.ravel()
)
data.rewards = np.concatenate((data.rewards, rewards))
return data