深度马尔可夫模型

简介

我们将构建一个用于序列数据的深度概率模型:深度马尔可夫模型 (Deep Markov Model)。我们将要建模的特定数据集由多声部音乐片段组成。序列中的每个时间片跨越一个四分音符,并由一个 88 维二元向量表示,该向量编码了该时间步上的音符。

由于音乐(显然)在时间上是连贯的,我们需要一个能够表示观测数据中复杂时间依赖关系的模型。例如,考虑一个模型,其中某个时间步的音符与先前时间步的音符是独立的,这是不合适的。一种实现方法是构建一个隐变量模型,其中观测的变异性和时间结构由隐变量的动态过程控制。

这种想法的一个具体实现是马尔可夫模型,其中我们有一个隐变量链,链中的每个隐变量都以前一个隐变量为条件。这是一种强大的方法,但如果我们想用复杂(在本例中是未知的)的动态来表示复杂数据,我们希望我们的模型足够灵活,以适应潜在的高度非线性的动态。因此,引入深度马尔可夫模型:我们允许控制隐变量动态的转移概率以及控制观测如何由隐动态生成的发射概率通过(非线性)神经网络进行参数化。

我们将要实现的具体模型基于以下参考文献

[1] Structured Inference Networks for Nonlinear State Space Models,     Rahul G. Krishnan, Uri Shalit, David Sontag

请注意,虽然我们不假定本教程的读者已阅读参考文献,但如果想在其他时间序列模型的背景下更全面地讨论深度马尔可夫模型,这绝对是一个很好的参考来源。

我们已经描述了模型,那么如何训练它呢?我们将使用的推断策略是变分推断,这需要指定一个参数化的分布族来近似隐随机变量的后验分布。考虑到我们的模型和数据中固有的非线性和复杂时间依赖性,我们预计精确后验分布会非常复杂。因此,如果我们希望学习一个好的模型,我们将需要一个灵活的变分分布族。幸运的是,PyTorch 和 Pyro 一起提供了所有必要的组件。正如我们将看到的,组装它们将非常简单。让我们开始工作吧。

模型

描述模型高层结构的一种便捷方法是使用图形模型。

图 1:模型展开 T=3 个时间步。

在这里,我们假设观测序列长度为三,展开了模型:\(\{{\bf x}_1, {\bf x}_2, {\bf x}_3\}\)。与观测序列相对应,我们也有一个隐随机变量序列:\(\{{\bf z}_1, {\bf z}_2, {\bf z}_3\}\)。图表编码了模型的结构。相应的联合分布是

\[p({\bf x}_{123} , {\bf z}_{123})=p({\bf x}_1|{\bf z}_1)p({\bf x}_2|{\bf z}_2)p({\bf x}_3|{\bf z}_3)p({\bf z}_1)p({\bf z}_2|{\bf z}_1)p({\bf z}_3|{\bf z}_2)\]

\({\bf z}_t\) 为条件,每个观测 \({\bf x}_t\) 与其他观测独立。这可以从每个 \({\bf x}_t\) 只依赖于相应的隐变量 \({\bf z}_t\) 中看出,如图中向下的箭头所示。我们还可以看出模型的马尔可夫性质:每个隐变量 \({\bf z}_t\),以前一个隐变量 \({\bf z}_{t-1}\) 为条件时,与所有之前的隐变量 \(\{ {\bf z}_{t-2}, {\bf z}_{t-3}, ...\}\) 独立。这实际上意味着,在时间 \(t\) 关于系统状态所需知道的一切都封装在隐变量 \({\bf z}_{t}\) 中。

我们将假设观测似然,即控制观测的概率分布 \(p({{\bf x}_t}|{{\bf z}_t})\),由伯努利分布给出。这是一个合适的选择,因为我们的观测都是 0 或 1。对于控制隐变量动态的概率分布 \(p({\bf z}_t|{\bf z}_{t-1})\),我们选择具有对角协方差的(条件)高斯分布。这是合理的,因为我们假设隐空间是连续的。

实心黑色方块代表由神经网络参数化的非线性函数。这就是使其成为深度马尔可夫模型的原因。注意黑色方块出现在两个不同的地方:在隐变量对之间以及在隐变量和观测之间。连接隐变量的非线性函数(图 1 中的“Trans”)控制隐变量的动态。由于我们允许 \({\bf z}_{t}\) 的条件概率分布以复杂的方式依赖于 \({\bf z}_{t-1}\),我们将能够捕捉模型中复杂的动态。类似地,连接隐变量到观测的非线性函数(图 1 中的“Emit”)控制观测如何依赖于隐动态。

一些额外说明:- 我们可以根据手头的问题自由选择隐空间的维度:简单问题使用小隐空间,具有复杂动态的问题使用大隐空间。- 注意图 1 中的参数 \({\bf z}_0\)。从代码中将变得更清楚,这只是我们方便参数化第一个时间步(没有之前隐变量可作为条件)的概率分布 \(p({\bf z}_1)\) 的一种方式。

门控转移和发射器

事不宜迟,让我们开始编写一些代码。我们首先定义对应于图 1 中黑色方块的两个 PyTorch Module。首先是发射函数

class Emitter(nn.Module):
    """
    Parameterizes the bernoulli observation likelihood p(x_t | z_t)
    """
    def __init__(self, input_dim, z_dim, emission_dim):
        super().__init__()
        # initialize the three linear transformations used in the neural network
        self.lin_z_to_hidden = nn.Linear(z_dim, emission_dim)
        self.lin_hidden_to_hidden = nn.Linear(emission_dim, emission_dim)
        self.lin_hidden_to_input = nn.Linear(emission_dim, input_dim)
        # initialize the two non-linearities used in the neural network
        self.relu = nn.ReLU()
        self.sigmoid = nn.Sigmoid()

    def forward(self, z_t):
        """
        Given the latent z at a particular time step t we return the vector of
        probabilities `ps` that parameterizes the bernoulli distribution p(x_t|z_t)
        """
        h1 = self.relu(self.lin_z_to_hidden(z_t))
        h2 = self.relu(self.lin_hidden_to_hidden(h1))
        ps = self.sigmoid(self.lin_hidden_to_input(h2))
        return ps

