Author: Andrei Stefan
Date: 13-11-2023
Required files: data/data_adjusted.csv, data/data_no_features.csv, state_files/all_possible_states.txt, state_files/states_for_q4.txt, state files/states_for_q6.txt
Output files: no output files
This file contains the code to reproduce the results of running the analysis for questions 4, 5, and 6. The corresponding figures and tables are: Table E.1, Table F.1, Table G.1, Figure 4.4, Figure 4.5, Figure 4.6, Figure 4.7.
# import all required packages
import math
import matplotlib.pylab as plt
import matplotlib.ticker as mtick
import numpy as np
import os
import random
import scipy
from collections import Counter
Define global variables that hold the number of times each action was done in each reduced state, the mean reward overall, and the number of times each action was done in each state without the person's situation.
state_action_times_reduced = {}
mean_reward_overall = 0
state_action_times_no_features = {}
Define a helper function for converting strings to booleans.
def string_to_bool(s):
"""
Function that helps turn a "True" or "False" string into a boolean.
Args: s - the string.
Returns: a boolean which is True if the string is "True" or False if the string is "False".
"""
if s == "False":
return 0
elif s == "True":
return 1
Define a helper function to help process the raw reward, which could either be a 0 or a list with four values which make up the reward.
def reward_function(reward):
"""
Function that helps parse the reward.
Args: reward - the reward as a string.
Returns: the parsed reward.
"""
# if the reward is a 0
if reward == 0:
# then return 0
return 0
# if there is a newline in the string
if "\n" in reward:
# then remove it
reward = reward[:-1]
# if the reward is a 0, but as string
if reward == "0":
# then return 0
return 0
# otherwise, the reward is not 0 and we need to process it
# it might be enclosed in quotes and will always have four numbers enclosed in square brackets
elif '\"' in reward or "\'" in reward or "[" in reward:
# so, while there are either quotes or square brackets, remove the first and last character
# e.g. it could start off as "[0, 1, 2, 3]"
# removing the first and last character once leaves [0, 1, 2, 3]
# removing them again leaves 0, 1, 2, 3
while '\"' in reward or "\'" in reward or "[" in reward:
reward = reward[1:-1]
# split the reward
split = reward.split(", ")
# extract reward components
split[0] = int(split[0])
split[1] = int(split[1])
split[2] = int(split[2])
split[3] = int(split[3])
# return the reward after applying the function
return (split[0] + 0.8 * split[1] + 0.2 * split[2] + split[3]) / 3
Calculate how the data is distributed across the reduced states and the states without a person's situation which will be needed for the imputation. Corresponds to table C.1 in the manuscript.
def data_distribution(filename):
"""
Function that calculates how many times each action was done in each reduced state
Args: filename - the name of the file with the data.
Returns: a dict containing the number of times each action was done in each reduced state,
the mean reward of all end states,
a dict which contains the end state rewards at each time step from 2 to 6.
"""
# initialise an empty list for holding the rewards
rewards = []
# initialise an empty dict for holding the number of times an action was done in a state
state_action_times = {}
# initialise an empty dict for holding the end state rewards in each time step
rewards_per_timestep = {}
# initialise all time steps 2 - 6 to an empty list in the dict
for i in range(2,7):
rewards_per_timestep[i] = []
# open the file
with open(filename) as f:
# read all lines in the file
lines = f.readlines()
# loop over the lines, skipping the header
for line in lines[1:]:
# split the line
split = line.split("\",")
# get the state before
state_before = split[0][1:]
# get the action
action = split[1].split(",\"")[0]
# get the state after
state_after = split[1].split(",\"")[1]
# get the reward
reward = split[2]
# process the reward by applying the reward function to it
reward_processed = reward_function(reward)
# split the state_after
split2 = state_after[1:-1].split(", ")
# extract the data from the state after
plans = int(split2[0])
a1 = string_to_bool(split2[3])
a2 = string_to_bool(split2[4])
a3 = string_to_bool(split2[5])
a4 = string_to_bool(split2[6])
# calculate the time step which this state corresponds to by looking at how many actions have been done
time_step = plans + a1 + a2 + a3 + a4
# if the original reward was a list (so if it was a reward at the end of the dialogue)
if "[" in reward:
# then add it to the list of rewards
rewards.append(reward_processed)
# and add it to the list of rewards for this corresponding time step
rewards_per_timestep[time_step].append(reward_processed)
# if there is no entry for this action being done in this state_before
if (state_before, action) not in state_action_times:
# then make an entry and record that the action was done once
state_action_times[(state_before, action)] = 1
# otherwise, increment the count by 1
else:
state_action_times[(state_before, action)] += 1
# loop over the number of times each action was done in each state to reduce the state
for (state, action), times in state_action_times.items():
# split the state
split = state[1:-1].split(", ")
# keep only the confidence and perceived usefulness from the original state
reduced_state = f"[{split[1]}, {split[2]}]"
# if there is no entry for the reduced state and action in the state_action_times_reduced dict
if (reduced_state, action) not in state_action_times_reduced:
# then make an entry and record how many times the action was done so far
state_action_times_reduced[(reduced_state, action)] = times
# otherwise, increment the count by the number of times this current item in the dict was done
else:
state_action_times_reduced[(reduced_state, action)] += times
# create the state without the person's situation
state_no_features = f"[{split[0]}, {split[3]}, {split[4]}, {split[5]}, {split[6]}]"
# if there is no entry for the state without the person's situation and the action in the dict
if (state_no_features, action) not in state_action_times_no_features:
# then make an entry and record how many times the action was done so far
state_action_times_no_features[(state_no_features, action)] = times
# otherwise, increment the count by the number of times this current item in the dict was done
else:
state_action_times_no_features[(state_no_features, action)] += times
# we also need to add the actions missed for the states without the person's situaion
# open the file which contains all the possible states without the person's situaion
with open("state_files/states_for_q6.txt") as f:
# read all the lines
states = f.readlines()
# remove the endlines at the end of each line
states = [state[:-1] for state in states]
# loop over the states
for state in states:
# initialise a list which will hold the possible actions in this state
possible_actions = []
# split the state
split = state[1:-1].split(", ")
# extract the data from the state
plans = int(split[0])
a1 = split[1]
a2 = split[2]
a3 = split[3]
a4 = split[4]
# note that the requirements below are not 100% correct in terms of what action can be done in each state
# but it is fine if we count that impossible actions were never done, since these will never be looked up later
# because the only way we access this dictionary is to look up how many times an action that was actually done in a state was done
# and for actions that are not possible in a state, that accessing can never happen
# if there were less than 2 plans done, changes to plan can still be done
if plans < 2:
possible_actions.append("changes_to_plan")
# if any of the other actions was not done, it can still be done
if a1 == "False":
possible_actions.append("explain_planning")
if a2 == "False":
possible_actions.append("identify_barriers")
if a3 == "False":
possible_actions.append("deal_with_barriers")
if a4 == "False":
possible_actions.append("show_testimonials")
# loop over all the actions in the possible actions list
for action in possible_actions:
# if this pair is not in the dict
if (state, action) not in state_action_times_no_features:
# then add it and indicate that the action was done 0 times in the state
state_action_times_no_features[(state, action)] = 0
# make sure that the global variable is changed
global mean_reward_overall
# set the mean reward overall to the mean of the rewards at the end of the dialogue
mean_reward_overall = np.mean(rewards)
# return the number of times each action was done in each reduced state, the mean reward overall, and the rewards per time step
return state_action_times_reduced, mean_reward_overall, rewards_per_timestep
# call the function to make sure that the global variable is updated
state_action_times_reduced, mean_reward_overall, rewards_per_timestep = data_distribution("../../data/data_adjusted.csv")
# create a new dict which contains the mean reward of each time step
rewards_per_timestep_mean = {k: np.mean(v) for k, v in rewards_per_timestep.items()}
# print the number of times each action was done in each reduced state
print(state_action_times_reduced)
{("['0', '0']", 'show_testimonials'): 27, ("['0', '0']", 'changes_to_plan'): 51, ("['0', '0']", 'explain_planning'): 25, ("['0', '0']", 'identify_barriers'): 22, ("['0', '0']", 'deal_with_barriers'): 23, ("['1', '0']", 'show_testimonials'): 11, ("['1', '0']", 'changes_to_plan'): 23, ("['1', '0']", 'identify_barriers'): 17, ("['1', '0']", 'deal_with_barriers'): 10, ("['1', '0']", 'explain_planning'): 14, ("['1', '1']", 'show_testimonials'): 41, ("['1', '1']", 'changes_to_plan'): 81, ("['1', '1']", 'identify_barriers'): 33, ("['1', '1']", 'deal_with_barriers'): 41, ("['1', '1']", 'explain_planning'): 39, ("['0', '1']", 'show_testimonials'): 11, ("['0', '1']", 'changes_to_plan'): 18, ("['0', '1']", 'explain_planning'): 8, ("['0', '1']", 'identify_barriers'): 13, ("['0', '1']", 'deal_with_barriers'): 11}
Define a helper function for value iteration and helper functions for the value iteration.
def value_iteration(file, features=[], not_all_features=False, states_file="state_files/states_for_q4.txt",
no_features=False):
"""
Function for executing the value iteration algorithm.
Args: file - the name of the file,
features - a list of features used for the states,
not_all_features - boolean indicating whether or not all three features should be used,
states_file - the name of the file which contains all the states for the Q-tables,
no_features - a boolean indicating whether or not we shouldn't use any of the three features.
Returns: the final Q-table, the reward and transition dicts filled in and processed, and the list of states which
correspond to the rows of the Q-table.
"""
# open the states file
with open(states_file) as f:
# read all lines in the file
states = f.readlines()
# remove the newline at the end of each line
states = [state[:-1] for state in states]
# if we should not be using all three features
if not_all_features:
# then adjust states to only include features given
# create a new list which will hold the adjusted states
adjusted_states = []
# loop over all the original states
for state in states:
#split the state
split = state[1:-1].split(", ")
# extract the data from the state
plans = split[0]
c = split[1]
pu = split[2]
a = split[3]
a1 = split[4]
a2 = split[5]
a3 = split[6]
a4 = split[7]
# start creating the adjusted state
adjusted_state = f"[{plans}, "
# add the features which are in the list of features provided
if "confidence" in features:
adjusted_state += f"{c}, "
if "perceived_usefulness" in features:
adjusted_state += f"{pu}, "
if "attitude" in features:
adjusted_state += f"{a}, "
# finish creating the adjusted state
adjusted_state += f"{a1}, {a2}, {a3}, {a4}]"
# add the adjusted states to the list of adjusted states
adjusted_states.append(adjusted_state)
# empty the original list of states
states = []
# loop over the list of adjusted states and add them to the states list
for adjusted_state in adjusted_states:
if adjusted_state not in states:
states.append(adjusted_state)
# enumerate all possible actions
actions = ["changes_to_plan", "explain_planning", "identify_barriers", "deal_with_barriers",
"show_testimonials"]
# get the number of states
num_states = len(states)
# set epsilon to a small number
epsilon = 0.0001
# initialise two Q-tables with num_states rows and 5 columns, both filled with zeroes
q_n = np.zeros((num_states, 5))
q_n_1 = np.zeros((num_states, 5))
# set delta to a number large than epsilon to ensure the loop starts
delta = 1
# set the discount factor to 0.85
gamma = 0.85
# set the iteration number to 0
n = 0
# initialise empty raw transition and raw reward dicts
transition_dict_raw = {}
reward_dict_raw = {}
# fill in the raw dicts
fill_in_reward_and_transition(file, transition_dict_raw, reward_dict_raw)
# process the transitions
transition_dict = process_transition(transition_dict_raw, no_features=no_features)
# process the rewards
reward_dict = process_reward(reward_dict_raw, no_features=no_features, states_file=states_file)
# loop while there is a difference larger than 0.0001 between iterations
while delta > epsilon and delta != 0:
# set the difference to 0 by default
delta = 0
# loop over states
for s in states:
# loop over actions
for a in actions:
# check if there is an average reward for this state-action pair
if (s, a) in reward_dict:
# if there is, get it from the dict
reward = reward_dict.get((s, a))
else:
# if there is not, set the reward to 0
reward = 0
# fill in the state-action location of the Q-table with the value for this state-action pair
# which is equal to the reward for the state-action pair + the sum term multiplied by the discount factor
q_n[states.index(s)][actions.index(a)] = reward + gamma * sum_term(s, a, states, q_n_1,
transition_dict)
# calculate the absolute difference (always positive) between this Q-table and the previous Q-table
# if this difference is more than delta (wich is 0 at this point), overwrite the delta with this value
delta = max(delta,
abs(q_n_1[states.index(s)][actions.index(a)] - q_n[states.index(s)][actions.index(a)]))
# save a copy of the current Q-table
q_n_1 = np.copy(q_n)
# increment the iteration number
n += 1
# return the final Q-table, the reward and transition dicts, and the states list
return q_n, reward_dict, transition_dict, states
def fill_in_reward_and_transition(file, transition_dict_raw, reward_dict_raw):
"""
Function for filling in the initial transition and reward dicts, which contain the state to which a state-action pair transitions,
and the raw rewards (a 0 or a list of 4 numbers) respectively.
Args: file - the name of the file,
transition_dict_raw - an empty dict,
reward_dict_raw - an empty dict.
Returns: none, the dicts are filled inplace.
"""
# loop over the file
for index, line in enumerate(open(file)):
# ignore the header and empty lines
if "state_before" not in line and len(line) != 0:
# remove the newline at the end of the line if there is one
if "\n" in line:
line = line[:-1]
# split the line
split = line.split("\",")
# get the state before
state_before = split[0][1:]
# get the action
action = split[1].split(",")[0]
# get the state after
state_after = split[1].split(",\"")[1]
# split the line differently
split_2 = line.split("]")
# get the reward
reward = split_2[2].split("\",")[1]
# if the reward was a list (starting with "["), then we removed the "]" at the end
if "[" in reward:
# so add it back
reward = f"{reward[1:]}]"
# if the state_before-action pair is not in the raw transition dict
if not (state_before, action) in transition_dict_raw:
# then create a new list containing the state_after
transition_dict_raw[(state_before, action)] = [state_after]
else:
# otherwise, there is already a list there, so append the state after to it
transition_dict_raw[(state_before, action)].append(state_after)
# if the state_before-action pair is not in the raw reward dict
if not (state_before, action) in reward_dict_raw:
# then create a new list containing the reward
reward_dict_raw[(state_before, action)] = [reward]
else:
# otherwise, there is already a list there, so append the reward to it
reward_dict_raw[(state_before, action)].append(reward)
def process_transition(transition_dict_raw, no_features=False):
"""
Function for processing the raw transition dict. It creates a new dict that contains state_before-action-state_after triples that
have a corresponding probability - the probability of going to the state_after when taking the action in the given state_before.
!! Note that this function is very different from the one used in the G-algorithm, because it imputes data where necessary.
Args: transition_dict_raw - a dict filled in by fill_in_reward_and_transition.
Returns: a filled in transition dict.
"""
# initialise an empty transition dict
transition_dict = {}
# loop over the raw transition dict
for (state_before, action), states_after in transition_dict_raw.items():
# count how many time each state after occurs for this state_before-action pair
state_after_frequencies = Counter(states_after)
#initialise an empty list that will hold the states that were not reached by this state_before-action pair
missed_states_after = []
# split the state before
split = state_before[1:-1].split(", ")
# extract the data
plans = int(split[0])
# depending on how the state is formatted, different pieces of data are at different positions
if no_features:
a1 = split[1]
else:
a1 = split[3]
if no_features:
a2 = split[2]
else:
a2 = split[4]
if no_features:
a3 = split[3]
else:
a3 = split[5]
if no_features:
a4 = split[4]
else:
a4 = split[6]
# determine how the state changes after taking the action
if action == "changes_to_plan":
plans += 1
if action == "explain_planning":
a1 = "True"
if action == "identify_barriers":
a2 = "True"
if action == "deal_with_barriers":
a3 = "True"
if action == "show_testimonials":
a4 = "True"
# if we are working with full states
if not no_features:
# loop over 2 values for confidence
for c in ["'0'", "'1'"]:
#loop over 2 values for perceived usefulness
for pu in ["'0'", "'1'"]:
# create each possible state after the action
possible_state_after = f"[{plans}, {c}, {pu}, {a1}, {a2}, {a3}, {a4}]"
# if the possible state after is not in the list of states after
# that this state_before-action pair has reached
if possible_state_after not in states_after:
# then set the frequency for this possible state after to 0
state_after_frequencies[possible_state_after] = 0
# and add it to the list of missed states
missed_states_after.append(possible_state_after)
# loop over the states after that were reached
for state_after in states_after:
# split the state before
split = state_before[1:-1].split(", ")
# if we have states without the person's situation
if no_features:
# check how many samples there are for this state_before-action pair
state_action_time = state_action_times_no_features[(state_before, action)]
# if there are less than 8 (explanation for the number in the thesis manuscript)
if state_action_time < 8:
# then adjust the probability by imputing samples
# e.g. if there were 3 samples, the new probability is 3/8 * the old probability + 5/3 * 1/4 (the probability of a random next state)
prob = (state_action_time / 8) * (state_after_frequencies[state_after] / len(states_after)) + (
((8 - state_action_time) / 8) * 1 / 4)
# otherwise, we have at least 8 samples
else:
# so calculate the probability of reaching this state after normally
prob = states_after.count(state_after) / len(states_after)
# if there is no entry for the state_before-action-state_after triple in the transition dict
if not (state_before, action, state_after) in transition_dict:
# then add it and set its value to the probability
transition_dict[(state_before, action, state_after)] = prob
# otherwise, if we have full states
else:
# extract the confidence and perceived usefulness
c = split[1]
pu = split[2]
# create the reduced state
reduced_state = f"[{c}, {pu}]"
# check how many reduced_state-action samples there are in the data
state_action_time = state_action_times_reduced[(reduced_state, action)]
# if there are less than 25 (explanation for the number in the thesis manuscript)
if state_action_time < 25:
# then adjust the probability as above, but replacing the 8 with 25
prob = (state_action_time / 25) * (state_after_frequencies[state_after] / len(states_after)) + (
((25 - state_action_time) / 25) * 1 / 4)
# otherwise, we have at least 25 samples
else:
# so calculate the probability of reaching this state after normally
prob = states_after.count(state_after) / len(states_after)
# if there is no entry for the state_before-action-state_after triple in the transition dict
if not (state_before, action, state_after) in transition_dict:
# then add it and set its value to the probability
transition_dict[(state_before, action, state_after)] = prob
# next, loop over the possible states after that this state_before-action pair did not reach
for state_after in missed_states_after:
# split the state before
split = state_before[1:-1].split(", ")
# extract the confidence and perceived usefulness
c = split[1]
pu = split[2]
# create the reduced state
reduced_state = f"[{c}, {pu}]"
# check how many reduced_state-action samples there are in the data
state_action_time = state_action_times_reduced[(reduced_state, action)]
# if there are less than 25 (explanation for the number in the thesis manuscript)
if state_action_time < 25:
# then adjust the probability
prob = (state_action_time / 25) * (state_after_frequencies[state_after] / len(states_after)) + (
((25 - state_action_time) / 25) * 1 / 4)
# otherwise, we have at least 25 samples
else:
# so calculate the probability of reaching this state after normally
prob = states_after.count(state_after) / len(states_after)
# if there is no entry for the state_before-action-state_after triple in the transition dict
if not (state_before, action, state_after) in transition_dict:
# then add it and set its value to the probability
transition_dict[(state_before, action, state_after)] = prob
# return the transition dict
return transition_dict
def process_reward(reward_dict_raw, no_features=False, states_file="state_files/states_for_q4.txt"):
"""
Function for processing the raw reward dict. It creates a new dict that contains the average reward for every
state_before-action pair.
!! Note that this function is very different from the one used in the G-algorithm, because it imputes data where necessary.
Args: reward_dict_raw - a dict filled in by fill_in_reward_and_transition.
Returns: a filled in reward dict.
"""
# initialise an empty reward dict
reward_dict = {}
# open the file
with open(states_file) as f:
# read all the lines
states = f.readlines()
# remove the newlines
states = [state[:-1] for state in states]
# loop over the states
for state in states:
# initialise n empty list for holding the possible actions
possible_actions = []
# split the state
split = state[1:-1].split(", ")
# extract the data
plans = int(split[0])
# depending on how the state is formatted, different pieces of data are at different positions
if no_features:
a1 = split[1]
else:
a1 = split[3]
if no_features:
a2 = split[2]
else:
a2 = split[4]
if no_features:
a3 = split[3]
else:
a3 = split[5]
if no_features:
a4 = split[4]
else:
a4 = split[6]
# calculate how many actions have been done already
n = string_to_bool(a1) + string_to_bool(a2) + string_to_bool(a3) + string_to_bool(a4)
# determine the possible actions
if plans < 2 and plans <= n:
possible_actions.append("changes_to_plan")
if a1 == "False":
possible_actions.append("explain_planning")
if a2 == "False":
possible_actions.append("identify_barriers")
if a3 == "False" and a2 == "True":
possible_actions.append("deal_with_barriers")
if a4 == "False":
possible_actions.append("show_testimonials")
# loop over the possible actions
for action in possible_actions:
# if there is no entry for the state-possible action pair
if (state, action) not in reward_dict_raw:
# add an entry with the reward 0
reward_dict_raw[(state, action)] = ['0']
# loop over the raw reward dict
for (state_before, action), rewards in reward_dict_raw.items():
# process the rewards
parsed_rewards = [reward_function(x) for x in rewards]
# split the state before
split = state_before[1:-1].split(", ")
# if we have states without the person's situation
if no_features:
# check how many samples there are for this state_before-action pair
state_action_time = state_action_times_no_features[(state_before, action)]
# if there are less than 8 (explanation for the number in the thesis manuscript)
if state_action_time < 8:
# then adjust the reward by imputing samples
# e.g. if there were 3 samples, the new reward is 3/8 * the old mean reward + 5/3 * the mean reward overall
final_reward = (state_action_time / 8) * np.average(parsed_rewards) + (
(8 - state_action_time) / 8) * mean_reward_overall
# otherwise, we have at least 8 samples
else:
# so calculate the reward normally
final_reward = np.average(parsed_rewards)
# if there is no entry for the state_before-action pair in the reward dict
if not (state_before, action) in reward_dict:
# then add it and set its value to the reward
reward_dict[(state_before, action)] = final_reward
# otherwise, we have full states
else:
# extract the confidence and perceived usefulness
c = split[1]
pu = split[2]
# create the reduced state
reduced_state = f"[{c}, {pu}]"
# check how many reduced_state-action samples there are in the data
state_action_time = state_action_times_reduced[(reduced_state, action)]
# if there are less than 25 (explanation for the number in the thesis manuscript)
if state_action_time < 25:
# then adjust the reward as above, but replacing the 8 with a 25
final_reward = (state_action_time / 25) * np.average(parsed_rewards) + (
(25 - state_action_time) / 25) * mean_reward_overall
# otherwise, we have at least 25 samples
else:
# so calculate the reward normally
final_reward = np.average(parsed_rewards)
# if there is no entry for the state_before-action pair in the reward dict
if not (state_before, action) in reward_dict:
# then add it and set its value to the reward
reward_dict[(state_before, action)] = final_reward
# return the reward dict
return reward_dict
def max_action(q_n_1, state_after, states):
"""
Function for calculating the maximum value in a Q-table row.
Args: q_n_1 - a Q-table,
state_after - a string representation of a state,
states - a list of states, in the same order as the Q-table.
Returns: the maximum value in the corresponding row.
"""
# get the row of the Q-table by first getting the index of the state_after in the states list, which
# is also the index of the Q-table row which corresponds to this state, since the Q-table rows are
# in the same order as the states list
row = q_n_1[states.index(state_after)]
# return the maximum value in the row
return np.max(row)
def sum_term(state_before, action, states, q_n_1, transition_dict):
"""
Function for calculating the sum term of the Bellman equation: Sum_{s' in S} p(s'|s,a) * max_{a' in A} Q_{n-1}(s', a').
Args: state_before - the state from which the action is taken,
action - the action taken,
states - the list of all states,
q_n_1 - the Q-table for which to compute the sum,
transition_dict - the transition dict.
Returns: the value of the sum.
"""
# initialise the total value as 0
value_total = 0
# loop over all possible states after
for state_after in states:
# if a transition from the state_before-action pair to this state_after exists
if (state_before, action, state_after) in transition_dict:
# then the value is the probability of transitioning to it times the maximum possible value
# that can be gained in the state_after (by taking the best action)
value = transition_dict.get((state_before, action, state_after)) * max_action(q_n_1, state_after, states)
else:
# otherwise, the transition does not exist, so the value is 0
value = 0
# add the value of this possible state after to the total
value_total += value
# return the total
return value_total
Define a helper function that determines how many actions (or time steps) have happened to reach the current state, And another helper function which checks if a state has the highest values for confidence and perceived usefulness.
def state_reachable_after(state):
"""
Function to check after how many steps we can reach a state.
Args: state - the state for which to check.
Returns: the number of time steps to reach this state.
"""
# split the state
split = state[1:-1].split(", ")
plans = int(split[0])
# extract the data
a1 = string_to_bool(split[3])
a2 = string_to_bool(split[4])
a3 = string_to_bool(split[5])
a4 = string_to_bool(split[6])
# sum up the actions done and return
return plans + a1 + a2 + a3 + a4
def state_good(state):
"""
Function to determine if a state is an end state (both confidence and perceived usefulness high).
Args: state - the state for which to check.
Returns: True if the state is an end state, and False otherwise.
"""
# split the state
split = state[1:-1].split(", ")
# get the confidence and perceived usefulness
c = split[1][1:-1]
pu = split[2][1:-1]
# if they are both 1
if c == "1" and pu == "1":
# return True
return True
# otherwise, return False
else:
return False
Define a helper function which removed the file adjusted.csv
which is created for the reduced states.
def remove_new_file():
"""
Function to remove the newly created file called "adjusted.csv".
Args: none.
Returns: none.
"""
# set the name of the file
name = "adjusted.csv"
# if it exists
if os.path.exists(name):
# remove it
os.remove(name)
Define a helper function for selecting the states at the beginning and end of the dialogue.
def get_start_and_end_states():
"""
Function to help easily get the start and end states.
Args: none - implicitly uses the data_adjusted.csv as the file for the data.
Returns: the starting states, the rewards in end states, the end states without the person's situation, and all the end states.
"""
# initialise 2 lists for holding the start and end states
start_states = []
all_end_states = []
# initialise 2 dicts for holding the rewards of all end states and
# the rewards of all end states without the person's situation (same rewards, just states formatted differently)
end_states_rewards = {}
end_states_no_features_rewards = {}
# open the file
with open("../../data/data_adjusted.csv") as f:
# read all lines
lines = f.readlines()
# initialise 2 variables which indicate if we started processing a person and if we finished processing a person
got_start = False
got_end = False
# loop over all lines except the header
for line in lines[1:]:
# if we didn't start processing a person
if not got_start:
# then check if its reward is just a 0
if line[-2] == "0":
# if it is, the split it
split = line.split("\",")
# get the state before
start_state = split[0][1:]
# add it to the list of start states
start_states.append(start_state)
# set got_start to true so we don't look at any other samples from this person until the last one
got_start = True
# also set got_end to false to indicate that we haven't yet found this person's last sample
got_end = False
# if we didn't finish processing a person
if not got_end:
# check if the line has a " before the endline (this means it is a list enclosen in quotes, so it is the last sample)
if line[-2] == "\"":
# split the line
split = line.split(",\"[")
# get the state after
end_state = f"[{split[1][:-1]}"
# get the reward
reward = f"[{split[2][:-2]}"
# if there is no entry for this end state in end_states__rewards
if end_state not in end_states_rewards:
# then create a new list with the reward of this state as the first item
end_states_rewards[end_state] = [reward_function(reward)]
# otherwise, append the reward to the list
else:
end_states_rewards[end_state].append(reward_function(reward))
# add the end state to the list of end states
all_end_states.append(end_state)
# split the end state
split = end_state[1:-1].split(", ")
# extract the data from it
plans = int(split[0])
a1 = split[3]
a2 = split[4]
a3 = split[5]
a4 = split[6]
# adjust the end state to only include the actions done
end_state = f"[{plans}, {a1}, {a2}, {a3}, {a4}]"
# if there is no entry for this adjusted end state in end_states_no_features_rewards
if end_state not in end_states_no_features_rewards:
# then create a new list with the reward of this state as the first item
end_states_no_features_rewards[end_state] = [reward_function(reward)]
# otherwise, append the reward to the list
else:
end_states_no_features_rewards[end_state].append(reward_function(reward))
# set got_end to True and got_start to False, so we know that the next sample is a start sample that we should save
got_end = True
got_start = False
# calculate the means of all rewards for the end states and the end states without the person's situation
end_states_rewards = {k: np.mean(v) for k, v in end_states_rewards.items()}
end_states_no_features_rewards = {k: np.mean(v) for k, v in end_states_no_features_rewards.items()}
# return the start states, the mean rewards of the end states, the mean rewards of the
# end states without the person's situation, and all the end states
return start_states, end_states_rewards, end_states_no_features_rewards, all_end_states
Define helper functions for adjusting states to only contain the specified features and nothing else.
def adjust_state(state, features):
"""
Function to adjust a state to only include the features given.
Args: state - the state to adjust as a string,
features - the list of features to include in the state.
Returns: the adjusted state.
"""
# split the state
split = state.split(", ")
# extract the confidence, perceived usefulness, and attitude from the state
c = split[1]
pu = split[2]
# start building the new state
state = "["
# check what features are provided and add them to the new state
if "confidence" in features:
state += f"{c}, "
if "perceived_usefulness" in features:
state += f"{pu}, "
# close the state
state += "]"
# correction for the situations where confidence or perceived usefulness are the last feature to be added,
# in which case the state would end with a ", ]"
if ", ]" in state:
state = state[:-3] + "]"
# return the new state
return state
def adjust_states(original, new, features):
"""
Function to adjust all states in a given file to only include the features given and save them to sa new file.
Args: original - the name of the file with states to adjust,
new - the name of the file to save the adjusted states to,
features - the list of features to include in the state.
Returns: none.
"""
# open the original file in read mode
with open(original, 'r', newline='') as input_file:
# open the new file in write mode
with open(new, 'w', newline='') as file:
# initialise a csv writer
writer = csv.writer(file)
# write the header of the file
writer.writerow(["state_before", "action", "state_after", "reward"])
# loop over all lines in the original file, except the first one which is the header
for line in input_file.readlines()[1:]:
# remove the newline at the end of the line
if "\n" in line:
line = line[:-1]
# split the line
split = line.split("\",")
# extract the state before, action, state after, and reward from the line
state_before = split[0][1:]
action = split[1].split(",")[0]
state_after = split[1].split(",\"")[1]
split_2 = line.split("]")
reward = split_2[2].split("\",")[1]
# make corrections to the reward extracted previously,
# based on what kind of reward we have - it can be just a 0 or a list of the form [..., ..., ..., ...]
if "[" in reward:
# if the reward is a list, then we removed the ] previously, so add it back
reward = f"{reward[1:]}]"
# if the reward has a \r (carriage return) at the end, remove it
elif "\r" in reward:
reward = reward[:-1]
# create new states before and after which only include the specified features
new_state_before = adjust_state(state_before, features)
new_state_after = adjust_state(state_after, features)
# write the new row in the new file
writer.writerow([f"{new_state_before}", f"{action}", f"{new_state_after}", f"{reward}"])
Define a helper function that can simulate a given policy.
def simulate_policy(policy, possible_actions_policy, states, transition_dict, simulate_no_f=False, simulate_rewards=False):
"""
Function to help simulate policies.
Args: policy - a dict containing the policy to simulate,
possible_actions_policy - a dict containing the possible actions to take in each state for which there is not a fixed policy,
states - a list containing the states to use in the simulation,
transition_dict - the transition dict of the policy,
simulate_no_f - a boolean indicating if we are simulating with states that do not include the person's situation,
simulate_rewards - a boolean indicting if we should simulate rewards (and then add people in the starting states differently).
Returns: a dict containing the rewards per time step when simulating rewards.
"""
# get the end states
start_states, end_states, end_states_no_features, all_end_states = get_start_and_end_states()
# enumerate the values for confidence and perceived usefulness
confidence = ["0", "1"]
perceived_usefulness = ["0", "1"]
# number of people per initial state
# 100 people per state, since there may be more than one optimal policy
n = 100
# initialise 3 dicts which will hold
# the number of states in each of the time steps
distribution = {}
# the states in which people ended and corresponding time step
ended = {}
# the rewards gained in each time step
rewards = {}
# loop over 6 time steps
for i in range(7):
# loop over the possible states
for state in states:
# if the state is reachable after i time steps
if state_reachable_after(state) == i:
# the initialise the distribution and the ended to 0 for that state and time step
distribution[(i, state)] = 0
ended[(i, state)] = 0
# initialise and empty list for each time step
rewards[i] = []
# if we are simulating rewards
if simulate_rewards:
# then for each starting state
for state in start_states:
# add one person in the starting state
distribution[(0, state)] += 1
# otherwise, we are simulating transitions
else:
# loop over 2 values for confidence
for c in confidence:
# loop over 2 values for perceived usefulness
for pu in perceived_usefulness:
# create all possible starting states
state = f"[0, '{c}', '{pu}', False, False, False, False]"
# and add 100 people to each
distribution[(0, state)] = n
# set the random seed
random.seed(2023)
# start the simulation
# loop over 6 time steps
for i in range(6):
# loop over the distribution
for k_distribution, v_distribution in distribution.items():
# if there are people in this state
if v_distribution != 0:
# get the state
state = k_distribution[1]
# initialise a variable that helps determine if we found a suitable action
got_action = False
# if we are simulating states that do not include the person's situation
if simulate_no_f:
# then the state we need to check in the policy is different that the full state
# so, split the state
split = state[1:-1].split(", ")
# and create the reduced state
policy_state = f"[{split[0]}, {split[3]}, {split[4]}, {split[5]}, {split[6]}]"
# otherwise, the state stays the same
else:
policy_state = state
# check if the state can be reached in the current time step and if it is not in the policy and not a good state
if state_reachable_after(state) == i and policy_state not in policy and not state_good(state) \
or state_reachable_after(state) == i and policy_state not in policy and state_good(state) and i <= 1:
# then we need to pick an action uniformly at random
# split the state
split = state[1:-1].split(", ")
# determine if the action should be changes to plan
if n == 1 and split[0] == 0 or split[0] == 1 and n == 3:
action = "changes_to_plan"
got_action = True
# otherwise pick a random action
else:
action = random.choice(possible_actions_policy[state])
got_action = True
# otherwise, if the state can be reached in the current time step and it is in the policy and it is not an end state
# or if the state can be reached in the current time step and it is in the policy and we cannot finish (time step is less than 2)
elif (state_reachable_after(state) == i and policy_state in policy and not state_good(state)) or (
state_reachable_after(state) == i and policy_state in policy and i <= 1):
# then we have a fixed action given by the policy
action = policy[policy_state]
got_action = True
# if we found the action to be done
if got_action:
# then we need to check if we have a transition
# initialise a variable that will help determine if we found a transition
found_transition = False
# loop over the transition dict
for k_transition, v_transition in transition_dict.items():
# check if there is an entry which starts in the current state and takes the action
if k_transition[0] == state and k_transition[1] == action:
# if there is, then we found the transition
found_transition = True
# get the state that we transition to
next_state = k_transition[2]
# calculate how many people will transition to the next state
# v_transition is the probability of moving to the next state
# v_distribution is the number of people we have in the current state
number_people = v_transition * v_distribution
# if there are more than 0 people transitioning
if number_people > 0:
# then add them to the distribution at the next time step
distribution[(i + 1, next_state)] += number_people
# then check if we are transitioning to a good state and if those people can end (time step larger than 2)
if (state_good(next_state) and i + 1 >= 2):
# if that is the case, add them to the ended dict
ended[(i + 1, next_state)] += number_people
# and also add that many entries to the rewards dict (so that we can compute the mean later)
for _ in range(round(number_people)):
# if the state we are transitioning to is a state for which we know the reward
if next_state in end_states:
# then add that reward
rewards[i + 1].append(end_states[next_state])
# oherwise, add the mean reward of the next time step
else:
rewards[i + 1].append(rewards_per_timestep_mean[i + 1])
# if we did not find a transition, then we transition to all possible next states with 1/4 probability for each
if not found_transition:
# split the state
split = state[1:-1].split(", ")
# loop over 2 values for confidence
for c in ["'0'", "'1'"]:
# loop over 2 values for perceived usefulness
for pu in ["'0'", "'1'"]:
# generate the possible next state depending on the action we are taking
if action == "changes_to_plan":
possible_next_state = f"[{int(split[0]) + 1}, {c}, {pu}, {split[3]}, {split[4]}, {split[5]}, {split[6]}]"
if action == "explain_planning":
possible_next_state = f"[{int(split[0])}, {c}, {pu}, True, {split[4]}, {split[5]}, {split[6]}]"
if action == "identify_barriers":
possible_next_state = f"[{int(split[0])}, {c}, {pu}, {split[3]}, True, {split[5]}, {split[6]}]"
if action == "deal_with_barriers":
possible_next_state = f"[{int(split[0])}, {c}, {pu}, {split[3]}, {split[4]}, True, {split[6]}]"
if action == "show_testimonials":
possible_next_state = f"[{int(split[0])}, {c}, {pu}, {split[3]}, {split[4]}, {split[5]}, True]"
# calculate how many people are moving to the next state
# 1/4 is the probability of moving to the next state
# v_distribution is the number of people we have in the current state
number_people = 1 / 4 * v_distribution
# if there are more than 0 people transitioning
if number_people > 0:
# then add them to the distribution at the next time step
distribution[(i + 1, possible_next_state)] += number_people
# then check if we are transitioning to a good state and if those people can end (time step larger than 2)
if (state_good(possible_next_state) and i + 1 >= 2):
# if that is the case, add them to the ended dict
ended[(i + 1, possible_next_state)] += number_people
# and also add that many entries to the rewards dict (so that we can compute the mean later)
for _ in range(round(number_people)):
# if the state we are transitioning to is a state for which we know the reward
if possible_next_state in end_states:
# then add that reward
rewards[i + 1].append(end_states[possible_next_state])
# oherwise, add the mean reward of the next time step
else:
rewards[i + 1].append(rewards_per_timestep_mean[i + 1])
# initialise a dict which will hold the distribution with compressed states (only confidence and perceived usefulness)
distribution_compressed = {}
# calculate the mean number of time steps needed to end the simulation
# initialise the sum to 0
s = 0
# initialise the total number of people to 0
total = 0
# loop over the people who ended
for k, v in ended.items():
# if there are more than 0 people
if v != 0:
# add time step times the number of people who ended in that time step to the sum
s += k[0] * v
# add the number of people to the total
total += v
# if we are not simulating rewards, then print the average number of time steps needed to end, rounded up to the nearest integer
if not simulate_rewards:
print("Average number of time steps to reach the good state", math.ceil(s / total))
# loop over the distribution dict
for k, v in distribution.items():
# get the time step
time_step = k[0]
# adjust the state to only include confidence and perceived usefulness
state = adjust_state(k[1], features=["confidence", "perceived_usefulness"])
# if there is no entry for this time step and state in the compressed dict
if (time_step, state) not in distribution_compressed:
# then make an entry and add the number of people
distribution_compressed[(time_step, state)] = v
# otherwise, there is already an entry so add the number of people to the previous value
else:
distribution_compressed[(time_step, state)] += v
# initialise the graph X axis
X = []
# add the possible states to the X axis
for c in confidence:
for pu in perceived_usefulness:
state = f"['{c}', '{pu}']"
X.append(state)
# also add an entry for the people who ended in the good state
X.append("Ended")
# calculate how many people there are in each state in each time step
time_0 = []
time_1 = []
time_2 = []
time_3 = []
time_4 = []
time_5 = []
time_6 = []
for k, v in distribution_compressed.items():
if k[0] == 0:
time_0.append(v)
if k[0] == 1:
time_1.append(v)
if k[0] == 2:
time_2.append(v)
if k[0] == 3:
time_3.append(v)
if k[0] == 4:
time_4.append(v)
if k[0] == 5:
time_5.append(v)
if k[0] == 6:
time_6.append(v)
# calculate how many people ended at each time step
ended_at_time_0 = []
ended_at_time_1 = []
ended_at_time_2 = []
ended_at_time_3 = []
ended_at_time_4 = []
ended_at_time_5 = []
ended_at_time_6 = []
for k, v in ended.items():
if k[0] == 0:
ended_at_time_0.append(v)
if k[0] == 1:
ended_at_time_1.append(v)
if k[0] == 2:
ended_at_time_2.append(v)
if k[0] == 3:
ended_at_time_3.append(v)
if k[0] == 4:
ended_at_time_4.append(v)
if k[0] == 5:
ended_at_time_5.append(v)
if k[0] == 6:
ended_at_time_6.append(v)
# at each time step, add the previous one too to present an incrementally increasing graph
end_0 = sum(ended_at_time_0)
end_1 = end_0 + sum(ended_at_time_1)
end_2 = end_1 + sum(ended_at_time_2)
end_3 = end_2 + sum(ended_at_time_3)
end_4 = end_3 + sum(
ended_at_time_4)
end_5 = end_4 + sum(ended_at_time_5)
end_6 = end_5 + sum(ended_at_time_6)
time_0.append(end_0)
time_1.append(end_1)
time_2.append(end_2)
time_3.append(end_3)
time_4.append(end_4)
time_5.append(end_5)
time_6.append(end_6)
# when we are not simulating rewards
if not simulate_rewards:
# print how many people ended in a good state
print(f"{round(time_6[-1]/4)}% of people ended the simulation in a good state.")
# plot the results
X_axis = np.arange(len(X))
plt.ylim([0, 100])
plt.rcParams['font.size'] = 15
plt.xlabel('xlabel', fontsize=15)
plt.ylabel('ylabel', fontsize=15)
plt.yticks(fontsize=15)
plt.xticks(fontsize=15)
plt.rcParams["figure.figsize"] = (15,7)
plt.bar(X_axis - 0.3, [i / 4 for i in time_0], 0.1, label='0', color="#caf0f8", hatch="/")
plt.bar(X_axis - 0.2, [i / 4 for i in time_1], 0.1, label='1', color="#a5e6f3", hatch="\\")
plt.bar(X_axis - 0.1, [i / 4 for i in time_2], 0.1, label='2', color="#90e0ef", hatch=".")
plt.bar(X_axis - 0, [i / 4 for i in time_3], 0.1, label='3', color="#48cae4", hatch="|")
plt.bar(X_axis + 0.1, [i / 4 for i in time_4], 0.1, label='4', color="#00b4d8", hatch="-")
plt.bar(X_axis + 0.2, [i / 4 for i in time_5], 0.1, label='5', color="#0096c7", hatch="x")
plt.bar(X_axis + 0.3, [i / 4 for i in time_6], 0.1, label='6', color="#0086b3", hatch="o")
plt.gca().yaxis.set_major_formatter(mtick.PercentFormatter())
plt.xticks(X_axis, X)
plt.xlabel("States")
plt.ylabel("Percentage of people")
plt.legend()
plt.tight_layout()
# only show the plot when we are simulating transitions
if not simulate_rewards:
plt.show()
plt.close()
remove_new_file()
# if we are simulating rewards, return the rewards dict
if simulate_rewards:
return rewards
In Q4, we are computing the optimal policy, using full states. There are cases where, for a state, there might be multiple best Q-values to pick from, and cases where the optimal policy is to pick an action uniformly at random. Since we will need to pick an action when we simulate the policy, we also compute a list of possible actions that can be done in such states. Every time the simulation has to pick an action, it checks this list and picks one.
def q4(filename, min_action=False):
"""
Function to make the computations necessary to answer Q4.
Args: filename - the name of the file containing the data,
min_action - a boolean indicating whether or not to pick the worst action (used to compute the worst policy).
Returns: the policy as a dict, whether or not there are multiple policies, the transition dict, and a dict with the possible actions
in the states where there is no fixed policy.
"""
# get the Q-table via value iteration
q_n, reward_dict, transition_dict, states = value_iteration(filename)
# enumerate the possible actions
actions = ["changes_to_plan", "explain_planning", "identify_barriers", "deal_with_barriers",
"show_testimonials"]
# initialise an empty dict which will hold the policy
policy = {}
# initialise a variable to indicate if there are multiple optimal policies
multiple_optimal_policies = False
# when there are multiple optimal policies, we should know what the options are so we can easily pick one at random
# initialise a dict that will hold the possible actions for the states for which there is no fixed policy
possible_actions_policy = {}
# set the random seed so that repeated runs produce the same result
random.seed(2023)
# loop over the Q-table
for i, row in enumerate(q_n):
# get the row as a list of values
as_list = row.tolist()
# get the states from the list of states
state = states[i]
# split the state
split = state[1:-1].split(", ")
# extract the data
plans = int(split[0])
a1 = split[3]
a2 = split[4]
a3 = split[5]
a4 = split[6]
# initialise two lists for the possible q values and corresponding actions to select
possible_q_values = []
possible_actions = []
# calculate how many actions have been done already
n = string_to_bool(a1) + string_to_bool(a2) + string_to_bool(a3) + string_to_bool(a4)
# determine if changes to plans should be done
if n == 1 and plans == 0 or plans == 1 and n == 3:
possible_q_values = [as_list[0]]
possible_actions = ["changes_to_plan"]
# determine if deaing with barriers should be done
elif n >= 1 and plans == 1 and a2 == True and a3 == False:
possible_q_values = [as_list[3]]
possible_actions = ["deal_with_barriers"]
# if we do not have an action that should be done by default
else:
# set a boolean that will help determine if the previous action was changes to plan
previous_action_was_changes_to_plan = False
# if there was one changes to plans and no other action
if plans == 1 and n != 0:
# loop over two values for confidence
for c in ["'0'", "'1'"]:
# loop over two values for perceived usefulness
for pu in ["'0'", "'1'"]:
# create the possible state before the current one
possible_previous_state = f"[0, {c}, {pu}, {a1}, {a2}, {a3}, {a4}]"
# if the policy for the possible previous state was making changes to the plan
if possible_previous_state in policy and policy[possible_previous_state] == "changes_to_plan":
# then we got here by making a change to the plan, so we cannot do it again
previous_action_was_changes_to_plan = True
# determine what actions are possible
if plans < 2 and plans <= n and not previous_action_was_changes_to_plan:
possible_q_values.append(as_list[0])
possible_actions.append("changes_to_plan")
if a1 == "False":
possible_q_values.append(as_list[1])
possible_actions.append("explain_planning")
if a2 == "False":
possible_q_values.append(as_list[2])
possible_actions.append("identify_barriers")
if a3 == "False" and a2 == "True":
possible_q_values.append(as_list[3])
possible_actions.append("deal_with_barriers")
if a4 == "False":
possible_q_values.append(as_list[4])
possible_actions.append("show_testimonials")
# if we are computing the worst policy
if min_action:
# then select the smallest Q-value
max_value = min(possible_q_values)
# otherwise, select the largest Q-value
else:
max_value = max(possible_q_values)
# check how many times the value appears in the row of the Q-table
num_max = as_list.count(max_value)
# if the value appears only once
if num_max == 1:
# select the action at its corresponding index as the one to be done in the policy
ind_max = as_list.index(max_value)
policy[states[i]] = actions[ind_max]
# if we are computing a regular policy and we have multiple largest values that are not zero
elif min_action == False and num_max != 1 and max_value != 0:
# then get their indices and save them as possible actions
indices = [i for i, x in enumerate(as_list) if x == max_value]
possible_actions_policy[states[i]] = [actions[i] for i in indices]
# if we are computing the worst policy and there are multiple smallest values
elif min_action == True and num_max != 1:
# get the indices for these values
indices = [i for i, x in enumerate(as_list) if x == max_value]
# and the corresponding actions
possible_actions_according_to_indices = [actions[i] for i in indices]
# then, take the union of the possible actions that can be done and the actions which have the smallest corresponding Q-value
# to ensure that the action we pick is one we can actually do
final_possible_actions = list(set(possible_actions) & set(possible_actions_according_to_indices))
# if we are left with one action
if len(final_possible_actions) == 1:
# then pick it
policy[states[i]] = final_possible_actions[0]
# otherwise, there are multiple actions
else:
# so add them to the dict of possible actions for this state
possible_actions_policy[states[i]] = final_possible_actions
# return the policy, whether or not there are multiple policies, the transition dict, the possible actions dict
return policy, multiple_optimal_policies, transition_dict, possible_actions_policy
# return the policy
policy, multiple_optimal_policies, transition_dict, possible_actions_policy = q4("../../data/data_adjusted.csv")
# print the policy
for k,v in policy.items():
print(k,v)
[0, '0', '0', True, False, False, False] changes_to_plan [0, '0', '1', True, False, False, False] changes_to_plan [0, '1', '0', True, False, False, False] changes_to_plan [0, '1', '1', True, False, False, False] changes_to_plan [0, '0', '0', False, True, False, False] changes_to_plan [0, '0', '1', False, True, False, False] changes_to_plan [0, '1', '0', False, True, False, False] changes_to_plan [0, '1', '1', False, True, False, False] changes_to_plan [0, '0', '0', False, False, False, True] changes_to_plan [0, '0', '1', False, False, False, True] changes_to_plan [0, '1', '0', False, False, False, True] changes_to_plan [0, '1', '1', False, False, False, True] changes_to_plan [0, '0', '0', False, False, False, False] identify_barriers [0, '0', '1', False, False, False, False] explain_planning [0, '1', '0', False, False, False, False] explain_planning [0, '1', '1', False, False, False, False] explain_planning [1, '0', '0', True, True, True, True] changes_to_plan [1, '0', '1', True, True, True, True] changes_to_plan [1, '1', '0', True, True, True, True] changes_to_plan [1, '1', '1', True, True, True, True] changes_to_plan [1, '0', '1', True, True, True, False] changes_to_plan [1, '1', '0', True, True, True, False] changes_to_plan [1, '0', '1', True, True, False, True] changes_to_plan [1, '1', '0', True, True, False, True] changes_to_plan [1, '0', '0', True, True, False, False] deal_with_barriers [1, '0', '1', True, True, False, False] deal_with_barriers [1, '1', '0', True, True, False, False] deal_with_barriers [1, '1', '1', True, True, False, False] deal_with_barriers [1, '0', '0', True, False, False, True] identify_barriers [1, '0', '1', True, False, False, True] identify_barriers [1, '1', '0', True, False, False, True] identify_barriers [1, '1', '1', True, False, False, True] identify_barriers [1, '0', '0', True, False, False, False] identify_barriers [1, '0', '1', True, False, False, False] show_testimonials [1, '1', '0', True, False, False, False] show_testimonials [1, '1', '1', True, False, False, False] show_testimonials [1, '0', '1', False, True, True, True] changes_to_plan [1, '1', '0', False, True, True, True] changes_to_plan [1, '0', '0', False, True, True, False] explain_planning [1, '0', '1', False, True, True, False] explain_planning [1, '1', '0', False, True, True, False] show_testimonials [1, '1', '1', False, True, True, False] explain_planning [1, '0', '0', False, True, False, True] deal_with_barriers [1, '0', '1', False, True, False, True] explain_planning [1, '1', '0', False, True, False, True] deal_with_barriers [1, '1', '1', False, True, False, True] deal_with_barriers [1, '0', '0', False, True, False, False] deal_with_barriers [1, '0', '1', False, True, False, False] deal_with_barriers [1, '1', '0', False, True, False, False] deal_with_barriers [1, '1', '1', False, True, False, False] deal_with_barriers [1, '0', '0', False, False, False, True] identify_barriers [1, '0', '1', False, False, False, True] explain_planning [1, '1', '0', False, False, False, True] explain_planning [1, '1', '1', False, False, False, True] explain_planning [1, '0', '0', False, False, False, False] identify_barriers [1, '0', '1', False, False, False, False] explain_planning [1, '1', '0', False, False, False, False] show_testimonials [1, '1', '1', False, False, False, False] identify_barriers [2, '0', '0', True, True, True, False] show_testimonials [2, '0', '1', True, True, True, False] show_testimonials [2, '1', '0', True, True, True, False] show_testimonials [2, '0', '0', True, True, False, True] deal_with_barriers [2, '0', '1', True, True, False, True] deal_with_barriers [2, '1', '0', True, True, False, True] deal_with_barriers [2, '0', '0', True, True, False, False] deal_with_barriers [2, '1', '0', True, True, False, False] deal_with_barriers [2, '0', '0', True, False, False, True] identify_barriers [2, '0', '1', True, False, False, True] identify_barriers [2, '1', '0', True, False, False, True] identify_barriers [2, '0', '0', True, False, False, False] identify_barriers [2, '0', '1', True, False, False, False] show_testimonials [2, '1', '0', True, False, False, False] show_testimonials [2, '0', '0', False, True, True, True] explain_planning [2, '0', '1', False, True, True, True] explain_planning [2, '1', '0', False, True, True, True] explain_planning [2, '0', '0', False, True, True, False] explain_planning [2, '0', '1', False, True, True, False] explain_planning [2, '1', '0', False, True, True, False] show_testimonials [2, '0', '0', False, True, False, True] deal_with_barriers [2, '0', '1', False, True, False, True] explain_planning [2, '1', '0', False, True, False, True] deal_with_barriers [2, '0', '0', False, True, False, False] deal_with_barriers [2, '0', '1', False, True, False, False] explain_planning [2, '1', '0', False, True, False, False] deal_with_barriers [2, '0', '0', False, False, False, True] identify_barriers [2, '0', '1', False, False, False, True] explain_planning [2, '1', '0', False, False, False, True] explain_planning
In Q5, we are simulating the optimal policy from Q4. In each of the 4 possible tsarting states, we put 100 people, and simulate how they transition from one time step to another, by following the optimal policy and the transition function learned from the data. In cases where there is no such transition, we send 1/4 of people to each of the 4 possible next state.
Additionally, we compute the average number of people needed to reach the state with the highest possible confidence and perceived usefulness (rounded up to represent whole actions needed to bring people to this state), and the percentage of people who end the conversation in the this state after the 6 time steps.
def q5(filename, simulate_rewards=False):
"""
Function to make the computations necessary to answer Q5.
Args: filename - the name of the file containing the data,
simulate_rewards - a boolean indicting if we should simulate rewards (and then add people in the starting states differently).
Returns: a dict containing the rewards per time step when simulating rewards.
"""
# set the name of the file for the states
name = "state_files/all_possible_states.txt"
# open the file
with open(name) as f:
# read all the lines
states = f.readlines()
# remove the newlines at the end of the file
states = [state[:-1] for state in states]
# get the optimal policy
policy, multiple_optimal_policies, transition_dict, possible_actions_policy = q4(filename)
# loop over the states
for state in states:
# if we de not have a policy for the state
if state not in policy:
# split the state
split = state[1:-1].split(", ")
# initialise an empty list that will hold the possible actions that can be done in this state
possible_actions = []
# extract the data from the state
plans = int(split[0])
a1 = split[3]
a2 = split[4]
a3 = split[5]
a4 = split[6]
# calculate how many actions have been done
n = string_to_bool(a1) + string_to_bool(a2) + string_to_bool(a3) + string_to_bool(a4)
# determine if changes to plans should be done
if n == 1 and plans == 0 or plans == 1 and n == 3:
possible_actions = ["changes_to_plan"]
# otherwise
else:
# initialise a variable that will help determine if the previous action was making changes to the plan
previous_action_was_changes_to_plan = False
# if we have made one change and done no other action
if plans == 1 and n != 0:
# loop over 2 values for confidence
for c in ["'0'", "'1'"]:
# loop over 2 values for perceived usefulness
for pu in ["'0'", "'1'"]:
# generate the previous possible state
possible_previous_state = f"[0, {c}, {pu}, {a1}, {a2}, {a3}, {a4}]"
# if the previous possible state is in the policy and its action is making changes to the plan
if possible_previous_state in policy and policy[
possible_previous_state] == "changes_to_plan":
# then we got here by making a change to the plan so we cannot do another one
previous_action_was_changes_to_plan = True
# determine which actions are possible
if plans < 2 and plans <= n and not previous_action_was_changes_to_plan:
possible_actions.append("changes_to_plan")
if a1 == "False":
possible_actions.append("explain_planning")
if a2 == "False":
possible_actions.append("identify_barriers")
if a3 == "False" and a2 == "True":
possible_actions.append("deal_with_barriers")
if a4 == "False":
possible_actions.append("show_testimonials")
# if there is at least one possible action
if len(possible_actions) > 0:
# add the list of possible actions to the dict
possible_actions_policy[state] = possible_actions
# simulate the policy
return simulate_policy(policy, possible_actions_policy, states, transition_dict, simulate_rewards=simulate_rewards)
q5("../../data/data_adjusted.csv")
print("States are formatted as [confidence, perceived usefulness]")
Average number of time steps to reach the good state 3 82% of people ended the simulation in a good state.
States are formatted as [confidence, perceived usefulness]
In Q6, we are comparing the optimal policy with two other policies: the worst policy, obtained by taking the worst possible action for each state, and an optimal policy obtained from a model where states only include the actions done thus far and not the person's situation.
We first run simulations like the one in Q5, to see how the new policies compare to the original one. We then adjust the simulations to reflect the distribution of people across states according to the data (i.e. if the data has one person in a state, the simulation also has one person in that same state). In this second simulation, we are interested in the rewards of each time step, and also the discounted rewards for time steps in the future.
Define a function for simulating the worst policy.
def simulate_policy_worst(filename, simulate_rewards=False):
"""
Function simulate the worst policy, used in Q6.
Args: filename - the name of the file containing the data
simulate_rewards - a boolean indicting if we should simulate rewards (and then add people in the starting states differently)..
Returns: a dict containing the rewards per time step when simulating rewards.
"""
# set the name of the file for the states
name = "state_files/all_possible_states.txt"
# open the file
with open(name) as f:
# read all the lines
states = f.readlines()
# remove the newlines at the end of the file
states = [state[:-1] for state in states]
# get the worst policy
worst_policy, multiple_optimal_policies, transition_dict_worst, possible_actions_policy_worst = q4(filename,
min_action=True)
# loop over the states
for state in states:
# if we de not have a policy for the state
if state not in worst_policy:
# split the state
split = state[1:-1].split(", ")
# initialise an empty list that will hold the possible actions that can be done in this state
possible_actions = []
# extract the data from the state
plans = int(split[0])
a1 = split[3]
a2 = split[4]
a3 = split[5]
a4 = split[6]
# calculate how many actions have been done
n = string_to_bool(a1) + string_to_bool(a2) + string_to_bool(a3) + string_to_bool(a4)
# determine if changes to plans should be done
if n == 1 and plans == 0 or plans == 1 and n == 3:
possible_actions = ["changes_to_plan"]
# otherwise
else:
# initialise a variable that will help determine if the previous action was making changes to the plan
previous_action_was_changes_to_plan = False
# if we have made one change and done no other action
if plans == 1 and n != 0:
# loop over 2 values for confidence
for c in ["'0'", "'1'"]:
# loop over 2 values for perceived usefulness
for pu in ["'0'", "'1'"]:
# generate the previous possible state
possible_previous_state = f"[0, {c}, {pu}, {a1}, {a2}, {a3}, {a4}]"
# if the previous possible state is in the policy and its action is making changes to the plan
if possible_previous_state in worst_policy and worst_policy[
possible_previous_state] == "changes_to_plan":
# then we got here by making a change to the plan so we cannot do another one
previous_action_was_changes_to_plan = True
# determine which actions are possible
if plans < 2 and plans <= n and not previous_action_was_changes_to_plan:
possible_actions.append("changes_to_plan")
if a1 == "False":
possible_actions.append("explain_planning")
if a2 == "False":
possible_actions.append("identify_barriers")
if a3 == "False" and a2 == "True":
possible_actions.append("deal_with_barriers")
if a4 == "False":
possible_actions.append("show_testimonials")
# if there is at least one possible action
if len(possible_actions) > 0:
# add the list of possible actions to the dict
possible_actions_policy_worst[state] = possible_actions
# if we are not simulating rewards, print the worst policy
if not simulate_rewards:
print("Worst policy (Table F.1)")
for k,v in worst_policy.items():
print(k,v)
# simulate the policy
return simulate_policy(worst_policy, possible_actions_policy_worst, states, transition_dict_worst, simulate_rewards=simulate_rewards)
simulate_policy_worst("../../data/data_adjusted.csv")
print("States are formatted as [confidence, perceived usefulness]")
Worst policy (Table F.1) [0, '0', '0', True, False, False, False] changes_to_plan [0, '0', '1', True, False, False, False] changes_to_plan [0, '1', '0', True, False, False, False] changes_to_plan [0, '1', '1', True, False, False, False] changes_to_plan [0, '0', '0', False, True, False, False] changes_to_plan [0, '0', '1', False, True, False, False] changes_to_plan [0, '1', '0', False, True, False, False] changes_to_plan [0, '1', '1', False, True, False, False] changes_to_plan [0, '0', '0', False, False, False, True] changes_to_plan [0, '0', '1', False, False, False, True] changes_to_plan [0, '1', '0', False, False, False, True] changes_to_plan [0, '1', '1', False, False, False, True] changes_to_plan [0, '0', '0', False, False, False, False] changes_to_plan [0, '0', '1', False, False, False, False] changes_to_plan [0, '1', '0', False, False, False, False] changes_to_plan [0, '1', '1', False, False, False, False] changes_to_plan [1, '0', '0', True, True, True, True] changes_to_plan [1, '0', '1', True, True, True, True] changes_to_plan [1, '1', '0', True, True, True, True] changes_to_plan [1, '1', '1', True, True, True, True] changes_to_plan [1, '0', '0', True, True, True, False] changes_to_plan [1, '0', '1', True, True, True, False] changes_to_plan [1, '1', '0', True, True, True, False] changes_to_plan [1, '1', '1', True, True, True, False] changes_to_plan [1, '0', '0', True, True, False, True] changes_to_plan [1, '0', '1', True, True, False, True] changes_to_plan [1, '1', '0', True, True, False, True] changes_to_plan [1, '1', '1', True, True, False, True] changes_to_plan [1, '0', '1', True, True, False, False] changes_to_plan [1, '1', '0', True, True, False, False] changes_to_plan [1, '0', '0', True, False, False, True] changes_to_plan [1, '0', '1', True, False, False, True] changes_to_plan [1, '1', '0', True, False, False, True] changes_to_plan [1, '1', '1', True, False, False, True] changes_to_plan [1, '0', '0', True, False, False, False] show_testimonials [1, '0', '1', True, False, False, False] identify_barriers [1, '1', '0', True, False, False, False] identify_barriers [1, '1', '1', True, False, False, False] identify_barriers [1, '0', '0', False, True, True, True] changes_to_plan [1, '0', '1', False, True, True, True] changes_to_plan [1, '1', '0', False, True, True, True] changes_to_plan [1, '1', '1', False, True, True, True] changes_to_plan [1, '0', '0', False, True, True, False] show_testimonials [1, '0', '1', False, True, True, False] changes_to_plan [1, '1', '0', False, True, True, False] changes_to_plan [1, '1', '1', False, True, True, False] changes_to_plan [1, '0', '1', False, True, False, True] changes_to_plan [1, '1', '0', False, True, False, True] changes_to_plan [1, '0', '1', False, True, False, False] show_testimonials [1, '1', '0', False, True, False, False] explain_planning [1, '0', '0', False, False, False, True] explain_planning [1, '0', '1', False, False, False, True] identify_barriers [1, '1', '0', False, False, False, True] identify_barriers [1, '1', '1', False, False, False, True] identify_barriers [1, '0', '1', False, False, False, False] identify_barriers [1, '1', '0', False, False, False, False] identify_barriers [2, '0', '0', True, True, True, False] show_testimonials [2, '0', '1', True, True, True, False] show_testimonials [2, '1', '0', True, True, True, False] show_testimonials [2, '1', '1', True, True, True, False] show_testimonials [2, '0', '0', True, True, False, True] deal_with_barriers [2, '0', '1', True, True, False, True] deal_with_barriers [2, '1', '0', True, True, False, True] deal_with_barriers [2, '1', '1', True, True, False, True] deal_with_barriers [2, '0', '0', True, True, False, False] show_testimonials [2, '1', '0', True, True, False, False] show_testimonials [2, '0', '0', True, False, False, True] identify_barriers [2, '0', '1', True, False, False, True] identify_barriers [2, '1', '0', True, False, False, True] identify_barriers [2, '1', '1', True, False, False, True] identify_barriers [2, '0', '0', True, False, False, False] show_testimonials [2, '0', '1', True, False, False, False] identify_barriers [2, '1', '0', True, False, False, False] identify_barriers [2, '0', '0', False, True, True, True] explain_planning [2, '0', '1', False, True, True, True] explain_planning [2, '1', '0', False, True, True, True] explain_planning [2, '1', '1', False, True, True, True] explain_planning [2, '0', '0', False, True, True, False] show_testimonials [2, '0', '1', False, True, True, False] show_testimonials [2, '1', '0', False, True, True, False] explain_planning [2, '0', '0', False, True, False, True] explain_planning [2, '0', '1', False, True, False, True] deal_with_barriers [2, '1', '0', False, True, False, True] explain_planning [2, '1', '0', False, True, False, False] explain_planning [2, '0', '0', False, False, False, True] explain_planning [2, '0', '1', False, False, False, True] identify_barriers [2, '1', '0', False, False, False, True] identify_barriers Average number of time steps to reach the good state 4 72% of people ended the simulation in a good state.
States are formatted as [confidence, perceived usefulness]
Define a function for simulating the optimal policy without the person's situation.
def simulate_policy_no_features(filename, simulate_rewards=False):
"""
Function simulate the worst policy, used in Q6.
Args: filename - the name of the file containing the data
simulate_rewards - a boolean indicting if we should simulate rewards (and then add people in the starting states differently)..
Returns: a dict containing the rewards per time step when simulating rewards.
"""
# get Q-table for states that do not include the person's situation
q_n, reward_dict_no_features, transition_dict_no_features, states = value_iteration("../../data/data_no_features.csv",
states_file="state_files/states_for_q6.txt",
no_features=True)
# enumerate all possible actions
actions = ["changes_to_plan", "explain_planning", "identify_barriers", "deal_with_barriers",
"show_testimonials"]
# initialise a dict which will hold the policy
policy_no_features = {}
# initialise a dict which will hold the possible actions for states where there is no fixed policy
possible_actions_policy_no_features = {}
# loop over the Q-table
for i, row in enumerate(q_n):
# get the row as a list of values
as_list = row.tolist()
# get the states from the list of states
state = states[i]
# split the state
split = state[1:-1].split(", ")
# extract the data
plans = int(split[0])
a1 = split[1]
a2 = split[2]
a3 = split[3]
a4 = split[4]
# initialise a lists for the possible q values
possible_q_values = []
# calculate how many actions have been done already
n = string_to_bool(a1) + string_to_bool(a2) + string_to_bool(a3) + string_to_bool(a4)
# determine if changes to plans should be done
if n == 1 and plans == 0 or plans == 1 and n == 3:
possible_q_values = [as_list[0]]
# if we do not have an action that should be done by default
else:
# set a boolean that will help determine if the previous action was changes to plan
previous_action_was_changes_to_plan = False
# if there was one changes to plans and no other action
if plans == 1 and n != 0:
# get the previous state
possible_previous_state = f"[0, {a1}, {a2}, {a3}, {a4}]"
# if the policy for the possible previous state was making changes to the plan
if possible_previous_state in policy_no_features and policy_no_features[
possible_previous_state] == "changes_to_plan":
# then we got here by making a change to the plan, so we cannot do it again
previous_action_was_changes_to_plan = True
# determine the possible actions
if plans < 2 and plans <= n and not previous_action_was_changes_to_plan:
possible_q_values.append(as_list[0])
if a1 == "False":
possible_q_values.append(as_list[1])
if a2 == "False":
possible_q_values.append(as_list[2])
if a3 == "False" and a2 == "True":
possible_q_values.append(as_list[3])
if a4 == "False":
possible_q_values.append(as_list[4])
# get the action with the largest Q-value
max_value = max(possible_q_values)
# count how many times the action with the largest Q-value appears
num_max = as_list.count(max_value)
# if it appears only once
if num_max == 1:
#select it for the policy
ind_max = as_list.index(max_value)
policy_no_features[states[i]] = actions[ind_max]
# otherwise, if there are multiple possible actions and the largest Q-value is not 0
elif num_max != 1 and max_value != 0:
# then get their indices and save them as possible actions
indices = [i for i, x in enumerate(as_list) if x == max_value]
possible_actions_policy_no_features[states[i]] = [actions[i] for i in indices]
# loop over the states
for state in states:
# if we de not have a policy for the state
if state not in policy_no_features:
# split the state
split = state[1:-1].split(", ")
# initialise an empty list that will hold the possible actions that can be done in this state
possible_actions = []
# extract the data
plans = int(split[0])
a1 = split[1]
a2 = split[2]
a3 = split[3]
a4 = split[4]
# calculate how many actions have been done already
n = string_to_bool(a1) + string_to_bool(a2) + string_to_bool(a3) + string_to_bool(a4)
# determine if changes to plans should be done
if n == 1 and plans == 0 or plans == 1 and n == 3:
possible_actions = ["changes_to_plan"]
# otherwise
else:
# initialise a variable that will help determine if the previous action was making changes to the plan
previous_action_was_changes_to_plan = False
# if we have made one change and done no other action
if plans == 1 and n != 0:
# get the previous state
possible_previous_state = f"[0, {a1}, {a2}, {a3}, {a4}]"
# if the policy for the possible previous state was making changes to the plan
if possible_previous_state in policy_no_features and policy_no_features[
possible_previous_state] == "changes_to_plan":
# then we got here by making a change to the plan, so we cannot do it again
previous_action_was_changes_to_plan = True
# determine which actions are possible
if plans < 2 and plans <= n and not previous_action_was_changes_to_plan:
possible_actions.append("changes_to_plan")
if a1 == "False":
possible_actions.append("explain_planning")
if a2 == "False":
possible_actions.append("identify_barriers")
if a3 == "False" and a2 == "True":
possible_actions.append("deal_with_barriers")
if a4 == "False":
possible_actions.append("show_testimonials")
# if there is at least one possible action
if len(possible_actions) > 0:
# add the list of possible actions to the dict
possible_actions_policy_no_features[state] = possible_actions
# need the transition dict from the original policy, because we are simulating with full underlying states
# and only deciding which action to pick based on the policy for the reduced state
policy, multiple_optimal_policies_1, transition_dict, possible_actions_policy = q4(filename)
# set the name of the file for the states
name = "state_files/all_possible_states.txt"
# open the file
with open(name) as f:
# read all the lines
states = f.readlines()
# remove the newlines at the end of the file
states = [state[:-1] for state in states]
# if we are not simulating rewards, print the optimal policy without the person's situation
if not simulate_rewards:
print("Optimal policy without the person's situation (Table G.1)")
for k,v in policy_no_features.items():
print(k,v)
# simulate the policy
return simulate_policy(policy_no_features, possible_actions_policy_no_features, states, transition_dict, simulate_no_f=True, simulate_rewards=simulate_rewards)
simulate_policy_no_features("../../data/data_adjusted.csv")
print("States are formatted as [confidence, perceived usefulness]")
Optimal policy without the person's situation (Table G.1) [0, True, False, False, False] changes_to_plan [0, False, True, False, False] changes_to_plan [0, False, False, False, True] changes_to_plan [0, False, False, False, False] changes_to_plan [1, True, True, True, True] changes_to_plan [1, True, True, True, False] changes_to_plan [1, True, True, False, True] changes_to_plan [1, True, False, False, True] changes_to_plan [1, True, False, False, False] show_testimonials [1, False, True, True, True] changes_to_plan [1, False, True, True, False] changes_to_plan [1, False, True, False, False] deal_with_barriers [1, False, False, False, True] explain_planning [1, False, False, False, False] identify_barriers [2, True, True, True, False] show_testimonials [2, True, True, False, True] deal_with_barriers [2, True, False, False, True] identify_barriers [2, False, True, True, True] explain_planning [2, False, True, True, False] explain_planning Average number of time steps to reach the good state 3 74% of people ended the simulation in a good state.
States are formatted as [confidence, perceived usefulness]
Define the main function for Q6.
def q6(filename):
"""
Function make the comoutations necessary to answer Q6.
Args: filename - the name of the file containing the data,
Returns: none, but displays a plot.
"""
# get the rewards per time step of the three simulations
rewards_optimal = q5(filename, simulate_rewards=True)
rewards_worst = simulate_policy_worst(filename, simulate_rewards=True)
rewards_no_features = simulate_policy_no_features(filename, simulate_rewards=True)
# initialise lists which will hold the final rewards
rewards_final_optimal = []
rewards_final_worst = []
rewards_final_no_features = []
# initialise lists which will hold the lengths of the errorbars
yerr_optimal = []
yerr_worst = []
yerr_no_features = []
# enumerate the discount factors to check
discount_factors = [1, 0.85, 0.7, 0.5]
# loop over the discount factors
for discount_factor in discount_factors:
# for each policy, initialise
# a dict which will hold the discounted reward in each time step
rewards_discounted_optimal = {}
# and a list which will hold all the discounted rewards
rewards_discounted_optimal_list = []
rewards_discounted_worst = {}
rewards_discounted_worst_list = []
rewards_discounted_no_features = {}
rewards_discounted_no_features_list = []
# loop over time steps 2-6, since those have rewards
for i in range(2, 7):
# initialise an empty list for each time step and for each policy
rewards_discounted_optimal[i] = []
rewards_discounted_worst[i] = []
rewards_discounted_no_features[i] = []
# loop over the rewards of the optimal policy
for k, rewards in rewards_optimal.items():
# if the time step is at least the second one
if k > 1:
# loop over the rewrds gained
for reward in rewards:
# discount the reward by multiplying it with the discount factor raised to the power of the current time step - 2
# so rewards in time step 2 are not discounted
# rewards in time step 3 are discounted by one dicount factor
# rewards in time step 4 are discounted by one discount factor squared, and so on
rewards_discounted_optimal[k].append(reward * discount_factor ** (k-2))
rewards_discounted_optimal_list.append(reward * discount_factor ** (k-2))
# calculate the 95% credibility interval of the discounted rewards
mean, variance, std = scipy.stats.bayes_mvs(rewards_discounted_optimal_list, 0.95)
# get the endpoints of the interval
interval = mean[1]
lowest = interval[0]
highest = interval[1]
# get the length of the interval
diff = abs(highest - lowest)
# add half of the length of the error bar to the list, since it will display this length upwards and
# downwards from the mean, thereby displaying the entire length of the interval
yerr_optimal.append(diff / 2)
# start computing the final reward, by setting it to 0
reward_final_optimal = 0
# loop over the discounted rewards
for k, v in rewards_discounted_optimal.items():
# if there is at least one reward in this time step
if len(v) > 0:
# add the mean of this time step to the final optimal reward
reward_final_optimal += np.mean(v)
# do the same for the rewards of the worst policy
for k, rewards in rewards_worst.items():
if k > 1:
for reward in rewards:
rewards_discounted_worst[k].append(reward * discount_factor ** (k-2))
rewards_discounted_worst_list.append(reward * discount_factor ** (k-2))
mean, variance, std = scipy.stats.bayes_mvs(rewards_discounted_worst_list, 0.95)
interval = mean[1]
lowest = interval[0]
highest = interval[1]
diff = abs(highest - lowest)
yerr_worst.append(diff / 2)
reward_final_worst = 0
for k, v in rewards_discounted_worst.items():
if len(v) > 0:
reward_final_worst += np.mean(v)
# do the same for the rewards of the optimal policy with states which do not include the person's situation
for k, rewards in rewards_no_features.items():
if k > 1:
for reward in rewards:
rewards_discounted_no_features[k].append(reward * discount_factor ** (k-2))
rewards_discounted_no_features_list.append(reward * discount_factor ** (k-2))
mean, variance, std = scipy.stats.bayes_mvs(rewards_discounted_no_features_list, 0.95)
interval = mean[1]
lowest = interval[0]
highest = interval[1]
diff = abs(highest - lowest)
yerr_no_features.append(diff / 2)
reward_final_no_features = 0
for k, v in rewards_discounted_no_features.items():
if len(v) > 0:
reward_final_no_features += np.mean(v)
# divide the final rewards by the 6 to get the average across all 6 time steps
rewards_final_optimal.append(reward_final_optimal/6)
rewards_final_worst.append(reward_final_worst/6)
rewards_final_no_features.append(reward_final_no_features/6)
# plot the results
X = discount_factors
X_axis = np.arange(len(X))
plt.ylim([0, 10])
plt.plot(X_axis - 0.005, rewards_final_optimal, marker=".", label='Optimal policy', color="green")
plt.errorbar(X_axis - 0.005, rewards_final_optimal, linestyle='', yerr=yerr_optimal, color="green")
plt.plot(X_axis, rewards_final_worst, marker="*", label='Worst policy', color="purple")
plt.errorbar(X_axis, rewards_final_worst, linestyle='', yerr=yerr_worst, color="purple")
plt.plot(X_axis + 0.005, rewards_final_no_features, marker="x",
label='Optimal policy without the person\'s situation', color="orange")
plt.errorbar(X_axis + 0.005, rewards_final_no_features, linestyle='', yerr=yerr_no_features, color="orange")
plt.xticks(X_axis, X)
plt.xlabel("Discount factor")
plt.ylabel("Discounted mean reward at \n the end of the dialogue")
plt.legend(loc='upper right', bbox_to_anchor=(1.1, 1.3))
plt.show()
q6("../../data/data_adjusted.csv")