pyro-ppl/pyro

[Feature request] Support mask in Timeseries Forecast .

Open

#2,708 opened on Dec 7, 2020

View on GitHub
 (8 comments) (1 reaction) (1 assignee)Python (8,211 stars) (981 forks)batch import
documentationenhancementhelp wanted

Description

Missing observation is very common in real work , but I doesn't found a way to write a correct model . And tutorial http://pyro.ai/examples/forecasting_iii.html do not cover this problem : nan in real data . For example : in 2010, there was only 48 station in 2011, 3 station was closed and 5 new opened, 50 stations . in 2012, 2 station was closed in 2011 reponed …

For another example : My data is salecount of various products in many stores . I have reshape the szie to torch.Size([44, 103, 671, 1]) , means: 44 stores, 103 products , 671 days salecount . Some stores may be closed in different days, so as products would be off.shelf by many reason .

random 10 products history salecount :

image

Create matrix must have nan values , and

  • We can’t fill them by 0 because they are different to true 0 .
  • We should not take nan values into count.
  • We can’t drop the nan when trainning , because that break timeseries order . These are real cases .

According to https://forum.pyro.ai/t/how-to-ignore-nan-values-when-do-hierachical-forecast/2412

I alter forecast_iii code to below, structure is totally same , I just change some names .
I add a mask to noise distribution , but without luck , it doesn't work in prediction .

import math
import torch
import pyro
import pyro.distributions as dist
import pyro.poutine as poutine
from pyro.contrib.examples.bart import load_bart_od
from pyro.contrib.forecast import ForecastingModel, Forecaster, eval_crps
from pyro.infer.reparam import LocScaleReparam, SymmetricStableReparam
from pyro.ops.tensor_utils import periodic_repeat
from pyro.ops.stats import quantile

from pyro.contrib.forecast.util import reshape_batch

@reshape_batch.register(dist.MaskedDistribution)
def _(d, batch_shape):
    base_dist = reshape_batch(d.base_dist, batch_shape)
    return dist.MaskedDistribution(base_dist, d._mask.reshape(base_dist.shape()) )


class Model2(ForecastingModel):
    def __init__(self, mask=None):
        super().__init__()
        self.mask = mask

    def model(self, zero_data, covariates):
        num_stores, num_products, duration, one = zero_data.shape

        # We construct plates once so we can reuse them later. We ensure they don't collide by
        # specifying different dim args for each: -3, -2, -1. Note the time_plate is dim=-1.
        stores_plate = pyro.plate("stores", num_stores, dim=-3)
        products_plate = pyro.plate("products", num_products, dim=-2)
        day_of_week_plate = pyro.plate("day_of_week", 7, dim=-1)

        # Let's model the time-dependent part with only O(num_stations * duration) 复杂度 many
        # parameters, rather than the full possible O(num_stations ** 2 * duration) data size.
        drift_stability = pyro.sample("drift_stability", dist.Uniform(1, 2))
        drift_scale = pyro.sample("drift_scale", dist.LogNormal(-20, 5))
        with stores_plate:
            with day_of_week_plate:
                stores_seasonal = pyro.sample("stores_seasonal", dist.Normal(0, 5))
        with products_plate:
            with day_of_week_plate:
                product_seasonal = pyro.sample("product_seasonal", dist.Normal(0, 5))
            with self.time_plate:
                with poutine.reparam(config={"drift": LocScaleReparam()}):
                    with poutine.reparam(config={"drift": SymmetricStableReparam()}):
                        drift = pyro.sample("drift",
                                            dist.Stable(drift_stability, 0, drift_scale))

        with stores_plate, products_plate:
            pairwise = pyro.sample("pairwise", dist.Normal(0, 1))

        # Outside of the time plate we can now form the prediction.
        seasonal = stores_seasonal + product_seasonal  # Note this broadcasts.
        seasonal = periodic_repeat(seasonal, duration, dim=-1)  # 组装时间周期
        motion = drift.cumsum(dim=-1)  # A Levy stable motion to model shocks.
        prediction = motion + seasonal + pairwise

        # We will decompose the noise scale parameter into
        # an origin-local and a destination-local component.
        with stores_plate:
            stores_scale = pyro.sample("stores_scale", dist.LogNormal(-5, 5))
        with products_plate:
            products_scale = pyro.sample("products_scale", dist.LogNormal(-5, 5))
        scale = stores_scale + products_scale

        # At this point our prediction and scale have shape (50, 50, duration) and (50, 50, 1)
        # respectively, but we want them to have shape (50, 50, duration, 1) to satisfy the
        # Forecaster requirements.
        scale = scale.unsqueeze(-1)
        prediction = prediction.unsqueeze(-1)

        # Finally we construct a noise distribution and call the .predict() method.
        # Note that predict must be called inside the origin and destination plates.
        noise_dist = dist.Normal(0, scale)

        if self.mask is not None:
            noise_dist = noise_dist.mask(self.mask)

        with stores_plate, products_plate:
            self.predict(noise_dist, prediction)


test_data = torch.Tensor(msc)

T2 = test_data.size(-2)    # end
T1 = T2 - 7 * 2  # train/test split
T0 = 0    # beginning: train on 90 days of data
covariates = torch.zeros(test_data.size(-2), 0)  # empty covariates

pyro.set_rng_seed(1)
pyro.clear_param_store()
mask = torch.isnan(test_data)

# test_data = torch.Tensor(msc)
test_data = torch.Tensor(np.nan_to_num(msc))

covariates = torch.zeros(test_data.size(-2), 0)  
forecaster = Forecaster(Model2(mask=mask[..., T0:T1, :]), test_data[..., T0:T1, :], covariates[T0:T1],
                        learning_rate=0.1, learning_rate_decay=1, num_steps=501, log_every=50)

samples = forecaster(test_data[..., T0:T1, :], covariates[T0:T2], num_samples=100)

###################################
# here tensor([], size=(44, 103, 0, 1))
# then below code would encounter error 
###################################

samples.clamp_(min=0)  # apply domain knowledge: the samples must be positive
p10, p50, p90 = quantile(samples[..., 0], (0.1, 0.5, 0.9))
crps = eval_crps(samples, test_data[..., T1:T2, :])
print(samples.shape, p10.shape)

Contributor guide

[Feature request] Support mask in Timeseries Forecast . · pyro-ppl/pyro#2708 | Good First Issue