在构造函数中,我们定义了将在发射函数中使用的线性变换。注意 emission_dim 是神经网络中的隐藏单元数量。我们还定义了将要使用的非线性函数。前向调用定义了函数的计算流程。我们输入隐变量 \({\bf z}_{t}\),并进行一系列变换,直到获得一个长度为 88 的向量,该向量定义了我们的伯努利似然的发射概率。由于 sigmoid 函数的作用,ps 的每个元素都将在 0 到 1 之间,并定义一个有效的概率。总的来说,ps 的元素编码了给定系统状态(由 \({\bf z}_{t}\) 编码)时,我们在时间 \(t\) 期望观测到哪些音符。

现在我们定义门控转移函数

class GatedTransition(nn.Module):
    """
    Parameterizes the gaussian latent transition probability p(z_t | z_{t-1})
    See section 5 in the reference for comparison.
    """
    def __init__(self, z_dim, transition_dim):
        super().__init__()
        # initialize the six linear transformations used in the neural network
        self.lin_gate_z_to_hidden = nn.Linear(z_dim, transition_dim)
        self.lin_gate_hidden_to_z = nn.Linear(transition_dim, z_dim)
        self.lin_proposed_mean_z_to_hidden = nn.Linear(z_dim, transition_dim)
        self.lin_proposed_mean_hidden_to_z = nn.Linear(transition_dim, z_dim)
        self.lin_sig = nn.Linear(z_dim, z_dim)
        self.lin_z_to_loc = nn.Linear(z_dim, z_dim)
        # modify the default initialization of lin_z_to_loc
        # so that it's starts out as the identity function
        self.lin_z_to_loc.weight.data = torch.eye(z_dim)
        self.lin_z_to_loc.bias.data = torch.zeros(z_dim)
        # initialize the three non-linearities used in the neural network
        self.relu = nn.ReLU()
        self.sigmoid = nn.Sigmoid()
        self.softplus = nn.Softplus()

    def forward(self, z_t_1):
        """
        Given the latent z_{t-1} corresponding to the time step t-1
        we return the mean and scale vectors that parameterize the
        (diagonal) gaussian distribution p(z_t | z_{t-1})
        """
        # compute the gating function
        _gate = self.relu(self.lin_gate_z_to_hidden(z_t_1))
        gate = self.sigmoid(self.lin_gate_hidden_to_z(_gate))
        # compute the 'proposed mean'
        _proposed_mean = self.relu(self.lin_proposed_mean_z_to_hidden(z_t_1))
        proposed_mean = self.lin_proposed_mean_hidden_to_z(_proposed_mean)
        # assemble the actual mean used to sample z_t, which mixes
        # a linear transformation of z_{t-1} with the proposed mean
        # modulated by the gating function
        loc = (1 - gate) * self.lin_z_to_loc(z_t_1) + gate * proposed_mean
        # compute the scale used to sample z_t, using the proposed
        # mean from above as input. the softplus ensures that scale is positive
        scale = self.softplus(self.lin_sig(self.relu(proposed_mean)))
        # return loc, scale which can be fed into Normal
        return loc, scale

这反映了上面 Emitter 的结构,不同之处在于计算流程稍微复杂一些。这有两个原因。首先,GatedTransition 的输出需要定义一个有效的(对角)高斯分布。因此我们需要输出两个参数:均值 loc 和(平方根)协方差 scale。这两者都需要与隐空间具有相同的维度。其次,我们不想强制动态是非线性的。因此,我们的均值 loc 是两项之和,其中只有一项非线性地依赖于输入 z_t_1。通过这种方式,我们可以支持线性动态和非线性动态(或者实际上让隐空间的一部分动态是线性的,而其余动态是非线性的)。

模型 - 一个 Pyro 随机函数

到目前为止,我们所做的一切都是纯 PyTorch。要完成将模型转换为代码,我们需要引入 Pyro。基本上,我们需要实现图 1 中的随机节点(即圆圈)。为此,我们引入一个可调用的 model() 函数,其中包含 Pyro 原语 pyro.samplesample 语句将用于指定隐变量 \({\bf z}_{1:T}\) 的联合分布。此外,obs 参数可与 sample 语句一起使用,以指定观测 \({\bf x}_{1:T}\) 如何依赖于隐变量。在我们查看完整的 model() 代码之前,让我们看看一个包含主要逻辑的简化版本

def model(...):
    z_prev = self.z_0

    # sample the latents z and observed x's one time step at a time
    for t in range(1, T_max + 1):
        # the next two lines of code sample z_t ~ p(z_t | z_{t-1}).
        # first compute the parameters of the diagonal gaussian
        # distribution p(z_t | z_{t-1})
        z_loc, z_scale = self.trans(z_prev)
        # then sample z_t according to dist.Normal(z_loc, z_scale)
        z_t = pyro.sample("z_%d" % t, dist.Normal(z_loc, z_scale))

        # compute the probabilities that parameterize the bernoulli likelihood
        emission_probs_t = self.emitter(z_t)
        # the next statement instructs pyro to observe x_t according to the
        # bernoulli distribution p(x_t|z_t)
        pyro.sample("obs_x_%d" % t,
                    dist.Bernoulli(emission_probs_t),
                    obs=mini_batch[:, t - 1, :])
        # the latent sampled at this time step will be conditioned upon
        # in the next time step so keep track of it
        z_prev = z_t

我们首先需要采样 \({\bf z}_1\)。一旦采样了 \({\bf z}_1\),我们就可以采样 \({\bf z}_2 \sim p({\bf z}_2|{\bf z}_1)\),依此类推。这就是 for 循环中实现的逻辑。定义概率分布 \(p({\bf z}_t|{\bf z}_{t-1})\) 的参数 z_locz_scale 是使用 self.trans 计算的,后者是我们上面定义的 GatedTransition 模块的一个实例。对于第一个时间步 \(t=1\),我们以 self.z_0(一个可训练的 Parameter)为条件,而对于后续时间步,我们以前一次采样的隐变量为条件。注意,用户为每个随机变量 z_t 分配了一个唯一名称。

