import numpy as np
import matplotlib.pyplot as plt
from stable_baselines import results_plotter
import os
[docs]class Reward:
"""
Reward base class for reward signal calculation and visualization
Parameters:
:param env: (object) Environment, where the training takes place
:param task: (object) Task that is being trained, instance of a class TaskModule
"""
def __init__(self, env, task=None):
self.env = env
self.task = task
self.rewards_history = []
def compute(self, observation=None):
raise NotImplementedError
def reset(self):
raise NotImplementedError
[docs] def visualize_reward_over_steps(self):
"""
Plot and save a graph of reward values assigned to individual steps during an episode. Call this method after the end of the episode.
"""
save_dir = os.path.join(self.env.logdir, "rewards")
os.makedirs(save_dir, exist_ok=True)
if self.env.episode_steps > 0:
results_plotter.EPISODES_WINDOW=50
results_plotter.plot_curves([(np.arange(self.env.episode_steps),np.asarray(self.rewards_history[-self.env.episode_steps:]))],'step','Step rewards')
plt.ylabel("reward")
plt.gcf().set_size_inches(8, 6)
plt.savefig(save_dir + "/reward_over_steps_episode{}.png".format(self.env.episode_number))
plt.close()
[docs] def visualize_reward_over_episodes(self):
"""
Plot and save a graph of cumulative reward values assigned to individual episodes. Call this method to plot data from the current and all previous episodes.
"""
save_dir = os.path.join(self.env.logdir, "rewards")
os.makedirs(save_dir, exist_ok=True)
if self.env.episode_number > 0:
results_plotter.EPISODES_WINDOW=10
results_plotter.plot_curves([(np.arange(self.env.episode_number),np.asarray(self.env.episode_final_reward[-self.env.episode_number:]))],'episode','Episode rewards')
plt.ylabel("reward")
plt.gcf().set_size_inches(8, 6)
plt.savefig(save_dir + "/reward_over_episodes_episode{}.png".format(self.env.episode_number))
plt.close()
[docs]class DistanceReward(Reward):
"""
Reward class for reward signal calculation based on distance differences between 2 objects
Parameters:
:param env: (object) Environment, where the training takes place
:param task: (object) Task that is being trained, instance of a class TaskModule
"""
def __init__(self, env, task):
super(DistanceReward, self).__init__(env, task)
self.prev_obj1_position = None
self.prev_obj2_position = None
[docs] def compute(self, observation):
"""
Compute reward signal based on distance between 2 objects. The position of the objects must be present in observation.
Params:
:param observation: (list) Observation of the environment
Returns:
:return reward: (float) Reward signal for the environment
"""
observation = observation["observation"] if isinstance(observation, dict) else observation
o1 = observation[0:3] if self.env.reward_type != "2dvu" else observation[0:int(len(observation[:-3])/2)]
o2 = observation[3:6] if self.env.reward_type != "2dvu" else observation[int(len(observation[:-3])/2):-3]
reward = self.calc_dist_diff(o1, o2)
self.task.check_distance_threshold(observation=observation)
self.rewards_history.append(reward)
return reward
[docs] def reset(self):
"""
Reset stored value of distance between 2 objects. Call this after the end of an episode.
"""
self.prev_obj1_position = None
self.prev_obj2_position = None
[docs] def calc_dist_diff(self, obj1_position, obj2_position):
"""
Calculate change in the distance between 2 objects in previous and in current step. Normalize the change by the value of distance in previous step.
Params:
:param obj1_position: (list) Position of the first object
:param obj2_position: (list) Position of the second object
Returns:
:return norm_diff: (float) Normalized difference of distances between 2 objects in previsous and in current step
"""
if self.prev_obj1_position is None and self.prev_obj2_position is None:
self.prev_obj1_position = obj1_position
self.prev_obj2_position = obj2_position
self.prev_diff = self.task.calc_distance(self.prev_obj1_position, self.prev_obj2_position)
current_diff = self.task.calc_distance(obj1_position, obj2_position)
norm_diff = (self.prev_diff - current_diff) / self.prev_diff
self.prev_obj1_position = obj1_position
self.prev_obj2_position = obj2_position
return norm_diff
[docs]class ComplexDistanceReward(DistanceReward):
"""
Reward class for reward signal calculation based on distance differences between 3 objects, e.g. 2 objects and gripper for complex tasks
Parameters:
:param env: (object) Environment, where the training takes place
:param task: (object) Task that is being trained, instance of a class TaskModule
"""
def __init__(self, env, task):
super(ComplexDistanceReward,self).__init__(env, task)
self.prev_obj3_position = None
[docs] def compute(self, observation):
"""
Compute reward signal based on distances between 3 objects. The position of the objects must be present in observation.
Params:
:param observation: (list) Observation of the environment
Returns:
:return reward: (float) Reward signal for the environment
"""
reward = self.calc_dist_diff(observation[0:3], observation[3:6], observation[6:9])
self.task.check_distance_threshold(observation=observation)
self.rewards_history.append(reward)
return reward
[docs] def reset(self):
"""
Reset stored value of distance between 2 objects. Call this after the end of an episode.
"""
super().reset()
self.prev_obj3_position = None
[docs] def calc_dist_diff(self, obj1_position, obj2_position, obj3_position):
"""
Calculate change in the distances between 3 objects in previous and in current step. Normalize the change by the value of distance in previous step.
Params:
:param obj1_position: (list) Position of the first object
:param obj2_position: (list) Position of the second object
:param obj3_position: (list) Position of the third object
Returns:
:return norm_diff: (float) Sum of normalized differences of distances between 3 objects in previsous and in current step
"""
if self.prev_obj1_position is None and self.prev_obj2_position is None and self.prev_obj3_position is None:
self.prev_obj1_position = obj1_position
self.prev_obj2_position = obj2_position
self.prev_obj3_position = obj3_position
prev_diff_12 = self.task.calc_distance(self.prev_obj1_position, self.prev_obj2_position)
current_diff_12 = self.task.calc_distance(obj1_position, obj2_position)
prev_diff_13 = self.task.calc_distance(self.prev_obj1_position, self.prev_obj3_position)
current_diff_13 = self.task.calc_distance(obj1_position, obj3_position)
prev_diff_23 = self.task.calc_distance(self.prev_obj2_position, self.prev_obj3_position)
current_diff_23 = self.task.calc_distance(obj2_position, obj3_position)
norm_diff = (prev_diff_13 - current_diff_13) / prev_diff_13 + (prev_diff_23 - current_diff_23) / prev_diff_23 + (prev_diff_12 - current_diff_12) / prev_diff_12
self.prev_obj1_position = obj1_position
self.prev_obj2_position = obj2_position
self.prev_obj3_position = obj3_position
return norm_diff
[docs]class PnPDistanceReward(DistanceReward):
"""
Reward class for reward signal calculation based on distance differences between 4 objects, e.g. 1 object and gripper, finger1, finger2 for Panda
Parameters:
:param env: (object) Environment, where the training takes place
:param task: (object) Task that is being trained, instance of a class TaskModule
"""
def __init__(self, env, task):
super(PnPDistanceReward,self).__init__(env, task)
self.prev_obj3_position = None
self.prev_obj4_position = None
[docs] def compute(self, observation):
"""
Compute reward signal based on distances between 4 objects. The position of the objects must be present in observation.
Params:
:param observation: (list) Observation of the environment
Returns:
:return reward: (float) Reward signal for the environment
"""
reward = self.calc_dist_diff(observation[0:3], observation[3:6], observation[6:9], observation[9:12])
reward += self.calc_orn_diff(observation[12:16])
self.task.check_distance_threshold(observation=observation)
self.rewards_history.append(reward)
return reward
[docs] def reset(self):
"""
Reset stored value of distance between objects. Call this after the end of an episode.
"""
super().reset()
self.prev_obj3_position = None
self.prev_obj4_position = None
self.prev_gripper_orn = None
[docs] def calc_dist_diff(self, obj1_position, obj2_position, obj3_position, obj4_position):
"""
Calculate change in the distances between 4 objects in previous and in current step. Normalize the change by the value of distance in previous step.
Params:
:param obj1_position: (list) Position of the first object
:param obj2_position: (list) Position of the second object
:param obj3_position: (list) Position of the third object
:param obj4_position: (list) Position of the fourth object
Returns:
:return norm_diff: (float) Sum of normalized differences of distances between 4 objects in previsous and in current step
"""
if self.prev_obj1_position is None and self.prev_obj2_position is None and self.prev_obj3_position is None and self.prev_obj4_position is None:
self.prev_obj1_position = obj1_position
self.prev_obj2_position = obj2_position
self.prev_obj3_position = obj3_position
self.prev_obj4_position = obj4_position
prev_diff_12 = self.task.calc_distance(self.prev_obj1_position, self.prev_obj2_position)
current_diff_12 = self.task.calc_distance(obj1_position, obj2_position)
prev_diff_13 = self.task.calc_distance(self.prev_obj1_position, self.prev_obj3_position)
current_diff_13 = self.task.calc_distance(obj1_position, obj3_position)
prev_diff_14 = self.task.calc_distance(self.prev_obj1_position, self.prev_obj4_position)
current_diff_14 = self.task.calc_distance(obj1_position, obj4_position)
#norm_diff = (prev_diff_12 - current_diff_12) / prev_diff_12 + (prev_diff_13 - current_diff_13) / prev_diff_13 + (prev_diff_14 - current_diff_14) / prev_diff_14
norm_diff = (prev_diff_12 - current_diff_12) / prev_diff_12 + (prev_diff_13 - current_diff_13) / prev_diff_13# + (prev_diff_14 - current_diff_14) / prev_diff_14
self.prev_obj1_position = obj1_position
self.prev_obj2_position = obj2_position
self.prev_obj3_position = obj3_position
self.prev_obj4_position = obj4_position
return norm_diff
[docs] def calc_orn_diff(self, gripper_orn):
"""
Calculate change in the distances between 4 objects in previous and in current step. Normalize the change by the value of distance in previous step.
Params:
:param gripper_orn: (list) Orientation of gripper
Returns:
:return norm_diff: (float) Normalized differences of distances between desired orientation and orientation of gripper
"""
if self.prev_gripper_orn is None:
self.prev_gripper_orn = gripper_orn
prev_diff_orn = self.task.calc_rotation_diff(self.prev_gripper_orn, None)
current_diff_orn = self.task.calc_rotation_diff(gripper_orn, None)
norm_diff = (prev_diff_orn - current_diff_orn) / prev_diff_orn
self.prev_gripper_orn = gripper_orn
return norm_diff
[docs]class SparseReward(Reward):
"""
Reward class for sparse reward signal
Parameters:
:param env: (object) Environment, where the training takes place
:param task: (object) Task that is being trained, instance of a class TaskModule
"""
def __init__(self, env, task):
super(SparseReward, self).__init__(env, task)
def reset(self):
pass
[docs] def compute(self, observation=None):
"""
Compute sparse reward signal. Reward is 0 when goal is reached, -1 in every other step.
Params:
:param observation: Ignored
Returns:
:return reward: (float) Reward signal for the environment
"""
reward = -1
if self.task.check_distance_threshold(observation):
reward += 1.0
self.rewards_history.append(reward)
return reward