Latent dynamical systems and expectation-maximization

Lab 6

Course: DS-GA 1018: Probabilistic Time Series Analysis
Date: October 20, 2025 (Monday)
Name:
NYU NetID:

Instructions: Submit your completed lab worksheet to Gradescope by 1:30 PM the Monday of the next lab.

You can find a Colab version of the required code, including plotting functions here.


This lab is meant as an introduction to expectation-maximization and Kalman filtering.

Students should:

  1. Be able to implement the expectation-maximization algorithm for latent space dynamical models.
  2. Be able to calculate the log posterior of an LDS model.
  3. Gain intuition for how iterations of the EM algorithm improve the quality of the estimate.
from typing import Any, Tuple

import numpy as np
import matplotlib.pyplot as plt
from matplotlib.patches import Ellipse
import pandas as pd
def plot_means_and_cov(means: np.ndarray, covariances: np.ndarray, ax: Any, color: str, label: str):
    """Plot the mean and covariance of our filtering/smoothing.

    Args:
        means: Means to plot.
        covariances: Covariances to plot.
        ax: Axis on which to plot.
        color: Color for plotting.
        label: Label for plotted points.

    Notes:
        Will plot the 68% contours from the covariances.
    """
    # Plot the trend line.
    ax.plot(means[:,0], means[:,1], '-', color=color, label=label)

    # Plot the ellipses for covariances. Assume they are diagonal (true for this lab).
    for i in range(len(means)):
        elip = Ellipse((means[i,0], means[i,1]), np.sqrt(covariances[i,0,0]), np.sqrt(covariances[i,1,1]), color=color, alpha=0.7)
        ax.add_patch(elip)