在给定时间步采样了 \({\bf z}_t\) 后,我们需要观测数据点 \({\bf x}_t\)。因此,我们将 z_t 通过上面定义的 Emitter 模块实例 self.emitter,以获得 emission_probs_t。连同 sample 语句中的 dist.Bernoulli() 参数,这些概率完全指定了观测似然。最后,我们还使用 sampleobs 参数指定观测数据 \({\bf x}_t\) 的切片:mini_batch[:, t - 1, :]

这完全指定了我们的模型,并将其封装在一个可传递给 Pyro 的可调用对象中。在我们继续之前,让我们看看 model() 的完整版本,并回顾我们在初次查看时忽略的一些细节。

def model(self, mini_batch, mini_batch_reversed, mini_batch_mask,
          mini_batch_seq_lengths, annealing_factor=1.0):

    # this is the number of time steps we need to process in the mini-batch
    T_max = mini_batch.size(1)

    # register all PyTorch (sub)modules with pyro
    # this needs to happen in both the model and guide
    pyro.module("dmm", self)

    # set z_prev = z_0 to setup the recursive conditioning in p(z_t | z_{t-1})
    z_prev = self.z_0.expand(mini_batch.size(0), self.z_0.size(0))

    # we enclose all the sample statements in the model in a plate.
    # this marks that each datapoint is conditionally independent of the others
    with pyro.plate("z_minibatch", len(mini_batch)):
        # sample the latents z and observed x's one time step at a time
        for t in range(1, T_max + 1):
            # the next chunk of code samples z_t ~ p(z_t | z_{t-1})
            # note that (both here and elsewhere) we use poutine.scale to take care
            # of KL annealing. we use the mask() method to deal with raggedness
            # in the observed data (i.e. different sequences in the mini-batch
            # have different lengths)

            # first compute the parameters of the diagonal gaussian
            # distribution p(z_t | z_{t-1})
            z_loc, z_scale = self.trans(z_prev)

            # then sample z_t according to dist.Normal(z_loc, z_scale).
            # note that we use the reshape method so that the univariate
            # Normal distribution is treated as a multivariate Normal
            # distribution with a diagonal covariance.
            with poutine.scale(None, annealing_factor):
                z_t = pyro.sample("z_%d" % t,
                                  dist.Normal(z_loc, z_scale)
                                      .mask(mini_batch_mask[:, t - 1:t])
                                      .to_event(1))

            # compute the probabilities that parameterize the bernoulli likelihood
            emission_probs_t = self.emitter(z_t)
            # the next statement instructs pyro to observe x_t according to the
            # bernoulli distribution p(x_t|z_t)
            pyro.sample("obs_x_%d" % t,
                        dist.Bernoulli(emission_probs_t)
                            .mask(mini_batch_mask[:, t - 1:t])
                            .to_event(1),
                        obs=mini_batch[:, t - 1, :])
            # the latent sampled at this time step will be conditioned upon
            # in the next time step so keep track of it
            z_prev = z_t

首先要注意的是 model() 接受许多参数。现在让我们只关注 mini_batchmini_batch_maskmini_batch 是一个三维张量,第一维是批次维度,第二维是时间维度,最后一维是特征维度(在本例中是 88 维)。为了加速代码,每当我们运行 model 时,我们将处理整个 mini-batch 的序列(即,我们将利用向量化)。

这是合理的,因为我们的模型是隐含地定义在单个观测序列上的。一组序列的概率就是各个序列概率的乘积。换句话说,给定模型参数,这些序列是条件独立的。

这种向量化引入了一些复杂性,因为序列长度可能不同。这就是 mini_batch_mask 的作用。mini_batch_mask 是一个二维的 0/1 掩码,维度为 mini_batch_size x T_max,其中 T_max 是 mini-batch 中任何序列的最大长度。这编码了 mini_batch 的哪些部分是有效观测。

所以我们做的第一件事是获取 T_max:我们必须将模型展开至少这么多的时间步。请注意,这将导致大量的“浪费”计算,因为有些序列会比 T_max 短,但这对于向量化带来的巨大加速来说是值得的。我们只需要确保没有“浪费”的计算“污染”我们的模型计算。我们通过将适合时间步 \(t\) 的掩码传递给 mask 方法(该方法作用于需要掩码的分布)来实现这一点。

最后,行 pyro.module("dmm", self) 等同于模型中每个参数的一系列 pyro.param 语句。这让 Pyro 知道哪些参数是模型的一部分。就像 sample 语句一样,我们给模块一个唯一的名称。这个名称将被并入模型中 Parameters 的名称中。关于 KL 退火因子的讨论留待以后。

推断

至此,我们已经完全指定了模型。下一步是准备进行推断。如简介中所述,我们的推断策略将是变分推断(参见 SVI 第一部分 了解简介)。所以我们接下来的任务是构建一个适用于在深度马尔可夫模型中进行推断的变分分布族。然而,在此刻值得强调的是,我们实现 model() 的方式并不将我们绑定到变分推断。原则上我们可以使用任何推断策略可用在 Pyro 中。例如,在这个特定背景下,人们可以想象使用 Sequential Monte Carlo 的某种变体(尽管 Pyro 目前还不支持)。

指引函数 (Guide)

指引函数(即变分分布)的目的是提供对精确后验分布 \(p({\bf z}_{1:T}|{\bf x}_{1:T})\) 的(参数化)近似。实际上,这里有一个隐含的假设我们应该明确说明,所以让我们退一步。假设我们的数据集 \(\mathcal{D}\) 包含 \(N\) 个序列 \(\{ {\bf x}_{1:T_1}^1, {\bf x}_{1:T_2}^2, ..., {\bf x}_{1:T_N}^N \}\)。那么我们实际感兴趣的后验分布是 \(p({\bf z}_{1:T_1}^1, {\bf z}_{1:T_2}^2, ..., {\bf z}_{1:T_N}^N | \mathcal{D})\),也就是说,我们想推断所有 \(N\) 个序列的隐变量。即使对于小的 \(N\),这也是一个非常高维的分布,需要非常多的参数来指定。特别是如果我们直接以这种形式参数化后验,所需的参数数量将随 \(N\)(至少)线性增长。避免参数数量随数据集大小这种讨厌的增长的一种方法是摊销(参见 SVI 第二部分 中的类似讨论)。

