#! /usr/bin/env python import sys import re usage = "\nusage: ./hmm.py [weather|phone] [data]\nexample: ./hmm.py weather foggy-1000.txt (to test weather model on foggy-1000.txt)" # This section defines states and probabilities used by the HMMs. # # This assignment has parts concerning two different HMMs: the weather # model specified in the HMM Tutorial, and the speech HMM for phones from # problem 15.12 in the Russell and Norvig book. # # Note that in the phone HMM, 'none' is always observed in the 'final' state # and the 'final' state transitions back to 'final' with probability 1. # (This fact was not included in the book). ### Weather Model ### # state map weatherStateMap = {'sunny' : 0, 'rainy' : 1, 'foggy' : 2} weatherStateIndex = {0 : 'sunny', 1 : 'rainy', 2 : 'foggy'} # observation map weatherObsMap = {'no' : 0, 'yes' : 1} weatherObsIndex = {0 : 'no', 1 : 'yes'} # prior probability on weather states # P(sunny) = 0.5 P(rainy) = 0.25 P(foggy) = 0.25 weatherProb = [0.5, 0.25, 0.25] # transition probabilities # tomorrrow # today sunny rainy foggy # sunny 0.8 0.05 0.15 # rainy 0.2 0.6 0.2 # foggy 0.2 0.3 0.5 weatherTProb = [ [0.8, 0.05, 0.15], [0.2, 0.6, 0.2], [0.2, 0.3, 0.5] ] # conditional probabilities of evidence (observations) given weather # sunny rainy foggy # P(umbrella=no|weather) 0.9 0.2 0.7 # P(umbrella=yes|weather) 0.1 0.8 0.3 weatherEProb = [ [0.9, 0.2, 0.7], [0.1, 0.8, 0.3] ] ### Phone Model ### # state map phoneStateMap = {'onset' : 0, 'mid' : 1, 'end' : 2, 'final' : 3} phoneStateIndex = {0 : 'onset', 1 : 'mid', 2 : 'end', 3 : 'final'} # observation map phoneObsMap = {'C1':0,'C2':1,'C3':2,'C4':3,'C5':4,'C6':5,'C7':6,'none':7} phoneObsIndex = {0:'C1',1:'C2',2:'C3',3:'C4',4:'C5',5:'C6',6:'C7',7:'none'} # probabilities phoneProb = [ 1.0, 0, 0, 0 ] phoneEProb = [ \ [0.5, 0, 0, 0], \ [0.2, 0, 0, 0], \ [0.3, 0.2, 0, 0], \ [0, 0.7, 0.1, 0], \ [0, 0.1, 0, 0], \ [0, 0, 0.5, 0], \ [0, 0, 0.4, 0], \ [0, 0, 0, 1.0] \ ] phoneTProb = [ \ [ 0.3, 0.7, 0, 0 ], \ [ 0, 0.9, 0.1, 0 ], \ [ 0, 0, 0.4, 0.6 ], \ [ 0, 0, 0, 1.0 ] \ ] # Using the prior probabilities and state map, return: # P(state) def getStatePriorProb(prob, stateMap, state): return prob[stateMap[state]] # Using the transition probabilities and state map, return: # P(next state | current state) def getNextStateProb(tprob, stateMap, current, next): return tprob[stateMap[current]][stateMap[next]] # Using the observation probabilities, state map, and observation map, return: # P(observation | state) def getObservationProb(eprob, stateMap, obsMap, state, obs): return eprob[obsMap[obs]][stateMap[state]] # Normalize a probability distribution def normalize(pdist): s = sum(pdist) for i in range(0,len(pdist)): pdist[i] = pdist[i] / s return pdist # Filtering. # Input: The HMM (state and observation maps, and probabilities) # A list of T observations: E(0), E(1), ..., E(T-1) # (ie whether the umbrella was seen [yes, no, ...]) # # Output: The posterior probability distribution over the most recent state # given all of the observations: P(X(T-1)|E(0), ..., E(T-1)). def filter( \ stateMap, stateIndex, obsMap, obsIndex, prob, tprob, eprob, observations): # intialize probability distribution to prior pdist = prob # update for subsequent times for k in range(0,len(observations)): pdist_new = [] for i in range(0,len(stateMap)): prob_i = 0.0 for j in range(0,len(stateMap)): prob_i = prob_i + tprob[j][i] * pdist[j] prob_i = prob_i * eprob[obsMap[observations[k]]][i] pdist_new.append(prob_i) pdist = normalize(pdist_new) return pdist # Prediction. # Input: The HMM (state and observation maps, and probabilities) # A list of T observations: E(0), E(1), ..., E(T-1) # # Output: The posterior probability distribution over the next state # given all of the observations: P(X(T)|E(0), ..., E(T-1)). def predict( \ stateMap, stateIndex, obsMap, obsIndex, prob, tprob, eprob, observations): # compute P(X(T-1)|E(0), ..., E(T-1)) pdist = filter( \ stateMap, stateIndex, obsMap, obsIndex, prob, tprob, eprob, observations) # predict next timestep pdist_next = [] for i in range(0,len(stateMap)): prob_i = 0.0 for j in range(0,len(stateMap)): prob_i = prob_i + tprob[j][i] * pdist[j] pdist_next.append(prob_i) return pdist_next # Smoothing. # Input: The HMM (state and observation maps, and probabilities) # A list of T observations: E(0), E(1), ..., E(T-1) # # Ouptut: The posterior probability distribution over each state given all # of the observations: P(X(k)|E(0), ..., E(T-1) for 0 <= k <= T-1. # # These distributions should be returned as a list of lists. def smooth( \ stateMap, stateIndex, obsMap, obsIndex, prob, tprob, eprob, observations): # compute forward messages fv = prob forward = [] for k in range(0,len(observations)): fv_next = [] for i in range(0,len(stateMap)): prob_i = 0.0 for j in range(0,len(stateMap)): prob_i = prob_i + tprob[j][i] * fv[j] prob_i = prob_i * eprob[obsMap[observations[k]]][i] fv_next.append(prob_i) fv = normalize(fv_next) forward.append(fv) # compute backward messages sv = [] b = [] for i in range(0,len(stateMap)): b.append(1) for k in range(len(observations)-1,-1,-1): sv_curr = [] for i in range(0,len(stateMap)): sv_curr.append(forward[k][i]*b[i]) sv.append(normalize(sv_curr)) b_prev = [] for i in range(0,len(stateMap)): prob_i = 0.0 for j in range(0,len(stateMap)): prob_i = \ prob_i + eprob[obsMap[observations[k]]][j] * b[j] * tprob[i][j] b_prev.append(prob_i) b = normalize(b_prev) sv.reverse() return sv # Viterbi algorithm. # Input: The HMM (state and observation maps, and probabilities) # A list of T observations: E(0), E(1), ..., E(T-1) # # Output: A list containing the most likely sequence of states. # (ie [sunny, foggy, rainy, sunny, ...]) def viterbi( \ stateMap, stateIndex, obsMap, obsIndex, prob, tprob, eprob, observations): # intialize probability distribution to prior pdist = prob # update max probability paths, keeping back links to best path back_pointers = [] for k in range(0,len(observations)): pdist_new = [] prev_pointer = [] for i in range(0,len(stateMap)): prob_i = 0.0 best_j = 0 for j in range(0,len(stateMap)): prob_i_from_j = tprob[j][i] * pdist[j] if (prob_i_from_j > prob_i): prob_i = prob_i_from_j best_j = j prob_i = prob_i * eprob[obsMap[observations[k]]][i] pdist_new.append(prob_i) prev_pointer.append(best_j) pdist = normalize(pdist_new) back_pointers.append(prev_pointer) # extract the best path n = len(observations) - 1 s_prob = 0 s = 0 for i in range(0,len(stateMap)): if (pdist[i] > s_prob): s_prob = pdist[i] s = i seq = [] for k in range(n,-1,-1): seq.append(stateIndex[s]) s = back_pointers[k][s] seq.reverse() return seq # Functions for testing. # You should not change any of these functions. def loadData(filename): input = open(filename, 'r') input.readline() data = [] for i in input.readlines(): x = i.split() y = x[0].split(",") data.append(y) return data def accuracy(a,b): total = float(max(len(a),len(b))) c = 0 for i in range(min(len(a),len(b))): if a[i] == b[i]: c = c + 1 return c/total def test(stateMap, stateIndex, obsMap, obsIndex, prob, tprob, eprob, data): observations = [] classes = [] for c,o in data: observations.append(o) classes.append(c) n_obs_short = 10 obs_short = observations[0:n_obs_short] classes_short = classes[0:n_obs_short] print 'Short observation sequence:' print ' ', obs_short # test filtering result_filter = filter( \ stateMap, stateIndex, obsMap, obsIndex, prob, tprob, eprob, obs_short) print '\nFiltering - distribution over most recent state:' for i in range(0,len(result_filter)): print ' ', stateIndex[i], '%1.3f' % result_filter[i], # test prediction result_predict = predict( \ stateMap, stateIndex, obsMap, obsIndex, prob, tprob, eprob, obs_short) print '\n\nPrediction - distribution over next state:' for i in range(0,len(result_filter)): print ' ', stateIndex[i], '%1.3f' % result_predict[i], # test smoothing result_smooth = smooth( \ stateMap, stateIndex, obsMap, obsIndex, prob, tprob, eprob, obs_short) print '\n\nSmoothing - distribution over state at each point in time:' for t in range(0,len(result_smooth)): result_t = result_smooth[t] print ' ', 'time', t, for i in range(0,len(result_t)): print ' ', stateIndex[i], '%1.3f' % result_t[i], print ' ' # test viterbi result_viterbi = viterbi( \ stateMap, stateIndex, obsMap, obsIndex, prob, tprob, eprob, obs_short) print '\nViterbi - predicted state sequence:\n ', result_viterbi print 'Viterbi - actual state sequence:\n ', classes_short print 'The accuracy of your viterbi classifier on the short data set is', \ accuracy(classes_short, result_viterbi) result_viterbi_full = viterbi( \ stateMap, stateIndex, obsMap, obsIndex, prob, tprob, eprob, observations) print 'The accuracy of your viterbi classifier on the entire data set is', \ accuracy(classes, result_viterbi_full) if __name__ == '__main__': type = None filename = None if len(sys.argv) > 1: type = sys.argv[1] if len(sys.argv) > 2: filename = sys.argv[2] if filename: data = loadData(filename) else: print usage exit(0) if (type == 'weather'): test( \ weatherStateMap, \ weatherStateIndex, \ weatherObsMap, \ weatherObsIndex, \ weatherProb, \ weatherTProb, \ weatherEProb, \ data) elif (type == 'phone'): test( \ phoneStateMap, \ phoneStateIndex, \ phoneObsMap, \ phoneObsIndex, \ phoneProb, \ phoneTProb, \ phoneEProb, \ data) else: print usage exit(0)