Author: Martin Dierikx
Date: 25-09-2023
Required files: data/preprocessed_database_data.csv and data/preprocessed_post_questionnaire_data.csv
Output files: none
This file contains the code to create the results for the general analysis for Figures 4.2, 4.3, 4.4, 4.5, and 4.6 and Appendix D, E, and F.
import math
from matplotlib import pyplot as plt
import numpy as np
import scipy.stats as sc
# The number of values for each feature with format [self-motivation, self-efficacy, context state]
state_sizes = [2, 2, 2]
# Convert mood into a number between 0-10 to be used in the context state
possible_moods = ['glad', 'happy', 'pleased', 'delighted', 'serene', 'content', 'satisfied', 'relaxed', 'calm',
'excited', 'astonished', 'aroused', 'sleepy', 'neutral', 'tired', 'tense', 'alarmed', 'afraid',
'droopy', 'bored', 'angry', 'annoyed', 'frustrated', 'distressed', 'depressed', 'sad', 'gloomy',
'miserable']
mood_lookup = [10, 10, 9, 9, 9, 9, 9, 9, 9, 8, 7, 7, 5, 5, 5, 5, 5, 4, 3, 3, 3, 3, 2, 1, 1, 1, 1, 0]
# The base rewards for a chosen action in the reward function with the standard reward per action
action_lookup = [0.8, 0.85, 0.9, 0.95, 1]
# The discount factor and epsilon used in the value iteration
discount = 0.85
epsilon = 0.01
# List all the possible states and actions
possible_actions = ['dec', 'sdec', 'nothing', 'sinc', 'inc']
possible_states = []
for sm in range(state_sizes[0]):
for se in range(state_sizes[1]):
for cs in range(state_sizes[2]):
possible_states.append(f"[{sm}, {se}, {cs}]")
# The file with the pre-processed post-questionnaire data
data_filename = "../data/preprocessed_post_questionnaire_data.csv"
We calculate the ASA-score of our virtual coach using the data from the post-questionnaire.
# Store the scores from the participants
p_ids = []
average_steps_final_day = []
ASAQ = [[] for i in range(24)]
for d in open(data_filename):
data = d.split(",")
if data[0] != 'ID':
# Extract data on the average number of steps taken by people on the final day of the intervention
p_ids.append(data[0].strip("\""))
average_steps_final_day.append(int(data[1]))
# Extract the scores for each of the 24 items of the short version of the ASAQ
ASAQ[0].append(int(data[4].split(" (")[0]))
ASAQ[1].append(int(data[5].split(" (")[0]))
ASAQ[2].append(int(data[6].split(" (")[0]))
ASAQ[3].append(int(data[7].split(" (")[0]))
ASAQ[4].append(int(data[8].split(" (")[0]))
ASAQ[5].append(int(data[9].split(" (")[0]))
ASAQ[6].append(int(data[10].split(" (")[0]))
ASAQ[7].append(int(data[11].split(" (")[0]))
ASAQ[8].append(int(data[12].split(" (")[0]))
ASAQ[9].append(int(data[13].split(" (")[0]))
ASAQ[10].append(int(data[14].split(" (")[0]))
ASAQ[11].append(int(data[15].split(" (")[0]))
ASAQ[12].append(int(data[16].split(" (")[0]))
ASAQ[13].append(int(data[17].split(" (")[0]))
ASAQ[14].append(int(data[18].split(" (")[0]))
ASAQ[15].append(int(data[19].split(" (")[0]))
ASAQ[16].append(int(data[20].split(" (")[0]))
ASAQ[17].append(int(data[21].split(" (")[0]))
ASAQ[18].append(int(data[22].split(" (")[0]))
ASAQ[19].append(int(data[23].split(" (")[0]))
ASAQ[20].append(int(data[24].split(" (")[0]))
ASAQ[21].append(int(data[25].split(" (")[0]))
ASAQ[22].append(int(data[26].split(" (")[0]))
ASAQ[23].append(int(data[27].split(" (")[0]))
# Calculate the mean values for all the ASAQ items
ASAQ_means = [np.mean(ASAQ[i]) for i in range(24)]
# Account for reversed scoring questions
ASAQ_means[11] = ASAQ_means[11] * -1
ASAQ_means[16] = ASAQ_means[16] * -1
ASAQ_means[17] = ASAQ_means[17] * -1
ASAQ_means[21] = ASAQ_means[21] * -1
print(f"The average number of steps taken on the final day by people who completed the full study was {round(np.mean(average_steps_final_day))} with standard deviation of {round(np.std(average_steps_final_day))}")
print(f"A total of {len(ASAQ[0])} people completed the post questionnaire")
print(f"The ASA score = {sum(ASAQ_means)}")
We look at how difficult people generally found the goals to be.
print("Perceived goal difficulty:")
# Keep track of how many times a certain answer was given
x = [i for i in range(-5, 6)]
y = [0 for i in range(0, 11)]
goal_difficulty_list = []
for d in open(data_filename):
data = d.split(",")
if data[30] != '' and data[30] != 'Goal_difficulty':
goal_difficulty = data[30].split(" (")[0]
goal_difficulty_list.append(int(goal_difficulty))
y[int(goal_difficulty) + 5] += 1
# Plot the occurrences of each of the possible goal difficulties
plt.subplots(figsize=(10, 6))
plt.bar(x, y, color='lightblue')
plt.xticks(x, ['-5 \n (very difficult)', '-4', '-3', '-2', '-1', '0', '1', '2', '3', '4', '5 \n (very easy)'])
plt.ylabel("Number of people")
plt.xlabel("Perceived goal difficulty")
plt.show()
def calculateReward(steps_taken, recommended_goal, action):
"""
Function to calculate the reward for a sample.
Args:
steps_taken: number of steps taken in the sample.
recommended_goal: recommended goal to achieve in the sample.
action: reinforcement learning action taken in the sample.
Returns: calculated reward belonging to the sample.
"""
diff = abs(recommended_goal - steps_taken)
# If the goal was achieved, it should be less penalized for the difference in steps
if recommended_goal < steps_taken:
diff = 0.5 * diff
# The standard reward function
reward = 1 - diff / recommended_goal
return reward
# Adjust the file to use
data_filename = "../data/preprocessed_database_data.csv"
# Lists for all self-motivation, self-efficacy, and context state values
sm_list = []
se_list = []
cs_list = []
# Lists for the other information on the data samples
full_states = []
transformed_states = []
actions = []
full_next_states = []
transformed_next_states = []
rewards = []
steps_taken = []
recommended_goals = []
session_numbers = []
number_of_rejected_proposals = []
actual_goals = []
average_steps_before_intervention = []
# Read the data from the file
for d in open(data_filename):
data = d.split(",")
# To ignore the header cells
if data[0] != "ID":
# State data
self_motivation = int(data[1])
sm_list.append(self_motivation)
self_efficacy = int(data[2])
se_list.append(self_efficacy)
rest = int(data[3])
available_time = int(data[4])
mood = mood_lookup[possible_moods.index(data[5])]
cs_list.append(rest + available_time + mood)
full_states.append([self_motivation, self_efficacy, rest, available_time, mood])
# Action data
action = possible_actions.index(data[6])
actions.append(action)
# Reward data
prev_activity = data[13].split(";")
prev_steps = []
for steps in prev_activity:
prev_steps.append(int(steps))
prev_steps.sort()
recommended_goal = max(2000, min(10000, int(math.ceil(np.percentile(prev_steps, 60) / 100.0)) * 100))
initial_reward = calculateReward(int(data[14]), recommended_goal, action)
rewards.append(initial_reward)
steps_taken.append(int(data[14]))
recommended_goals.append(recommended_goal)
if int(data[20].strip('\n')) == 1 and data[0].strip("\"") in p_ids:
average_steps_before_intervention.append(np.mean(prev_steps))
# Next state data
self_motivation_next = int(data[7])
self_efficacy_next = int(data[8])
rest_next = int(data[9])
available_time_next = int(data[10])
mood_next = mood_lookup[possible_moods.index(data[11])]
full_next_states.append([self_motivation_next, self_efficacy_next, rest_next, available_time_next, mood_next])
number_of_rejected_proposals.append(int(data[18]))
session_numbers.append(int(data[20].strip('\n')))
actual_goals.append(int(data[17]))
# Get the transformations of the self-motivation
sm_lookup = []
sm_precentiles = []
for i in range(11):
percentile = sc.percentileofscore(sm_list, i, 'weak')
sm_precentiles.append(percentile)
# Transform into a 1 if the value is in the upper 50% of the data
if percentile > 50:
sm_lookup.append(1)
else:
sm_lookup.append(0)
# Get the transformations of the self-efficacy
se_lookup = []
se_precentiles = []
for i in range(11):
percentile = sc.percentileofscore(se_list, i, 'weak')
se_precentiles.append(percentile)
# Transform into a 1 if the value is in the upper 50% of the data
if percentile > 50:
se_lookup.append(1)
else:
se_lookup.append(0)
# Get the transformations of the context state
cs_lookup = []
cs_precentiles = []
for i in range(31):
percentile = sc.percentileofscore(cs_list, i, 'weak')
cs_precentiles.append(percentile)
# Transform into a 1 if the value is in the upper 50% of the data
if percentile > 50:
cs_lookup.append(1)
else:
cs_lookup.append(0)
# Get the mean rewards for each state-action pair
mean_rewards = [[0 for i in range(len(possible_actions))] for j in range(state_sizes[2] * state_sizes[1] * state_sizes[0])]
state_action_pair_counter = [[0 for i in range(len(possible_actions))] for j in range(state_sizes[2] * state_sizes[1] * state_sizes[0])]
for i in range(len(rewards)):
# Transform the state
transformed_sm = sm_lookup[full_states[i][0]]
transformed_se = se_lookup[full_states[i][1]]
transformed_cs = cs_lookup[full_states[i][2] + full_states[i][3] + full_states[i][4]]
transformed_states.append([transformed_sm, transformed_se, transformed_cs])
# Make a unique number from the state features to use it as index
state_as_number = transformed_sm * state_sizes[1] * state_sizes[2] + transformed_se * state_sizes[2] + transformed_cs
# Transform the next state
transformed_next_sm = sm_lookup[full_next_states[i][0]]
transformed_next_se = se_lookup[full_next_states[i][1]]
transformed_next_cs = cs_lookup[full_next_states[i][2] + full_next_states[i][3] + full_next_states[i][4]]
transformed_next_states.append([transformed_next_sm, transformed_next_se, transformed_next_cs])
# Keep track of the mean reward R(s,a)
reward = mean_rewards[state_as_number][actions[i]]
state_action_pair_encounters = state_action_pair_counter[state_as_number][actions[i]]
if state_action_pair_encounters == 0:
# The first encounter for this state-action pair, so just use the sample reward as mean reward
mean_rewards[state_as_number][actions[i]] = rewards[i]
state_action_pair_counter[state_as_number][actions[i]] = 1
else:
# Update the previously saved mean reward
total_reward = reward * state_action_pair_encounters + rewards[i]
state_action_pair_counter[state_as_number][actions[i]] += 1
mean_rewards[state_as_number][actions[i]] = total_reward / (state_action_pair_encounters + 1)
We visualized the distribution of the number of rejected proposals per conversation session.
# Plot the number of rejections per session
print(f"The total number of samples is {len(transformed_states)}")
print(f"In total {sum(number_of_rejected_proposals)} proposed goals were rejected")
print("The number of rejections per session was distributed as follows:")
plt.subplots(figsize=(10, 6))
x = [i for i in range(5)]
plt.bar(x, [number_of_rejected_proposals.count(0), number_of_rejected_proposals.count(1), number_of_rejected_proposals.count(2), number_of_rejected_proposals.count(3), number_of_rejected_proposals.count(4)], color='lightblue')
plt.xticks(x)
plt.subplots_adjust(bottom=0.2)
plt.xlabel("Number of rejected proposals")
plt.ylabel("Number of occurrences")
plt.show()
We visualized the amount of people that perform more or less than their recommended goal. Also the amount they go over or under is displayed.
# Keep track of number of people over and underachieving in each state
over_achieving = [0, 0, 0, 0, 0, 0, 0, 0]
exact_achieving = [0, 0, 0, 0, 0, 0, 0, 0]
under_achieving = [0, 0, 0, 0, 0, 0, 0, 0]
goals_achieved = 0
# Keep track of how much is done too much or too little
steps_over = [[] for i in range(8)]
steps_under = [[] for i in range(8)]
for i in range(len(transformed_states)):
if recommended_goals[i] - steps_taken[i] < 0:
# Count the overachieving per state
over_achieving[transformed_states[i][0] * state_sizes[1] * state_sizes[2] + transformed_states[i][1] * state_sizes[2] + transformed_states[i][2]] += 1
steps_over[transformed_states[i][0] * state_sizes[1] * state_sizes[2] + transformed_states[i][1] * state_sizes[2] + transformed_states[i][2]].append((steps_taken[i] - recommended_goals[i]) / recommended_goals[i])
elif recommended_goals[i] - steps_taken[i] > 0:
# Count the underachieving per state
under_achieving[transformed_states[i][0] * state_sizes[1] * state_sizes[2] + transformed_states[i][1] * state_sizes[2] + transformed_states[i][2]] += 1
steps_under[transformed_states[i][0] * state_sizes[1] * state_sizes[2] + transformed_states[i][1] * state_sizes[2] + transformed_states[i][2]].append((recommended_goals[i] - steps_taken[i]) / recommended_goals[i])
else:
# Count the exact achieving per state
exact_achieving[transformed_states[i][0] * state_sizes[1] * state_sizes[2] + transformed_states[i][1] * state_sizes[2] + transformed_states[i][2]] += 1
if steps_taken[i] >= actual_goals[i]:
goals_achieved += 1
print(f"The total number of samples with an achieved goal was {goals_achieved} which is {round(goals_achieved / 381 * 100)}%")
print(f"The average steps per day before the intervention of people who completed the full study was {round(np.mean(average_steps_before_intervention))} with standard deviation of {round(np.std(average_steps_before_intervention))}")
print(f"The average increase in steps per day during the study of people who completed the full study was {round(np.mean(average_steps_final_day)) - round(np.mean(average_steps_before_intervention))} which is a {round((round(np.mean(average_steps_final_day)) - round(np.mean(average_steps_before_intervention))) / round(np.mean(average_steps_before_intervention)) * 100)}% increase")
# Plot the over- and underachieving
print("The amount of over- and underachieving was as follows:")
plt.subplots(figsize=(10, 6))
x = [i for i in range(8)]
plt.plot(x, [100 * over_achieving[i] / (over_achieving[i] + under_achieving[i] + exact_achieving[i]) for i in range(8)], label='overachieving', color='lightblue')
plt.plot(x, [100 * under_achieving[i] / (over_achieving[i] + under_achieving[i] + exact_achieving[i]) for i in range(8)], label='underachieving', color='orange')
plt.xticks(x, ['[0,0,0]', '[0,0,1]', '[0,1,0]', '[0,1,1]', '[1,0,0]', '[1,0,1]', '[1,1,0]', '[1,1,1]'])
plt.subplots_adjust(bottom=0.2)
plt.legend(loc='upper left')
plt.xlabel("State [self-motivation, self-efficacy, context state]")
plt.ylabel("Percentage of total samples")
axes2 = plt.twinx()
axes2.plot(x, [100 * np.mean(steps_over[i]) for i in range(8)], label='steps taken too many', color='lightblue', linestyle='dashed')
axes2.plot(x, [100 * np.mean(steps_under[i]) for i in range(8)], label='steps taken too little', color='orange', linestyle='dashed')
axes2.set_ylabel("Percentage of recommended goal")
axes2.set_ylim(10, 60)
plt.legend(loc='center right')
plt.show()
We plotted the occurrences of each rounded reward to show the distribution of rewards.
We also did this per state to see the differences between states.
print("Reward distribution:")
# Keep track of the rounded rewards and how many times they occur in the data
rounded_rewards = []
reward_occurrences = []
cumulative_reward_occurrences = []
for i in range(len(rewards)):
rounded_rewards.append(round(rewards[i], 1))
# Count how many times each rounded reward (-1.0, -0.9, ..., 0.9, 1.0) occurs in the data
for i in range(21):
reward_to_count = round((0.1 * i) - 1, 1)
reward_occurrences.append(rounded_rewards.count(reward_to_count) / len(rewards) * 100)
if i == 0:
cumulative_reward_occurrences.append(rounded_rewards.count(reward_to_count) / len(rewards) * 100)
else:
cumulative_reward_occurrences.append(cumulative_reward_occurrences[i-1] + rounded_rewards.count(reward_to_count) / len(rewards) * 100)
# Plot the reward distribution
x = [i / 10 for i in range(-10, 11)]
plt.subplots(figsize=(10, 6))
plt.ylim(0, 120)
plt.plot(x, reward_occurrences, color='lightblue', label='individual')
plt.plot(x, cumulative_reward_occurrences, color='orange', label='cumulative')
plt.ylabel("Percentage of occurrences")
plt.xlabel("Reward")
plt.ylim(0, 100)
plt.xticks(x)
plt.subplots_adjust(bottom=0.2)
plt.legend()
plt.show()
print("Reward distribution per state:")
for state in possible_states:
# Keep track of the rounded rewards and how many times they occur in the data
rounded_rewards = []
reward_occurrences = []
for i in range(len(rewards)):
if str(transformed_states[i]) == state:
rounded_rewards.append(round(rewards[i], 1))
# Count how many times each rounded reward (-1.0, -0.9, ..., 0.9, 1.0) occurs in each state
for i in range(21):
reward_to_count = round((0.1 * i) - 1, 1)
reward_occurrences.append(rounded_rewards.count(reward_to_count))
# Plot the reward distribution
x = [i / 10 for i in range(-10, 11)]
plt.subplots(figsize=(5, 3))
plt.ylim(0, 30)
plt.plot(x, reward_occurrences, color='lightblue', label=str(state))
plt.ylabel("Number of occurrences")
plt.xlabel("Reward")
plt.subplots_adjust(bottom=0.2)
plt.legend(loc='upper left')
plt.show()
We counted how many samples we have per state-action pair.
print("Number of samples:")
sample_count = [[0 for i in range(len(possible_actions))] for j in range(len(possible_states))]
state_to_number_lookup = [state_sizes[1] * state_sizes[2], state_sizes[2], 1]
# Count how many times each state-action pair occurs
for i in range(len(transformed_states)):
state_as_number = state_to_number_lookup[0] * transformed_states[i][0] + state_to_number_lookup[1] * transformed_states[i][1] + transformed_states[i][2]
sample_count[state_as_number][actions[i]] += 1
# Count the total samples per action
column_total = [0 for i in range(len(possible_actions))]
print(f"state | decrease | slightly decrease | do nothing | slightly increase | increase | total")
for i in range(len(possible_states)):
print(f"s{possible_states[i]} | {sample_count[i][0]} | {sample_count[i][1]} | {sample_count[i][2]} | {sample_count[i][3]} | {sample_count[i][4]} | {sum(sample_count[i])}")
for j in range(len(possible_actions)):
column_total[j] += sample_count[i][j]
print(f"total | {column_total[0]} | {column_total[1]} | {column_total[2]} | {column_total[3]} | {column_total[4]} | {sum(column_total)}")
We calculated the q-values to determine the optimal and worst policy.
# Calculate the transition function
transitions = [[[] for i in range(len(possible_actions))] for j in range(len(possible_states))]
# Get the next state for each datapoint and sort them by state-action pair
for i in range(len(transformed_states)):
state_as_number = state_to_number_lookup[0] * transformed_states[i][0] + state_to_number_lookup[1] * transformed_states[i][1] + state_to_number_lookup[2] * transformed_states[i][2]
action_as_number = actions[i]
transitions[state_as_number][action_as_number].append(transformed_next_states[i])
# Calculate the probability of each next state happening for each state-action pair
for i in range(len(possible_states)):
for j in range(len(possible_actions)):
next_states_probs = [0 for l in range(len(possible_states))]
number_of_next_states = len(transitions[i][j])
for k in range(number_of_next_states):
next_state = transitions[i][j][k]
next_state_as_number = state_to_number_lookup[0] * next_state[0] + state_to_number_lookup[1] * next_state[1] + state_to_number_lookup[2] * next_state[2]
if next_states_probs[next_state_as_number] == 0:
# Count how many of the transitions of a state-action pair are towards a specific next state
number_of_occurences_of_next_state = transitions[i][j].count(next_state)
next_states_probs[next_state_as_number] = number_of_occurences_of_next_state / number_of_next_states
transitions[i][j] = next_states_probs
# Balance the transition probabilities for the state-action pairs with too little data
for i in range(len(possible_states)):
for j in range(len(possible_actions)):
if sample_count[i][j] < 10:
for k in range(len(possible_states)):
transitions[i][j][k] = (sample_count[i][j] / 10) * transitions[i][j][k] + ((10 - sample_count[i][j]) / 10) * (1 / len(possible_states))
# Balance the mean rewards if there was too little data in a state-action pair
for i in range(len(possible_states)):
for j in range(len(possible_actions)):
if sample_count[i][j] < 10:
mean_rewards[i][j] = (sample_count[i][j] / 10) * mean_rewards[i][j] + ((10 - sample_count[i][j]) / 10) * np.mean(rewards)
# Calculate the q-values
delta = 10
q_values = [[0 for i in range(len(possible_actions))] for j in range(len(possible_states))]
# Continue for as long as there is a significant change in the q-table after an iteration
while delta > epsilon:
delta = 0
next_qvalues = [[0 for i in range(len(possible_actions))] for j in range(len(possible_states))]
for i in range(len(possible_states)):
for j in range(len(possible_actions)):
future_reward = 0
# Calculate the future rewards for each next state
for k in range(len(possible_states)):
if transitions[i][j][k] != 0:
best_future_reward = 0
# Use the future action that gives the highest q-value
for l in range(len(possible_actions)):
best_future_reward = max(best_future_reward, q_values[k][l])
future_reward += transitions[i][j][k] * best_future_reward
# Calculate the new q-values and calculate the change in values
next_qvalues[i][j] = mean_rewards[i][j] + discount * future_reward
delta = max(delta, abs(next_qvalues[i][j] - q_values[i][j]))
# Update the q-values
q_values = next_qvalues.copy()
# Print the q-table
print(f"state | decrease | slightly decrease | do nothing | slightly increase | increase")
for i in range(len(possible_states)):
print(
f"s{possible_states[i]} & {round(q_values[i][0], 2)} & {round(q_values[i][1], 2)} & {round(q_values[i][2], 2)} & {round(q_values[i][3], 2)} & {round(q_values[i][4], 2)}")
print("\\hline")
# Print the optimal and worst policy
print(f"The optimal policy is {[possible_actions[q_values[i].index(max(q_values[i]))] for i in range(len(possible_states))]}")
print(f"The worst policy is {[possible_actions[q_values[i].index(min(q_values[i]))] for i in range(len(possible_states))]}")