旁注:摊销

其工作原理如下。我们不为数据集中的每个序列引入变分参数,而是学习一个单一的参数化函数 \(f({\bf x}_{1:T})\),并使用形式为 \(\prod_{n=1}^N q({\bf z}_{1:T_n}^n | f({\bf x}_{1:T_n}^n))\) 的变分分布。函数 \(f(\cdot)\)(它基本上将给定的观测序列映射到为该序列定制的一组变分参数)需要足够丰富以准确捕捉后验,但现在我们可以处理大型数据集而无需引入数量庞大的变分参数。

因此我们的任务是构建函数 \(f(\cdot)\)。由于在这种情况下我们需要支持变长序列,所以 \(f(\cdot)\) 自然地会包含一个 RNN。在我们详细了解构成 \(f(\cdot)\) 的各个组成部分之前,让我们先看看一个编码基本结构的计算图

图 2:Guide 展开 T=3 个时间步。

在图的底部,我们有三个观测值的序列。这些观测值将被一个 RNN 消耗,该 RNN 从右到左读取观测值,并输出三个隐藏状态 \(\{ {\bf h}_1, {\bf h}_2,{\bf h}_3\}\)。请注意,此计算是在我们采样任何隐变量之前完成的。接下来,每个隐藏状态将输入到一个 Combiner 模块,其任务是输出条件分布 \(q({\bf z}_t | {\bf z}_{t-1}, {\bf x}_{t:T})\) 的均值和协方差,我们将其视为对角高斯分布。(就像在模型中一样,Guide 中 \({\bf z}_{1:T}\) 的条件结构是我们在时间上向前采样 \({\bf z}_t\)。)除了 RNN 隐藏状态,Combiner 还将前一个时间步的隐随机变量作为输入,但 \(t=1\) 除外,此时它将可训练的(变分)参数 \({\bf z}_0^{\rm{q}}\) 作为输入。

旁注:Guide 结构

为什么我们将 RNN 设置为从右到左消耗观测值?为什么不从左到右?通过这种选择,我们的条件分布 \(q({\bf z}_t |...)\) 取决于两件事

  • 前一个时间步的隐变量 \({\bf z}_{t-1}\);以及

  • 观测值 \({\bf x}_{t:T}\),即当前观测值以及所有未来观测值

我们可以自由做出其他选择;唯一的要求是 Guide 是一个经过适当归一化的分布,并且与自动微分兼容。这个特定的选择是受到真实后验分布依赖结构的启发:详细讨论请参见参考文献 [1]。简而言之,虽然我们可以例如以前一个观测序列为条件,但由于模型的马尔可夫结构,我们需要知道的关于之前观测 \({\bf x}_{1:t-1}\) 的所有信息都封装在 \({\bf z}_{t-1}\) 中。我们可以以更多的信息为条件,但没有必要;这样做可能会稀释学习信号。因此,对于这个特定的模型,从右到左运行 RNN 是最自然的选择。

让我们详细看看组成部分。首先是 Combiner 模块

class Combiner(nn.Module):
    """
    Parameterizes q(z_t | z_{t-1}, x_{t:T}), which is the basic building block
    of the guide (i.e. the variational distribution). The dependence on x_{t:T} is
    through the hidden state of the RNN (see the pytorch module `rnn` below)
    """
    def __init__(self, z_dim, rnn_dim):
        super().__init__()
        # initialize the three linear transformations used in the neural network
        self.lin_z_to_hidden = nn.Linear(z_dim, rnn_dim)
        self.lin_hidden_to_loc = nn.Linear(rnn_dim, z_dim)
        self.lin_hidden_to_scale = nn.Linear(rnn_dim, z_dim)
        # initialize the two non-linearities used in the neural network
        self.tanh = nn.Tanh()
        self.softplus = nn.Softplus()

    def forward(self, z_t_1, h_rnn):
        """
        Given the latent z at at a particular time step t-1 as well as the hidden
        state of the RNN h(x_{t:T}) we return the mean and scale vectors that
        parameterize the (diagonal) gaussian distribution q(z_t | z_{t-1}, x_{t:T})
        """
        # combine the rnn hidden state with a transformed version of z_t_1
        h_combined = 0.5 * (self.tanh(self.lin_z_to_hidden(z_t_1)) + h_rnn)
        # use the combined hidden state to compute the mean used to sample z_t
        loc = self.lin_hidden_to_loc(h_combined)
        # use the combined hidden state to compute the scale used to sample z_t
        scale = self.softplus(self.lin_hidden_to_scale(h_combined))
        # return loc, scale which can be fed into Normal
        return loc, scale

这个模块的总体结构与模型中的 EmitterGatedTransition 相同。唯一值得注意的是,由于 Combiner 需要在每个时间步消耗两个输入,因此它在计算输出之前将输入转换为一个单一的组合隐藏状态 h_combined

除了 RNN,我们现在已经拥有构建 Guide 分布所需的所有组成部分。幸运的是,PyTorch 提供了出色的内置 RNN 模块,因此我们在这方面不需要做太多工作。我们稍后会看到在哪里实例化 RNN。现在,让我们直接跳到随机函数 guide() 的定义。

