import pybullet_data
import glob
import pybullet
import pybullet_utils.bullet_client as bc
import time
import numpy as np
from gym.utils import seeding
import gym
import os
import inspect
from myGym.envs.camera import Camera
import pkg_resources
currentdir = pkg_resources.resource_filename("myGym", "envs")
repodir = pkg_resources.resource_filename("myGym", "./")
[docs]class BaseEnv(gym.Env):
"""
The base class for environments without rendering
Parameters:
:param gui_on: (bool) Whether or not to use PyBullet built-in GUI
:param objects_dir_path: (str) Path to directory with URDF files for objects
:param max_steps: (int) The maximum number of actions per episode
:param show_bounding_boxes_gui: (bool) Whether or not to show bounding boxes in GUI
:param changing_light_gui: (bool) Whether or not to change light in GUI
:param shadows_on_gui: (bool) Whether or not to show shadows in GUI
"""
metadata = {'render.modes': [
'human', 'rgb_array'], 'video.frames_per_second': 50}
def __init__(self,
gui_on=True,
objects_dir_path=pkg_resources.resource_filename("myGym", "envs/"),
max_steps=1024,
show_bounding_boxes_gui=False,
changing_light_gui=False,
shadows_on_gui=True
):
self.gui_on = gui_on
self.max_steps = max_steps
self.show_bounding_boxes_gui = show_bounding_boxes_gui
self.changing_light_gui = changing_light_gui
self.shadows_on_gui = shadows_on_gui
# Set episode information
self.episode_start_time = None
self.episode_over = False
self.episode_failed = False
self.episode_reward = 0.0
self.episode_final_reward = []
self.episode_final_distance = []
self.episode_number = 0
self.episode_steps = 0
self.episode_max_time = 300
self.episode_info = ""
# Set general params
self.time_step = 1. / 240.
self.urdf_root = pybullet_data.getDataPath()
self.observation = {}
# Set objects information
self.objects_dir_path = objects_dir_path
self.env_objects = []
self.scene_objects_uids = {}
self.all_objects_filenames = self._get_all_urdf_filenames(self.objects_dir_path)
# Set GUI
self._connect_to_physics_server()
# Set env params and load models
self._set_physics()
self._setup_scene()
self._set_observation_space()
self._set_action_space()
def _connect_to_physics_server(self):
"""
Connect to the PyBullet physics server in SHARED_MEMORY, GUI or DIRECT mode
"""
if self.gui_on:
self.p = bc.BulletClient(connection_mode=pybullet.GUI)
# if (self.p < 0):
# self.p = bc.BulletClient(connection_mode=p.GUI)
self._set_gui_mode()
else:
self.p = bc.BulletClient(connection_mode=pybullet.DIRECT)
self.p.setPhysicsEngineParameter(enableFileCaching=0)
def _set_gui_mode(self):
"""
Set GUI parameters: camera, shadows, extra elements
"""
self.p.resetDebugVisualizerCamera(3.3, 0, -41, [0.0, 0.0, 0.33])
self.p.configureDebugVisualizer(self.p.COV_ENABLE_SHADOWS, self.shadows_on_gui)
self.p.configureDebugVisualizer(self.p.COV_ENABLE_GUI, 0)
def _set_physics(self):
"""
Set physics engine parameters
"""
self.p.setGravity(0, 0, -9.81)
self.p.setPhysicsEngineParameter(solverResidualThreshold=0.00001, numSolverIterations=150, numSubSteps=20, useSplitImpulse=1, collisionFilterMode=1, constraintSolverType=self.p.CONSTRAINT_SOLVER_LCP_DANTZIG, globalCFM=0.000001, contactBreakingThreshold=0.001)
#self.p.setPhysicsEngineParameter(solverResidualThreshold=0.00001, numSolverIterations=150, numSubSteps=20, useSplitImpulse=1, collisionFilterMode=1, constraintSolverType=self.p.CONSTRAINT_SOLVER_LCP_DANTZIG, globalCFM=0.000001)
self.p.setTimeStep(self.time_step)
self.p.setRealTimeSimulation(0)
self.p.setPhysicsEngineParameter(enableConeFriction=1)
print(self.p.getPhysicsEngineParameters())
def _setup_scene(self):
"""
Set up scene elements (furniture, objects, robots)
"""
raise NotImplementedError
def _set_observation_space(self):
"""
Set limits of observations
"""
raise NotImplementedError
def _set_action_space(self):
"""
Set limits of actions
"""
raise NotImplementedError
def _get_observation(self):
"""
Get info about the state of the environment
Returns:
:return observation: (object) Observation of the environment
"""
raise NotImplementedError
[docs] def step(self, action):
"""
Apply action on the environment
Parameters:
:param action: (object) An action provided by the agent
Returns:
:return observation: (object)
:return reward: (float)
:return done: (bool):
:return info: (dict):
"""
raise NotImplementedError
def _add_scene_object_uid(self, scene_object_uid, name):
"""
Call this method in order to enable texturization of object
Parameters:
:param scene_object: (int)
"""
self.scene_objects_uids[scene_object_uid] = name
def get_scene_object_uid_by_name(self, name):
for uid, object_name in self.scene_objects_uids.items():
if name == object_name:
return uid
return None
[docs] def seed(self, seed=None):
"""
Set the seed for this env's random number generator(s)
"""
self.np_random, seed = seeding.np_random(seed)
return [seed]
[docs] def hard_reset(self):
"""
Full reset of the simulation. Delete and load again all objects and reset physics.
"""
self.p.resetSimulation()
self.p.disconnect()
self._connect_to_physics_server()
self.scene_objects_uids = {}
#self.episode_number = 0
self._set_physics()
self._setup_scene()
def _restart_episode(self):
"""
Reset episode information and delete all objects
"""
self.p.removeAllUserDebugItems()
self.episode_start_time = time.time()
self.episode_over = False
self.episode_failed = False
self.episode_reward = 0.0
self.episode_steps = 0
[docs] def reset(self, hard=False):
"""
Reset the state of the environment
"""
if hard:
self.hard_reset()
else:
self._remove_all_objects()
self._restart_episode()
def _draw_bounding_boxes(self):
"""
Show bounding boxes in tne PyBullet GUI
"""
for object in self.env_objects:
object.draw_bounding_box()
def _compute_reward(self):
"""
Compute reward for the agent
"""
return NotImplementedError
def _print_episode_summary(self, info_dict={}):
"""
Show an extra information about the episode
Parameters:
:param info_dict: (dict) Extra info
"""
if self.episode_failed:
episode_status = "FAILURE"
else:
episode_status = "SUCCESS"
print("#---------Episode-Summary---------#")
print("Episode number: " + str(self.episode_number))
print("Episode's number of steps: " + str(self.episode_steps))
print("Episode status: " + episode_status)
print("Episode info: " + self.episode_info)
print("Episode reward: " + str(self.episode_reward))
print("Last step reward: " + str(self.reward.rewards_history[-1]))
print("#---------------------------------#")
for key, value in info_dict.items():
print(key + ": " + str(value))
def _get_random_urdf_filenames(self, n, used_objects=None):
"""
Sample random URDF files from directory with objects URDFs
Parameters:
:param n: (int) Number of URDF's
:param used_objects: (list) Specified subset of objects
Returns:
:return selected_objects_filenames: (list)
"""
if used_objects or (self.all_objects_filenames is None):
all_objects_filenames = []
for object_name in used_objects:
if "virtual" in object_name:
all_objects_filenames.append(object_name)
for file in self.all_objects_filenames:
if '/'+object_name+'.' in file:
all_objects_filenames.append(file)
else:
# uses self.all_objects_filenames
pass
assert all_objects_filenames is not None
selected_objects_filenames = []
total_num_objects = len(all_objects_filenames)
if (n <= total_num_objects):
selected_objects = np.random.choice(
np.arange(total_num_objects), n, replace=True)
else:
selected_objects = list(np.arange(total_num_objects))
remain = n - total_num_objects
selected_objects += list(np.random.choice(
np.arange(total_num_objects), remain))
for object_id in selected_objects:
selected_objects_filenames.append(all_objects_filenames[object_id])
return selected_objects_filenames
def _get_all_urdf_filenames(self, dir):
"""
Get all URDF filenames from directory
Parameters:
:param dir: (int) Number of URDFs
Returns:
:return filenames: (list)
"""
list_all = []
for (dirpath, dirnames, filenames) in os.walk(self.objects_dir_path):
if '_old' not in dirpath and 'urdf' in dirpath:
list_all += [os.path.join(dirpath, file) for file in filenames]
return list_all
def _remove_object(self, object):
"""
Totally remove object from the simulation
Parameters:
:param object: (EnvObject) Object to remove
"""
self.env_objects.remove(object)
self.p.removeBody(object.uid)
def _remove_all_objects(self):
"""
Remove all objects from simulation (not scene objects or robots)
"""
env_objects_copy = self.env_objects[:]
for env_object in env_objects_copy:
self._remove_object(env_object)
[docs] def get_texturizable_objects_uids(self):
"""
Get all objects in the environment, on which textures can be applied
Returns:
:return texturizable_objects_uids: (list)
"""
return [object.get_uid() for object in self.env_objects] + list(self.scene_objects_uids.keys())
[docs] def get_colorizable_objects_uids(self):
"""
Get all objects in the environment, which color can be changed
Returns:
:return colorizable_objects_uids: (list)
"""
return [object.get_uid() for object in self.env_objects] + list(self.scene_objects_uids.keys())
def __del__(self):
"""
Disconnect from the physics server
"""
self.p.disconnect()
[docs]class CameraEnv(BaseEnv):
"""
The class for environments with rendering
Parameters:
:param camera_resolution: (list) The number of pixels in image (WxH)
:param shadows_on: (bool) Whether or not to use shadows while rendering, only applies to ER_TINY_RENDERER
:param render_on: (bool) Turn on rendering
:param renderer: (int) self.p.ER_TINY_RENDERER (CPU) or self.p.ER_BULLET_HARDWARE_OPENGL (GPU)
:param active_cameras: (list) Set 1 at a position(=camera number) to save images from this camera
"""
def __init__(self, camera_resolution=[640, 480], shadows_on=True,
render_on=True, renderer=pybullet.ER_BULLET_HARDWARE_OPENGL,
active_cameras=None, **kwargs):
super(CameraEnv, self).__init__(**kwargs)
self.camera_resolution = camera_resolution
self.shadows_on = shadows_on
self.render_on = render_on
self.renderer = renderer
self.active_cameras = active_cameras
self.cameras = []
self.set_light()
self._set_cameras()
[docs] def set_light(self, light_direction=[1, 1, 1], light_color=[0.1, 0.1, 0.1],
light_distance=1., light_ambient=1., light_diffuse=1.,
light_specular=1.):
"""
Set light parameters for rendering, doesn't affect PyBullet GUI. Appart from light_direction, all parameters only apply to ER_TINY_RENDERER.
Parameters:
:param light_direction: (list) Specifies the world position of the light source
:param light_color: (list) Directional light color in RGB in range 0..1
:param light_distance: (float) Distance of the light along the normalized light_direction
:param light_ambient: (float) Light ambient coefficient in range 0..1
:param light_diffuse: (float) Light diffuse coefficient in range 0..1
:param light_specular: (float) Light specular coefficient in range 0..1
"""
self.light_direction = light_direction
self.light_color = light_color
self.light_distance = light_distance
self.light_ambient = light_ambient
self.light_diffuse = light_diffuse
self.light_specular = light_specular
[docs] def get_render_parameters(self):
"""
Return environment parameters for rendering, initially is intended to
use by cameras
Returns:
:return render_parameters: (dict) Render parameters
"""
return {
"width": self.camera_resolution[0],
"height": self.camera_resolution[1],
"lightDirection": self.light_direction,
"lightColor": self.light_color,
"lightDistance": self.light_distance,
"shadow": 1 if self.shadows_on else 0,
"lightAmbientCoeff": self.light_ambient,
"lightDiffuseCoeff": self.light_diffuse,
"lightSpecularCoeff": self.light_specular,
"renderer": self.renderer
}
def _set_cameras(self):
"""
Set cameras available to use for rendering
"""
raise NotImplementedError
def get_cameras(self):
return self.cameras
[docs] def add_camera(self, **kwargs):
"""
Add new camera to the environment
Parameters:
:param position: (list) Eye position in Cartesian world coordinates
:prarm target_position: (list) Position of the target point
:param up_vector: (list) Up vector of the camera
:param up_axis_index: (int) Either 1 for Y or 2 for Z axis up
:param yaw: (float) Yaw angle in degrees left/right around up-axis
:param pitch: (float) Pitch in degrees up/down
:param roll: (float) Roll in degrees around forward vector
:param distance: (float) Distance from eye to focus point
:param field_of_view: (float) Field of view
:param near_plane_distance: (float) Near plane distance
:param far_plane_distance: (float) Far plane distance
"""
self.cameras.append(Camera(env=self, **kwargs))
def set_active_cameras(self, active_cameras):
if (len(active_cameras) == len(self.cameras)):
self.active_cameras = active_cameras
def change_current_camera(self, camera_num):
print("Change camera to " + str(self.current_camera))
self.current_camera = camera_num
[docs] def render(self, mode="rgb_array", camera_id=None):
"""
Get image (image, depth, segmentation_mask) from camera or active cameras
Parameters:
:param mode: (str) rgb_array to return RGB image
:param camera_id: (int) Get image from specified camera
Returns:
:return camera_data: (dict) Key: camera_id, Value: info from camera
"""
if mode != "rgb_array":
return np.array([])
camera_data = {}
if self.render_on:
if camera_id is not None:
camera_data[camera_id] = self.cameras[camera_id].render()
else:
for camera_num in range(len(self.active_cameras)):
if self.active_cameras[camera_num]:
camera_data[camera_num] = self.cameras[camera_num].render()
return camera_data
[docs] def project_point_to_camera_image(self, point, camera_id):
"""
Project 3D point in Cartesian world coordinates to 2D point in pixel space
Parameters:
:param point: (list) 3D point in Cartesian world coordinates
:param camera_id: (int) Index of camera to project on
Returns:
:return 2d_point: (list) 2D coordinates of point on imageg
"""
return self.cameras[camera_id].project_point_to_image(point)
[docs] def get_camera_opencv_matrix_values(self, camera_id):
"""
Compute values of OpenCV matrix
Parameters:
:param camera_id: (int) Index of camera to get matrix from
Returns:
:return values: (dict) fx, fy, cx, cy values
"""
return self.cameras[camera_id].get_opencv_camera_matrix_values()