Safe action repetition for Policy Gradient methods


Many reinforcement learning (RL) problems, such as any robotics task, are defined in a continuous setting. This does not fit the Markov Decision Process (MDP) framework, which is the basis of most RL algorithms. To solve this, continuous time is discretized by a discretization time scale $\delta$. RL algorithms' performance has been shown to be highly sensitive to the choice of this parameter. This work presents an algorithm that performs well regardless of the choice of $\delta$.


Previous works have used the framework of durative actions, which in a discretized setting are equivalent to action repetition. To achieve this, a policy is chosen to produce an action along with a number of repetitions . This does not allow the algoritm to stop the repetition of an action, which means the agent cannot accommodate for unexpected events, an ability crucial for robotics. To solve this, this work presents the apporach of Safe Action Repetition (SAR) , in which the policy produces an action and a safe region. The agent will repeat the action until it leaves the safe region. SAR allows policy gradient (PG) to both be invariant to the choice of $\delta$ and perform well in stochastic environments.


The algorithm is implemented on top of the Stable Baselines 3 (SB3) library, and tested on OpenAI's Gym environments. SB3 provides an implementation of Proximal Policy Optimization (PPO). This section will explore the main ways SB3's implementation was modified to implement SAR.


In SAR, the policy (an Actor-Critic Policy) produces both an action and a safe region. To achieve this, a third neural network (NN) is introduced alongside the classic NNs of PG (action network, log-standard-deviation network) to produce the safe region. The NNs are created in the policy class' _build method

def _build(self, lr_schedule: Schedule): ... if isinstance(self.action_dist, DiagGaussianDistribution): self.action_net, self.log_std_net = self.action_dist.proba_distribution_net(...) ... elif isinstance(self.action_dist, MixedDistribution): self.action_net, self.log_std_net, self.dist_action_net = self.action_dist.proba_distribution_net( latent_dim=latent_dim_pi, log_std_init=self.log_std_init ) ...

MixedDistribution is a class that takes as an argument to its constructor any SB3 distribution class and adds to it a Categorical distribution that produces the safe region:

class MixedDistribution(Distribution): def __init__(self, cont_cls, cont_kwargs, action_space): super().__init__() self.cont_dist = cont_cls(get_action_dim(action_space.spaces[0]), **cont_kwargs) self.dis_dist = CategoricalDistribution(action_space.spaces[1].n) def proba_distribution_net(self, latent_dim: int, log_std_init): action_net, log_std_net = self.cont_dist.proba_distribution_net(latent_dim, log_std_init) dist_action_net = self.dis_dist.proba_distribution_net(latent_dim) return action_net, log_std_net, dist_action_net ...
Action repetition

The actual repetition of actions, as well as the application of stochasticity, is done as part of the step function in a gym environment wrapper class.

def step(self, aug_action): cur_idx = self.ori_action_dim action = aug_action[:self.ori_action_dim] # application of stochasticity if self.anoise_type == 'action': # "action noise" if np.random.rand() < self.anoise_prob: action = action + np.random.randn(*action.shape) * self.anoise_std action = np.clip(action, self.action_space.low[:len(action)], self.action_space.high[:len(action)]) elif self.anoise_type is not None and 'ext' in self.anoise_type: # "external force" if np.random.rand() < self.anoise_prob: force = np.random.randn(3) * self.anoise_std torque = np.zeros(3) cur_info = force # apply force[self.env.sim.model._body_name2id[self.body_key], :] = [0] * 6 u = aug_action[cur_idx] cur_idx += 1 norm_u = (u + 1) / 2 u = norm_u total_reward = 0.0 done = None cur_gamma = 1.0 first_obs = self.cur_obs for i in range(100000000): # repeat action forever obs, reward, done, info = self.env.step(action) if self.anoise_type in ['ext_fpc']: obs = np.concatenate([obs, self.cur_force]) if not done: self._update_obs_estimate(obs[np.newaxis, ...]) self.reset_update_obs_estimate = True total_reward += reward * cur_gamma cur_gamma *= self.gamma if done: break if self.max_d is None: break norm_obs = (obs - self.obs_rms.mean) / (np.sqrt(self.obs_rms.var) + self.epsilon_std) norm_first_obs = (first_obs - self.obs_rms.mean) / (np.sqrt(self.obs_rms.var) + self.epsilon_std) # Distance computation d_delta = np.linalg.norm(norm_obs - norm_first_obs, ord=1) / len(obs) # stop repetition check if d_delta >= u * self.max_d: break ... self.num_steps += 1 return obs, total_reward, done, info