def guide(self, mini_batch, mini_batch_reversed, mini_batch_mask,
          mini_batch_seq_lengths, annealing_factor=1.0):

    # this is the number of time steps we need to process in the mini-batch
    T_max = mini_batch.size(1)
    # register all PyTorch (sub)modules with pyro
    pyro.module("dmm", self)

    # if on gpu we need the fully broadcast view of the rnn initial state
    # to be in contiguous gpu memory
    h_0_contig = self.h_0.expand(1, mini_batch.size(0),
                                 self.rnn.hidden_size).contiguous()
    # push the observed x's through the rnn;
    # rnn_output contains the hidden state at each time step
    rnn_output, _ = self.rnn(mini_batch_reversed, h_0_contig)
    # reverse the time-ordering in the hidden state and un-pack it
    rnn_output = poly.pad_and_reverse(rnn_output, mini_batch_seq_lengths)
    # set z_prev = z_q_0 to setup the recursive conditioning in q(z_t |...)
    z_prev = self.z_q_0.expand(mini_batch.size(0), self.z_q_0.size(0))

    # we enclose all the sample statements in the guide in a plate.
    # this marks that each datapoint is conditionally independent of the others.
    with pyro.plate("z_minibatch", len(mini_batch)):
        # sample the latents z one time step at a time
        for t in range(1, T_max + 1):
            # the next two lines assemble the distribution q(z_t | z_{t-1}, x_{t:T})
            z_loc, z_scale = self.combiner(z_prev, rnn_output[:, t - 1, :])
            z_dist = dist.Normal(z_loc, z_scale)

            # sample z_t from the distribution z_dist
            with pyro.poutine.scale(None, annealing_factor):
                z_t = pyro.sample("z_%d" % t,
                                  z_dist.mask(mini_batch_mask[:, t - 1:t])
                                        .to_event(1))
            # the latent sampled at this time step will be conditioned
            # upon in the next time step so keep track of it
            z_prev = z_t

guide() 的高层结构与 model() 非常相似。首先请注意,模型和 Guide 接受相同的参数:这是 Pyro 中模型/Guide 对的一般要求。与模型中一样,调用了 pyro.module 来向 Pyro 注册所有参数。此外,for 循环的结构与 model() 中的循环相同,不同之处在于 Guide 只需采样隐变量(没有带有 obs 关键字的 sample 语句)。最后,请注意 Guide 中隐变量的名称与模型中的名称完全匹配。这就是 Pyro 知道如何正确对齐随机变量的方式。

RNN 的逻辑对于 PyTorch 用户来说应该很熟悉,但让我们快速回顾一下。首先,我们准备 RNN 的初始状态 h_0。然后我们通过其前向调用来调用 RNN;结果张量 rnn_output 包含了整个 mini-batch 的隐藏状态。请注意,由于我们希望 RNN 从右到左消耗观测值,因此 RNN 的输入是 mini_batch_reversed,它是 mini_batch 的一个副本,其中所有序列都按反向时间顺序排列。此外,mini_batch_reversed 已用 PyTorch 的 rnn.pack_padded_sequence 包装,以便 RNN 可以处理变长序列。由于我们在隐空间中按正常时间顺序进行采样,我们使用辅助函数 pad_and_reverse 来反转 rnn_output 中的隐藏状态序列,以便我们可以将正确对齐和排序的 RNN 隐藏状态馈送给 Combiner。此辅助函数还会解包 rnn_output,使其不再是 PyTorch rnn.pack_padded_sequence 的形式。

将模型和 Guide 打包为 PyTorch Module

在此节点,我们准备继续进行推断。但在这样做之前,让我们快速回顾一下如何将模型和 Guide 打包为一个单一的 PyTorch Module。这通常是一个好的实践,特别是对于较大的模型。

class DMM(nn.Module):
    """
    This PyTorch Module encapsulates the model as well as the
    variational distribution (the guide) for the Deep Markov Model
    """
    def __init__(self, input_dim=88, z_dim=100, emission_dim=100,
                 transition_dim=200, rnn_dim=600, rnn_dropout_rate=0.0,
                 num_iafs=0, iaf_dim=50, use_cuda=False):
        super().__init__()
        # instantiate pytorch modules used in the model and guide below
        self.emitter = Emitter(input_dim, z_dim, emission_dim)
        self.trans = GatedTransition(z_dim, transition_dim)
        self.combiner = Combiner(z_dim, rnn_dim)
        self.rnn = nn.RNN(input_size=input_dim, hidden_size=rnn_dim,
                          nonlinearity='relu', batch_first=True,
                          bidirectional=False, num_layers=1, dropout=rnn_dropout_rate)

        # define a (trainable) parameters z_0 and z_q_0 that help define
        # the probability distributions p(z_1) and q(z_1)
        # (since for t = 1 there are no previous latents to condition on)
        self.z_0 = nn.Parameter(torch.zeros(z_dim))
        self.z_q_0 = nn.Parameter(torch.zeros(z_dim))
        # define a (trainable) parameter for the initial hidden state of the rnn
        self.h_0 = nn.Parameter(torch.zeros(1, 1, rnn_dim))

        self.use_cuda = use_cuda
        # if on gpu cuda-ize all pytorch (sub)modules
        if use_cuda:
            self.cuda()

    # the model p(x_{1:T} | z_{1:T}) p(z_{1:T})
    def model(...):

        # ... as above ...

    # the guide q(z_{1:T} | x_{1:T}) (i.e. the variational distribution)
    def guide(...):

        # ... as above ...

既然我们已经介绍了 modelguide,我们这里的重点是构造函数。首先,我们实例化在模型和 Guide 中使用的四个 PyTorch 模块。模型端:EmitterGatedTransition。Guide 端:Combiner 和 RNN。

接下来,我们定义 PyTorch Parameter 用于 RNN 的初始状态,以及 z_0z_q_0,它们分别被馈送到 self.transself.combiner,以代替不存在的随机变量 \(\bf z_0\)

这里要强调的关键是,所有这些 ModuleParameter 都是 DMM 的属性(DMM 本身继承自 nn.Module)。这带来的结果是它们都会自动注册到模块中。因此,例如,当我们对 DMM 的实例调用 parameters() 时,PyTorch 会知道返回所有相关的参数。这也意味着,当我们在 model()guide() 中调用 pyro.module("dmm", self) 时,模型和 Guide 的所有参数都会注册到 Pyro 中。最后,这意味着如果我们在 GPU 上运行,调用 cuda() 会将所有参数移动到 GPU 内存中。

随机变分推断

有了模型和 Guide,我们终于准备好进行推断了。在我们查看完整实验脚本中涉及的完整逻辑之前,让我们先看看如何进行单个梯度步长。首先,我们实例化一个 DMM 实例并设置一个优化器。

# instantiate the dmm
dmm = DMM(input_dim, z_dim, emission_dim, transition_dim, rnn_dim,
          args.rnn_dropout_rate, args.num_iafs, args.iaf_dim, args.cuda)

