# CS640 Homework 5: HMM, MDP and RL

This assignment can be divided into three parts, and each part contains some coding tasks:
1. Hidden Markov Model
 - Complete a model
 - Implement the **Forward Procedure**, **Backward Procedure**, and the **Viterbi Algorithm**
2. Markov Decision Process
 - Implement **value iteration** and **policy iteration**
3. Reinforcelemnt Learning
 - Implement **Q-learning**

Do **not** include any extra outputs in your final answer. If you use print() function for debugging, please run your code without them again before submitting.

### Collaboration
All questions must be answered **independently**.

## Instructions

### General Instructions
In an ipython notebook, to run code in a cell or to render [Markdown](https://en.wikipedia.org/wiki/Markdown)+[LaTeX](https://en.wikipedia.org/wiki/LaTeX) press `Ctrl+Enter` or `[>|]`(like "play") button above. To edit any code or text cell (double) click on its content. To change cell type, choose "Markdown" or "Code" in the drop-down menu above.

Most of the written questions are followed up a cell for you enter your answers. Please enter your answers in a new line below the **Answer** mark. If you do not see such cell, please insert one by yourself. Your answers and the questions should **not** be in the same cell.

### Instructions on Math
Some questions require you to enter math expressions. To enter your solutions, put down your derivations into the corresponding cells below using LaTeX. Show all steps when proving statements. If you are not familiar with LaTeX, you should look at some tutorials and at the examples listed below between \$..\$. The [OEIS website](https://oeis.org/wiki/List_of_LaTeX_mathematical_symbols) can also be helpful.

Alternatively, you can scan your work from paper and insert the image(s) in a text cell.

## Submission
Once you are ready, save the note book as PDF file (File -> Print -> Save as PDF) and submit via Gradescope.

Please select pages to match the questions on Gradescope. **You may be subject to a 5% penalty if you do not do so**.

## Q1: Hidden Markov Model

In this problem, you need to first complete a model by filling some missing values according to the existing ones, and then implement and run **Forward Procedure**, **Backward Procedure**, and the **Viterbi Algorithm**.

### Problem Setup

Consider an HMM with the following properties (unknown values are marked with "?").

States: $S = \{S_{1}, S_{2}, S_{3}, S_{4}\}$

Initial state probability distribution: $\pi = \{\pi_{1} = 0.3, \pi_{2} = 0.1, \pi_{3} = ?, \pi_{4} = 0.5\}$

Transition probability matrix A, where each entry $a_{ij}$ is the probability of moving from state $S_{i}$ to state $S_{j}$:

$$\begin{bmatrix}
? & 0.5 & 0.4 & 0\\
? & 0.3 & 0.4 & 0.1\\
? & 0.1 & 0.2 & 0.7 \\
? & 0.2 & 0.2 & 0.2
\end{bmatrix}$$

Symbols probability matrix $B$, where each entry $b_{jk} = b_{j}(k) = P[V_{k} \; | \; S_{j}]$ is the probability of yielding $V_{k}$ in state $S_{j}$:

$$\begin{bmatrix}
0.5 & 0.4 & ? \\
? & 0.3 & 0.1 \\
0.8 & ? & 0.1 \\
? & 0.3 & 0.7
\end{bmatrix}$$

Let $\lambda = (A, B, \pi)$.

Suppose we observe a sequence $O = V_{1}V_{3}V_{2}V_{1}$.

### Q1.1

Complete the model by filling the unknown values in the following code block and run the cell. Make sure the outputs are all **True**.



In [None]:
import numpy as np

M = 3 # vocabulary size
N = 4 # state number
V = [None, 0, 1, 2] # one-based, V[0] = None to align the indices

# fill in the blank values for the following three arrays
pi = np.array([0.3, 0.1, , 0.5])
A = np.array([[, 0.5, 0.4, 0], [, 0.3, 0.4, 0.1], [, 0.1, 0.2, 0.7], [, 0.2, 0.2, 0.2]])
B = np.array([[0.5, 0.4, ], [, 0.3, 0.1], [0.8, , 0.1], [, 0.3, 0.7]])

O = [V[1], V[3], V[2], V[1]]

print("Sum of pi is one? " + str(np.linalg.norm(pi.sum() - 1) < 1e-9))
print("Sum of each row of A is one? " + str(np.linalg.norm(A.sum(axis = 1) - 1) < 1e-9))
print("Sum of each row of B is one? " + str(np.linalg.norm(B.sum(axis = 1) - 1) < 1e-9))

### Q1.2

What is $P(O | \lambda)$? Answer the question by finishing implementing the **Forward Procedure** and the **Backward Procedure** below and run the cells.

***Hints***
- The two procedures should yield the same final answer.
- The forward procedure code prints the $\alpha$ values as a table while the backward procedure code prints the $\beta$ values as a table. Think of how the values are supposed to change over the steps/rows.

### **Forward Procedure**

In [None]:
T = len(O)
alpha = np.zeros((T, N))

# initialization
for i in range(N): # complete this loop
 ################### start of your code ###################

 #################### end of your code ####################

# inductive steps
for t in range(1, T): # complete this loop
 ################### start of your code ###################

 #################### end of your code ####################
print("alpha values:")
print(alpha)
print()

# final answer
################### start of your code ###################
answer = 0 # this variable should store your final answer

#################### end of your code ####################
print("P(O | lambda) = " + str(answer))

### **Backward Procedure**

In [None]:
T = len(O)
beta = np.zeros((T, N))

# initialization
for i in range(N): # complete this loop
 ################### start of your code ###################

 #################### end of your code ####################

# inductive steps
for t in range(T - 2, -1, -1): # complete this loop
 ################### start of your code ###################

 #################### end of your code ####################
print("beta values:")
print(beta)
print()

# final answer
# complete this part
################### start of your code ###################
answer = 0 # this variable should store your final answer

#################### end of your code ####################
print("P(O | lambda) = " + str(answer))

### Q1.3

What is the most likely state sequence for the observed output sequence $O = V_{1}V_{3}V_{2}V_{1}$? Answer the question by finish implementing the **Viterbi Algorithm** in the following code block and run the cell.

In [None]:
T = len(O)
delta = np.zeros((T, N)) # stores the optimal probabilities
psi = np.zeros((T, N), dtype = np.int8) # stores the corresponding sources

# initialization
for i in range(N): # complete this loop
 ################### start of your code ###################

 #################### end of your code ####################

# inductive steps
for t in range(1, T): # complete this loop
 ################### start of your code ###################

 #################### end of your code ####################
print("delta values:")
print(delta)
print()
print("psi values:")
print(psi)
print()

# backtrack for the optimal path
################### start of your code ###################
optimal_path = [] # this variable should store your final answer

################### start of your code ###################
print("Optimal path = " + str(optimal_path))

## Q2: Markov Decision Process

This problem asks you to implement both the **value iteration** and **policy iteration**. Two simple test examples are provided for you to debug your code, and at the end some more complicated cases are randomly generated to test your answers.



### Simple Test Examples

The next block contains both the packages you will need and two simple test cases. Please run the cell before working on the implementation.

Note that the imported packages are sufficient for this problem. You can add some more if you think they can help, but obviously, you should **not** use any MDP packages.

In [None]:
import numpy as np
import sys
from itertools import product
import matplotlib.pyplot as plt

# a small MDP
states = [0, 1, 2]
actions = [0, 1] # 0 : stay, 1 : jump
jump_probabilities = np.matrix([[0.1, 0.2, 0.7],
 [0.5, 0, 0.5],
 [0.6, 0.4, 0]])
for i in range(len(states)):
 jump_probabilities[i, :] /= jump_probabilities[i, :].sum()

rewards_stay = np.array([0, 8, 5])
rewards_jump = np.matrix([[-5, 5, 7],
 [2, -4, 0],
 [-3, 3, -3]])

T = np.zeros((len(states), len(actions), len(states)))
R = np.zeros((len(states), len(actions), len(states)))
for s in states:
 T[s, 0, s], R[s, 0, s] = 1, rewards_stay[s]
 T[s, 1, :], R[s, 1, :] = jump_probabilities[s, :], rewards_jump[s, :]

example_1 = (states, actions, T, R)

# a larger MDP
states = [0, 1, 2, 3, 4, 5, 6, 7]
actions = [0, 1, 2, 3, 4]
T = np.zeros((len(states), len(actions), len(states)))
R = np.zeros((len(states), len(actions), len(states)))
for a in actions:
 T[:, a, :] = np.random.RandomState(4).uniform(0, 10, (len(states), len(states)))

 # randomly delete 20% of the edges
 tuples = list(product(states, actions, states))
 np.random.RandomState(6).shuffle(tuples)
 for t in tuples[:len(tuples) // 5]:
 T[t[0], t[1], t[2]] = 0

 # normalizing
 for s in states:
 T[s, a, :] /= T[s, a, :].sum()
 R[:, a, :] = np.random.RandomState(8).uniform(-10, 10, (len(states), len(states)))

example_2 = (states, actions, T, R)

### Q2.1: Value Iteration

Implement value iteration by finishing the following function, and then run the cell.

***Hint***

- The provided code produces a figure of state values for each of the two test cases. Think of how the values are supposed to change over time steps.

In [None]:
def value_iteration(states, actions, T, R, gamma = 0.95, tolerance = 1e-2, max_steps = 1000):
 Vs = [] # all state values
 Vs.append(np.zeros(len(states))) # initial state values
 steps, convergent = 0, False
 while not convergent and steps < max_steps: # complete this loop
 ########################### start of your code #########################
 # compute new state values as an array, and append the array to the list Vs)

 ############################ end of your code ##########################
 convergent = np.linalg.norm(Vs[-1] - Vs[-2]) < tolerance
 steps += 1
 if steps == max_steps:
 print("Max iterations reached before convergence.")
 sys.exit(1)
 ########################### start of your code #########################
 # compute Q from the last V array
 # extract the optimal policy and name it "policy" to return

 policy = # store your optimal policy here
 ############################ end of your code ##########################
 return np.array(Vs), policy, steps

states, actions, T, R = example_1
gamma, tolerance, max_steps = 0.95, 1e-2, 1000
Vs_1, policy_1, steps_1 = value_iteration(states, actions, T, R, gamma, tolerance, max_steps)

states, actions, T, R = example_2
gamma, tolerance, max_steps = 0.95, 1e-2, 1000
Vs_2, policy_2, steps_2 = value_iteration(states, actions, T, R, gamma, tolerance, max_steps)

fig, axes = plt.subplots(1, 2, figsize = (8, 4))
for i in range(Vs_1.shape[1]):
 axes[0].plot(Vs_1[:, i], label = "state " + str(i))
axes[0].set_title("Example 1 State Values", fontsize = 12)
axes[0].set_xlabel("Steps", fontsize = 8)
axes[0].legend()
axes[0].grid(True)
for i in range(Vs_2.shape[1]):
 axes[1].plot(Vs_2[:, i], label = "state " + str(i))
axes[1].set_title("Example 2 State Values", fontsize = 12)
axes[1].set_xlabel("Steps", fontsize = 8)
axes[1].legend()
axes[1].grid(True)
fig.tight_layout()
plt.show()

print("Optimal policy for example 1: " + str(policy_1))
print("Optimal policy for example 2: " + str(policy_2))

### Q2.2: Policy Iteration

Implement policy iteration by finishing the following function, and then run the cell.

***Hints***

- The provided code prints step-wise state values and policies, again think of how the state values are supposed to change.
- Note that the optimal policy from the two iteration procedures should match.

In [None]:
def policy_iteration(states, actions, T, R, gamma = 0.95, tolerance = 1e-2, max_steps = 1000):
 policy_list = [] # all policies explored
 initial_policy = np.array([np.random.choice(actions) for s in states]) # random policy
 policy_list.append(initial_policy)
 Vs = [] # all state values
 Vs = [np.zeros(len(states))] # initial state values
 steps, convergent = 0, False
 while not convergent and steps < max_steps:
 ########################### start of your code #########################
 # 1. Evaluate the current policy, and append the state values to the list Vs

 # 2. Extract the new policy, and append the new policy to the list policy_list

 policy_list.append() # append the new policy as an array here
 ############################ end of your code ##########################
 steps += 1
 convergent = (policy_list[-1] == policy_list[-2]).all()
 if steps == max_steps:
 print("Max iterations reached before convergence.")
 sys.exit(1)
 return Vs, policy_list, steps

print("Example MDP 1")
states, actions, T, R = example_1
gamma, tolerance, max_steps = 0.95, 1e-2, 1000
Vs, policy_list, steps = policy_iteration(states, actions, T, R, gamma, tolerance, max_steps)
optimal_policy_1 = policy_list[-1]
for i in range(steps):
 print("Step " + str(i))
 print("state values: " + str(Vs[i]))
 print("policy: " + str(policy_list[i]))
 print()
print()
print("Example MDP 2")
states, actions, T, R = example_2
gamma, tolerance, max_steps = 0.95, 1e-2, 1000
Vs, policy_list, steps = policy_iteration(states, actions, T, R, gamma, tolerance, max_steps)
for i in range(steps):
 print("Step " + str(i))
 print("state values: " + str(Vs[i]))
 print("policy: " + str(policy_list[i]))
 print()
optimal_policy_2 = policy_list[-1]

print("Optimal policy for example 1: " + str(optimal_policy_1))
print("Optimal policy for example 2: " + str(optimal_policy_2))

### Q2.3: More Tests

The following block tests both of your implementations with even more random MDPs. Simply run the cell.

***Hint***
- The code produces step counts for the two iteration procedures. Think of how they should look like.

In [None]:
steps_list_vi, steps_list_pi = [], []
for i in range(20):
 states = [j for j in range(np.random.randint(5, 40))]
 actions = [j for j in range(np.random.randint(2, states[-1]))]
 T = np.zeros((len(states), len(actions), len(states)))
 R = np.zeros((len(states), len(actions), len(states)))
 for a in actions:
 T[:, a, :] = np.random.uniform(0, 10, (len(states), len(states)))

 # randomly delete 20% of the edges
 tuples = list(product(states, actions, states))
 np.random.shuffle(tuples)
 for t in tuples[:len(tuples) // np.random.randint(2, 20)]:
 T[t[0], t[1], t[2]] = 0

 for s in states:
 T[s, a, :] /= T[s, a, :].sum()
 R[:, a, :] = np.random.uniform(-10, 10, (len(states), len(states)))
 Vs, policy, steps_v = value_iteration(states, actions, T, R)
 Vs, policy_list, steps_p = policy_iteration(states, actions, T, R)
 steps_list_vi.append(steps_v)
 steps_list_pi.append(steps_p)
print("Numbers of steps in value iteration: " + str(steps_list_vi))
print("Numbers of steps in policy iteration: " + str(steps_list_pi))

## Q3: Reinforcement Learning

In this problem, you will be asked to implement the **$\epsilon$-greedy Q-learning** algorithm. But first, you will need to set up the environment with the *gym* package.

**Installation**

If you have not done this yet, install gym with atari using the following command.

In [None]:
!pip install cmake 'gym[atari]'
!pip install gym[toy_text]

**Visualization on Google-Colab**

This part is necessary for viewing the environment on Google-Colab. There may be other ways if you run this notebook on your own machine.

If you choose to work on Google-Colab, run the following two cells and ignore errors if you see any.

In [None]:
!apt-get install -y xvfb python-opengl > /dev/null 2>&1
!pip install gym pyvirtualdisplay > /dev/null 2>&1
!apt install chromium-browser xvfb

In [None]:
import matplotlib.pyplot as plt
from IPython import display as ipythondisplay
from pyvirtualdisplay import Display
display = Display(visible = 0, size = (400, 300))
display.start()

**Environment**

We will use the "Taxi-v3" environment for this task. In this environment, an agent attempts to pickup a customer and then drive to a specific location. The following block helps you understand a bit more about this environment. Feel free to modify anything inside.

It is strongly recommended that you read the description of this environment [here](https://github.com/openai/gym/blob/master/gym/envs/toy_text/taxi.py).

In [None]:
import matplotlib.pyplot as plt
import gym

env = gym.make("Taxi-v3", new_step_api = True, render_mode = "rgb_array").env

env.reset()

prev_screen = env.render()[0]
plt.imshow(prev_screen)

As described in the source code, there are 500 states and 6 actions in this environment.

Initially, the passenger and the destination can only spawn at two distinct color tiles.

Each state is encoded with the following information: taxi coordinate, passenger locations, and destination location.

Passenger locations are:
- 0: R(ed)
- 1: G(reen)
- 2: Y(ellow)
- 3: B(lue)
- 4: in taxi

Destination locations are:
- 0: R(ed)
- 1: G(reen)
- 2: Y(ellow)
- 3: B(lue)

Actions are:
- 0: move south
- 1: move north
- 2: move east
- 3: move west
- 4: pickup passenger
- 5: drop off passenger

Rewards are:
- -1 per step unless other reward is triggered
- +20 delivering passenger
- -10 executing "pickup" and "drop-off" actions illegally

The environment includes a dictionary P storing the reward for each (state, action) pair. The information stored in P has the following structure: {state: {action: [(probability, nextstate, reward, terminated)]}}. The following code the information in the 10th state.

In [None]:
env.P[10] # shows the rewards in the 10th state

Note that all probabilities are 1 in this environment.

### Q2.1: $\epsilon$-greedy Q-learning

**Implementation**

Implement **$\epsilon$-greedy Q-learning** by modifying the specified part in the following block. Currently it is only taking a random action.

In [None]:
def Q_learning(env, episodes = 100000, alpha = 0.1, gamma = 0.6, epsilon = 0.1):
 Q_values = np.zeros([env.observation_space.n, env.action_space.n])
 rewards_list = [0] * episodes
 for episode in trange(episodes, desc = "Training progress"):
 state = env.reset()
 terminated = False
 while not terminated:
 ########################################################################
 # TO DO: implement Q-learning update
 action = env.action_space.sample()
 next_state, reward, terminated, _, info = env.step(action) # you shouldn't change this line
 ############################# End of your code #########################
 rewards_list[episode] += reward
 state = next_state
 return Q_values, rewards_list

**Training and Evaluating**

Run the following block to train and evaluate your implementation.

The training episode number is large on purpose: using a small value may cause the agent to stuck during test time. But since a large episode value means longer training time, during debugging, you can **create a separate cell** and call the function with a smaller value. Furthermore, you are free to experiment with other hyper-parameters. For example, you can experiment with some more hyperparameter settings.

**Before submitting, make sure that you have successfully finish running the following cell.**

In [None]:
import numpy as np
import gym
from tqdm import trange

env = gym.make("Taxi-v3", new_step_api = True).env
env.reset()
Q_values, rewards_list = Q_learning(env)

episodes = 1000
steps_count, failures_count = 0, 0

for episode in trange(episodes, desc = "Test progress"):
 state = env.reset()
 terminated = False
 while not terminated:
 action = np.argmax(Q_values[state])
 state, reward, terminated, _, info = env.step(action)
 steps_count += 1
 failures_count += reward == -10

print("Average steps per episode: " + str(np.round(steps_count / episodes, 2))) # should be less than 20
print("Number of failures: " + str(failures_count)) # should be very very low

# the curve should be overall increasing
fig, ax = plt.subplots()
ax.plot(rewards_list)
ax.set_title("Training Reward Plot")
ax.set_xlabel("Episode")
ax.set_ylabel("Reward")
plt.show()
