Source code for myGym.generate_dataset

## script to generate train/test sets from the simulator in COCO or DOPE format. Used for vision training.

import gym
from myGym import envs
from matplotlib.pyplot import imshow, show
import cv2
import numpy as np
import os
import glob
import json
import commentjson
import sys
import random
from pycocotools.cocostuffhelper import segmentationToCocoMask, segmentationToCocoResult
from pycocotools import mask
import pybullet as p
from bbox import BBox3D
from myGym.envs.wrappers import RandomizedEnvWrapper
import pkg_resources

# config, specify here or pass as an input argument
CONFIG_DEFAULT = pkg_resources.resource_filename("myGym", 'configs/dataset_coco.json')

# helper functions:
[docs]def color_names_to_rgb(): """ Assign RGB colors to objects by name as specified in the training config file """ with open(pkg_resources.resource_filename("myGym", 'configs/rgbcolors.json'), "r") as read_file: clr = json.load(read_file) #json file with suggested colors new_dict = {} for key, value in config['object_colors'].items(): new_value = [] for item in value: new_value.append(clr[item]) new_dict[key] = new_value config['color_dict'] = new_dict
def _category_coco_format(): #COCO """ Create list of dictionaries with category id-name pairs in MSCOCO format Returns: :return categories: (list) Categories in MSCOCO format """ categories = [] for value, key in config['used_class_names'].items(): categories.append({"id": int(key), "name": str(value)}) return categories def _segmentationToPoly(mask, ): """ Convert segmentation from RLE to polynoms ([[x1 y1 x2 x2 y2 ...]]). Code from https://github.com/facebookresearch/Detectron/issues/100#issuecomment-362882830. Parameters: :param mask: (array) Bitmap mask :return segmentationPoly: (list) Segmentation converted to polynoms """ contours, _ = cv2.findContours((mask).astype(np.uint8), cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE) segmentationPoly = [] for contour in contours: contour = contour.flatten().tolist() if len(contour) > 4: segmentationPoly.append(contour) return segmentationPoly
[docs]def create_coco_json(): #COCO """ Create COCO json data structure Returns: :return data_train: (dict) Data structure for training data :return data_test: (dist) Data structure for testing data """ data_train = dict( images=[# file_name, height, width, id ], type='instances', annotations=[# segmentation, area, iscrowd, image_id, bbox, category_id, id ], categories = _category_coco_format(), ) data_test = dict( images=[# file_name, height, width, id ], type='instances', annotations=[# segmentation, area, iscrowd, image_id, bbox, category_id, id ], categories = _category_coco_format(), ) return data_train, data_test
def create_3D_box(env_object,objdim): #DOPE objpos = env_object.get_position() objorient = env_object.get_orientation() #objdim = env_object.get_cuboid_dimensions() box= BBox3D(objpos[0],objpos[1],objpos[2],objdim[0],objdim[1],objdim[2],objorient[3],objorient[0],objorient[1],objorient[2]) return box.p,box.center
[docs]class GeneratorCoco: #COCO """ Generator class for COCO image dataset for YOLACT vision model training """ def __init__(self): pass
[docs] def get_env(self): #COCO """ Create environment for COCO dataset generation according to dataset config file Returns: :return env: (object) Environment for dataset generation """ env = RandomizedEnvWrapper(env=gym.make(config['env_name'], robot = config['robot'], render_on = True, gui_on = config['gui_on'], show_bounding_boxes_gui = config['show_bounding_boxes_gui'], changing_light_gui = config['changing_light_gui'], shadows_on = config['shadows_on'], color_dict = config['color_dict'], object_sampling_area = config['object_sampling_area'], num_objects_range = config['num_objects_range'], used_objects = used_objects, active_cameras = config['active_cameras'], camera_resolution = config['camera_resolution'], renderer=p.ER_BULLET_HARDWARE_OPENGL, dataset = True, ), config_path = config['output_folder']+'/config_dataset.json') p.setGravity(0, 0, -9.81) return env
[docs] def episode_zero(self): """ Initial espisode set-up """ self.id_unique = 0 #image_id*x for paralel dataset generation, otherwise 0
[docs] def init_data(self): #COCO """ Initialize data structures for COCO dataset annotations Returns: :return data_train: (dict) Data structure for training data :return data_test: (dist) Data structure for testing data """ data_train, data_test = create_coco_json() return data_train, data_test
[docs] def resume(self): #COCO """ Resume COCO dataset generation Returns: :return data_train: (dict) Training data from preceding dataset generation in COCO data structure :return data_test: (dist) Testing data from preceding dataset generation in COCO data structure :return image_id: (int) ID of last generated image in preceding dataset generation """ try: with open(os.path.join(config['output_test_folder'],'annotations.json'), 'r') as f: data_test = json.load(f) except: pass #happens when test JSON is empty, which can happen for small numbers try: with open(os.path.join(config['output_train_folder'],'annotations.json'), 'r') as f: data_train = json.load(f) # get the ID of last image img_ids = [img["id"] for img in data_train['images']] image_id = max(img_ids) +1 # +1 if last sample were test (thus not in here). it's safe to have holes in the ids, just need to monothically increase self.id_unique = len(data_test['annotations']) + len(data_train['annotations']) print("Resuming from image_id {} for episodes: {}".format(image_id, config['num_episodes'])) except FileNotFoundError: image_id = 0 return data_test, data_train, image_id
[docs] def data_struct_image(self): #COCO """ Assign name to COCO dataset image and train of test status Returns: :param data: (dict) Corresponding data dictionary :param name: (string) Name of image file for saving """ data = data_test if isTestSample == True else data_train name = 'img_{}_cam{}.jpg'.format(image_id,camera_id) return data, name
[docs] def store_image_info(self): #COCO """ Append COCO dataset image info to corresponding data dict """ data['images'].append(dict( file_name=name, height=im.shape[0], width=im.shape[1], id=image_id,))
[docs] def get_append_annotations(self): #COCO """ Make and append COCO annotations for each object in the scene """ seg = segmentationToCocoMask(img_mask,object_uid) area = float(mask.area(seg)) bbox = mask.toBbox(seg).flatten().tolist() #1 run length encoding RLE segmentation format seg['counts'] = str(seg['counts'], "utf-8") #utf-8 format in str #2 or poly segmentation format bitmap = mask.decode(seg) seg = _segmentationToPoly(bitmap) self.too_small_obj = False try: #notify and skip the object with too small visible representation assert(area > config['min_obj_area']) assert(len(seg)>0 and len(seg[0])>0) except: #make inverse map id->name (just to pretty print) inv_map = dict(zip(config['used_class_names'].values(), config['used_class_names'].keys())) self.too_small_obj = inv_map[class_id] self.data_dict = dict( id=self.id_unique, image_id=image_id, category_id=class_id, segmentation=seg, area=area, bbox=bbox, iscrowd=0, ) if self.too_small_obj == False: #area ok data['annotations'].append(self.data_dict) #append annotations self.id_unique +=1 else: #area too small to be realistically seen print('Too small object of class {} with area={} in img {}'.format(self.too_small_obj, self.data_dict['area'], name))
[docs] def visualize(self): #COCO """ Visualize mask and bounding box coordinates for COCO annotated object """ mask = img_mask==object_uid mask = np.expand_dims(mask, axis=2) mask = 255*mask.astype('uint8') cv2.imshow('image',im) cv2.waitKey(1) print(class_name) if self.too_small_obj: cv2.imshow('Too small object', mask) else: cv2.imshow('Labeled object', mask) cv2.waitKey(1000) print(self.data_dict['bbox'])
[docs] def write_json_end(self): #COCO """ Write json file with COCO annotations to output directory """ if config['make_dataset'] in ["new", "resume"]: print("Storing annotations.json at episode {} of {}.".format(episode, config['num_episodes'])) for flag in ['test','train']: if flag == 'train': folder = config['output_train_folder'] data = data_train else: folder = config['output_test_folder'] data = data_test json_name = 'img_{}_cam{}.json'.format(image_id, camera_id) json_dict = {"images": data["images"], "type":'instances',"annotations": data["annotations"], "categories":_category_coco_format()} if len(data["images"]) > 0: with open(os.path.join(folder,json_name), 'w') as f: json.dump(json_dict, f, indent=4) # clear data and continue data["images"].clear() data["annotations"].clear()
class GeneratorDope: #DOPE def __init__(self): self.object_settings = {"exported_object_classes": [], "exported_objects": []} def get_env(self): #DOPE env = RandomizedEnvWrapper(env=gym.make(config['env_name'], robot = config['robot'], render_on = True, gui_on = config['gui_on'], show_bounding_boxes_gui = config['show_bounding_boxes_gui'], changing_light_gui = config['changing_light_gui'], shadows_on = config['shadows_on'], color_dict = config['color_dict'], object_sampling_area = config['object_sampling_area'], num_objects_range = config['num_objects_range'], used_objects = used_objects, active_cameras = config['active_cameras'], camera_resolution = config['camera_resolution'], dataset = True, ), config_path = config['output_folder']+'/config_dataset.json') p.setGravity(0, 0, -9.81) return env def episode_zero(self): self.objdim = {} while len(self.objdim.keys()) < len(config['used_class_names'].keys()): env.reset(random_robot=config['random_arm_movement'], random_pos=False) observation = env.get_observation() env_objects = observation["objects"] for obj in env_objects: if obj.name not in self.objdim.keys(): self.objdim[obj.name] = obj.get_cuboid_dimensions() def init_data(self): #DOPE data_train = {"objects":[]} data_test = {"objects":[]} return data_train, data_test def resume(self): #DOPE try: files_test = [int(x.replace('.jpg','')) for x in os.listdir(config['output_test_folder']) if x.endswith(".jpg")] files_train = [int(x.replace('.jpg','')) for x in os.listdir(config['output_train_folder']) if x.endswith(".jpg")] image_id = max(max(files_test),max(files_train)) print("Resuming from image_id {} for episodes: {}".format(image_id, config['num_episodes'])) self.episode_zero() except FileNotFoundError: image_id = 0 return self.init_data()[0],self.init_data()[1],image_id def data_struct_image(self): #DOPE data_train, data_test = self.init_data() data = data_test if isTestSample == True else data_train name = '{}.jpg'.format(image_id) return data, name def store_image_info(self): #DOPE #write dataset image info filename = str(image_id) + '.json' if config['make_dataset'] in ["new", "resume"]: print("Storing {} and {} at episode {} of {}.".format(filename, name, episode, config['num_episodes'])) with open(os.path.join(path, filename), 'w') as f: json.dump(data, f, indent=4) def get_append_annotations(self): #DOPE cuboid_with_centroid = env_object.get_bounding_box() cuboid_centroid=cuboid_with_centroid[8] cuboid=cuboid_with_centroid[:8] seg = segmentationToCocoMask(img_mask, object_uid) seg['counts'] = str(seg['counts'], "utf-8") #utf-8 format in str bbox = mask.toBbox(seg).flatten().tolist() bounding_box = {'top_left': bbox[:2], 'bottom_right': [bbox[0]+bbox[2], bbox[1]+bbox[3]]} boxp,boxc = create_3D_box(env_object,self.objdim[class_name]) box3D = [] for x in range(boxp.shape[0]): box3D.append(tuple(boxp[x])) boxc = list(boxc) projected_cuboid_centroid = list(env.project_point_to_camera_image(cuboid_centroid, camera_id)) projected_cuboid = [list(env.project_point_to_camera_image(point, camera_id)) for point in cuboid] projected_3DBB_centroid = list(env.project_point_to_camera_image(boxc, camera_id)) projected_3DBB = [list(env.project_point_to_camera_image(point, camera_id)) for point in box3D] if class_name not in self.object_settings["exported_object_classes"]: self.object_settings["exported_object_classes"].append(class_name) self.object_settings['exported_objects'].append({ "class": class_name, "segmentation_class_id": class_id, "cuboid_dimensions": self.objdim[class_name] }) self.data_dict = { "class": class_name, "class_id": class_id, "location":env_object.get_position(), "quaternion_xyzw": env_object.get_orientation(), "cuboid_centroid": cuboid_centroid, "projected_cuboid_centroid": projected_cuboid_centroid, "bounding_box": bounding_box, "cuboid": cuboid, "projected_cuboid": projected_cuboid, "box3D": box3D, "projected_3DBB": projected_3DBB, "projected_3DBB_centroid": projected_3DBB_centroid, } data["objects"].append(self.data_dict) def visualize(self): #DOPE image = im for projected_cuboid_point in data["objects"][-1]["projected_cuboid"]: image = cv2.circle(cv2.UMat(image), tuple(map(int, projected_cuboid_point)), 4, [0,255,0], -1) for projected_cuboid_point in data["objects"][-1]["projected_3DBB"]: image = cv2.circle(cv2.UMat(image), tuple(map(int, projected_cuboid_point)), 4, [255,0,0], -1) image = cv2.circle(cv2.UMat(image), tuple(map(int, [data["objects"][-1]["projected_cuboid_centroid"][0],data["objects"][-1]["projected_cuboid_centroid"][1]])), 4, [0,255,0], -1) image = cv2.circle(cv2.UMat(image), tuple(map(int, [data["objects"][-1]["projected_3DBB_centroid"][0],data["objects"][-1]["projected_3DBB_centroid"][1]])), 4, [255,0,0], -1) image = cv2.circle(cv2.UMat(image), tuple(map(int, [data["objects"][-1]["bounding_box"]["top_left"][0],data["objects"][-1]["bounding_box"]["top_left"][1]])), 4, [255,255,0], -1) image = cv2.circle(cv2.UMat(image), tuple(map(int, [data["objects"][-1]["bounding_box"]["bottom_right"][0],data["objects"][-1]["bounding_box"]["bottom_right"][1]])), 4, [255,255,0], -1) print(class_name) cv2.imshow('image',image) cv2.waitKey(1000) self.draw_bounding_box_3D() def draw_bounding_box_3D(self): #DOPE for points in range(7): #p.addUserDebugLine([0,0,0], [1,2,3], lineColorRGB=(0.31, 0.78, 0.47), lineWidth = 10) #points2=(points[0]+0.001,points[1]+0.001,points[2]+0.001) p.addUserDebugLine(data["objects"][-1]["box3D"][points],data["objects"][-1]["box3D"][points+1], lineColorRGB=(0.0, 0.0, 0.99), lineWidth = 4, lifeTime = 1) def write_json_end(self): #DOPE self.camera = {"camera_settings": []} for c in range(np.count_nonzero(config['active_cameras'])): #loop through active cameras camera_id=np.nonzero(config['active_cameras'])[0][c] intrinsic_settings = env.get_camera_opencv_matrix_values(camera_id) captured_image_size = {"width": config['camera_resolution'][0], "height": config['camera_resolution'][1]} self.camera["camera_settings"].append(dict( name="camera" + str(camera_id), intrinsic_settings=intrinsic_settings, captured_image_size=captured_image_size, )) if config['make_dataset'] in ["new", "resume"]: filename = "_camera_settings" + '.json' print("Storing {}.".format(filename)) with open(os.path.join(config['output_test_folder'], filename), 'w') as f: json.dump(self.camera, f, indent=4) with open(os.path.join(config['output_train_folder'], filename), 'w') as f: json.dump(self.camera, f, indent=4) filename = "_object_settings" + '.json' with open(os.path.join(config['output_test_folder'], filename), 'w') as f: json.dump(self.object_settings, f, indent=4) with open(os.path.join(config['output_train_folder'], filename), 'w') as f: json.dump(self.object_settings, f, indent=4)
[docs]class GeneratorVae: """ Generator class for image dataset for VAE vision model training """ def __init__(self): self.object_settings = {"exported_object_classes": [], "exported_objects": []} self.env = None self.imsize = config["imsize"] # only supported format at the moment
[docs] def get_env(self): """ Create environment for VAE dataset generation according to dataset config file """ self.env = RandomizedEnvWrapper(env=gym.make(config['env_name'], robot = config['robot'], render_on = True, gui_on = config['gui_on'], show_bounding_boxes_gui = config['show_bounding_boxes_gui'], changing_light_gui = config['changing_light_gui'], shadows_on = config['shadows_on'], color_dict = config['color_dict'], object_sampling_area = config['object_sampling_area'], num_objects_range = config['num_objects_range'], used_objects = used_objects, active_cameras = config['active_cameras'], camera_resolution = config['camera_resolution'], dataset = True, ), config_path = config['output_folder']+'/config_dataset.json') p.setGravity(0, 0, -9.81)
[docs] def collect_data(self, steps): """ Collect data for VAE dataset Parameters: :param steps: (int) Number of episodes initiated during dataset generation """ data = np.zeros((steps, self.imsize, self.imsize, 3), dtype='f') for t in range(steps): self.env.reset(random_pos=True) self.env.render() action = [random.uniform(1,2) for x in range(6)] #action = [2,2,2,2,2,2] self.env.robot.reset_random(action) # send the Kuka arms up observation, reward, done, info = self.env.step(action) img = observation['camera_data'][6]['image'] imgs = cv2.cvtColor(img, cv2.COLOR_RGB2BGR) img = cv2.resize(imgs[0:450,100:500], (self.imsize, self.imsize)) cv2.imshow("image", img) cv2.waitKey(1) padding = 6 - len(str(t+7999)) name = padding * "0" + str(t+7999) cv2.imwrite(os.path.join(dataset_pth, "img_{}.png".format(name)), img) data[t] = img print("Image {}/{}".format(t, steps)) self.env.close()
# main if __name__ == "__main__": if len(sys.argv) <= 1: config_path = CONFIG_DEFAULT print('No config.json passed in argument. Loading default config: {}'.format(CONFIG_DEFAULT)) else: config_path = pkg_resources.resource_filename("myGym", sys.argv[1]) with open(config_path) as file: config = commentjson.load(file) # initialize dataset generator if config['dataset_type'] == 'coco': generator = GeneratorCoco() elif config['dataset_type'] == 'dope': generator = GeneratorDope() elif config['dataset_type'] == 'vae': generator = GeneratorVae() else: raise Exception("dataset_type in config: use one of 'coco', 'dope', 'vae'!") #prepare directories config['output_test_folder'] = config['output_folder'] + '/test' config['output_train_folder'] = config['output_folder'] + '/train' os.makedirs(config["output_test_folder"], exist_ok=True) os.makedirs(config["output_train_folder"], exist_ok=True) #define objects to appear in the env, add colors config['used_class_names'] = dict([x[1:3] for x in config['used_class_names_quantity']]) used_objects = [] for x in config['used_class_names_quantity']: for _ in range(x[0]): used_objects.append(x[1]) if config['color_dict'] == 'object_colors': color_names_to_rgb() config['texture_randomizer']['exclude'].append("objects") config['color_randomizer']['exclude'].append("objects") #write config.json to output_folder with open(config['output_folder']+'/config_dataset.json', 'w') as f: commentjson.dump(config, f) if config['dataset_type'] == 'vae': generator.get_env() dataset_pth = config['output_folder'] + '/train' generator.collect_data(config['num_episodes']) print("Dataset finished. Ready to train!") raise SystemExit(0) # initialize pybullet env env = generator.get_env() first_link_uid = env.robot.robot_uid robot_uids = np.array([((x + 1) << 24) + first_link_uid for x in range(-1, env.robot.gripper_index)],dtype=np.int32) gripper_uids = np.array([((x + 1) << 24) + first_link_uid for x in range(env.robot.gripper_index, env.robot.num_joints + 1)]) # check mode of writing files image_id = 0 #for paralel dataset generation >0, otherwise 0 if config['make_dataset'] == "new": #cleanup files files = glob.glob(os.path.join(config['output_test_folder'],'./*')) for f in files: os.remove(f) files = glob.glob(os.path.join(config['output_train_folder'],'./*')) for f in files: os.remove(f) data_train, data_test = generator.init_data() generator.episode_zero() elif config['make_dataset'] == 'resume': #resume print("Restoring dataset generation") data_test, data_train, image_id = generator.resume() elif (config['make_dataset'] == "display"): data_train, data_test = generator.init_data() generator.episode_zero() # the main loop for episode in range(int(image_id/(config['num_steps']*np.count_nonzero(config['active_cameras']))), config['num_episodes']): #loop through episodes print("episode: {}/{}".format(episode, config['num_episodes'])) #env reset #random_robot randomizes the init position of robots #random_pos randomizes the init positions of objects # if episode == 0: # generator.episode_zero() if episode % config['num_episodes_hard_reset'] == 0: #to prevent objects vanishing when GUI is on print("Hard reset!!!") env.reset(hard=True) env.reset(random_robot=config['random_arm_movement'], random_pos=True, hard=False) observation = env.get_observation() env_objects = observation["objects"] for t in range(config['num_steps']): #loop through steps # randomize the movements of robots (using joint control) action = env.action_space.sample() observation, reward, done, info = env.step(action) if t == config['num_steps']-1 or t%config['make_shot_every_frame'] == 0: # we only use frames from some steps env.render() #only render at the steps/frames we use for dataset for c in range(np.count_nonzero(config['active_cameras'])): #loop through active cameras camera_id=np.nonzero(config['active_cameras'])[0][c] image_id = image_id + 1 #unique isTestSample = np.random.random_sample() < config['train_test_split_pct'] # bool, test/train data? path = config['output_test_folder'] if isTestSample == True else config['output_train_folder'] #get dataset image and its mask im = observation["camera_data"][camera_id]["image"] im = cv2.cvtColor(im, cv2.COLOR_RGB2BGR) #fix colors img_mask = observation["camera_data"][camera_id]["segmentation_mask"] obj_ids = [x for x in np.unique(img_mask)] #identify objects(links) in the camera view (in the image) img_mask = np.where(np.isin(img_mask,gripper_uids), gripper_uids[0], img_mask) #merge gripper links img_mask = np.where(np.isin(img_mask,robot_uids), robot_uids[0], img_mask) #merge robot links obj_ids = [x for x in np.unique(img_mask)] #identify merged objects in the camera view (in the image) #prepare data strucuture data, name = generator.data_struct_image() for object_uid in obj_ids: #loop through kuka and used objects in the image (in the camera view) if object_uid == robot_uids[0]: class_name = config['robot'] elif object_uid == gripper_uids[0]: class_name = env.robot.get_name() else: env_object_list = list(filter(lambda object: object.uid == object_uid, env_objects)) if len(env_object_list) > 0: env_object = env_object_list[0] class_name = env_object.get_name() else: continue if class_name in config['used_class_names']: class_id = config['used_class_names'][class_name] generator.get_append_annotations() #annotate and append annotations if config['visualize']: #visualize generator.visualize() #store dataset image and info generator.store_image_info() if config['make_dataset'] in ["new", "resume"]: cv2.imwrite(os.path.join(path, name), im, [int(cv2.IMWRITE_JPEG_QUALITY), 70]) if done: print("Episode finished after {} timesteps".format(t+1)) break # write JSON annotations every n periods or at the end if episode % config['autosafe_episode'] == 0 or episode == config['num_episodes']-1: generator.write_json_end() data_train, data_test = generator.init_data() # end print('DATASET FINISHED')