# setup optimizer
adam_params = {"lr": args.learning_rate, "betas": (args.beta1, args.beta2),
               "clip_norm": args.clip_norm, "lrd": args.lr_decay,
               "weight_decay": args.weight_decay}
optimizer = ClippedAdam(adam_params)

这里我们使用了包含梯度裁剪的 Adam 优化器的实现。这减轻了训练循环神经网络时可能出现的一些问题(例如梯度消失/爆炸)。接下来我们设置推断算法。

# setup inference algorithm
svi = SVI(dmm.model, dmm.guide, optimizer, Trace_ELBO())

推断算法 SVI 使用随机梯度估计器对目标函数进行梯度步长,在本例中目标函数是 ELBO(证据下界)。正如名称所示,ELBO 是对数证据 \(\log p(\mathcal{D})\) 的下界。当我们采取最大化 ELBO 的梯度步长时,我们将 Guide \(q(\cdot)\) 移动得更接近精确后验。

参数 Trace_ELBO() 构建了一个梯度估计器版本,该版本无需访问模型和 Guide 的依赖结构。由于模型中的所有隐变量都是可重参数化的,这是我们用例中合适的梯度估计器。(它也是默认选项。)

假设我们已经准备好了 dmm.modeldmm.guide 的各种参数,通过调用以下函数即可完成梯度步长

svi.step(mini_batch, ...)

就这么简单!

嗯,不完全是。这将是我们推断算法的主要步骤,但我们仍然需要实现一个完整的训练循环,包括 mini-batch 的准备、评估等等。这种逻辑对于任何深度学习者来说都很熟悉,但让我们看看它在 PyTorch/Pyro 中是什么样子。

优化的“黑魔法”

实际上,在我们深入训练核心之前,让我们花点时间思考一下我们设置的优化问题。我们将一个具有高维隐空间的非线性模型中的贝叶斯推断(一个困难的问题)替换为一个特定的优化问题。别自欺欺人了,这个优化问题也相当困难。为什么?让我们看看一些原因

  • 我们正在优化的参数空间维度非常高(它包括我们定义的所有神经网络中的所有权重)。

  • 我们的目标函数(ELBO)无法解析计算。因此我们的参数更新将遵循带有噪声的蒙特卡洛梯度估计。

  • 数据子采样是随机性的另一个来源:即使我们想,通常也无法对整个数据集上定义的 ELBO 进行梯度步长(实际上在我们的特定情况下数据集并没有那么大,但我们姑且忽略这一点)。

  • 考虑到循环中所有的神经网络和非线性,我们的(随机)损失表面非常复杂。

结论是,如果我们想找到 ELBO 的合理(局部)最优解,最好在决定如何进行优化时多加小心。现在不是讨论所有可能采取的不同策略的时间或场合,但强调学习超参数(学习率、mini-batch 大小等)的好坏选择是多么具有决定性作用非常重要。

在继续之前,让我们更详细地讨论一种我们正在使用的特定优化策略:KL 退火。在我们的例子中,ELBO 是两项之和:期望对数似然项(衡量模型拟合度)和 KL 散度项之和(用于正则化近似后验分布)。

\(\rm{ELBO} = \mathbb{E}_{q({\bf z}_{1:T})}[\log p({\bf x}_{1:T}|{\bf z}_{1:T})] - \mathbb{E}_{q({\bf z}_{1:T})}[ \log q({\bf z}_{1:T}) - \log p({\bf z}_{1:T})]\)

后一项可以是一个非常强的正则化项,并且在训练的早期阶段,它倾向于偏爱包含许多糟糕局部最优解的损失表面区域。一种避免这些糟糕局部最优解的策略(参考文献 [1] 中也采用了该策略)是通过将 KL 散度项乘以一个介于零和一之间的标量 annealing_factor 来对其进行退火。

\(\mathbb{E}_{q({\bf z}_{1:T})}[\log p({\bf x}_{1:T}|{\bf z}_{1:T})] - \text{退火因子} \times \mathbb{E}_{q({\bf z}_{1:T})}[ \log q({\bf z}_{1:T}) - \log p({\bf z}_{1:T})]\)

想法是,在训练过程中,annealing_factor 从接近零的初始值缓慢上升到 1.0 的最终值。退火策略是任意的;下面我们将使用一个简单的线性策略。在代码方面,为了通过适当的退火因子对对数似然进行缩放,我们将模型和 Guide 中的每个隐变量采样语句都包含在 pyro.poutine.scale 上下文中。

最后,我们应该提到,此处描述的 DMM 实现与参考文献 [1] 中使用的主要区别在于,他们利用了两个高斯分布之间 KL 散度的解析公式(而我们依赖于蒙特卡洛估计)。这导致 ELBO 的梯度估计具有较低方差,从而使训练更容易一些。即使不进行这种解析替换,我们仍然可以训练模型,但由于方差较高,训练可能需要更长的时间。要使用解析 KL 散度,请使用 TraceMeanField_ELBO

数据加载、训练和评估

首先我们加载数据。训练数据集中共有 229 个序列,每个序列的平均长度约为 60 个时间步。

jsb_file_loc = "./data/jsb_processed.pkl"
data = pickle.load(open(jsb_file_loc, "rb"))
training_seq_lengths = data['train']['sequence_lengths']
training_data_sequences = data['train']['sequences']
test_seq_lengths = data['test']['sequence_lengths']
test_data_sequences = data['test']['sequences']
val_seq_lengths = data['valid']['sequence_lengths']
val_data_sequences = data['valid']['sequences']
N_train_data = len(training_seq_lengths)
N_train_time_slices = np.sum(training_seq_lengths)
N_mini_batches = int(N_train_data / args.mini_batch_size +
                     int(N_train_data % args.mini_batch_size > 0))

对于此数据集,我们通常使用 mini_batch_size 为 20,这样每个 epoch 将有 12 个 mini-batch。接下来我们定义函数 process_minibatch,该函数准备一个 mini-batch 用于训练并进行梯度步长

