Author: Martin Dierikx
Date: 26-09-2023
Required files: data/preprocessed_database_data.csv
Output files: none
This file contains the code for the adapted G Algorithm to select significant features for the state space with their significant number of possible values. It gives the results for Table 4.2.
import math
import numpy as np
import scipy.stats as sc
import sys
# The maximum number of values to try for each feature with format [self-motivation, self-efficacy, context state]
possible_state_sizes = [4, 4, 4]
# Convert mood into a number between 0-10 to be used in the context state
possible_moods = ['glad', 'happy', 'pleased', 'delighted', 'serene', 'content', 'satisfied', 'relaxed', 'calm',
'excited', 'astonished', 'aroused', 'sleepy', 'neutral', 'tired', 'tense', 'alarmed', 'afraid',
'droopy', 'bored', 'angry', 'annoyed', 'frustrated', 'distressed', 'depressed', 'sad', 'gloomy',
'miserable']
mood_lookup = [10, 10, 9, 9, 9, 9, 9, 9, 9, 8, 7, 7, 5, 5, 5, 5, 5, 4, 3, 3, 3, 3, 2, 1, 1, 1, 1, 0]
# The discount factor and epsilon used in the value iteration
discount = 0.85
epsilon = 0.01
# List all the possible actions
possible_actions = ['dec', 'sdec', 'nothing', 'sinc', 'inc']
# The file with the pre-processed database data
data_filename = "../data/preprocessed_database_data.csv"
def calculateReward(steps_taken, recommended_goal, action):
"""
Function to calculate the reward for a sample.
Args:
steps_taken: number of steps taken in the sample.
recommended_goal: recommended goal to achieve in the sample.
action: reinforcement learning action taken in the sample.
Returns: calculated reward belonging to the sample.
"""
diff = abs(recommended_goal - steps_taken)
# If the goal was achieved, it should be less penalized for the difference in steps
if recommended_goal < steps_taken:
diff = 0.5 * diff
# The standard reward function
reward = 1 - diff / recommended_goal
return reward
def getLookup(percentile, state_size):
"""
Method to get the mapping for a certain state feature's value depending on the percentile of that value in the data.
Args:
percentile: percentile of the value to map.
state_size: number of possible values for the state feature.
Returns: the number which the value should be mapped to.
"""
lookup = 0
# For binary features
if state_size == 2:
if percentile > 50:
lookup = 1
else:
lookup = 0
# For tertiary features
if state_size == 3:
if percentile > 100 / 3:
if percentile > 200 / 3:
lookup = 2
else:
lookup = 1
else:
lookup = 0
# For quadriary features
if state_size == 4:
if percentile > 25:
if percentile > 50:
if percentile > 75:
lookup = 3
else:
lookup = 2
else:
lookup = 1
else:
lookup = 0
return lookup
def read_data(return_full_state, state_sizes):
"""
Method to read the sample data from the csv file and put it into variables.
Args:
return_full_state: whether or not to return the states with plain (0-10) state features,
- otherwise the transformed (0-1) state features are returned.
state_sizes: list containing the number of values per state feature.
Returns:
- a list of states for every sample,
- a list of actions for every sample,
- a list of next states for every sample,
- a list of rewards for every sample,
- a table with the mean rewards for each state-action pair,
- a list with states for every sample of the (possible) left-out person,
- a list with actions for every sample of the (possible) left-out person,
- a list with rewards for every sample of the (possible) left-out person,
- a list with next states for every sample of the (possible) left-out person,
- a lookup list with the transformed values (0-1) for every plain self-motivation value (0-10),
- a lookup list with the transformed values (0-1) for every plain self-efficacy value (0-10),
- a lookup list with the transformed values (0-1) for every plain context state value (0-30),
- a list with session_numbers for every sample
"""
# Lists for all self-motivation, self-efficacy, and context state values
sm_list = []
se_list = []
cs_list = []
# Lists for the other information on the data samples
full_states = []
transformed_states = []
actions = []
full_next_states = []
transformed_next_states = []
rewards = []
steps_taken = []
recommended_goals = []
session_numbers = []
# Read the data from the file
for d in open(data_filename):
data = d.split(",")
# To ignore the header cells
if data[0] != "ID":
# State data
self_motivation = int(data[1])
sm_list.append(self_motivation)
self_efficacy = int(data[2])
se_list.append(self_efficacy)
rest = int(data[3])
available_time = int(data[4])
mood = mood_lookup[possible_moods.index(data[5])]
cs_list.append(rest + available_time + mood)
full_states.append([self_motivation, self_efficacy, rest, available_time, mood])
# Action data
action = possible_actions.index(data[6])
actions.append(action)
# Reward data
prev_activity = data[13].split(";")
prev_steps = []
for steps in prev_activity:
prev_steps.append(int(steps))
prev_steps.sort()
recommended_goal = max(2000, min(10000, int(math.ceil(np.percentile(prev_steps, 60) / 100.0)) * 100))
initial_reward = calculateReward(int(data[14]), recommended_goal, action)
rewards.append(initial_reward)
steps_taken.append(int(data[14]))
recommended_goals.append(recommended_goal)
# Next state data
self_motivation_next = int(data[7])
self_efficacy_next = int(data[8])
rest_next = int(data[9])
available_time_next = int(data[10])
mood_next = mood_lookup[possible_moods.index(data[11])]
full_next_states.append([self_motivation_next, self_efficacy_next, rest_next, available_time_next, mood_next])
session_numbers.append(int(data[20].strip('\n')))
# Get the transformations of the self-motivation
sm_lookup = []
sm_precentiles = []
for i in range(11):
percentile = sc.percentileofscore(sm_list, i, 'weak')
sm_precentiles.append(percentile)
sm_lookup.append(getLookup(percentile, state_sizes[0]))
# Get the transformations of the self-efficacy
se_lookup = []
se_precentiles = []
for i in range(11):
percentile = sc.percentileofscore(se_list, i, 'weak')
se_precentiles.append(percentile)
se_lookup.append(getLookup(percentile, state_sizes[1]))
# Get the transformations of the context state
cs_lookup = []
cs_precentiles = []
for i in range(31):
percentile = sc.percentileofscore(cs_list, i, 'weak')
cs_precentiles.append(percentile)
cs_lookup.append(getLookup(percentile, state_sizes[2]))
# Get the mean rewards for each state-action pair
mean_rewards = [[0 for i in range(len(possible_actions))] for j in range(state_sizes[2] * state_sizes[1] * state_sizes[0])]
state_action_pair_counter = [[0 for i in range(len(possible_actions))] for j in range(state_sizes[2] * state_sizes[1] * state_sizes[0])]
for i in range(len(rewards)):
# Transform the state
transformed_sm = sm_lookup[full_states[i][0]]
transformed_se = se_lookup[full_states[i][1]]
transformed_cs = cs_lookup[full_states[i][2] + full_states[i][3] + full_states[i][4]]
transformed_states.append([transformed_sm, transformed_se, transformed_cs])
# Make a unique number from the state features to use it as index
state_as_number = transformed_sm * state_sizes[1] * state_sizes[2] + transformed_se * state_sizes[2] + transformed_cs
# Transform the next state
transformed_next_sm = sm_lookup[full_next_states[i][0]]
transformed_next_se = se_lookup[full_next_states[i][1]]
transformed_next_cs = cs_lookup[full_next_states[i][2] + full_next_states[i][3] + full_next_states[i][4]]
transformed_next_states.append([transformed_next_sm, transformed_next_se, transformed_next_cs])
# Keep track of the mean reward R(s,a)
reward = mean_rewards[state_as_number][actions[i]]
state_action_pair_encounters = state_action_pair_counter[state_as_number][actions[i]]
if state_action_pair_encounters == 0:
# The first encounter for this state-action pair, so just use the sample reward as mean reward
mean_rewards[state_as_number][actions[i]] = rewards[i]
state_action_pair_counter[state_as_number][actions[i]] = 1
else:
# Update the previously saved mean reward
total_reward = reward * state_action_pair_encounters + rewards[i]
state_action_pair_counter[state_as_number][actions[i]] += 1
mean_rewards[state_as_number][actions[i]] = total_reward / (state_action_pair_encounters + 1)
# Return the data with either the plain full states or the transformed states
if return_full_state:
return full_states, actions, transformed_next_states, rewards, mean_rewards, sm_lookup, se_lookup, cs_lookup, session_numbers
else:
return transformed_states, actions, transformed_next_states, rewards, mean_rewards, sm_lookup, se_lookup, cs_lookup, session_numbers
def calculateTransitions(number_of_possible_states, states, actions, next_states, state_to_number_lookup):
"""
Method to calculate the transition function from the data.
Args:
number_of_possible_states: number of possible states.
states: list with the states from all the samples in the data.
actions: list with the actions from all the samples in the data.
next_states: list with the next states from all the samples in the data.
state_to_number_lookup: list to convert a state into a number to use as index.
Returns: three-dimensional list containing a list of probabilities per next state for each state-action pair.
"""
# Every state-action pair has a list with probabilities for each next state
transitions = [[[] for i in range(len(possible_actions))] for j in range(number_of_possible_states)]
# Get the next state for each datapoint and sort them by state-action pair
for i in range(len(states)):
state_as_number = state_to_number_lookup[0] * states[i][0] + state_to_number_lookup[1] * states[i][1] + state_to_number_lookup[2] * states[i][2]
action_as_number = actions[i]
transitions[state_as_number][action_as_number].append(next_states[i])
# Calculate the probability of each next state happening for each state-action pair
for i in range(number_of_possible_states):
for j in range(len(possible_actions)):
next_states_probs = [0 for l in range(number_of_possible_states)]
number_of_next_states = len(transitions[i][j])
for k in range(number_of_next_states):
next_state = transitions[i][j][k]
next_state_as_number = state_to_number_lookup[0] * next_state[0] + state_to_number_lookup[1] * next_state[1] + state_to_number_lookup[2] * next_state[2]
if next_states_probs[next_state_as_number] == 0:
# Count how many of the transitions of a state-action pair are towards a specific next state
number_of_occurences_of_next_state = transitions[i][j].count(next_state)
next_states_probs[next_state_as_number] = number_of_occurences_of_next_state / number_of_next_states
transitions[i][j] = next_states_probs
return transitions
def calculateQValues(number_of_possible_states, mean_rewards, transitions):
"""
Method to calculate the q-values using value iteration.
Args:
mean_rewards: list with mean rewards for each state-action pair.
transitions: list containing the probabilities for each transition for each state-action pair.
Returns: list containing the q-values for each state-action pair.
"""
delta = 10
q_values = [[0 for i in range(len(possible_actions))] for j in range(number_of_possible_states)]
# Continue for as long as there is a significant change in the q-table after an iteration
while delta > epsilon:
delta = 0
next_qvalues = [[0 for i in range(len(possible_actions))] for j in range(number_of_possible_states)]
for i in range(number_of_possible_states):
for j in range(len(possible_actions)):
future_reward = 0
# Calculate the future rewards for each next state
for k in range(number_of_possible_states):
if transitions[i][j][k] != 0:
best_future_reward = 0
# Use the future action that gives the highest q-value
for l in range(len(possible_actions)):
best_future_reward = max(best_future_reward, q_values[k][l])
future_reward += transitions[i][j][k] * best_future_reward
# Calculate the new q-values and calculate the change in values
next_qvalues[i][j] = mean_rewards[i][j] + discount * future_reward
delta = max(delta, abs(next_qvalues[i][j] - q_values[i][j]))
# Update the q-values
q_values = next_qvalues.copy()
return q_values
This shows the results for picking one feature with the lowest significant size and checking combinations with the remaining features. However, it does not check combinations of the same feature with different sizes, for example, which might also be significant. To display those results, manually set the variables of best_feature
, best_size
, best_feature_2
, and best_size_2
at the places explained in the code. Feature 0 is self-motivation, feature 1 is self-efficacy, and feature 2 is context state.
Running this could give a SystemExit code which is done to end the run as soon as no significant features are found. Because of this, this file cannot be run at once as it will stop after this part of the code.
Also after separating the data over multiple buckets to compare them, it could be that one of the buckets has too few samples in it which creates a warning running the anova. This warning will not impact the results, however.
print("Features for reward prediction:")
list_of_features = [0, 1, 2]
# Initialize variables to keep track of the best feature and its corresponding size and p-value
best_feature = -1
best_p_value = 1
best_size = 4
# Select the first best feature to split on by trying all features with all possible sizes
for feature_to_test in list_of_features:
for size in range(possible_state_sizes[feature_to_test], 1, -1):
# Extract the data
new_state_sizes = possible_state_sizes.copy()
new_state_sizes[feature_to_test] = size
states, actions, next_states, rewards, mean_rewards, sm_lookup, se_lookup, cs_lookup, session_numbers = read_data(False, new_state_sizes)
buckets_for_rewards = [[] for i in range(size)]
# Separate the data over buckets with the same state
for i in range(len(states)):
state = states[i]
reward = rewards[i]
buckets_for_rewards[state[feature_to_test]].append(reward)
# Calculate the p-value of the buckets
p_value = 1
if size == 3:
F, p_value = sc.f_oneway(buckets_for_rewards[0], buckets_for_rewards[1], buckets_for_rewards[2])
elif size == 4:
F, p_value = sc.f_oneway(buckets_for_rewards[0], buckets_for_rewards[1], buckets_for_rewards[2], buckets_for_rewards[3])
elif size == 2:
F, p_value = sc.f_oneway(buckets_for_rewards[0], buckets_for_rewards[1])
print(f"The p-value for feature {feature_to_test} with size {size} is {p_value}")
# Update best feature by using the feature with the least number of values that is still significant
if p_value < 0.05:
if size == best_size and p_value < best_p_value:
best_feature = feature_to_test
best_p_value = p_value
best_size = size
elif size < best_size:
best_feature = feature_to_test
best_p_value = p_value
best_size = size
# Check if there was at least one significant feature
if best_feature == -1:
raise SystemExit("No significant feature left")
## Adjust the best feature and its size here if different configurations from the G Algortihm need to be run
## best_feature can be 0, 1, or 2
## best_size can be 2, 3, or 4
# best_feature = 0
# best_size = 2
# Initialze variables to keep track of the second best feature and its corresponding size and p-value
best_p_value_2 = 1
best_feature_2 = -1
best_size_2 = 4
# Skip the first used feature
list_of_features.remove(best_feature)
# Select the second best feature by looking for other significant features in combination with one of the values of the first selected feature
for i in range(best_size):
print(f"Checking for feature {best_feature} with size {best_size} being {i}")
for feature_to_test in list_of_features:
for size in range(possible_state_sizes[feature_to_test], 1, -1):
# Extract the data
new_state_sizes = possible_state_sizes.copy()
new_state_sizes[best_feature] = best_size
new_state_sizes[feature_to_test] = size
states, actions, next_states, rewards, mean_rewards, sm_lookup, se_lookup, cs_lookup, session_numbers = read_data(False, new_state_sizes)
buckets_for_rewards = [[] for j in range(size)]
# Separate the data depending on the state
for j in range(len(states)):
if states[j][best_feature] == i:
state = states[j]
reward = rewards[j]
buckets_for_rewards[state[feature_to_test]].append(reward)
# Calculate the p-value of the buckets
p_value = 1
if size == 3:
F, p_value = sc.f_oneway(buckets_for_rewards[0], buckets_for_rewards[1], buckets_for_rewards[2])
elif size == 4:
F, p_value = sc.f_oneway(buckets_for_rewards[0], buckets_for_rewards[1], buckets_for_rewards[2], buckets_for_rewards[3])
elif size == 2:
F, p_value = sc.f_oneway(buckets_for_rewards[0], buckets_for_rewards[1])
print(f"The p-value for feature {feature_to_test} with size {size} is {p_value}")
# Update second best feature by using the feature with the least number of values that is still significant
if p_value < 0.05:
if size == best_size_2 and p_value < best_p_value_2:
best_feature_2 = feature_to_test
best_p_value_2 = p_value
best_size_2 = size
elif size < best_size_2:
best_feature_2 = feature_to_test
best_p_value_2 = p_value
best_size_2 = size
# Check if there was at least one significant feature
if best_feature_2 == -1:
raise SystemExit("No significant feature left")
## Adjust the second best feature and its size here if different configurations from the G Algortihm need to be run
## best_feature_2 can be 0, 1, or 2
## best_size_2 can be 2, 3, or 4
# best_feature_2 = 0
# best_size_2 = 2
# Check the third feature for significance
list_of_features.remove(best_feature_2)
best_p_value_3 = 1
best_size_3 = 4
for i in range(best_size):
for j in range(best_size_2):
print(f"Checking for feature {best_feature} with size {best_size} being {i} and feature {best_feature_2} with size {best_size_2} being {j}")
for size in range(possible_state_sizes[list_of_features[0]], 1, -1):
# Extract the data
new_state_sizes = possible_state_sizes.copy()
new_state_sizes[best_feature] = best_size
new_state_sizes[best_feature_2] = best_size_2
new_state_sizes[list_of_features[0]] = size
states, actions, next_states, rewards, mean_rewards, sm_lookup, se_lookup, cs_lookup, session_numbers = read_data(False, new_state_sizes)
buckets_for_rewards = [[] for k in range(size)]
# Separate the data depending on the state
for k in range(len(states)):
if states[k][best_feature] == i and states[k][best_feature_2] == j:
state = states[k]
reward = rewards[k]
buckets_for_rewards[state[list_of_features[0]]].append(reward)
# Calculate the p-value of the buckets
p_value = 1
if size == 3:
F, p_value = sc.f_oneway(buckets_for_rewards[0], buckets_for_rewards[1], buckets_for_rewards[2])
elif size == 4:
F, p_value = sc.f_oneway(buckets_for_rewards[0], buckets_for_rewards[1], buckets_for_rewards[2], buckets_for_rewards[3])
elif size == 2:
F, p_value = sc.f_oneway(buckets_for_rewards[0], buckets_for_rewards[1])
print(f"The p-value for feature {list_of_features[0]} with size {size} is {p_value}")
# Update the possible values of the third feature
if p_value < 0.05:
if size == best_size_3 and p_value < best_p_value_3:
best_p_value_3 = p_value
best_size_3 = size
elif size < best_size_3:
best_p_value_3 = p_value
best_size_3 = size
print(f"The best p-value is {best_p_value_3} belonging to size {best_size_3}")
This shows the results for picking one feature with the lowest significant size and checking combinations with the remaining features. However, it does not check combinations of the same feature with different sizes, for example, which might also be significant. To display those results, manually set the variables of best_feature
, best_size
, best_feature_2
, and best_size_2
at the places explained in the code. Feature 0 is self-motivation, feature 1 is self-efficacy, and feature 2 is context state.
Running this could give a SystemExit code which is done to end the run as soon as no significant features are found. Because of this, this file cannot be run at once as it will stop after this part of the code.
Also after separating the data over multiple buckets to compare them, it could be that one of the buckets has too few samples in it which creates a warning running the anova. This warning will not impact the results, however.
print("Features for transition prediction:")
list_of_features = [0, 1, 2]
# Initialize variables to keep track of the best feature and its corresponding size and p-value
best_feature = -1
best_p_value = 1
best_size = 4
# Select the first best feature to split on by trying all features with all possible sizes
for feature_to_test in list_of_features:
for size in range(possible_state_sizes[feature_to_test], 1, -1):
# Extract the data
new_state_sizes = possible_state_sizes.copy()
new_state_sizes[feature_to_test] = size
states, actions, next_states, rewards, mean_rewards, sm_lookup, se_lookup, cs_lookup, session_numbers = read_data(False, new_state_sizes)
new_states = []
new_next_states = []
# Separate the data based on the state
for state in states:
new_state = [0, 0, 0]
new_state[2] = state[feature_to_test]
new_states.append(new_state)
for next_state in next_states:
new_next_state = [0, 0, 0]
new_next_state[2] = next_state[feature_to_test]
new_next_states.append(new_next_state)
# Calculate the transitions
transitions = calculateTransitions(size, new_states, actions, new_next_states, [0, 0, 1])
# Calculate the p-value of the transitions
p_value = 0
if size == 3:
F, p = sc.f_oneway(transitions[0], transitions[1], transitions[2])
p_value = np.mean(p)
elif size == 4:
F, p = sc.f_oneway(transitions[0], transitions[1], transitions[2], transitions[3])
p_value = np.mean(p)
elif size == 2:
F, p = sc.f_oneway(transitions[0], transitions[1])
p_value = np.mean(p)
print(f"The p-value for feature {feature_to_test} with size {size} is {p_value}")
# Update best feature by using the feature with the least number of values that is still significant
if p_value < 0.05:
if size == best_size and p_value < best_p_value:
best_feature = feature_to_test
best_p_value = p_value
best_size = size
elif size < best_size:
best_feature = feature_to_test
best_p_value = p_value
best_size = size
# Check if there was at least one significant feature
if best_feature == -1:
raise SystemExit("No significant feature left")
## Adjust the best feature and its size here if different configurations from the G Algortihm need to be run
## best_feature can be 0, 1, or 2
## best_size can be 2, 3, or 4
# best_feature = 0
# best_size = 2
# Initialze variables to keep track of the second best feature and its corresponding size and p-value
best_feature_2 = -1
best_p_value_2 = 1
best_size_2 = 4
# Skip the first used feature
list_of_features.remove(best_feature)
# Select the second best feature by looking for other significant features in combination with one of the values of the first selected feature
for i in range(best_size):
print(f"Checking for feature {best_feature} with size {best_size} being {i}")
for feature_to_test in list_of_features:
for size in range(possible_state_sizes[feature_to_test], 1, -1):
# Extract the data
new_state_sizes = possible_state_sizes.copy()
new_state_sizes[best_feature] = best_size
new_state_sizes[feature_to_test] = size
states, actions, next_states, rewards, mean_rewards, sm_lookup, se_lookup, cs_lookup, session_numbers = read_data(False, new_state_sizes)
new_states = []
new_next_states = []
state_indices = []
# Separate the data based on the state
for j in range(len(states)):
if states[j][best_feature] == i:
state_indices.append(j)
new_state = [0, 0, 0]
new_state[2] = i
new_state[1] = states[j][feature_to_test]
new_states.append(new_state)
for j in range(len(next_states)):
if j in state_indices:
new_next_state = [0, 0, 0]
new_next_state[2] = next_states[j][best_feature]
new_next_state[1] = next_states[j][feature_to_test]
new_next_states.append(new_next_state)
# Calculate the transitions
state_to_number_lookup_2 = [0, 0, 1]
state_to_number_lookup_2[1] = best_size
transitions = calculateTransitions(best_size * size, new_states, actions, new_next_states, state_to_number_lookup_2)
# Calculate the p-value of the transitions
p_value = 0
if size == 3:
F, p = sc.f_oneway(transitions[0 + i], transitions[1 * best_size + i], transitions[2 * best_size + i])
idx = [float(l) for l in p if l > 0]
p_value = np.mean(idx)
elif size == 4:
F, p = sc.f_oneway(transitions[0 + i], transitions[1 * best_size + i], transitions[2 * best_size + i],
transitions[3 * best_size + i])
idx = [float(l) for l in p if l > 0]
p_value = np.mean(idx)
elif size == 2:
F, p = sc.f_oneway(transitions[0 + i], transitions[1 * best_size + i])
idx = [float(l) for l in p if l > 0]
p_value = np.mean(idx)
print(f"The p-value for feature {feature_to_test} with size {size} is {p_value}")
# Update second best feature by using the feature with the least number of values that is still significant
if p_value < 0.05:
if size == best_size_2 and p_value < best_p_value_2:
best_feature_2 = feature_to_test
best_p_value_2 = p_value
best_size_2 = size
elif size < best_size_2:
best_feature_2 = feature_to_test
best_p_value_2 = p_value
best_size_2 = size
# Check if there was at least one significant feature
if best_feature_2 == -1:
raise SystemExit("No significant feature left")
## Adjust the second best feature and its size here if different configurations from the G Algortihm need to be run
## best_feature_2 can be 0, 1, or 2
## best_size_2 can be 2, 3, or 4
# best_feature_2 = 0
# best_size_2 = 2
list_of_features.remove(best_feature_2)
best_p_value_3 = 1
best_size_3 = 4
# Check significance of the third feature
for i in range(best_size):
for j in range(best_size_2):
print(f"Checking for feature {best_feature} with size {best_size} being {i} and feature {best_feature_2} with size {best_size_2} being {j}")
for size in range(possible_state_sizes[list_of_features[0]], 1, -1):
# Extract the data
new_state_sizes = possible_state_sizes.copy()
new_state_sizes[best_feature] = best_size
new_state_sizes[best_feature_2] = best_size_2
new_state_sizes[list_of_features[0]] = size
states, actions, next_states, rewards, mean_rewards, sm_lookup, se_lookup, cs_lookup, session_numbers = read_data(False, new_state_sizes)
new_states = []
new_next_states = []
state_indices = []
# Separate the data based on the state
for k in range(len(states)):
if states[k][best_feature] == i and states[k][best_feature_2] == j:
state_indices.append(k)
new_state = [0, 0, 0]
new_state[2] = i
new_state[1] = j
new_state[0] = states[k][list_of_features[0]]
new_states.append(new_state)
for k in range(len(next_states)):
if k in state_indices:
new_next_state = [0, 0, 0]
new_next_state[2] = next_states[k][best_feature]
new_next_state[1] = next_states[k][best_feature_2]
new_next_state[0] = next_states[k][list_of_features[0]]
new_next_states.append(new_next_state)
# Calculate the transitions
state_to_number_lookup_2 = [0, 0, 1]
state_to_number_lookup_2[1] = best_size
state_to_number_lookup_2[0] = best_size_2 * best_size
transitions = calculateTransitions(best_size * best_size_2 * size, new_states, actions, new_next_states, state_to_number_lookup_2)
# Calculate the p-value of the transitions
p_value = 0
if size == 3:
F, p = sc.f_oneway(transitions[j * best_size + i], transitions[1 * best_size * best_size_2 + j * best_size + i], transitions[2 * best_size * best_size_2 + j * best_size + i])
idx = [float(l) for l in p if l > 0]
p_value = np.mean(idx)
elif size == 4:
F, p = sc.f_oneway(transitions[j * best_size + i], transitions[1 * best_size * best_size_2 + j * best_size + i], transitions[2 * best_size * best_size_2 + j * best_size + i], transitions[3 * best_size * best_size_2 + j * best_size + i])
idx = [float(l) for l in p if l > 0]
p_value = np.mean(idx)
elif size == 2:
F, p = sc.f_oneway(transitions[j * best_size + i], transitions[1 * best_size * best_size_2 + j * best_size + i])
idx = [float(l) for l in p if l > 0]
p_value = np.mean(idx)
print(f"The p-value for feature {list_of_features[0]} with size {size} is {p_value}")
# Update the possible values of the third feature
if p_value < 0.05:
if size == best_size_3 and p_value < best_p_value_3:
best_p_value_3 = p_value
best_size_3 = size
elif size < best_size_3:
best_p_value_3 = p_value
best_size_3 = size
print(f"The best p-value is {best_p_value_3} belonging to size {best_size_3}")
This shows the results for picking one feature with the lowest significant size and checking combinations with the remaining features. However, it does not check combinations of the same feature with different sizes, for example, which might also be significant. To display those results, manually set the variables of best_feature
, best_size
, best_feature_2
, and best_size_2
at the places explained in the code. Feature 0 is self-motivation, feature 1 is self-efficacy, and feature 2 is context state.
Running this could give a SystemExit code which is done to end the run as soon as no significant features are found. Because of this, this file cannot be run at once as it will stop after this part of the code.
Also after separating the data over multiple buckets to compare them, it could be that one of the buckets has too few samples in it which creates a warning running the anova. This warning will not impact the results, however.
print("Features for q-value prediction:")
list_of_features = [0, 1, 2]
# Initialize variables to keep track of the best feature and its corresponding size and p-value
best_feature = -1
best_p_value = 1
best_size = 4
# Select the first best feature to split on by trying all features with all possible sizes
for feature_to_test in list_of_features:
for size in range(possible_state_sizes[feature_to_test], 1, -1):
# Extract the data
new_state_sizes = possible_state_sizes.copy()
new_state_sizes[feature_to_test] = size
states, actions, next_states, rewards, mean_rewards, sm_lookup, se_lookup, cs_lookup, session_numbers = read_data(False, new_state_sizes)
# Create a list of all possible states
possible_states = []
for sm in range(new_state_sizes[0]):
for se in range(new_state_sizes[1]):
for cs in range(new_state_sizes[2]):
possible_states.append([sm, se, cs])
new_states = []
new_next_states = []
new_mean_rewards = [[0 for i in range(len(possible_actions))] for j in range(size)]
# Separate the data
for state in states:
new_state = [0, 0, 0]
new_state[2] = state[feature_to_test]
new_states.append(new_state)
for next_state in next_states:
new_next_state = [0, 0, 0]
new_next_state[2] = next_state[feature_to_test]
new_next_states.append(new_next_state)
for i in range(len(possible_states)):
bucket = possible_states[i][feature_to_test]
for j in range(len(possible_actions)):
if new_mean_rewards[bucket][j] == 0:
new_mean_rewards[bucket][j] = mean_rewards[i][j]
elif mean_rewards[i][j] != 0:
new_mean_rewards[bucket][j] = (new_mean_rewards[bucket][j] + mean_rewards[i][j]) / 2
# Calculate the transitions
transitions = calculateTransitions(size, new_states, actions, new_next_states, [0, 0, 1])
# Calculate the q-values
q_values = calculateQValues(size, new_mean_rewards, transitions)
# Calculate the p-value of the q-values
p_value = 1
if size == 3:
F, p_value = sc.f_oneway(q_values[0], q_values[1], q_values[2])
elif size == 4:
F, p_value = sc.f_oneway(q_values[0], q_values[1], q_values[2], q_values[3])
elif size == 2:
F, p_value = sc.f_oneway(q_values[0], q_values[1])
print(f"The p-value for feature {feature_to_test} with size {size} is {p_value}")
# Update best feature by using the feature with the least number of values that is still significant
if p_value < 0.05:
if size == best_size and p_value < best_p_value:
best_feature = feature_to_test
best_p_value = p_value
best_size = size
elif size < best_size:
best_feature = feature_to_test
best_p_value = p_value
best_size = size
# Check if there was at least one significant feature
if best_feature == -1:
raise SystemExit("No significant feature left")
## Adjust the best feature and its size here if different configurations from the G Algortihm need to be run
## best_feature can be 0, 1, or 2
## best_size can be 2, 3, or 4
# best_feature = 0
# best_size = 2
# Initialze variables to keep track of the second best feature and its corresponding size and p-value
best_feature_2 = -1
best_p_value_2 = 1
best_size_2 = 4
# Skip the first used feature
list_of_features.remove(best_feature)
# Select the second best feature by looking for other significant features in combination with one of the values of the first selected feature
for i in range(best_size):
print(f"Checking for feature {best_feature} with size {best_size} being {i}")
for feature_to_test in list_of_features:
for size in range(possible_state_sizes[feature_to_test], 1, -1):
# Extract the data
new_state_sizes = possible_state_sizes.copy()
new_state_sizes[best_feature] = best_size
new_state_sizes[feature_to_test] = size
states, actions, next_states, rewards, mean_rewards, sm_lookup, se_lookup, cs_lookup, session_numbers = read_data(False, new_state_sizes)
# Create a list of all possible states
possible_states = []
for sm in range(new_state_sizes[0]):
for se in range(new_state_sizes[1]):
for cs in range(new_state_sizes[2]):
possible_states.append([sm, se, cs])
new_states = []
new_next_states = []
new_mean_rewards = [[0 for j in range(len(possible_actions))] for k in range(size * best_size)]
state_indices = []
# Separate the data based on the state
for j in range(len(states)):
if states[j][best_feature] == i:
state_indices.append(j)
new_state = [0, 0, 0]
new_state[2] = i
new_state[1] = states[j][feature_to_test]
new_states.append(new_state)
for j in range(len(next_states)):
if j in state_indices:
new_next_state = [0, 0, 0]
new_next_state[2] = next_states[j][best_feature]
new_next_state[1] = next_states[j][feature_to_test]
new_next_states.append(new_next_state)
for j in range(len(possible_states)):
if possible_states[j][best_feature] == i:
bucket = possible_states[j][feature_to_test] * best_size + i
for k in range(len(possible_actions)):
if new_mean_rewards[bucket][k] == 0:
new_mean_rewards[bucket][k] = mean_rewards[j][k]
elif mean_rewards[j][k] != 0:
new_mean_rewards[bucket][k] = (new_mean_rewards[bucket][k] + mean_rewards[j][k]) / 2
# Calculate the transitions
state_to_number_lookup = [0, 0, 1]
state_to_number_lookup[1] = best_size
transitions = calculateTransitions(size * best_size, new_states, actions, new_next_states, state_to_number_lookup)
# Calculate the q-values
q_values = calculateQValues(size * best_size, new_mean_rewards, transitions)
# Calculate the p-value of the q-values
p_value = 1
if size == 3:
F, p_value = sc.f_oneway(q_values[0 + i], q_values[1 * best_size + i], q_values[2 * best_size + i])
elif size == 4:
F, p_value = sc.f_oneway(q_values[0 + i], q_values[1 * best_size + i], q_values[2 * best_size + i], q_values[3 * best_size + i])
elif size == 2:
F, p_value = sc.f_oneway(q_values[0 + i], q_values[1 * best_size + i])
print(f"The p-value for feature {feature_to_test} with size {size} is {p_value}")
# Update second best feature by using the feature with the least number of values that is still significant
if p_value < 0.05:
if size == best_size_2 and p_value < best_p_value_2:
best_feature_2 = feature_to_test
best_p_value_2 = p_value
best_size_2 = size
elif size < best_size_2:
best_feature_2 = feature_to_test
best_p_value_2 = p_value
best_size_2 = size
# Check if there was at least one significant feature
if best_feature_2 == -1:
raise SystemExit("No significant feature left")
## Adjust the second best feature and its size here if different configurations from the G Algortihm need to be run
## best_feature_2 can be 0, 1, or 2
## best_size_2 can be 2, 3, or 4
# best_feature_2 = 0
# best_size_2 = 2
list_of_features.remove(best_feature_2)
best_p_value_3 = 1
best_size_3 = 4
# Check significance of the third feature
for i in range(best_size):
for j in range(best_size_2):
print(f"Checking for feature {best_feature} with size {best_size} being {i} and feature {best_feature_2} with size {best_size_2} being {j}")
for size in range(possible_state_sizes[list_of_features[0]], 1, -1):
# Extract the data
new_state_sizes = possible_state_sizes.copy()
new_state_sizes[best_feature] = best_size
new_state_sizes[best_feature_2] = best_size_2
new_state_sizes[list_of_features[0]] = size
states, actions, next_states, rewards, mean_rewards, sm_lookup, se_lookup, cs_lookup, session_numbers = read_data(False, new_state_sizes)
# Create a list of all possible states
possible_states = []
for sm in range(new_state_sizes[0]):
for se in range(new_state_sizes[1]):
for cs in range(new_state_sizes[2]):
possible_states.append([sm, se, cs])
new_states = []
new_next_states = []
new_mean_rewards = [[0 for k in range(len(possible_actions))] for l in range(len(possible_states))]
state_indices = []
# Separate the data based on the state
for k in range(len(states)):
if states[k][best_feature] == i and states[k][best_feature_2] == j:
state_indices.append(k)
new_state = [0, 0, 0]
new_state[2] = i
new_state[1] = j
new_state[0] = states[k][list_of_features[0]]
new_states.append(new_state)
for k in range(len(next_states)):
if k in state_indices:
new_next_state = [0, 0, 0]
new_next_state[2] = next_states[k][best_feature]
new_next_state[1] = next_states[k][best_feature_2]
new_next_state[0] = next_states[k][list_of_features[0]]
new_next_states.append(new_next_state)
for k in range(len(possible_states)):
if possible_states[k][best_feature] == i and possible_states[k][best_feature_2] == j:
bucket = possible_states[k][list_of_features[0]] * best_size * best_size_2 + j * best_size + i
for l in range(len(possible_actions)):
if new_mean_rewards[bucket][l] == 0:
new_mean_rewards[bucket][l] = mean_rewards[k][l]
elif mean_rewards[k][l] != 0:
new_mean_rewards[bucket][l] = (new_mean_rewards[bucket][l] + mean_rewards[k][l]) / 2
# Calculate the transitions
state_to_number_lookup = [0, 0, 1]
state_to_number_lookup[1] = best_size
state_to_number_lookup[0] = best_size_2 * best_size
transitions = calculateTransitions(len(possible_states), new_states, actions, new_next_states, state_to_number_lookup)
# Calculate the q-values
q_values = calculateQValues(len(possible_states), new_mean_rewards, transitions)
# Calculate the p-value of the q-values
p_value = 1
if size == 3:
F, p_value = sc.f_oneway(q_values[j * best_size + i], q_values[1 * best_size * best_size_2 + j * best_size + i], q_values[2 * best_size * best_size_2 + j * best_size + i])
elif size == 4:
F, p_value = sc.f_oneway(q_values[j * best_size + i], q_values[1 * best_size * best_size_2 + j * best_size + i], q_values[2 * best_size * best_size_2 + j * best_size + i], q_values[3 * best_size * best_size_2 + j * best_size + i])
elif size == 2:
F, p_value = sc.f_oneway(q_values[j * best_size + i], q_values[1 * best_size * best_size_2 + j * best_size + i])
print(f"The p-value for feature {list_of_features[0]} with size {size} is {p_value}")
# Update the possible values of the third feature
if p_value < 0.05:
if size == best_size_3 and p_value < best_p_value_3:
best_p_value_3 = p_value
best_size_3 = size
elif size < best_size_3:
best_p_value_3 = p_value
best_size_3 = size
print(f"The best p-value is {best_p_value_3} belonging to size {best_size_3}")
This shows the results for picking one feature of the context state with the lowest significant size and checking combinations with the remaining features. However, it does not check combinations of the same feature in different order of picking, for example, which might also be significant. To display those results, manually set the variables of best_feature
and best_feature_2
at the places explained in the code. Feature 2 is rest, feature 3 is available time, and feature 4 is mood (valence).
Because of too little data, splitting the data on two of the three context state features leaves too few samples per bucket when checking the third feature. Hence, the results will say that the third feature is not significant. However, all combinations of the three features will have the same problem but will be significant with two of them at the same time, as can be seen by manually setting the variables of best_feature
and best_feature_2
. Also, the splitting of the data could create a warning running the anova. This warning will not impact the results, however.
print("Features for context state:")
list_of_features = [2, 3, 4]
# Extract the data
states, actions, next_states, rewards, mean_rewards, sm_lookup, se_lookup, cs_lookup, session_numbers = read_data(True, new_state_sizes)
# Initialize variables to keep track of the best feature and its corresponding p-value
best_feature = -1
best_buckets_for_states = []
best_buckets_for_context_state = []
best_p_value = 1
# Select the first best feature to split on by trying all features
for feature_to_test in list_of_features:
buckets_for_states = [[] for i in range(11)]
buckets_for_context_state = [[] for i in range(11)]
# Separate the data based on the context state
for i in range(len(states)):
state = states[i]
buckets_for_states[state[feature_to_test]].append(state)
context_state = cs_lookup[state[2] + state[3] + state[4]]
buckets_for_context_state[state[feature_to_test]].append(context_state)
# Calculate the p-value of the buckets
p_value = 1
if feature_to_test == 4:
# The mood features has no possible value 6 because of the mood mappings, so bucket 6 is always empty and we can skip it
# Also nobody answered 0, meaning that bucket 0 is also always empty and can be skipped
F, p_value = sc.f_oneway(buckets_for_context_state[1], buckets_for_context_state[2], buckets_for_context_state[3], buckets_for_context_state[4], buckets_for_context_state[5], buckets_for_context_state[7], buckets_for_context_state[8], buckets_for_context_state[9], buckets_for_context_state[10])
else:
F, p_value = sc.f_oneway(buckets_for_context_state[0], buckets_for_context_state[1], buckets_for_context_state[2], buckets_for_context_state[3], buckets_for_context_state[4], buckets_for_context_state[5], buckets_for_context_state[6], buckets_for_context_state[7], buckets_for_context_state[8], buckets_for_context_state[9], buckets_for_context_state[10])
print(f"The p-value for feature {feature_to_test} is {p_value}")
# Update the best feature and its p-value
if p_value < best_p_value:
best_feature = feature_to_test
best_p_value = p_value
best_buckets_for_states = buckets_for_states.copy()
best_buckets_for_context_state = buckets_for_context_state.copy()
# Check if there was at least one significant feature
if best_feature == -1:
raise SystemExit("No significant feature left")
## Adjust the best feature here if different configurations from the G Algortihm need to be run
## best_feature can be 2, 3, or 4
# best_feature = 2
# Initialze variables to keep track of the second best feature and its corresponding size and p-value
best_p_value_2 = 1
best_feature_2 = -1
list_of_features.remove(best_feature)
# Select the second best feature by looking for other significant features in combination with the first selected feature
for i in range(11):
print(f"Checking for feature {best_feature} being {i}")
for feature_to_test in list_of_features:
buckets_for_states = [[] for i in range(11)]
buckets_for_context_state = [[] for i in range(11)]
# Separate the data based on the context state
for j in range(len(best_buckets_for_states[i])):
state = best_buckets_for_states[i][j]
buckets_for_states[state[feature_to_test]].append(state)
context_state = best_buckets_for_context_state[i][j]
buckets_for_context_state[state[feature_to_test]].append(context_state)
# Calculate the p-value of the buckets
p_value = 1
if feature_to_test == 4:
# The mood features has no possible value 6 because of the mood mappings, so bucket 6 is always empty and we can skip it
# Also nobody answered 0, meaning that bucket 0 is also always empty and can be skipped
F, p_value = sc.f_oneway(buckets_for_context_state[0], buckets_for_context_state[1], buckets_for_context_state[2], buckets_for_context_state[3], buckets_for_context_state[4], buckets_for_context_state[5], buckets_for_context_state[7], buckets_for_context_state[8], buckets_for_context_state[9], buckets_for_context_state[10])
else:
F, p_value = sc.f_oneway(buckets_for_context_state[0], buckets_for_context_state[1], buckets_for_context_state[2], buckets_for_context_state[3], buckets_for_context_state[4], buckets_for_context_state[5], buckets_for_context_state[6], buckets_for_context_state[7], buckets_for_context_state[8], buckets_for_context_state[9], buckets_for_context_state[10])
print(f"The p-value for feature {feature_to_test} is {p_value}")
# Update second best feature and its p-value
if p_value < best_p_value_2:
best_feature_2 = feature_to_test
best_p_value_2 = p_value
# Check if there was at least one significant feature
if best_feature_2 == -1:
raise SystemExit("No significant feature left")
## Adjust the second best feature here if different configurations from the G Algortihm need to be run
## best_feature_2 can be 2, 3, or 4
# best_feature_2 = 2
# Check the third feature
list_of_features.remove(best_feature_2)
best_p_value_3 = 1
for i in range(11):
buckets_for_states = [[] for i in range(11)]
buckets_for_context_state = [[] for i in range(11)]
for j in range(len(best_buckets_for_states[i])):
state = best_buckets_for_states[i][j]
buckets_for_states[state[best_feature_2]].append(state)
context_state = best_buckets_for_context_state[i][j]
buckets_for_context_state[state[best_feature_2]].append(context_state)
for j in range(11):
print(f"Checking for feature {best_feature} being {i} and feature {best_feature_2} being {j}")
buckets_for_states_2 = [[] for i in range(11)]
buckets_for_context_state_2 = [[] for i in range(11)]
# Separate the data based on the context state
for k in range(len(buckets_for_states[j])):
state = buckets_for_states[j][k]
buckets_for_states_2[state[list_of_features[0]]].append(state)
context_state = buckets_for_context_state[j][k]
buckets_for_context_state_2[state[list_of_features[0]]].append(context_state)
# Calculate the p-value of the buckets
p_value = 1
if list_of_features[0] == 4:
# The mood features has no possible value 6 because of the mood mappings, so bucket 6 is always empty and we can skip it
# Also nobody answered 0, meaning that bucket 0 is also always empty and can be skipped
F, p_value = sc.f_oneway(buckets_for_context_state_2[0], buckets_for_context_state_2[1], buckets_for_context_state_2[2], buckets_for_context_state_2[3], buckets_for_context_state_2[4], buckets_for_context_state_2[5], buckets_for_context_state_2[7], buckets_for_context_state_2[8], buckets_for_context_state_2[9], buckets_for_context_state_2[10])
else:
F, p_value = sc.f_oneway(buckets_for_context_state_2[0], buckets_for_context_state_2[1], buckets_for_context_state_2[2], buckets_for_context_state_2[3], buckets_for_context_state_2[4], buckets_for_context_state_2[5], buckets_for_context_state_2[6], buckets_for_context_state_2[7], buckets_for_context_state_2[8], buckets_for_context_state_2[9], buckets_for_context_state_2[10])
print(f"The p-value for feature {list_of_features[0]} is {p_value}")
# Update the p-value of the third feature
if p_value < best_p_value_3:
best_p_value_3 = p_value
if best_p_value_3 == 1:
print("The third feature is not significant")
else:
print(f"The p-value of the third feature is {best_p_value_3}")