Introduction
Many reinforcement learning (RL) problems, such as any robotics task, are defined in a continuous setting. This does not
fit the Markov Decision Process (MDP) framework, which is the basis of most RL algorithms. To
solve this, continuous time is discretized by a discretization time scale $\delta$. RL algorithms' performance has
been shown to be highly sensitive to the choice of this parameter. This work presents an algorithm that performs well
regardless of the choice of $\delta$.
Approach
Previous works have used the framework of durative actions, which in a discretized setting are equivalent to
action repetition. To achieve this, a policy is chosen to produce an action along with a number of repetitions .
This does not allow the algoritm to stop the repetition of an action, which means the agent cannot accommodate for
unexpected events, an ability crucial for robotics. To solve this, this work presents the apporach of Safe Action
Repetition (SAR) , in which the policy produces an action and a safe region. The agent will repeat the action until it
leaves the safe region. SAR allows policy gradient (PG) to both be invariant to the choice of $\delta$ and perform well
in stochastic environments.
implementation
The algorithm is implemented on top of the Stable Baselines 3 (SB3) library, and tested on OpenAI's Gym environments.
SB3 provides an implementation of Proximal Policy Optimization (PPO). This section will explore the main ways SB3's implementation
was modified to implement SAR.
Policy
In SAR, the policy (an Actor-Critic Policy) produces both an action and a safe region. To achieve this, a third neural network (NN) is introduced alongside
the classic NNs of PG (action network, log-standard-deviation network) to produce the safe region. The NNs are created in
the policy class' _build method
def _build(self, lr_schedule: Schedule):
...
if isinstance(self.action_dist, DiagGaussianDistribution):
self.action_net, self.log_std_net = self.action_dist.proba_distribution_net(...)
...
elif isinstance(self.action_dist, MixedDistribution):
self.action_net, self.log_std_net, self.dist_action_net = self.action_dist.proba_distribution_net(
latent_dim=latent_dim_pi, log_std_init=self.log_std_init
)
...
MixedDistribution is a class that takes as an argument to its constructor any SB3 distribution class and adds to it
a Categorical distribution that produces the safe region:
class MixedDistribution(Distribution):
def __init__(self, cont_cls, cont_kwargs, action_space):
super().__init__()
self.cont_dist = cont_cls(get_action_dim(action_space.spaces[0]), **cont_kwargs)
self.dis_dist = CategoricalDistribution(action_space.spaces[1].n)
def proba_distribution_net(self, latent_dim: int, log_std_init):
action_net, log_std_net = self.cont_dist.proba_distribution_net(latent_dim, log_std_init)
dist_action_net = self.dis_dist.proba_distribution_net(latent_dim)
return action_net, log_std_net, dist_action_net
...
Action repetition
The actual repetition of actions, as well as the application of stochasticity, is done as part of the step function in a gym environment wrapper class.
def step(self, aug_action):
cur_idx = self.ori_action_dim
action = aug_action[:self.ori_action_dim]
# application of stochasticity
if self.anoise_type == 'action': # "action noise"
if np.random.rand() < self.anoise_prob:
action = action + np.random.randn(*action.shape) * self.anoise_std
action = np.clip(action, self.action_space.low[:len(action)], self.action_space.high[:len(action)])
elif self.anoise_type is not None and 'ext' in self.anoise_type: # "external force"
if np.random.rand() < self.anoise_prob:
force = np.random.randn(3) * self.anoise_std
torque = np.zeros(3)
cur_info = force
# apply force
self.env.sim.data.xfrc_applied[self.env.sim.model._body_name2id[self.body_key], :] = [0] * 6
u = aug_action[cur_idx]
cur_idx += 1
norm_u = (u + 1) / 2
u = norm_u
total_reward = 0.0
done = None
cur_gamma = 1.0
first_obs = self.cur_obs
for i in range(100000000): # repeat action forever
obs, reward, done, info = self.env.step(action)
if self.anoise_type in ['ext_fpc']:
obs = np.concatenate([obs, self.cur_force])
if not done:
self._update_obs_estimate(obs[np.newaxis, ...])
self.reset_update_obs_estimate = True
total_reward += reward * cur_gamma
cur_gamma *= self.gamma
if done:
break
if self.max_d is None:
break
norm_obs = (obs - self.obs_rms.mean) / (np.sqrt(self.obs_rms.var) + self.epsilon_std)
norm_first_obs = (first_obs - self.obs_rms.mean) / (np.sqrt(self.obs_rms.var) + self.epsilon_std)
# Distance computation
d_delta = np.linalg.norm(norm_obs - norm_first_obs, ord=1) / len(obs)
# stop repetition check
if d_delta >= u * self.max_d:
break
...
self.num_steps += 1
return obs, total_reward, done, info