def process_minibatch(epoch, which_mini_batch, shuffled_indices):
    if args.annealing_epochs > 0 and epoch < args.annealing_epochs:
        # compute the KL annealing factor appropriate
        # for the current mini-batch in the current epoch
        min_af = args.minimum_annealing_factor
        annealing_factor = min_af + (1.0 - min_af) * \
            (float(which_mini_batch + epoch * N_mini_batches + 1) /
             float(args.annealing_epochs * N_mini_batches))
    else:
        # by default the KL annealing factor is unity
        annealing_factor = 1.0

    # compute which sequences in the training set we should grab
    mini_batch_start = (which_mini_batch * args.mini_batch_size)
    mini_batch_end = np.min([(which_mini_batch + 1) * args.mini_batch_size,
                             N_train_data])
    mini_batch_indices = shuffled_indices[mini_batch_start:mini_batch_end]
    # grab the fully prepped mini-batch using the helper function in the data loader
    mini_batch, mini_batch_reversed, mini_batch_mask, mini_batch_seq_lengths \
        = poly.get_mini_batch(mini_batch_indices, training_data_sequences,
                              training_seq_lengths, cuda=args.cuda)
    # do an actual gradient step
    loss = svi.step(mini_batch, mini_batch_reversed, mini_batch_mask,
                     mini_batch_seq_lengths, annealing_factor)
    # keep track of the training loss
    return loss

我们首先计算适用于 mini-batch 的 KL 退火因子(根据前面描述的线性策略)。然后我们计算 mini-batch 的索引,并将其传递给辅助函数 get_mini_batch。此辅助函数负责许多不同的事情

  • 它按序列长度对每个 mini-batch 进行排序

  • 它调用另一个辅助函数以获取按时间顺序反转的 mini-batch 的副本

  • 它将每个反转的 mini-batch 打包到 rnn.pack_padded_sequence 中,然后即可由 RNN 接收

  • 如果在 GPU 上运行,它会将所有张量移动到 CUDA

  • 它调用另一个辅助函数以获取 mini-batch 的适当 0/1 掩码

然后我们将 get_mini_batch() 的所有返回值输送到 elbo.step(...) 中。回想一下,这些参数在构建 elbo 中的梯度估计器时,将进一步输送到 model(...)guide(...)。最后,我们返回一个浮点数,它是该 mini-batch 损失的噪声估计。

现在我们拥有了训练循环主要部分所需的所有要素

times = [time.time()]
for epoch in range(args.num_epochs):
    # accumulator for our estimate of the negative log likelihood
    # (or rather -elbo) for this epoch
    epoch_nll = 0.0
    # prepare mini-batch subsampling indices for this epoch
    shuffled_indices = np.arange(N_train_data)
    np.random.shuffle(shuffled_indices)

    # process each mini-batch; this is where we take gradient steps
    for which_mini_batch in range(N_mini_batches):
        epoch_nll += process_minibatch(epoch, which_mini_batch, shuffled_indices)

    # report training diagnostics
    times.append(time.time())
    epoch_time = times[-1] - times[-2]
    log("[training epoch %04d]  %.4f \t\t\t\t(dt = %.3f sec)" %
        (epoch, epoch_nll / N_train_time_slices, epoch_time))

在每个 epoch 开始时,我们打乱指向训练数据的索引。然后我们处理每个 mini-batch,直到遍历整个训练集,并在此过程中累积训练损失。最后,我们报告一些诊断信息。注意,我们将损失按训练集中总的时间片数进行归一化(这允许我们与参考文献 [1] 进行比较)。

评估

这个训练循环仍然缺少任何类型的评估诊断。让我们解决这个问题。首先,我们需要准备用于评估的验证集和测试集数据。由于验证集和测试集数据足够小,可以轻松装入内存,我们将按批次处理每个数据集(即,我们不会将数据集分成 mini-batch)。[旁注:此时读者可能会问,为什么我们不对训练集做同样的事情。原因是数据子采样带来的额外随机性在优化过程中通常是有利的:特别是它可以帮助我们避免局部最优解。] 实际上,为了获得噪声较小的 ELBO 估计,我们将计算多样本估计。最简单的方法如下

val_loss = svi.evaluate_loss(val_batch, ..., num_particles=5)

然而,这将涉及一个包含五次迭代的显式 for 循环。对于我们的特定模型,我们可以做得更好,并向量化整个计算。目前在 Pyro 中实现此功能的唯一方法是显式地将数据复制 n_eval_samples 次。这是我们遵循的策略

# package repeated copies of val/test data for faster evaluation
# (i.e. set us up for vectorization)
def rep(x):
    return np.repeat(x, n_eval_samples, axis=0)

# get the validation/test data ready for the dmm: pack into sequences, etc.
val_seq_lengths = rep(val_seq_lengths)
test_seq_lengths = rep(test_seq_lengths)
val_batch, val_batch_reversed, val_batch_mask, val_seq_lengths = poly.get_mini_batch(
    np.arange(n_eval_samples * val_data_sequences.shape[0]), rep(val_data_sequences),
    val_seq_lengths, cuda=args.cuda)
test_batch, test_batch_reversed, test_batch_mask, test_seq_lengths = \
    poly.get_mini_batch(np.arange(n_eval_samples * test_data_sequences.shape[0]),
                        rep(test_data_sequences),
                        test_seq_lengths, cuda=args.cuda)

现在测试集和验证集数据已完全准备好,我们定义执行评估的辅助函数

def do_evaluation():
    # put the RNN into evaluation mode (i.e. turn off drop-out if applicable)
    dmm.rnn.eval()

    # compute the validation and test loss
    val_nll = svi.evaluate_loss(val_batch, val_batch_reversed, val_batch_mask,
                                 val_seq_lengths) / np.sum(val_seq_lengths)
    test_nll = svi.evaluate_loss(test_batch, test_batch_reversed, test_batch_mask,
                                  test_seq_lengths) / np.sum(test_seq_lengths)

    # put the RNN back into training mode (i.e. turn on drop-out if applicable)
    dmm.rnn.train()
    return val_nll, test_nll

我们只需调用 elboevaluate_loss 方法,该方法接受与 step() 相同的参数,即传递给模型和 Guide 的参数。请注意,我们必须将 RNN 置于评估模式并在评估后恢复,以考虑 dropout。现在我们可以将 do_evaluation() 放入训练循环中;详情请参阅源代码