def sample_lds(n_timesteps: int, transition_matrix: np.ndarray, transition_covariance: np.ndarray,
               observation_matrix: np.ndarray, observation_covariance: np.ndarray, mu_zero: np.ndarray,
               cov_zero: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
    """Sample from a latent dynamical system with the given parameters.

    Args:
        n_timesteps: Number of timesteps of data to sample.
        transition_matrix: Transition matrix between latent states. This is A
        transition_covariance: Covariance of latent state noise. This is Q
        observation_matrix: Observation matrix from latent state to observation. This is C.
        observation_covariance: Covariance of observation noise. This is R.
        mu_zero: Mean of initial latent state.
        cov_zero: Covariance of initial latent state.

    Returns:
        Latent and observed states from sampling.
    """
    # In our model we assume there is no initial observation.
    latent_state = np.zeros([n_timesteps+1, 2])
    observed_state = np.zeros([n_timesteps, 2])

    latent_state[0] = np.random.multivariate_normal(mu_zero, cov_zero)

    # Iterate through latent states.
    for t in range(n_timesteps):
        latent_state[t+1] = (
            np.dot(transition_matrix, latent_state[t]) +
            np.random.multivariate_normal(np.zeros(2), transition_covariance)
        )
        observed_state[t] = (
            np.dot(observation_matrix, latent_state[t+1]) +
            np.random.multivariate_normal(np.zeros(2), observation_covariance)
        )
    return latent_state, observed_state

Part I: Expectation-Maximization for Latent Dynamical System

To start this lab, we’ll return to our same Kalman filtering / smoothing modeling from last week and fold in our new expectation-maximization algorithm. I have supplied you with a function for sampling a latent space and a set of observations that we will use to generate our data. You will have to import your solutions from last week into the class (with a small modification to the smoothing function).

The expectation-maximization equations were provided in this week’s lecture, and there are test functions for each of the equations to make sure that your implementation is correct.

class KalmanFilter:
    """Class that implements the Kalman Filter for our LDS model.

    Args:
        sigma_w: Standard deviation of latent space noise.
        sigma_v: Standard deviation of observation noise.
        a: Magnitude of latent space transition matrix.
        c: Magnitude of the observation matrix.
        dim_z: Dimension of latent space.
        dim_x: Dimension of observation space.
        sigma_w_zero: Initial standard deviation of the zero state.
        mu_zero: Initial mean of the zero state.
    """
    def __init__(self, sigma_w: float, sigma_v: float, a: float, c: float, dim_z: int,
                 dim_x: int, sigma_w_zero: float, mu_zero: np.ndarray):
        """Initialize our class."""
        # Save a few variables for bookkeeping
        self.dim_x = dim_x
        self.dim_z = dim_z

        # Used in the EM function.
        self.smooth_matrices = None # The F_t matrices from the notes. Will need to be saved.

        # TODO: Implement the transition, observation, and noise covariance matrices.
        self.transition_covariance = # TODO
        self.observation_covariance = # TODO
        self.transition_matrix = # TODO
        self.observation_matrix = # TODO

        # TODO: Implement the initial covariance and mean of the zero state.
        self.mu_zero = # TODO
        self.cov_zero = # TODO

    def filter(self, data: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
        """Calculate the filtered mean and covariances of the latent space.

        Args:
            data: Observations with shape [n_observations, dim_x]

        Returns:
            Filtered mean and covariance for the latent space. The first dimension
            of both should by n_observations+1 since the initial latent state has no
            paired observation.
        """

        # Make sure the dimensions match and create some placeholders for the outputs.
        n_observations, dim_x = data.shape
        assert dim_x == self.dim_x
        filtered_means = np.zeros((n_observations + 1, self.dim_z))
        filtered_covs = np.zeros((n_observations + 1, self.dim_z, self.dim_z))

        # TODO: Implement filtering.

        return filtered_means, filtered_covs

    def smooth(self, data: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
        """Calculate the smoothed mean and covariances of the latent space.

        Args:
            data: Observations with shape [n_observations, dim_x]

        Returns:
            Smoothed mean and covariance for the latent space. The first dimension
            of both should by n_observations+1 since the initial latent state has no
            paired observation.

        Notes:

        """
        # Validate the data dimensions.
        n_observations, dim_x = data.shape
        assert dim_x == self.dim_x

        # Run the forward path
        filtered_means, filtered_covs = self.filter(data)

        # Create holders for outputs
        smoothed_means = np.zeros((n_observations + 1, self.dim_z))
        smoothed_covs = np.zeros((n_observations + 1, self.dim_z, self.dim_z))
        self.smooth_matrices = np.zeros((n_observations + 1, self.dim_z, self.dim_z))

        # TODO: Implement smoothing.

        # Did you remember to save the F_t calculations for EM?
        assert np.sum(np.abs(self.smooth_matrices)) > 0
        # Did you remember to set the last F_t calculation? We don't need it for EM,
        # but we do need it for completeness.
        assert np.sum(np.abs(self.smooth_matrices[-1])) > 0

        return smoothed_means, smoothed_covs

    def expectation_maximization(self, data: np.ndarray, n_iter: int):
        """Run the expectation-maximization algorithm on our LDS parameters.

        Args:
            data: Observations with shape [n_observations, dim_x].
            n_iter: The number of iterations of the expectation-maximization algorithm to run.

        Notes:
            Updates the internal representations of the LDS parameters but has no outputs.
        """
        # Validate the data dimensions.
        n_observations, dim_x = data.shape
        assert dim_x == self.dim_x

        # TODO: Implement EM Iterations.
        for iter_num in range(n_iter):

            ### Expectation Step ###
            # Get the smoothed state for the current model parameters.
            smoothed_state_means, smoothed_state_covariances = # TODO

            # Solve for E[z_t], E[z_t z_{t-1}^T], E[z_t z_t^T] for use in EM.
            expect_zt = # TODO
            expect_zt_zt = # TODO
            expect_zt_zt_minus = # TODO

            ### Maximization Step ###
            # Update equation for initial state mean and covariance.
            self.mu_zero = # TODO
            self.cov_zero = # TODO

            # Update equation for transition matrix and noise covariance.
            self.transition_matrix = # TODO
            self.transition_covariance = # TODO

            # Update equation for observation matrix and noise covariance.
            self.observation_matrix = # TODO
            self.observation_covariance = # TODO

Let’s test our implementation:

# Let's run some tests to make sure that your implementation of the expectation-maximization equations is correct.
kf_test = KalmanFilter(sigma_w=0.9, sigma_v=0.98, a=0.9, c=1.2, dim_z=2, dim_x=2, sigma_w_zero=2.0, mu_zero=np.array([1.0, 1.2]))
kf_test.expectation_maximization(np.array([[0.0, 0.05], [0.1, 0.2],[0.3,0.4]]), 1)

# Start with the updated initial states
np.testing.assert_almost_equal(kf_test.mu_zero, np.array([0.3185428, 0.4209496]))
np.testing.assert_almost_equal(kf_test.cov_zero, np.array([[1.1333618, 0.0], [0.0, 1.1333618]]))
np.testing.assert_almost_equal(kf_test.transition_matrix, np.array([[0.362484, 0.0247164], [0.0265544, 0.3868536]]))
np.testing.assert_almost_equal(kf_test.transition_covariance, np.array([[0.3320965, 0.012569],[0.012569, 0.3414906]]))
np.testing.assert_almost_equal(kf_test.observation_matrix, np.array([[0.0492384, 0.0705896], [0.076319, 0.1109947]]))
np.testing.assert_almost_equal(kf_test.observation_covariance, np.array([[0.0298151, 0.0411604],[0.0411604, 0.0588818]]))

# Make sure everything continues working for two iterations of EM
kf_test = KalmanFilter(sigma_w=0.9, sigma_v=0.98, a=0.9, c=1.2, dim_z=2, dim_x=2, sigma_w_zero=2.0, mu_zero=np.array([1.0, 1.2]))
kf_test.expectation_maximization(np.array([[0.0, 0.05], [0.1, 0.2],[0.3,0.4]]), 2)

# Start with the updated initial states
np.testing.assert_almost_equal(kf_test.mu_zero, np.array([0.4486251, 0.6279772]))
np.testing.assert_almost_equal(kf_test.cov_zero, np.array([[1.1107989, -0.0337335], [-0.0337335, 1.0827763]]))
np.testing.assert_almost_equal(kf_test.transition_matrix, np.array([[0.3938727, 0.0722462], [0.0729125, 0.4567033]]))
np.testing.assert_almost_equal(kf_test.transition_covariance, np.array([[0.3326792, 0.013621], [0.013621, 0.3433377]]))
np.testing.assert_almost_equal(kf_test.observation_matrix, np.array([[0.0538247, 0.078607], [0.0894437, 0.1314]]))
np.testing.assert_almost_equal(kf_test.observation_covariance, np.array([[0.0278726, 0.0375556], [0.0375556, 0.0522982]]))

Part II: Where’s the rat?

Your friend in the biology department has been working for months to observe how different music affects a rat’s behavior. They played Beethoven to their favorite rat for over an hour and they swear the rat was dancing a beautiful ballet. Unfortunately the optical data they captured tells a very different story: it just looks like a jumbled mess. They think the camera had some alignment issues. They turn to you, an expert in time-series analysis, to help them with their problem. Can you reconstruct the rat’s beautiful dance?

Let’s take a look at the data your friend collected.

# We start by generating the data our friend collected.
n_timesteps = 100
np.random.seed(1)

transition_matrix = np.array([[1.01, -0.2],[0.2,0.95]])
transition_covariance = 2.0*np.eye(2)

observation_matrix = np.array([[1.0, 0.0],[0.2,1.0]])
observation_covariance = 30.0*np.eye(2)

mu_zero = np.array([1.0,1.2])
cov_zero = 0.1*np.eye(2)

latent_state, observed_state = sample_lds(n_timesteps, transition_matrix, transition_covariance, observation_matrix, observation_covariance, mu_zero, cov_zero)
t_observed = np.arange(len(latent_state))

# Let's plot what our friend collected.
fig, ax = plt.subplots(1, 1, figsize=(10,10), dpi=100)
fontsize = 15
plt.scatter(observed_state[:,0],observed_state[:,1], c=t_observed[1:], cmap='autumn', label='Observed Rat Positions')
cbar = plt.colorbar()
cbar.ax.set_ylabel('Time Index', fontsize=fontsize, rotation=270)
plt.xlabel('X-Position', fontsize=fontsize)
plt.ylabel('Y-Position', fontsize=fontsize)
plt.legend(fontsize=fontsize)
plt.show()

You see where your friend is coming from, but it’s really hard to discern an obvious pattern. You decide to model the problem as an LDS where the rat’s dance is the latent state.

# We'll give you a smart initialization to your EM algorithm for free.
kf = KalmanFilter(sigma_w=3.0, sigma_v=2.0, a=1.0, c=1.0, dim_z=2, dim_x=2, sigma_w_zero=1.0, mu_zero=np.array([1.0, 1.2]))

# TODO: Get smooth latent states with the initial parameter choices.
smooth_latent_means, smooth_latent_covariances = # TODO

fig, ax = plt.subplots(1, 1, figsize=(10,10), dpi=100)
fontsize = 15
plt.scatter(observed_state[:,0],observed_state[:,1], c=t_observed[1:], cmap='autumn', label='Observed Rat Positions')
cbar = plt.colorbar()
cbar.ax.set_ylabel('Time Index', fontsize=fontsize, rotation=270)
plt.xlabel('X-Position', fontsize=fontsize)
plt.ylabel('Y-Position', fontsize=fontsize)
plot_means_and_cov(smooth_latent_means, smooth_latent_covariances, ax=ax, color='#a1dab4', label='Estimated Latent Rat State')
plt.legend(fontsize=fontsize)
plt.show()

Question: Looking at the plot above with initial parameter estimates, describe what pattern (if any) you can see in the estimated latent trajectory (green line). Does it look like a smooth dance, or does it follow the scattered observations too closely?

The estimated latent trajectory follows the noisy observations too closely and doesn’t reveal a clear, smooth dance pattern. The green line jumps around with the scattered data points rather than smoothing through them to reveal an underlying structure. This suggests the initial parameters don’t properly account for how much observation noise there is.

Well that doesn’t inspire confidence. Let’s try again, this time with 20 iterations of EM.

# We'll give you a smart initialization to your EM algorithm for free.
kf = KalmanFilter(sigma_w=3.0, sigma_v=2.0, a=1.0, c=1.0, dim_z=2, dim_x=2, sigma_w_zero=1.0, mu_zero=np.array([1.0, 1.2]))

# TODO: Run 20 iterations of expectation-maximization and extract the smooth latent states with the optimal parameters.
smooth_latent_means, smooth_latent_covariances = # TODO (don't forget to do some EM first)

fig, ax = plt.subplots(1, 1, figsize=(10,10), dpi=100)
fontsize = 15
plt.scatter(observed_state[:,0],observed_state[:,1], c=t_observed[1:], cmap='autumn', label='Observed Rat Positions')
cbar = plt.colorbar()
cbar.ax.set_ylabel('Time Index', fontsize=fontsize, rotation=270)
plt.xlabel('X-Position', fontsize=fontsize)
plt.ylabel('Y-Position', fontsize=fontsize)
plot_means_and_cov(smooth_latent_means, smooth_latent_covariances, ax=ax, color='#a1dab4', label='Estimated Latent Rat State')
plt.legend(fontsize=fontsize)
plt.show()

There it is! A beautiful dance. This rat was really feeling inspired by the song. For fun let’s cheat and see how well we did of reconstructing the true latent states.

fig, ax = plt.subplots(1, 1, figsize=(10,10), dpi=100)
fontsize = 15
plt.scatter(observed_state[:,0],observed_state[:,1], c=t_observed[1:], cmap='autumn', label='Observed Rat Positions')
cbar = plt.colorbar()
cbar.ax.set_ylabel('Time Index', fontsize=fontsize, rotation=270)
plt.xlabel('X-Position', fontsize=fontsize)
plt.ylabel('Y-Position', fontsize=fontsize)
plot_means_and_cov(smooth_latent_means, smooth_latent_covariances, ax=ax, color='#a1dab4', label='Estimated Latent Rat State')
plt.plot(latent_state[:,0], latent_state[:,1], '-', c='k', label='True Latent Rat State')
plt.legend(fontsize=fontsize)
plt.show()

Question: Now that we can see the true latent trajectory (black line) compared to our EM estimate (green line):

  1. Describe the overall shape revealed by the true trajectory. Does the EM estimate follow this shape?

  2. Are there any regions where the green estimated trajectory deviates noticeably from the black true trajectory? If so, where?

  1. The true trajectory reveals a spiral - the rat was indeed dancing! The EM estimate (green line) follows this spiral shape very closely, successfully recovering the rat’s dance from the noisy observations.

  2. The estimated trajectory follows the true trajectory quite well throughout. Any small deviations are mostly near the beginning and end of the sequence, where the smoother has less surrounding data to work with.