-
Notifications
You must be signed in to change notification settings - Fork 0
/
soloGaitPeriodEnv.py
148 lines (118 loc) · 5.13 KB
/
soloGaitPeriodEnv.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
import os
import gym
import random
import numpy as np
import pybullet as p
from baseControlEnv1 import BaseControlEnv
from collections import deque
periods7 = [0.16, 0.24, 0.32, 0.4, 0.48, 0.56, 0.64]
periods5 = [0.24, 0.32, 0.4, 0.48, 0.56]
periods4 = [0.24, 0.32, 0.4, 0.48]
class SoloGaitPeriodEnv(BaseControlEnv):
def __init__(self, config):
config['rl_dt'] = 0.32
super(SoloGaitPeriodEnv, self).__init__(config)
self.num_actions = config.get('num_actions', 4)
self.action_space = gym.spaces.Discrete(self.num_actions) # No noop action
self.semi_mdp = config.get('semi_mdp', False)
self.reactive_update = config.get('reactive_update', False)
self.num_history_stack = 8
# 1 base pose z, 3 orn , 6 body vel, 12 Joint angles , 12 Joints Vel,
# 12 rel foot pose, 6 vel_ref, 4 past gait seq = 62
high = np.inf * np.ones([62 + self.num_history_stack * 46])
self.observation_space = gym.spaces.Box(-high, high)
self.next_period = self.T_gait
self.past_actions = deque(np.ones(4)*self.next_period,maxlen=4)
if self.num_actions == 4:
ps = periods4
elif self.num_actions == 5:
ps = periods5
elif self.num_actions == 7:
ps = periods7
else:
raise NotImplementedError('Invalid number of actions')
self.period_dict = dict([(i,p) for i,p in enumerate(ps)])
def reset(self):
self.next_period = self.T_gait
self.past_actions = deque(np.ones(4)*self.next_period,maxlen=4)
self.k_rl = int(self.rl_dt/self.dt)
return super().reset()
def set_new_gait(self, action):
#print(self.period_dict[action])
period = self.period_dict[action] # No Noop Actions
if period != self.next_period:
self.next_period = period
if self.reactive_update: # update gait_f
g,gf = self._update_gait_matrices()
self.controller.planner.Cplanner.set_gaits(g,gf)
if self.semi_mdp:
self.k_rl = int(period/self.dt)
else: # update future_gait_des
self.controller.planner.Cplanner.create_modtrot(period)
self._last_action = self.next_period
self.past_actions.append(self.next_period)
def get_observation(self):
self.robot.UpdateMeasurment()
internal_state = self.get_internal_state()
history_periods = np.array(self.past_actions)
#executed_gaits = self.get_past_gait()[:2].flatten()
command_history = np.stack(self.past_commands).flatten()
state_history = np.stack(self.state_history).flatten()
return np.concatenate([internal_state, history_periods, command_history, state_history])
def _update_gait_matrices(self):
period = self.next_period
gait_f = self.get_current_gait()
gait_p = self.get_past_gait()
# half period MPC steps
period_steps = int(0.5 * (period /self.dt)/self.k_mpc)
default_steps = int(0.5 * (self.T_gait/self.dt) /self.k_mpc)
gait_steps = 2* default_steps
new_gait_f = np.zeros(gait_f.shape)
new_gait_f_des = np.zeros(gait_f.shape)
# If current sequence is still in progess,
# Start changing time from next sequence
# Else start changing time of current sequence
if np.array_equal(gait_f[0,1:], gait_p[0,1:]):
i_row = 1
remaining_steps = gait_steps - gait_f[0,0]
new_gait_f[0,:] = gait_f[0,:]
else:
i_row = 0
remaining_steps = gait_steps
s1 = gait_f[i_row,1:]
s2 = 1. - s1
#s2 = gait_f[i_row + 1,1:]
seqs = np.vstack((s1,s2))
i_seq = 0
remaining_f_steps = 0
while True:
if period_steps < remaining_steps:
new_gait_f[i_row, 0] = period_steps
new_gait_f[i_row, 1:] = seqs[i_seq]
remaining_steps -= period_steps
elif period_steps > remaining_steps:
new_gait_f[i_row, 0] = remaining_steps
new_gait_f[i_row, 1:] = seqs[i_seq]
remaining_f_steps = period_steps - remaining_steps
break;
else:
new_gait_f[i_row, 0] = period_steps
new_gait_f[i_row, 1:] = seqs[i_seq]
remaining_f_steps = 0
break;
i_row += 1
i_seq = (i_seq + 1) % len(seqs)
# Fill gait_f_des
if remaining_f_steps!=0:
new_gait_f_des[0,0] = remaining_f_steps
new_gait_f_des[0,1:] = seqs[i_seq]
new_gait_f_des[1,0] = period_steps
new_gait_f_des[1,1:] = seqs[(i_seq + 1) % len(seqs)]
new_gait_f_des[2,0] = period_steps - remaining_f_steps
new_gait_f_des[2,1:] = seqs[i_seq]
else:
new_gait_f_des[0:2,0] = [period_steps]*2
last_gait_f_row = new_gait_f[i_row, 1:]
new_gait_f_des[0,1:] = 1. - last_gait_f_row
new_gait_f_des[1,1:] = last_gait_f_row
return new_gait_f, new_gait_f_des