结果

让我们确保我们的实现给出了合理的结果。我们可以使用参考文献 [1] 中报告的数字作为健全性检查。对于相同的数据集和类似的模型/Guide 设置(隐空间的维度、RNN 中的隐藏单元数量等),他们在测试集上报告的归一化负对数似然(NLL)为 6.93(越低越好\()^{\S}\))。这与我们的结果 6.87 相比。这些数字非常接近,这是令人放心的。似乎,至少对于这个数据集,不使用 KL 散度的解析表达式不会降低学习模型的质量(尽管,如上所述,训练可能会稍微长一些)。

图 3:示例训练运行中测试集 NLL 随训练进展而变化。

在图中,我们展示了单个示例运行(学习率相对保守)在训练过程中测试集 NLL 的变化。大部分进展发生在最初的 3000 个 epoch 左右,如果训练时间更长,可能会有一些微小的改进。在 GeForce GTX 1080 上,5000 个 epoch 大约需要 20 小时。

num_iafs

测试集 NLL

0

6.87

1

6.82

2

6.80

最后,我们还报告了混合了 Normalizing Flows 的 Guide 的结果(详细信息将在下一节中找到)。

\({ \S\;}\) 实际上,他们似乎对相同的模型/Guide 报告了两个数字——6.93 和 7.03——并且不完全清楚这两个报告的数字有何不同。

改进、优化及其他

逆自回归流

概率编程语言的一大优点是它鼓励模块化。让我们在 DMM 的背景下展示一个例子。我们将通过在变分分布中添加 Normalizing Flows 来使其更加丰富(讨论请参见参考文献 [2])。这只会花费我们额外的四行代码!

首先,在 DMM 构造函数中,我们添加

iafs = [AffineAutoregressive(AutoRegressiveNN(z_dim, [iaf_dim])) for _ in range(num_iafs)]
self.iafs = nn.ModuleList(iafs)

这实例化了 num_iafsAffineAutoregressive 类型的双射变换(参见参考文献 [3,4]);每个 normalizing flow 将有 iaf_dim 个隐藏单元。然后我们将 normalizing flows 打包到 nn.ModuleList 中;这只是 PyTorch 中打包 nn.Module 列表的方式。接下来,在 Guide 中,我们添加以下代码行

if self.iafs.__len__() > 0:
    z_dist = TransformedDistribution(z_dist, self.iafs)

在这里,我们使用基分布 z_dist(在本例中是条件高斯分布),并使用 TransformedDistribution 构造将其转换为一个非高斯分布,该分布通过构造比基分布更丰富。瞧!

检查点

如果我们想从训练循环中的灾难性故障中恢复,我们需要跟踪两种状态。第一种是模型和 Guide 的各种参数。第二种是优化器的状态(例如,在 Adam 中,这包括每个参数最近梯度估计的运行平均值)。

在 Pyro 中,所有参数都可以在 ParamStore 中找到。然而,PyTorch 也通过 nn.Moduleparameters() 方法为我们跟踪这些参数。因此,保存模型和 Guide 参数的一种简单方法是利用 dmmstate_dict() 方法以及 torch.save();见下文。如果我们在循环中有 AffineAutoregressive,这实际上是我们唯一可用的选项。这是因为 AffineAutoregressive 模块包含 PyTorch 术语中的“持久缓冲区”。这些是携带状态但不是 Parameter 的东西。nn.Modulestate_dict()load_state_dict() 方法知道如何正确处理缓冲区。

要保存优化器的状态,我们必须使用 pyro.optim.PyroOptim 内部的功能。回想一下,典型的用户在使用 Pyro 时从不直接与 PyTorch Optimizers 交互;由于参数可以在任意概率程序中动态创建,Pyro 需要为我们管理 Optimizers。在我们的例子中,保存优化器状态就像调用 optimizer.save() 一样简单。加载逻辑完全类似。因此,我们保存和加载检查点的整个逻辑只需要几行代码

# saves the model and optimizer states to disk
def save_checkpoint():
    log("saving model to %s..." % args.save_model)
    torch.save(dmm.state_dict(), args.save_model)
    log("saving optimizer states to %s..." % args.save_opt)
    optimizer.save(args.save_opt)
    log("done saving model and optimizer checkpoints to disk.")

# loads the model and optimizer states from disk
def load_checkpoint():
    assert exists(args.load_opt) and exists(args.load_model), \
        "--load-model and/or --load-opt misspecified"
    log("loading model from %s..." % args.load_model)
    dmm.load_state_dict(torch.load(args.load_model))
    log("loading optimizer states from %s..." % args.load_opt)
    optimizer.load(args.load_opt)
    log("done loading model and optimizer states.")

一些最后的评论

深度马尔可夫模型是一个相对复杂的模型。既然我们已经努力实现了一个针对多声部音乐数据集量身定制的深度马尔可夫模型版本,我们应该问问自己还能做什么。如果我们拿到一个不同的序列数据集怎么办?我们必须从头开始吗?

完全不用!概率编程的优点在于它支持(并鼓励)模块化的建模和推断方法。将我们的多声部音乐模型适配到具有连续观测值的数据集就像改变观测似然一样简单。绝大部分代码可以不做任何改动地复用。这意味着只需额外做一点工作,本教程中的代码就可以重新用于支持各种不同的模型。

查看 Github 上的完整代码。

参考文献

[1] Structured Inference Networks for Nonlinear State Space Models,     Rahul G. Krishnan, Uri Shalit, David Sontag

[2] Variational Inference with Normalizing Flows,      Danilo Jimenez Rezende, Shakir Mohamed

[3] Improving Variational Inference with Inverse Autoregressive Flow,      Diederik P. Kingma, Tim Salimans, Rafal Jozefowicz, Xi Chen, Ilya Sutskever, Max Welling

[4] MADE: Masked Autoencoder for Distribution Estimation,      Mathieu Germain, Karol Gregor, Iain Murray, Hugo Larochelle

[5] Modeling Temporal Dependencies in High-Dimensional Sequences:      Application to Polyphonic Music Generation and Transcription,      Boulanger-Lewandowski, N., Bengio, Y. and Vincent, P.