[Feature request] Support mask in Timeseries Forecast .
#2,708 opened on Dec 7, 2020
Description
Missing observation is very common in real work , but I doesn't found a way to write a correct model . And tutorial http://pyro.ai/examples/forecasting_iii.html do not cover this problem : nan in real data . For example : in 2010, there was only 48 station in 2011, 3 station was closed and 5 new opened, 50 stations . in 2012, 2 station was closed in 2011 reponed …
For another example : My data is salecount of various products in many stores . I have reshape the szie to torch.Size([44, 103, 671, 1]) , means: 44 stores, 103 products , 671 days salecount . Some stores may be closed in different days, so as products would be off.shelf by many reason .
random 10 products history salecount :

Create matrix must have nan values , and
- We can’t fill them by 0 because they are different to true 0 .
- We should not take nan values into count.
- We can’t drop the nan when trainning , because that break timeseries order . These are real cases .
According to https://forum.pyro.ai/t/how-to-ignore-nan-values-when-do-hierachical-forecast/2412
I alter forecast_iii code to below, structure is totally same , I just change some names .
I add a mask to noise distribution , but without luck , it doesn't work in prediction .
import math
import torch
import pyro
import pyro.distributions as dist
import pyro.poutine as poutine
from pyro.contrib.examples.bart import load_bart_od
from pyro.contrib.forecast import ForecastingModel, Forecaster, eval_crps
from pyro.infer.reparam import LocScaleReparam, SymmetricStableReparam
from pyro.ops.tensor_utils import periodic_repeat
from pyro.ops.stats import quantile
from pyro.contrib.forecast.util import reshape_batch
@reshape_batch.register(dist.MaskedDistribution)
def _(d, batch_shape):
base_dist = reshape_batch(d.base_dist, batch_shape)
return dist.MaskedDistribution(base_dist, d._mask.reshape(base_dist.shape()) )
class Model2(ForecastingModel):
def __init__(self, mask=None):
super().__init__()
self.mask = mask
def model(self, zero_data, covariates):
num_stores, num_products, duration, one = zero_data.shape
# We construct plates once so we can reuse them later. We ensure they don't collide by
# specifying different dim args for each: -3, -2, -1. Note the time_plate is dim=-1.
stores_plate = pyro.plate("stores", num_stores, dim=-3)
products_plate = pyro.plate("products", num_products, dim=-2)
day_of_week_plate = pyro.plate("day_of_week", 7, dim=-1)
# Let's model the time-dependent part with only O(num_stations * duration) 复杂度 many
# parameters, rather than the full possible O(num_stations ** 2 * duration) data size.
drift_stability = pyro.sample("drift_stability", dist.Uniform(1, 2))
drift_scale = pyro.sample("drift_scale", dist.LogNormal(-20, 5))
with stores_plate:
with day_of_week_plate:
stores_seasonal = pyro.sample("stores_seasonal", dist.Normal(0, 5))
with products_plate:
with day_of_week_plate:
product_seasonal = pyro.sample("product_seasonal", dist.Normal(0, 5))
with self.time_plate:
with poutine.reparam(config={"drift": LocScaleReparam()}):
with poutine.reparam(config={"drift": SymmetricStableReparam()}):
drift = pyro.sample("drift",
dist.Stable(drift_stability, 0, drift_scale))
with stores_plate, products_plate:
pairwise = pyro.sample("pairwise", dist.Normal(0, 1))
# Outside of the time plate we can now form the prediction.
seasonal = stores_seasonal + product_seasonal # Note this broadcasts.
seasonal = periodic_repeat(seasonal, duration, dim=-1) # 组装时间周期
motion = drift.cumsum(dim=-1) # A Levy stable motion to model shocks.
prediction = motion + seasonal + pairwise
# We will decompose the noise scale parameter into
# an origin-local and a destination-local component.
with stores_plate:
stores_scale = pyro.sample("stores_scale", dist.LogNormal(-5, 5))
with products_plate:
products_scale = pyro.sample("products_scale", dist.LogNormal(-5, 5))
scale = stores_scale + products_scale
# At this point our prediction and scale have shape (50, 50, duration) and (50, 50, 1)
# respectively, but we want them to have shape (50, 50, duration, 1) to satisfy the
# Forecaster requirements.
scale = scale.unsqueeze(-1)
prediction = prediction.unsqueeze(-1)
# Finally we construct a noise distribution and call the .predict() method.
# Note that predict must be called inside the origin and destination plates.
noise_dist = dist.Normal(0, scale)
if self.mask is not None:
noise_dist = noise_dist.mask(self.mask)
with stores_plate, products_plate:
self.predict(noise_dist, prediction)
test_data = torch.Tensor(msc)
T2 = test_data.size(-2) # end
T1 = T2 - 7 * 2 # train/test split
T0 = 0 # beginning: train on 90 days of data
covariates = torch.zeros(test_data.size(-2), 0) # empty covariates
pyro.set_rng_seed(1)
pyro.clear_param_store()
mask = torch.isnan(test_data)
# test_data = torch.Tensor(msc)
test_data = torch.Tensor(np.nan_to_num(msc))
covariates = torch.zeros(test_data.size(-2), 0)
forecaster = Forecaster(Model2(mask=mask[..., T0:T1, :]), test_data[..., T0:T1, :], covariates[T0:T1],
learning_rate=0.1, learning_rate_decay=1, num_steps=501, log_every=50)
samples = forecaster(test_data[..., T0:T1, :], covariates[T0:T2], num_samples=100)
###################################
# here tensor([], size=(44, 103, 0, 1))
# then below code would encounter error
###################################
samples.clamp_(min=0) # apply domain knowledge: the samples must be positive
p10, p50, p90 = quantile(samples[..., 0], (0.1, 0.5, 0.9))
crps = eval_crps(samples, test_data[..., T1:T2, :])
print(samples.shape, p10.shape)