Author: Andrei Stefan
Date: 13-11-2023
Required files: state_files/states_for_g.txt, data/anonymised_data/anonymised_data_final_without_id.csv
Output files: data/data_adjusted.csv, also creates files in the python_code/files_for_g folder that are automatically deleted at the end of the process
This file contains the code to reproduce the results of running the G-algorithm. This corresponds to Q1 and is shown in Table D.1.
# import all required packages
import csv
import math
import numpy as np
import os
import pandas as pd
from collections import Counter
from scipy import stats
Define helper functions for adjusting states to only contain the specified features and nothing else.
def adjust_state(state, features):
"""
Function to adjust a state to only include the features given.
Args: state - the state to adjust as a string,
features - the list of features to include in the state.
Returns: the adjusted state.
"""
# split the state
split = state.split(", ")
# extract the confidence, perceived usefulness, and attitude from the state
c = split[1]
pu = split[2]
a = split[3]
# start building the new state
state = "["
# check what features are provided and add them to the new state
if "confidence" in features:
state += f"{c}, "
if "perceived_usefulness" in features:
state += f"{pu}, "
if "attitude" in features:
state += f"{a}"
# close the state
state += "]"
# correction for the situations where confidence or perceived usefulness are the last feature to be added,
# in which case the state would end with a ", ]"
if ", ]" in state:
state = state[:-3] + "]"
# return the new state
return state
def adjust_states(original, new, features):
"""
Function to adjust all states in a given file to only include the features given and save them to sa new file.
Args: original - the name of the file with states to adjust,
new - the name of the file to save the adjusted states to,
features - the list of features to include in the state.
Returns: none.
"""
# open the original file in read mode
with open(original, 'r', newline='') as input_file:
# open the new file in write mode
with open(new, 'w', newline='') as file:
# initialise a csv writer
writer = csv.writer(file)
# write the header of the file
writer.writerow(["state_before", "action", "state_after", "reward"])
# loop over all lines in the original file, except the first one which is the header
for line in input_file.readlines()[1:]:
# remove the newline at the end of the line
if "\n" in line:
line = line[:-1]
# split the line
split = line.split("\",")
# extract the state before, action, state after, and reward from the line
state_before = split[0][1:]
action = split[1].split(",")[0]
state_after = split[1].split(",\"")[1]
split_2 = line.split("]")
reward = split_2[2].split("\",")[1]
# make corrections to the reward extracted previously,
# based on what kind of reward we have - it can be just a 0 or a list of the form [..., ..., ..., ...]
if "[" in reward:
# if the reward is a list, then we removed the ] previously, so add it back
reward = f"{reward[1:]}]"
# if the reward has a \r (carriage return) at the end, remove it
elif "\r" in reward:
reward = reward[:-1]
# create new states before and after which only include the specified features
new_state_before = adjust_state(state_before, features)
new_state_after = adjust_state(state_after, features)
# write the new row in the new file
writer.writerow([f"{new_state_before}", f"{action}", f"{new_state_after}", f"{reward}"])
Define helper functions for determining the percentiles of the features, based on the raw data.
def get_values(number_values):
"""
Function which returns a list of splits of percentiles, depending on the number of values.
Args: number_values - an integer representing the number of values a feature should have.
Returns: the list of percentiles.
"""
# if the feature should have two values, the splits are 0th to 50th percentile, and 50th to 100th percentile
if number_values == 2:
values = ["0-50", "50-100"]
# if the feature should have three values, the splits are 0th to 33rd percentile, 33rd to 66th percentile, and 66th to 100th percentile
if number_values == 3:
values = ["0-33", "33-66", "66-100"]
# if the feature should have four values, the splits are 0th to 25th percentile, 25th to 50th percentile, 50th to 75th percentile, and 75th to 100th percentile
if number_values == 4:
values = ["0-25", "25-50", "50-75", "75-100"]
return values
def calculate_percentiles(df, number_values):
"""
Function which calculates the percentiles for all the features, based on the values of.
Args: df - dataframe containing the data,
number_values - a dict containing the number of values each feature should have.
Returns: the list of percentiles.
"""
# initialise empty lists to hold the values of all the confidence, perceived usefulness, and attitude values that appear in the data
list_of_c = []
list_of_pu = []
list_of_a = []
# if the number_values dict does not specify a number of values for one of the features, it means we do not need to calculate its percentiles
# still, calculating it is easier than writing several if statements below, so just default to 2 values and calculate it anyway
if "confidence" not in number_values:
number_values["confidence"] = 2
if "perceived_usefulness" not in number_values:
number_values["perceived_usefulness"] = 2
if "attitude" not in number_values:
number_values["attitude"] = 2
# loop over the dataframe
for index, row in df.iterrows():
# get the state before
state_before = row['state_before']
# split the state before
split = state_before[1:-1].split(", ")
# extract confidence, perceived usefulness, and attitude from the state before
c = split[1]
pu = split[2]
a = split[3]
# add the confidence, perceived usefulness, and attitude of the state before to the respective lists
list_of_c.append(c)
list_of_pu.append(pu)
list_of_a.append(a)
# get the state after
state_after = row['state_after']
# split the state after
split = state_after[1:-1].split(", ")
# extract confidence, perceived usefulness, and attitude from the state after
c = split[1]
pu = split[2]
a = split[3]
# get the reward
reward = row['reward']
# to avoid adding the same values multiple times by adding both the state before and the state after,
# check if this is the last sample for a person, by checking that the reward is a list and not 0
# if we always add the state before and only for the last sample of a person add also the state after,
# then we can avoid adding the same values multiple times
if reward != "0":
# add the confidence, perceived usefulness, and attitude to the respective lists
list_of_c.append(c)
list_of_pu.append(pu)
list_of_a.append(a)
# ensure that the values are integers
list_of_c = [int(eval(i)) for i in list_of_c]
list_of_pu = [int(eval(i)) for i in list_of_pu]
list_of_a = [int(eval(i)) for i in list_of_a]
# sort the values
list_of_c.sort()
list_of_pu.sort()
list_of_a.sort()
# split the sorted arrays into 2, 3, or 4 equal length lists, depending on the number of values of each feature
# thereby calculating the respective percentiles
percentiles_c = np.array_split(list_of_c, number_values["confidence"])
percentiles_pu = np.array_split(list_of_pu, number_values["perceived_usefulness"])
percentiles_a = np.array_split(list_of_a, number_values["attitude"])
# initialise empty lists for the bounds
bounds_c = []
bounds_pu = []
bounds_a = []
# for each of the splits, add the first and last values to the bounds
# e.g. for a 0th to 50th percentile split that looks like [0,1,1,1,2,3,5,6], add 0 and 6 to the list of bounds
for percentile_c in percentiles_c:
bounds_c.append([percentile_c[0], percentile_c[-1]])
for percentile_pu in percentiles_pu:
bounds_pu.append([percentile_pu[0], percentile_pu[-1]])
for percentile_a in percentiles_a:
bounds_a.append([percentile_a[0], percentile_a[-1]])
# check the bounds for overlaps, in case of an overlap, the earlier split's upper bound becomes lower
# e.g. if we have the split for the 0th fo 50th percentile as [0, 7] and the one for the 50th to 100th percentile as [7, 10],
# then the first one becomes [0, 6] and the second one stays [7, 10] to determine that a value of 7 is always in the higher split
for i, bound_c in enumerate(bounds_c[:-1]):
if bound_c[1] == bounds_c[i + 1][0]:
bound_c[1] -= 1
for i, bound_pu in enumerate(bounds_pu[:-1]):
if bound_pu[1] == bounds_pu[i + 1][0]:
bound_pu[1] -= 1
for i, bound_a in enumerate(bounds_a[:-1]):
if bound_a[1] == bounds_a[i + 1][0]:
bound_a[1] -= 1
# return the bounds
return bounds_c, bounds_pu, bounds_a
def get_values_for_features(df, number_values, features, percentiles):
"""
Function that helps determine what raw values correspond to which percentile split.
Args: df - dataframe containing the data,
number_values - a dict containing the number of values each feature should have,
features - a list of features,
percentiles - a list of corresponding percentiles for each of the features.
Returns: a dict containing, for each feature, what raw values correspond to which percentile split.
"""
# get the bounds for all features
bounds_c, bounds_pu, bounds_a = calculate_percentiles(df, number_values)
# initialise a dict for holding the bounds for each feature
bounds = {}
# add the bounds to a corresponding key in the dict
bounds["confidence"] = bounds_c
bounds["perceived_usefulness"] = bounds_pu
bounds["attitude"] = bounds_a
# initialise a dict for holding the values each percentile split of the feature can have
values_for_features = {}
# loop over each feature and value pair
for f, v in zip(features, percentiles):
# if the value is the first percentile split for 4 values (0th to 25th), 3 values (0th to 33rd) or 2 values (0th to 50th)
if v == "0-25" or v == "0-33" or v == "0-50":
# then we need to look at the first index (0) of the feature and take its bounds
# +1 since range does not include the second parameter, also for all the other lines
values_for_features[f] = list(f"'{i}'" for i in range(bounds[f][0][0], bounds[f][0][1] + 1))
# if the value is the second percentile split for 4 values (25th to 50th), 3 values (33rd to 66th) or 2 values (50th to 100th)
if v == "25-50" or v == "33-66" or v == "50-100":
# then we need to look at the second index (1) of the feature and take its bounds
values_for_features[f] = list(f"'{i}'" for i in range(bounds[f][1][0], bounds[f][1][1] + 1))
# if the value is the third percentile split for 4 values (50th to 75th), or 3 values (66th to 100th)
if v == "50-75" or v == "66-100":
# then we need to look at the third index (2) of the feature and take its bounds
values_for_features[f] = list(f"'{i}'" for i in range(bounds[f][2][0], bounds[f][2][1] + 1))
# if the value is the fourth percentile split for 4 values (50th to 75th)
if v == "75-100":
# then we need to look at the fourth index (3) of the feature and take its bounds
values_for_features[f] = list(f"'{i}'" for i in range(bounds[f][3][0], bounds[f][3][1] + 1))
# return the dict which has the values each percentile split of the feature can have
return values_for_features
Define a helper function for creating a new file with states which only contain data of the specified features and within the specified percentiles of each feature. For example, it can create a file in which states are only made up from confidence, and the samples kept are the ones where confidence has values from the 33rd to 66th percentile for confidence.
def create_new_file(name, df, number_values, features, percentiles):
"""
Function that creates a new file where the states of the samples are adjusted to only contain the specified features, and,
for each feature, only contain values in the corresponding specified percentiles.
Args: name - the name of the new file,
df - dataframe containing the data,
number_values - a dict containing the number of values each feature should have,
features - a list of features,
percentiles - a list of corresponding percentiles for each of the features.
Returns: none.
"""
# get the values that each feature can have
values_for_features = get_values_for_features(df, number_values, features, percentiles)
# open the new file in write mode
with open(name, 'w', newline='') as file:
# initialise a csv writer
writer = csv.writer(file)
# write the header
writer.writerow(["state_before", "action", "state_after", "reward"])
# loop over the dataframe
for index, row in df.iterrows():
# initialise three variables which indicate if there is a restriction on which values a feature can have
c_requirement = False
pu_requirement = False
a_requirement = False
# if the value is in values_for_features, then it has a restriction
# we only care about the restrictions on the features given as a parameter
# so, if we only care about the values of confidence, for example,
# we don't need to check what values perceived usefulness and attitude have
if "confidence" in values_for_features:
# so, initialise two more variables to check if the confidence before and after meet the restriction
c_before = False
c_after = False
# and set the requirement to true
c_requirement = True
if "perceived_usefulness" in values_for_features:
pu_before = False
pu_after = False
pu_requirement = True
if "attitude" in values_for_features:
a_before = False
a_after = False
a_requirement = True
# get the state before
state_before = row['state_before']
# split the state before
split = state_before[1:-1].split(", ")
# extract the data from the state before
plans = split[0]
c = split[1]
pu = split[2]
a = split[3]
a1 = split[4]
a2 = split[5]
a3 = split[6]
a4 = split[7]
# start creating a new state before
adjusted_state_before = f"[{plans}, "
# only add a feature if it is present in the features given as a parameter
# so, if confidence is in the features
if "confidence" in features:
# then add it to the state
adjusted_state_before += f"{c}, "
# then check if we have a restriction on the confidence
if c_requirement:
# and if the value of the confidence is within the bounds of the restriction
if c in values_for_features["confidence"]:
# then set the c_before variable to true, to indicate that the value of the confidence in the state before is within the bounds
c_before = True
if "perceived_usefulness" in features:
adjusted_state_before += f"{pu}, "
if pu_requirement:
if pu in values_for_features["perceived_usefulness"]:
pu_before = True
if "attitude" in features:
adjusted_state_before += f"{a}, "
if a_requirement:
if a in values_for_features["attitude"]:
a_before = True
# add the rest of the state variables to the new state before
adjusted_state_before += f"{a1}, {a2}, {a3}, {a4}]"
# repeat for the state after
# get the state after
state_after = row['state_after']
# split the state after
split = state_after[1:-1].split(", ")
# extract the data from the state after
plans = split[0]
c = split[1]
pu = split[2]
a = split[3]
a1 = split[4]
a2 = split[5]
a3 = split[6]
a4 = split[7]
# start creating a new state after
adjusted_state_after = f"[{plans}, "
# check if confidence is a feature we should add
if "confidence" in features:
# then add it to the state
adjusted_state_after += f"{c}, "
# then check if we have a requirement for confidence
if c_requirement:
# and if the value of the confidence is within the bounds of the restriction
if c in values_for_features["confidence"]:
# then set the c_after variable to true, to indicate that the value of the confidence in the state after is within the bounds
c_after = True
if "perceived_usefulness" in features:
adjusted_state_after += f"{pu}, "
if pu_requirement:
if pu in values_for_features["perceived_usefulness"]:
pu_after = True
if "attitude" in features:
adjusted_state_after += f"{a}, "
if a_requirement:
if a in values_for_features["attitude"]:
a_after = True
# add the rest of the state variables to the new state after
adjusted_state_after += f"{a1}, {a2}, {a3}, {a4}]"
# check if there was a requirement
if c_requirement:
# if there was and both the values before and after are within the required values
if c_before and c_after:
# then the requirement is satisfied
c_satisfied = True
else:
# otherwise, it is not
c_satisfied = False
else:
# for simplicity, if there was no requirement, we can say that it is satisfied
c_satisfied = True
if pu_requirement:
if pu_before and pu_after:
pu_satisfied = True
else:
pu_satisfied = False
else:
pu_satisfied = True
if a_requirement:
if a_before and a_after:
a_satisfied = True
else:
a_satisfied = False
else:
a_satisfied = True
# if all three requirements are satisfied (or if they don't exist (see previous comment))
if c_satisfied and pu_satisfied and a_satisfied:
# write the row containing the adjusted states before and after to the new file
writer.writerow(
[f"{adjusted_state_before}", row["action"], f"{adjusted_state_after}", row["reward"]])
Also define a function to help remove the files created by the one above.
def remove_new_files():
"""
Function that removes the files created by the G-algorithm.
Args: none.
Returns: none.
"""
# get the path to the directory named "files_for_g"
directory = os.path.join(os.getcwd(), "files_for_g")
# loop over all files in the directory
for filename in os.listdir(directory):
# get the full path of the file by joining the directory and the file names
f = os.path.join(directory, filename)
# if the file is a g_iteration file or the adjusted file
if "g_iteration_" in f or "adjusted" in f:
# then remove the file
os.remove(f)
Define a helper function to help process the raw reward, which could either be a 0 or a list with four values which make up the reward.
def reward_function(reward):
"""
Function that helps parse the reward.
Args: reward - the reward as a string.
Returns: the parsed reward.
"""
# if the reward is a 0
if reward == 0:
# then return 0
return 0
# if there is a newline in the string
if "\n" in reward:
# then remove it
reward = reward[:-1]
# if the reward is a 0, but as string
if reward == "0":
# then return 0
return 0
# otherwise, the reward is not 0 and we need to process it
# it might be enclosed in quotes and will always have four numbers enclosed in square brackets
elif '\"' in reward or "\'" in reward or "[" in reward:
# so, while there are either quotes or square brackets, remove the first and last character
# e.g. it could start off as "[0, 1, 2, 3]"
# removing the first and last character once leaves [0, 1, 2, 3]
# removing them again leaves 0, 1, 2, 3
while '\"' in reward or "\'" in reward or "[" in reward:
reward = reward[1:-1]
# split the reward
split = reward.split(", ")
# extract reward components
split[0] = int(split[0])
split[1] = int(split[1])
split[2] = int(split[2])
split[3] = int(split[3])
# return the reward after applying the function
return (split[0] + 0.8 * split[1] + 0.2 * split[2] + split[3]) / 3
Define a helper function to speed up the run time of the G-algorithm by checking if all the rewards of the provided file are zero. In such a case, the final Q-table will be all zeroes, so we can skip the processing.
def check_file_all_rewards_zero(name):
"""
Function that helps speed up the G-algorithm by checking if all the rewards in an input file are zero.
Args: name - the name of the file.
Returns: a boolean that is True if all the rewards are zero and False otherwise.
"""
# initialise a boolean to True
all_rewards_zero = True
# open the file
with open(name) as f:
# read all lines in the file
lines = f.readlines()
# loop over all lines except the first one, since that is the header
for line in lines[1:]:
# extract the reward from the line
reward = line.split(",")[-1]
# if the reward is something other than a 0 or a 0 with a newline after
if reward != "0\n" and reward != "0":
# then there are rewards which are not 0 in the file
all_rewards_zero = False
# return the boolean
return all_rewards_zero
Define several helper functions for value iteration, and the value iteration algorithm.
def fill_in_reward_and_transition(file, transition_dict_raw, reward_dict_raw):
"""
Function for filling in the initial transition and reward dicts, which contain the state to which a state-action pair transitions,
and the raw rewards (a 0 or a list of 4 numbers) respectively.
Args: file - the name of the file,
transition_dict_raw - an empty dict,
reward_dict_raw - an empty dict.
Returns: none, the dicts are filled inplace.
"""
# loop over the file
for index, line in enumerate(open(file)):
# ignore the header and empty lines
if "state_before" not in line and len(line) != 0:
# remove the newline at the end of the line if there is one
if "\n" in line:
line = line[:-1]
# split the line
split = line.split("\",")
# get the state before
state_before = split[0][1:]
# get the action
action = split[1].split(",")[0]
# get the state after
state_after = split[1].split(",\"")[1]
# split the line differently
split_2 = line.split("]")
# get the reward
reward = split_2[2].split("\",")[1]
# if the reward was a list (starting with "["), then we removed the "]" at the end
if "[" in reward:
# so add it back
reward = f"{reward[1:]}]"
# if the state_before-action pair is not in the raw transition dict
if not (state_before, action) in transition_dict_raw:
# then create a new list containing the state_after
transition_dict_raw[(state_before, action)] = [state_after]
else:
# otherwise, there is already a list there, so append the state after to it
transition_dict_raw[(state_before, action)].append(state_after)
# if the state_before-action pair is not in the raw reward dict
if not (state_before, action) in reward_dict_raw:
# then create a new list containing the reward
reward_dict_raw[(state_before, action)] = [reward]
else:
# otherwise, there is already a list there, so append the reward to it
reward_dict_raw[(state_before, action)].append(reward)
def process_transition(transition_dict_raw):
"""
Function for processing the raw transition dict. It creates a new dict that contains state_before-action-state_after triples that
have a corresponding probability - the probability of going to the state_after when taking the action in the given state_before.
Args: transition_dict_raw - a dict filled in by fill_in_reward_and_transition.
Returns: a filled in transition dict.
"""
# create an empty dict
transition_dict = {}
# loop over the raw transition dict
# its key is the state_before-action pair, and the value is a list of the states_after
# that this state_before-action pair can transition to
for (state_before, action), states_after in transition_dict_raw.items():
# loop over the list of states_after
for state_after in states_after:
# calculate the probability that the state_before-action pair transitions to the state_after
# by counting how many times the state_after appears in the states_after list and dividing by the length of the list
prob = states_after.count(state_after) / len(states_after)
# if there is not already an entry for the probability of transitioning to the state_after
# by taking the action in the state_before in he transition dict
if not (state_before, action, state_after) in transition_dict:
# then add it to the transition dict
transition_dict[(state_before, action, state_after)] = prob
# return the transition dict
return transition_dict
def process_reward(reward_dict_raw):
"""
Function for processing the raw reward dict. It creates a new dict that contains the average reward for every
state_before-action pair.
Args: reward_dict_raw - a dict filled in by fill_in_reward_and_transition.
Returns: a filled in reward dict.
"""
# create an empty dict
reward_dict = {}
# loop over the raw reward dict
# its key is the state_before-action pair, and the value is a list of the rewards gained by taking that action in the state_before
for (state_before, action), rewards in reward_dict_raw.items():
# apply the reward function to all the rewards in the list
processed_rewards = [reward_function(x) for x in rewards]
# calculate the average reward for this state_before-action pair
final_reward = np.average(processed_rewards)
# if there is not already an entry for the average reward in the dict
if not (state_before, action) in reward_dict:
# then add it to the reward dict
reward_dict[(state_before, action)] = final_reward
# return the reward dict
return reward_dict
def max_action(q_n_1, state_after, states):
"""
Function for calculating the maximum value in a Q-table row.
Args: q_n_1 - a Q-table,
state_after - a string representation of a state,
states - a list of states, in the same order as the Q-table.
Returns: the maximum value in the corresponding row.
"""
# get the row of the Q-table by first getting the index of the state_after in the states list, which
# is also the index of the Q-table row which corresponds to this state, since the Q-table rows are
# in the same order as the states list
row = q_n_1[states.index(state_after)]
# return the maximum value in the row
return np.max(row)
def sum_term(state_before, action, states, q_n_1, transition_dict):
"""
Function for calculating the sum term of the Bellman equation: Sum_{s' in S} p(s'|s,a) * max_{a' in A} Q_{n-1}(s', a').
Args: state_before - the state from which the action is taken,
action - the action taken,
states - the list of all states,
q_n_1 - the Q-table for which to compute the sum,
transition_dict - the transition dict.
Returns: the value of the sum.
"""
# initialise the total value as 0
value_total = 0
# loop over all possible states after
for state_after in states:
# if a transition from the state_before-action pair to this state_after exists
if (state_before, action, state_after) in transition_dict:
# then the value is the probability of transitioning to it times the maximum possible value
# that can be gained in the state_after (by taking the best action)
value = transition_dict.get((state_before, action, state_after)) * max_action(q_n_1, state_after, states)
else:
# otherwise, the transition does not exist, so the value is 0
value = 0
# add the value of this possible state after to the total
value_total += value
# return the total
return value_total
def value_iteration(file, features=[], not_all_features=False, states_file="state_files/states_for_g.txt",
no_features=False):
"""
Function for executing the value iteration algorithm.
Args: file - the name of the file,
features - a list of features used for the states,
not_all_features - boolean indicating whether or not all three features should be used,
states_file - the name of the file which contains all the states for the Q-tables,
no_features - a boolean indicating whether or not we shouldn't use any of the three features.
Returns: the final Q-table, the reward and transition dicts filled in and processed, and the list of states which
correspond to the rows of the Q-table.
"""
# open the states file
with open(states_file) as f:
# read all lines in the file
states = f.readlines()
# remove the newline at the end of each line
states = [state[:-1] for state in states]
# if we should not be using all three features
if not_all_features:
# then adjust states to only include features given
# create a new list which will hold the adjusted states
adjusted_states = []
# loop over all the original states
for state in states:
#split the state
split = state[1:-1].split(", ")
# extract the data from the state
plans = split[0]
c = split[1]
pu = split[2]
a = split[3]
a1 = split[4]
a2 = split[5]
a3 = split[6]
a4 = split[7]
# start creating the adjusted state
adjusted_state = f"[{plans}, "
# add the features which are in the list of features provided
if "confidence" in features:
adjusted_state += f"{c}, "
if "perceived_usefulness" in features:
adjusted_state += f"{pu}, "
if "attitude" in features:
adjusted_state += f"{a}, "
# finish creating the adjusted state
adjusted_state += f"{a1}, {a2}, {a3}, {a4}]"
# add the adjusted states to the list of adjusted states
adjusted_states.append(adjusted_state)
# empty the original list of states
states = []
# loop over the list of adjusted states and add them to the states list
for adjusted_state in adjusted_states:
if adjusted_state not in states:
states.append(adjusted_state)
# enumerate all possible actions
actions = ["changes_to_plan", "explain_planning", "identify_barriers", "deal_with_barriers",
"show_testimonials"]
# get the number of states
num_states = len(states)
# set epsilon to a small number
epsilon = 0.0001
# initialise two Q-tables with num_states rows and 5 columns, both filled with zeroes
q_n = np.zeros((num_states, 5))
q_n_1 = np.zeros((num_states, 5))
# set delta to a number larger than epsilon to ensure the loop starts
delta = 1
# set the discount factor to 0.85
gamma = 0.85
# set the iteration number to 0
n = 0
# initialise empty raw transition and raw reward dicts
transition_dict_raw = {}
reward_dict_raw = {}
# fill in the raw dicts
fill_in_reward_and_transition(file, transition_dict_raw, reward_dict_raw)
# process the transitions
transition_dict = process_transition(transition_dict_raw)
# process the rewards
reward_dict = process_reward(reward_dict_raw)
# loop while there is a difference larger than 0.0001 between iterations
while delta > epsilon and delta != 0:
# set the difference to 0 by default
delta = 0
# loop over states
for s in states:
# loop over actions
for a in actions:
# check if there is an average reward for this state-action pair
if (s, a) in reward_dict:
# if there is, get it from the dict
reward = reward_dict.get((s, a))
else:
# if there is not, set the reward to 0
reward = 0
# fill in the state-action location of the Q-table with the value for this state-action pair
# which is equal to the reward for the state-action pair + the sum term multiplied by the discount factor
q_n[states.index(s)][actions.index(a)] = reward + gamma * sum_term(s, a, states, q_n_1,
transition_dict)
# calculate the absolute difference (always positive) between this Q-table and the previous Q-table
# if this difference is more than delta (wich is 0 at this point), overwrite the delta with this value
delta = max(delta,
abs(q_n_1[states.index(s)][actions.index(a)] - q_n[states.index(s)][actions.index(a)]))
# save a copy of the current Q-table
q_n_1 = np.copy(q_n)
# increment the iteration number
n += 1
# return the final Q-table, the reward and transition dicts, and the states list
return q_n, reward_dict, transition_dict, states
In Q1, we are using the G-algorithm to determine which two features to keep in the final model. We want to also check how many values these features need to have.
def g_algorithm_iteration_q(df, features, previous_features=[], previous_features_number_values=[]):
"""
Function for executing one iteration of the G-algorithm for Q-values.
Args: df - a dataframe containing the data,
features - a list of features that can be picked,
previous_features - a list of features that were used previously,
previous_features_number_values - a list of corresponding numbers of values for the features that were used previously.
Returns: the feature selected in this iteration and the corresponding number of values for the feature.
"""
# initialise an empty dict that will hold the p-values
p_values = {}
# deduce the number of the current iteration, based on how many features were picked previously
iteration = len(previous_features)
# initialise an empty dict that will hold the previous features and their number of values
previous_dict = {}
# loop over all feature-value pairs
for pf, pv in zip(previous_features, previous_features_number_values):
# add the possible percentile splits to the dict
previous_dict[pf] = get_values(pv)
# if there are 2 or more features in the dict (this can only happen when selecting 3 or more features)
if len(previous_dict) >= 2:
# then compute all possible combinations of features and corresponding percentile splits
# e.g. if we already selected confidence with 2 values and attitude with 3 values
# then the dict has confidence - [0-50, 50-100] and attitude - [0-33, 33-66, 66-100]
# so all the combinations are:
# confidence 0-50, attitude 0-33
# confidence 0-50, attitude 33-66
# confidence 0-50, attitude 66-100
# confidence 50-100, attitude 0-33
# confidence 50-100, attitude 33-66
# confidence 50-100, attitude 66-100
# and the reversed order:
# attitude 0-33, confidence 0-50
# attitude 0-33, confidence 50-100
# attitude 33-66, confidence 0-50
# attitude 33-66, confidence 50-100
# attitude 66-100, confidence 0-50
# attitude 66-100, confidence 50-100
combinations = []
for k in previous_dict:
for val in previous_dict[k]:
for k2 in previous_dict:
if k2 != k:
for val2 in previous_dict[k2]:
combinations.append([(k, val), (k2, val2)])
# initialise a list that will hold tuples of the combinations
as_tuples = []
# loop over all the combinations
for combination in combinations:
# sort the combination, meaning that all the combinations will have the same first feature and same second feature,
# instead of sometimes having confidence first and sometimes having attitude first in the previous example
combination.sort()
# add the combinations to the tuples list
# meaning that we added each combintation twice
as_tuples.append(tuple(combination))
# keep only unique combinations (remove duplicates) by converting the list to a dict, then back to a list
unique = list(dict.fromkeys(as_tuples))
# if there are not two or more features in the list (always happen in our case of selecting 2 features)
else:
# initialise an empty list of combinations
unique = []
# loop over the features and their list of percentile splits in the dict
for k, v in previous_dict.items():
# for each percentile split in the corresponding list
for val in v:
# add a feature-percentile split pair to the unique list
unique.append((k, val))
# if the iteration is not the first one then we have features picked previously
if iteration != 0:
# loop over the feature-percentile split pairs
for feature_split in unique:
# loop over each feature that can be picked
for feature in features:
# each feature can have 2, 3, or 4 values
number_values = [2, 3, 4]
# initialise dicts for holding the Q-tables for the 2, 3, and 4 values
q_tables_2 = {}
q_tables_3 = {}
q_tables_4 = {}
# loop over the number of possible values
for number_value in number_values:
# get the percentile splits for this number of values
percentile_splits = get_values(number_value)
# for each possible percentile split
for percentile_split in percentile_splits:
# initialise the corresponding dict with an empty list
# for the feature-split-percentile split triple
# e.g. if the previous feature was confidence with 2 values
# and we are now checking for confidence in the 0-50 split for attitude with 3 values and the 66-100 split for the attitude
# then q_tables_3 is the dict where we are initialising the empty list
# and the key for it is ((confidence,0-50),66-100)
# this is needed to fill in all possible previous and current splits to be able to compare them later
if number_value == 2:
q_tables_2[(feature_split, percentile_split)] = []
if number_value == 3:
q_tables_3[(feature_split, percentile_split)] = []
if number_value == 4:
q_tables_4[(feature_split, percentile_split)] = []
# give a unique name for the file to avoid using the wrong file
name = f"files_for_g/g_iteration_{iteration}_{feature_split}_{iteration}_{feature}_{percentile_split}.csv"
# create a feature list with the current feature we are checking (e.g. attitude in our running example)
f = [feature]
# create a corresponding list with its percentile split (e.g. 66-100 in our running example)
ps = [percentile_split]
# add the feature and split of the previously picked feature to the corresponding new lists
if type(feature_split[0]) is tuple:
for tup in feature_split:
f.append(tup[0])
ps.append(tup[1])
else:
f.append(feature_split[0])
ps.append(feature_split[1])
# create a dict which will hold the number of values for each previously picked feature and the current feature
nv = {}
# fill in the dict with the previous features and their number of values
for pf, pv in zip(previous_features, previous_features_number_values):
nv[pf] = pv
# add the current feature and its number of values to the dict
nv[feature] = number_value
# create a new file from the dataframe containing all the data, the number of values for each feature dict,
# and the two features and percentile splits lists we have creating
# so, thus far, we have been specifying what features were picked previously and how many features they all had,
# as well as what specific percentile splits we are checking for each of the features
# and we now create a file that only contains those features and the values that are in the percentile splits
# in our running example, we are creating the file which has confidence and attitude,
# and confidence is in the 0-50 percentile split, and attitude is in the 66-100 split
create_new_file(name, df, number_values=nv, features=f, percentiles=ps)
# check if all rewards in the newly created file are 0
all_rewards_zero = check_file_all_rewards_zero(name)
# if all the rewards are 0
if all_rewards_zero:
# then open the states file to get the number of states and create a Q-table with that many rows and 5 columns full of zeroes
with open("state_files/states_for_g.txt") as f:
states = f.readlines()
num_states = len(states)
q_n = np.zeros((num_states, 5))
# otherwise, run the value iteration algorithm on the newly created file to get the Q-table
else:
q_n, reward_dict, transition_dict, states = value_iteration(name, f, not_all_features=True, states_file="state_files/states_for_g.txt")
# depending on the number of values, add the Q-table to the dict of the corresponding number of values
# under the key for the current previous feature and corresponding split and the current split
# this is the same key for which we created an empty list above,
# so ((confidence,0-50),66-100) in our running example
if number_value == 2:
for i, row in enumerate(q_n):
q_tables_2[(feature_split, percentile_split)].append(row)
if number_value == 3:
for i, row in enumerate(q_n):
q_tables_3[(feature_split, percentile_split)].append(row)
if number_value == 4:
for i, row in enumerate(q_n):
q_tables_4[(feature_split, percentile_split)].append(row)
# when we have processed the Q-tables for all the possible numbers of values for the feature we are now trying to pick
# then we can compute ANOVAs to see if the creates Q-tables differ from each other
# the keys are composed as explained previously, first the previously picked feature and its percentile split as a tuple
# and then the percentile split for the current feature
anova_result_2 = \
stats.f_oneway(q_tables_2[(feature_split, get_values(2)[0])],
q_tables_2[(feature_split, get_values(2)[1])])[
1]
# if the value of the ANOVA is not NaN, then add it to the dict of p-values under the key
# composed from the feature we are now trying to pick and the number of values corresponding to it
for val in anova_result_2:
if not math.isnan(val):
p_values[(feature, 2)] = anova_result_2
anova_result_3 = \
stats.f_oneway(q_tables_3[(feature_split, get_values(3)[0])],
q_tables_3[(feature_split, get_values(3)[1])],
q_tables_3[(feature_split, get_values(3)[2])])[1]
for val in anova_result_3:
if not math.isnan(val):
p_values[(feature, 3)] = anova_result_3
anova_result_4 = \
stats.f_oneway(q_tables_4[(feature_split, get_values(4)[0])],
q_tables_4[(feature_split, get_values(4)[1])],
q_tables_4[(feature_split, get_values(4)[2])],
q_tables_4[(feature_split, get_values(4)[3])])[1]
for val in anova_result_4:
if not math.isnan(val):
p_values[(feature, 4)] = anova_result_4
# when the iteration is the first one, we have no previous feature to worry about
# the process is very similar otherwise
else:
# loop over all possible features we can pick
for feature in features:
# define how many values they can get
number_values = [2, 3, 4]
# initialise empty dicts for the Q-tables
q_tables_2 = {}
q_tables_3 = {}
q_tables_4 = {}
# loop over the possible numbers of values
for number_value in number_values:
# get the percentile splits for this number of values
percentile_splits = get_values(number_value)
# loop over each percentile split and initialise empty lists in the corresponding Q-table dict
for percentile_split in percentile_splits:
if number_value == 2:
q_tables_2[percentile_split] = []
if number_value == 3:
q_tables_3[percentile_split] = []
if number_value == 4:
q_tables_4[percentile_split] = []
# create the file name
name = f"files_for_g/g_iteration_{iteration}_{feature}_{percentile_split}.csv"
# create the dict for featues and corresponding numbers of values
# which now has only the one feature we are currently trying to pick
nv = {}
nv[feature] = number_value
# create the new file
create_new_file(name, df, number_values=nv, features=[feature], percentiles=[percentile_split])
# get its Q-table
q_n, reward_dict, transition_dict, states = value_iteration(name, [feature], not_all_features=True, states_file="state_files/states_for_g.txt")
# add the Q-table to the corresponding dict
if number_value == 2:
for i, row in enumerate(q_n):
q_tables_2[percentile_split].append(row)
if number_value == 3:
for i, row in enumerate(q_n):
q_tables_3[percentile_split].append(row)
if number_value == 4:
for i, row in enumerate(q_n):
q_tables_4[percentile_split].append(row)
# calculate the ANOVAs
anova_result_2 = \
stats.f_oneway(q_tables_2[get_values(2)[0]],
q_tables_2[get_values(2)[1]])[1]
for val in anova_result_2:
if not math.isnan(val):
p_values[(feature, 2)] = anova_result_2
anova_result_3 = \
stats.f_oneway(q_tables_3[get_values(3)[0]],
q_tables_3[get_values(3)[1]],
q_tables_3[get_values(3)[2]])[
1]
for val in anova_result_3:
if not math.isnan(val):
p_values[(feature, 3)] = anova_result_3
anova_result_4 = \
stats.f_oneway(q_tables_4[get_values(4)[0]],
q_tables_4[get_values(4)[1]],
q_tables_4[get_values(4)[2]],
q_tables_4[get_values(4)[3]])[
1]
for val in anova_result_4:
if not math.isnan(val):
p_values[(feature, 4)] = anova_result_4
# after the iteration is done, we are left with a dict that has as keys all the possible features to be picked
# paired with all their possible numbers of values, and as values the results of the ANOVAs as p-values
# e.g. if it is the first iteration and we can pick any feature, then the dict will have:
# (confidence, 2) - [some ANOVAS]
# (confidence, 3) - [some ANOVAS]
# (confidence, 4) - [some ANOVAS]
# (perceived usefulness, 2) - [some ANOVAS]
# (perceived usefulness, 3) - [some ANOVAS]
# (perceived usefulness, 4) - [some ANOVAS]
# (attitude, 2) - [some ANOVAS]
# (attitude, 3) - [some ANOVAS]
# (attitude, 4) - [some ANOVAS]
#
# we then compute the mean of these ANOVAs
p_values = {k: np.mean(v) for k, v in p_values.items()}
# print the current iteration and specify which p-values we are comparing
print(f"Iteration {iteration}. Comparing:")
# print each key and its corresponding p-value
for k,v in p_values.items():
print(k,v)
# set the smallest p-value found to 1
smallest_p = 1
# set the lowest number of values to 5
number_val = 5
# set the feature selected to an empty string
feature_selected = ""
# loop over all the p-values
for k, v in p_values.items():
# if the p-values is significant
if v < 0.05:
# check if the number of values for this is smaller than the smallest we have so far
# (we want to pick features with the smallest number of values first if possible)
# also check if the p-value is smaller than the smallest one we have so far (arbitrarily)
if k[1] <= number_val and v < smallest_p:
# if that is the case, update the feature we select, the number of values it has, and the smallest p-value found
feature_selected = k[0]
number_val = k[1]
smallest_p = v
# if no feature is selected, indicate that none of the p-values were significant
if feature_selected == "":
print("No feature is significant")
return "None"
# otherwise, specify which feature was picked and its number of values, and return them
else:
print(f"Feature selected for iteration {iteration} is {feature_selected}, with {number_val} values")
print("---------------------------------")
return feature_selected, number_val
Define the main G-algorithm function.
def g_algorithm_q(filename):
"""
Function for executing the G-algorithm for Q-values.
Args: filename - the name of the file which has the data for the G-algorithm.
Returns: none.
"""
# enumerate the possible features
features = ["confidence","perceived_usefulness", "attitude"]
# list the previous features picked (also allows "forcing" a feature to be picked
# if it is listed here and its corresponding number of values is listed in the next list)
previous_features = []
# list the previous corresponding number of values for each of the features
previous_features_number_values = []
# read the file into a dataframe
df = pd.read_csv(filename)
# initialise the result as an empty string
result = ""
# initialise the iteration number to 1
i = 1
# loop while there was a result (a feature selected), while there are still features that can be picked,
# and while we have not selected two features already
while result != "None" and len(features) != 0 and i <= 2:
# get the feature that the G-algorithm picks in this iteration
result = g_algorithm_iteration_q(df, features, previous_features=previous_features,previous_features_number_values=previous_features_number_values)
# get the feature and the number of values from the result
selected_feature = result[0]
feature_values = result[1]
# increment the iteration number
i += 1
# if there was a feature selected
if result != "None":
# then remove it from the list of features that can be picked
features.remove(selected_feature)
# and add it to the features that were picked previously
previous_features.append(selected_feature)
# and add its corresponding number of values to the list of values for the previously picked features
previous_features_number_values.append(feature_values)
Note that the cell below takes about 2 hours to run, due to the Q-tables being very large.
g_algorithm_q('../../data/anonymised_data/anonymised_data_final_without_id.csv')
remove_new_files()
Iteration 0. Comparing: ('confidence', 2) 0.006551554309550134 ('confidence', 3) 0.005148984724721217 ('confidence', 4) 0.00040700805036571213 ('perceived_usefulness', 2) 0.00797145383771955 ('perceived_usefulness', 3) 0.01597165211688375 ('perceived_usefulness', 4) 0.0648294559680821 ('attitude', 2) 0.05220026776357819 ('attitude', 3) 0.08146539159516121 ('attitude', 4) 0.24668512883459562 Feature selected for iteration 0 is confidence, with 2 values --------------------------------- Iteration 1. Comparing: ('perceived_usefulness', 2) 9.22584917396775e-05 ('perceived_usefulness', 3) 0.0011009230968480823 ('perceived_usefulness', 4) 0.006373340300456921 ('attitude', 2) 0.001717186399168694 ('attitude', 3) 0.012657562590080329 ('attitude', 4) 0.008569952040884716 Feature selected for iteration 1 is perceived_usefulness, with 2 values ---------------------------------