提交 7a7583ab 编写于 作者: H Hongsheng Zeng 提交者: Bo Zhou

Refine documents of PARL (#43)

* remove not used files, add benchmark for DQN and DDPG, add Parameters management Readme

* Update README.md

* Update README.md

* add parl dependence in examples, use np shuffle instead of sklean

* fix codestyle

* refine readme of nips example

* fix bug

* fix code style

* Update README.md

* Update README.md

* Update README.md

* refine document and remove outdated design doc

* Update README.md

* Update README.md

* refine comment

* release version 1.0

* gif of examples

* Update README.md

* update Readme
上级 4163d732
......@@ -67,7 +67,7 @@ agent = AtariAgent(algorithm)
# Install:
### Dependencies
- Python 2.7 or 3.5+.
- PaddlePaddle >=1.2.1 (We try to make our repository always compatible with newest version PaddlePaddle)
- PaddlePaddle >=1.2.1 (We try to make our repository always compatible with latest version PaddlePaddle)
```
......@@ -80,3 +80,7 @@ pip install --upgrade git+https://github.com/PaddlePaddle/PARL.git
- [DDPG](examples/DDPG/)
- [PPO](examples/PPO/)
- [Winning Solution for NIPS2018: AI for Prosthetics Challenge](examples/NeurIPS2018-AI-for-Prosthetics-Challenge/)
<img src=".github/NeurlIPS2018.gif" width = "300" height ="200" alt="NeurlIPS2018"/> <img src=".github/Half-Cheetah.gif" width = "300" height ="200" alt="Half-Cheetah"/> <img src=".github/Breakout.gif" width = "200" height ="200" alt="Breakout"/>
<br>
<img src=".github/Aircraft.gif" width = "808" height ="300" alt="NeurlIPS2018"/>
# PaddlePaddle Reinforcement Learning Framework (PARL)
This is the design doc for [PARL](https://github.com/PaddlePaddle/PARL):a general-purpose RL platform based on PaddlePaddle Fluid.
## Problem description
> A robot is an intelligent entity that situates in an environment. At every time step, it receives multimodal sensory inputs and generates (possibly multimodal) action outputs according to a certain set of rewards or goals, given the current (partial) environment observation.
Almost any RL problem can be seen in the above perspective. We can always convert a problem description to a one similar to the above (namely, where a robot takes actions in an environment at every time step and receives rewards).
For example, in the scenario of learning to put only one advertisement on each webpage, you can think of our decision-making algorithm as a robot, the user's online profile, browsing history, and some other contextual information as the environment observation, which advertisement to put on the webpage as the robot's action, and whether the user clicks on the advertisement as the reward. The time horizon in this case would then be a sequence of webpages.
So it would be helpful to discuss any RL problem in this unified description.
#### Action step vs. time step
We need to differentiate between an *action step* and a *time step*.
- A time step is a time interval defined by the environment simulator, which is the minimal temporal unit for simulation. Each time step yields a new environment observation.
- An action step is always a multiple of a time step. If an actor has an action repetition of $K$, then an action step spans over $K$ time steps.
When $K>1$, between two adjacent action steps there are several environment observations. Usually the agent only gets the last environment observation while discarding the first $K-1$ ones. Below is a diagram for $K=5$.
<p align="center"><img src="step.png" width="300"/></p>
In the following discussion, for simplicity we will assume $K=1$ and use the term "action step" and "time step" interchangeably. However, all the conclusions and designs easily apply to the cases of $K>1$.
## Representation of experience/sample
An experience or a sample received by the robot is a struct that has four fields:
- `inputs`: all the observed data at the current time step
- `states`: memory data storing the previous history; useful for problems having temporal dependencies
- `actions`: the actions taken by the robot at the time step
- `rewards`: a list of scalar rewards for taking the actions
- `game_over`: an integer value indicating the game-over status for the current time step
Note that here the `game_over` field indicates the current time step instead of the next time step. This is different with most standard open-source simulator packages (see OpenAI gym for an example).
## Robot Components
<p align="center"><img src="framework.png" width=800></p>
A **Robot** consists of some **computation tasks**, i.e., applying some **algorithms** to using or learning **models**. For example, a robot has one reinforcement learning task and it uses an Actor-Critic algorithm to learn the policy model. To provide additional training of the policy model, the robot can also have some auxiliary task which is usually dealt with by some unsupervised learning algorithm. Another scenario is where a robot needs different policies at different levels of temporal abstraction. In this case, we can assign one policy learning task to each policy.
In the following, we describe the design details of the above-mentioned major components of robot. The design principle is that we want to provide users with a framework that is easy for them to implement their own robots, by overriding or extending some of these components. The components of a robot can be divided into two groups. One group handles the actual models, e.g., model computation, parameter sharing and synchronization; we call it **computation group**. The other group interacts with environment and produce/consume data to/from *computation group*; we call it **agent group** (we will explain this name later).
Let's first talk about the components in **computation group**: `ComputationTask`, `Algorithm` and `Model`.
### Computation Group
A robot consists of multiple `ComputationTask`s (CTs) executed in some order. Each CT can have its own data I/O interaction with outside. One CT may have I/O interaction with the other CTs. Intuitively, a ComputationTask should perform a relatively independent computation routine. For example, in hierarchical RL, one CT predicts the goal and the next CT predicts the actions according to the goal. We require that
- The code between two CTs is non-differentiable computation or control.
- There is no gradient flowing between two adjacent CTs.
If you have two modules that must have gradients flow between them, then consider puting the two modules in one CT.
Finally, different CTs might have different time resolutions. For example, a CT outputs actions at every time step, which another CT outputs actions very 100 time steps.
<p align="center"><img src="ct.png" height=300><img src="model.png" height=300></p>
***In the above figure, blue boxes represent pure Python logic control and computation, and purple boxes represent logic control and computation in Fluid programs.***
`ComputationTask` uses `Algorithm` to do the (back-end) network computatino. `Algorithm` implements general prediction and learning functions, independent of the actual tasks. An algorithm further contains a model which is problem-dependent and allows the user to write the front-end (data perception) of the network.
The current usage of a reference algorithm is for behavior policy or delayed model update.
A `Model` specifies the network input/output specs, the perception function of the input data, and the post-processing function of the actions. It is usually varies for different problems.
Next, let's talk about the components in **agent group**: `Agent`, `AgentHelper` and `ComputationWrapper`.
### Agent Group
<p align="center"><img src="relation.png" width=600></p>
<center>P_Q: prediction_q, PR_Q: prediction_return_q, T_Q: training_q, TR_Q: training_return_q</center>
`Agent` implements the control flow of robot's activities. At its simplest form, `Agent` provides an interface `run_one_episode()` where users can determine how `Agent` interacts with the environment `Simulator` and carries out `ComputationTask` with data from the `Simulator` or from the outcomes of other `ComputationTask`s.
`AgentHelper` abstracts the data preparation, data sampling and post-computation data processing of `Agent`. For example, `AgentHelper` maintains a replay buffer and handles the experience sampling for algorithms like DQN. Another purpose of `AgentHelper` is to hide the details of data communication with `ComputationTask` from `Agent`. For each `ComputationTask`, `Agent` binds it with one `AgentHelper`.
PARL supports parallel simulation, which means the robot can deploy multiple `Agent` that run independently. `ComputationTask` batches data from `Agent`s together and do the computation at one time. In practice, we use `ComputationWrapper` to abstract this I/O part. The communication between `AgentHelper` and `ComputationWrapper` is handled by a simple class `Communicator`:
```python
class Communicator(object):
def __init__(self, agent_id, training_q, prediction_q):
self.agent_id = agent_id # the id of agent that uses it
# reference to {training,prediction}_q of the ComputationWrapper that uses it
self.{training,prediction}_q = {training,prediction}_q
# used to accept data returned from ComputationTask's {training,prediction}
self.{training,prediction}_return_q = Queue()
def put_{training,prediction}_data(self, data):
self.{training,prediction}_q.put((self.agent_id, data))
def put_{training,prediction}_return(self, data):
self.{training,prediction}_return_q.put(data)
def get_{training,prediction}_return(self):
return self.{training,prediction}_q.get()
```
The definition of `AgentHelper` can then be:
```python
class AgentHelper(object):
def __init__(self):
self.name # the name of the task this helper is bound to
self.comm # Communicator
self.exp_q # a container to store the past experience data
def predict(self, inputs):
"""
send data to ComputationTask for prediction; blocked until return
"""
inputs = ... # some data processing of inputs
self.comm.put_prediction_data(inputs)
return self.comm.get_prediction_return()
def store_data(self, data):
"""
store the past experience data, and call learn() if necessary
"""
self.exp_q.add(data)
def learn(self):
"""
send data to ComputationTask for learning; blocked until return
"""
# some data preparation, for example:
data = self.exp_q.sample()
self.comm.put_training_data(data)
return self.comm.get_training_return()
```
Note: depending on the situation, `AgentHelper.learn()` can be called by `Agent` or within `AgentHelper.store_data()`, or be invoked as a separate thread.
As an example, we demonstrate how to define an `Agent` whose robot uses hierarchical policies. The bottom goal-following "RL" policy is updated every `TRAINING_INTERVAL` steps and the top goal generation "goal" policy is updated every time the goal is achieved or aborted.
```python
class Agent(object):
self.env # environment
self.helpers = {'RL': OnPolicyHelper, 'goal': OffPolicyHelper} # a dictionary of AgentHelper
def run_one_episode()
obs = self.env.get_obs()
prev_goal = []
while not self.env.game_over():
goal = self.helpers['goal'].predict([obs, prev_goal])
total_reward = 0
step_counts = 0
while not goal_achieved(goal) and not self.env.game_over():
actions = self.helpers['RL'].predict([obs, goal])
next_obs, reward = self.env.step(actions)
total_reward += reward
self.helpers['RL'].store_data([obs, goal, reward, actions, self.env.game_over()])
if step_counts % TRAINING_INTERVAL == 0:
self.helpers['RL'].learn()
obs = next_obs
self.helpers['goal'].store_data([obs, goal, total_reward, self.env.game_over()])
self.helpers['goal'].learn()
```
`ComputationWrapper` is essentially two threads that keep consuming data from Agent side:
```python
class ComputationWrapper(object):
def __init__(self):
self.ct # ComputationTask
self.training_q = Queue()
self.prediction_q = Queue()
self.comms = {} # dictionary of Communicators, indexed by Agent's id
self.prediction_thread = Thread(target=self._prediction_loop)
self.training_thread = Thread(target=self._training_loop)
self.model_input_specs
def _pack_data(self, data):
"""
Pack a list of data into one dict according to model's inputs specs.
"""
pass
def _unpack_data(self, batch_data):
"""
Unpack the dict into a list of dict, by slicing each value in the dict.
"""
pass
def create_communicator(self, agent_id):
comm = Communicator(agent_id, self.training_q, self.prediction_q)
self.comms[agent_id] = comm
return comm
def _prediction_loop(self):
while not stop:
agent_ids = []
data = []
while not agent_ids or not self.prediction_q.empty():
agent_id, d = self.prediction_q.get()
agent_ids.append(agent_id)
data.append(d)
data = self._pack_data(data)
ret = self.ct.predict(data)
ret = self._unpack_data(ret)
for i in range(len(agent_ids)):
self.comms[agent_ids[i]].put_prediction_return(ret[i])
def _training_loop(self):
while not stop:
agent_ids = []
data = []
while len(agent_ids) < min_batchsize or not self.training_q.empty():
agent_id, d = self.training_q.get()
agent_ids.append(agent_id)
data.append(d)
data = self._pack_data(data)
ret = self.ct.learn(data)
ret = self._unpack_data(ret)
assert len(ret) == len(agent_ids)
for i in range(len(agent_ids)):
self.comms[agent_ids[i]].put_training_return(ret[i])
def run(self):
self.prediction_thread.start()
self.training_thread.start()
```
## Parameters management of PARL
In RL we usually need to reuse or (periodically) synchronize parameters.
#### Reuse parameters
Fluid allows users to reuse parameters by specifying the same custom name to parameter attributes. For example:
```python
import paddle.fluid.layers as layers
x = layers.data(name='x', shape=[100], dtype="float32")
y1 = layers.fc(input=x, param_attr=ParamAttr(name="fc.w"), bias_attr=False)
y2 = layers.fc(input=x, param_attr=ParamAttr(name="fc.w"), bias_attr=False)
```
In this case, after forwarding, y1 and y2 should have the same value.
The advantage of this sharing method is its flexibility. Whenever the user wants to reuse parameters, he only needs to specify a common name for the layers that use those parameters. However, this process is tedious. If you want to share the parameters among 10 places in the code, then you have to set the name 10 times, which is not a good coding style. Also, if you want to reuse a module, then all the parameters inside the module must be named manually.
#### Sync parameters
Even worse, if we want to sync parameters between two networks, the only way for us to establish the parameter mapping is to look at the parameter name (If two layers use a same para name, then we copy from one to another). This means that basically we have to name the parameters for every layer in the network.
#### LayerFunc
To solve the above two issues, we use a `LayerFunc` object to wrap every layer that has parameters. The idea is that every `LayerFunc` object wraps a certain layer and manages a set of parameters automatically. The `LayerFunc` object is callable, and every time you want to reuse the parameters, you only need to call the corresponding `LayerFunc` object but without specifying the parameter name. In other words, a `LayerFunc` object automatically assumes a reusable set of parameters. This method of parameter naming and sharing is what PyTorch adopts.
An example of using `LayerFunc` to wrap the `paddle.fluid.layers.fc` layer is as follows:
```python
class LayerFunc(object):
def __init__(self, param_attr=False, bias_attr=False)
self.param_attr = param_attr
self.bias_attr = bias_attr
...
def fc(size, act=None, param_attr=False, bias_attr=False):
param_attr = update_attr_name("fc", param_attr, is_bias=False)
bias_attr = update_attr_name("fc", bias_attr, is_bias=True)
class FC_(LayerFunc):
def __init__(self):
super(FC, self).__init__(param_attr, bias_attr)
def __call__(self, input):
return layers.fc(input=input,
size=size,
param_attr=param_attr,
bias_attr=bias_attr
act=act)
return FC_()
```
So here we redefine each layer to return a callable `LayerFunc` object that will always use the same set of parameters whenever it is called later in the code (through a [closure](https://www.learnpython.org/en/Closures)). The parameter naming (inside ```update_attr_name```) is guarantteed to be globally unique.
`LayerFunc` is responsible for maintaining the parameter names and implementing the parameters copy function (see code for the details of handling sync).
Now for the same parameter sharing example, our code becomes:
```python
import parl.layers as layers
x = layers.data(name='x', shape=[100], dtype="float32") ## we've wrapped every layer, even the data layer
fc = layers.fc(size=64, act="relu") ## automatically create parameters named "fc_0.w" and "fc_0.b"
y1 = fc(x)
y2 = fc(x)
```
One disadvantage of this para sharing method is that we have to define all the layers that need to be reused in the very beginning. And then write other lines of code to actually use them. So it might result in additional lines of code, which is not a big deal compared to the original para sharing troubles.
#### Creating Model, Algorithm, and ComputationTask
When creating a `ComputationTask`, we start from the bottom of the robot hierarchy to the top.
- Create a `Model`
- Create an `Algorithm` with the created `Model` as the input
- Create a `ComputationTask` with the created `Algorithm` as the input
If we want two algorithms (computation tasks) to share a model (algorithm), then we can just pass the same model (algorithm) object to the algorithms (computation tasks) as the argument.
#### Lambda function for cloning
An algorithm (computation task) can clone a model (algorithm) if necessary. To facilitate this cloning, we require that when passing a model (algorithm) to create an algorithm (computation task), we should pass a Lambda function which will create a new model (algorithm) with new parameters whenever it is called with no args. We provide a function to define the lambda functions:
```python
def create_algorithm_func(model_class, model_args, algorithm_class, algorithm_args):
model_func = lambda: model_class(**model_args)
algorithm_func = lambda: algorithm_class(model_func=model_func, **algorithm_args)
return algorithm_func
```
......@@ -7,11 +7,15 @@ Based on PARL, the DDPG model of deep reinforcement learning is reproduced, and
### Mujoco games introduction
Please see [here](https://github.com/openai/mujoco-py) to know more about Mujoco game.
### Benchmark result
- HalfCheetah-v2
<img src=".benchmark/DDPG_HalfCheetah-v2.png"/>
## How to use
### Dependencies:
+ python2.7 or python3.5+
+ [paddlepaddle>=1.0.0](https://github.com/PaddlePaddle/Paddle)
+ [parl](https://github.com/PaddlePaddle/PARL)
+ gym
+ tqdm
+ mujoco-py>=1.50.1.0
......
......@@ -7,15 +7,20 @@ Based on PARL, the DQN model of deep reinforcement learning is reproduced, and t
### Atari games introduction
Please see [here](https://gym.openai.com/envs/#atari) to know more about Atari game.
### Benchmark result
- Pong
<img src=".benchmark/DQN_Pong.png"/>
## How to use
### Dependencies:
+ python2.7 or python3.5+
+ [paddlepaddle>=1.0.0](https://github.com/PaddlePaddle/Paddle)
+ [parl](https://github.com/PaddlePaddle/PARL)
+ gym
+ tqdm
+ opencv-python
+ ale_python_interface
+ atari_py
+ [ale_python_interface](https://github.com/mgbellemare/Arcade-Learning-Environment)
### Start Training:
......
......@@ -11,7 +11,7 @@ For more technical details about our solution, we provide:
3. [[Link]](https://drive.google.com/file/d/1W-FmbJu4_8KmwMIzH0GwaFKZ0z1jg_u0/view?usp=sharing) A poster briefly introducing our solution in NeurIPS2018 competition workshop.
3. (coming soon)A full academic paper detailing our solution, including entire training pipline, related work and experiments that analyze the importance of each key ingredient.
**Note**: Reproducibility is a long-standing issue in reinforcement learning field. We have tried to guarantee that our code is reproducible, testing each training sub-task three times. However, there are still some factors that prevent us from achieving the same performance. One problem is the choice time of a convergence model during curriculum learning. Choosing a sensible and natural gait visually is crucial for subsequent training, but the definition of what is a good gait varies from different people.
**Note**: Reproducibility is a long-standing issue in reinforcement learning field. We have tried to guarantee that our code is reproducible, testing each training sub-task three times. However, there are still some factors that prevent us from achieving the same performance. One problem is the choice time of a convergence model during curriculum learning. Choosing a sensible and natural gait visually is crucial for subsequent training, but the definition of what is a good gait varies from person to person.
<p align="center">
<img src="image/demo.gif" alt="PARL" width="500"/>
......@@ -60,7 +60,7 @@ For final submission, we test our model in 500 CPUs, running 10 episodes per CPU
python simulator_server.py --port [PORT] --ensemble_num 1
# client (Suggest: 200+ clients)
python simulator_client.py --port [PORT] --ip [IP] --reward_type RunFastest
python simulator_client.py --port [PORT] --ip [SERVER_IP] --reward_type RunFastest
```
#### 2. Target: run at 3.0 m/s
......@@ -71,7 +71,7 @@ python simulator_server.py --port [PORT] --ensemble_num 1 --warm_start_batchs 10
--restore_model_path [RunFastest model]
# client (Suggest: 200+ clients)
python simulator_client.py --port [PORT] --ip [IP] --reward_type FixedTargetSpeed --target_v 3.0 \
python simulator_client.py --port [PORT] --ip [SERVER_IP] --reward_type FixedTargetSpeed --target_v 3.0 \
--act_penalty_lowerbound 1.5
```
......@@ -83,7 +83,7 @@ python simulator_server.py --port [PORT] --ensemble_num 1 --warm_start_batchs 10
--restore_model_path [FixedTargetSpeed 3.0m/s model]
# client (Suggest: 200+ clients)
python simulator_client.py --port [PORT] --ip [IP] --reward_type FixedTargetSpeed --target_v 2.0 \
python simulator_client.py --port [PORT] --ip [SERVER_IP] --reward_type FixedTargetSpeed --target_v 2.0 \
--act_penalty_lowerbound 0.75
```
......@@ -99,7 +99,7 @@ python simulator_server.py --port [PORT] --ensemble_num 1 --warm_start_batchs 10
--restore_model_path [FixedTargetSpeed 2.0m/s model]
# client (Suggest: 200+ clients)
python simulator_client.py --port [PORT] --ip [IP] --reward_type FixedTargetSpeed --target_v 1.25 \
python simulator_client.py --port [PORT] --ip [SERVER_IP] --reward_type FixedTargetSpeed --target_v 1.25 \
--act_penalty_lowerbound 0.6
```
......@@ -109,10 +109,10 @@ As mentioned before, the selection of model that used to fine-tune influence lat
```bash
# server
python simulator_server.py --port [PORT] --ensemble_num 12 --warm_start_batchs 1000 \
--restore_model_path [FixedTargetSpeed 1.25m/s] --restore_from_one_head
--restore_model_path [FixedTargetSpeed 1.25m/s model] --restore_from_one_head
# client (Suggest: 100+ clients)
python simulator_client.py --port [PORT] --ip [IP] --reward_type Round2 --act_penalty_lowerbound 0.75 \
python simulator_client.py --port [PORT] --ip [SERVER_IP] --reward_type Round2 --act_penalty_lowerbound 0.75 \
--act_penalty_coeff 7.0 --vel_penalty_coeff 20.0 --discrete_data --stage 3
```
......
......@@ -16,8 +16,9 @@ Please see [here](https://github.com/openai/mujoco-py) to know more about Mujoco
## How to use
### Dependencies:
+ python2.7 or python3.5+
+ python3.5+
+ [paddlepaddle>=1.0.0](https://github.com/PaddlePaddle/Paddle)
+ [parl](https://github.com/PaddlePaddle/PARL)
+ gym
+ tqdm
+ mujoco-py>=1.50.1.0
......
......@@ -15,7 +15,6 @@
import numpy as np
import parl.layers as layers
from paddle import fluid
from sklearn.utils import shuffle
from parl.framework.agent_base import Agent
from parl.utils import logger
......@@ -183,12 +182,16 @@ class MujocoAgent(Agent):
all_loss = []
for _ in range(self.value_learn_times):
obs_train, value_train = shuffle(obs_train, value_train)
random_ids = np.arange(obs_train.shape[0])
np.random.shuffle(random_ids)
shuffle_obs_train = obs_train[random_ids]
shuffle_value_train = value_train[random_ids]
start = 0
while start < data_size:
end = start + self.value_batch_size
value_loss = self._batch_value_learn(obs_train[start:end, :],
value_train[start:end])
value_loss = self._batch_value_learn(
shuffle_obs_train[start:end, :],
shuffle_value_train[start:end])
all_loss.append(value_loss)
start += self.value_batch_size
return np.mean(all_loss)
......@@ -6,6 +6,7 @@ Based on PARL, train a agent to play CartPole game with policy gradient algorith
+ python2.7 or python3.5+
+ [paddlepaddle>=1.0.0](https://github.com/PaddlePaddle/Paddle)
+ [parl](https://github.com/PaddlePaddle/PARL)
+ gym
### Start Training:
......
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from parl.framework.algorithm import Algorithm
from paddle.fluid.initializer import ConstantInitializer
import parl.layers as layers
import parl.framework.policy_distribution as pd
from parl.layers import common_functions as comf
import paddle.fluid as fluid
from copy import deepcopy
class SimpleAC(Algorithm):
"""
A simple Actor-Critic that has a feedforward policy network and
a single discrete action.
learn() requires keywords: "action", "reward", "v_value"
"""
def __init__(self,
model,
hyperparas=dict(lr=1e-4),
gpu_id=-1,
discount_factor=0.99):
super(SimpleAC, self).__init__(model, hyperparas, gpu_id)
self.discount_factor = discount_factor
def learn(self, inputs, next_inputs, states, next_states, next_episode_end,
actions, rewards):
action = actions["action"]
reward = rewards["reward"]
values = self.model.value(inputs, states)
next_values = self.model.value(next_inputs, next_states)
value = values["v_value"]
next_value = next_values["v_value"] * next_episode_end[
"next_episode_end"]
next_value.stop_gradient = True
assert value.shape[1] == next_value.shape[1]
critic_value = reward + self.discount_factor * next_value
td_error = critic_value - value
value_cost = layers.square(td_error)
dist, _ = self.model.policy(inputs, states)
dist = dist["action"]
assert isinstance(dist, pd.CategoricalDistribution)
pg_cost = 0 - dist.loglikelihood(action)
avg_cost = layers.mean(x=value_cost + pg_cost * td_error)
optimizer = fluid.optimizer.DecayedAdagradOptimizer(
learning_rate=self.hp["lr"])
optimizer.minimize(avg_cost)
return dict(cost=avg_cost)
def predict(self, inputs, states):
return self._rl_predict(self.model, inputs, states)
class SimpleQ(Algorithm):
"""
A simple Q-learning that has a feedforward policy network and a single discrete action.
learn() requires keywords: "action", "reward", "q_value"
"""
def __init__(self,
model,
hyperparas=dict(lr=1e-4),
gpu_id=-1,
discount_factor=0.99,
exploration_end_batches=0,
exploration_end_rate=0.1,
update_ref_interval=100):
super(SimpleQ, self).__init__(model, hyperparas, gpu_id)
self.discount_factor = discount_factor
self.gpu_id = gpu_id
assert update_ref_interval > 0
self.update_ref_interval = update_ref_interval
self.total_batches = 0
## create a reference model
self.ref_model = deepcopy(model)
## setup exploration
self.explore = (exploration_end_batches > 0)
if self.explore:
self.exploration_counter = layers.create_persistable_variable(
dtype="float32",
shape=[1],
is_bias=True,
default_initializer=ConstantInitializer(0.))
### in the second half of training time, the rate is fixed to a number
self.total_exploration_batches = exploration_end_batches
self.exploration_rate_delta \
= (1 - exploration_end_rate) / self.total_exploration_batches
def before_every_batch(self):
if self.total_batches % self.update_ref_interval == 0:
self.model.sync_paras_to(self.ref_model, self.gpu_id)
self.total_batches += 1
def predict(self, inputs, states):
"""
Override the base predict() function to put the exploration rate in inputs
"""
rate = 0
if self.explore:
counter = self.exploration_counter()
## first compute the current exploration rate
rate = 1 - counter * self.exploration_rate_delta
distributions, states = self.model.policy(inputs, states)
for dist in distributions.values():
assert dist.__class__.__name__ == "CategoricalDistribution"
dist.add_uniform_exploration(rate)
actions = {}
for key, dist in distributions.iteritems():
actions[key] = dist()
return actions, states
def learn(self, inputs, next_inputs, states, next_states, next_episode_end,
actions, rewards):
action = actions["action"]
reward = rewards["reward"]
values = self.model.value(inputs, states)
next_values = self.ref_model.value(next_inputs, next_states)
q_value = values["q_value"]
next_q_value = next_values["q_value"] * next_episode_end[
"next_episode_end"]
next_q_value.stop_gradient = True
next_value = layers.reduce_max(next_q_value, dim=-1)
assert q_value.shape[1] == next_q_value.shape[1]
num_actions = q_value.shape[1]
value = comf.idx_select(input=q_value, idx=action)
critic_value = reward + self.discount_factor * next_value
td_error = critic_value - value
avg_cost = layers.mean(x=layers.square(td_error))
optimizer = fluid.optimizer.DecayedAdagradOptimizer(
learning_rate=self.hp["lr"])
optimizer.minimize(avg_cost)
self._increment_exploration_counter()
return dict(cost=avg_cost)
def _increment_exploration_counter(self):
if self.explore:
counter = self.exploration_counter()
exploration_counter_ = counter + 1
switch = layers.cast(
x=(exploration_counter_ > self.total_exploration_batches),
dtype="float32")
## if the counter already hits the limit, we do not change the counter
layers.assign(
switch * counter + (1 - switch) * exploration_counter_,
counter)
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
class LastExpError(Exception):
"""
Raised when the last element or an element with non-zero game status is
sampled.
Attributes:
message(string): error message
"""
def __init__(self, idx, status):
self.message = 'The element at {}'.format(idx)
if status:
self.message += ' has game status: {}'.format(status)
else:
self.message += ' is the last experience of a game.'
def check_last_exp_error(is_last_exp, idx, game_status):
if is_last_exp:
raise LastExpError(idx, game_status)
def check_type_error(type1, type2):
if type1.__name__ != type2.__name__:
raise TypeError('{} expected, but {} given.'.format(
type1.__name__, type2.__name__))
def check_eq(v1, v2):
if v1 != v2:
raise ValueError('{} == {} does not hold'.format(v1, v2))
def check_neq(v1, v2):
if v1 == v2:
raise ValueError('{} != {} does not hold'.format(v1, v2))
def check_gt(v1, v2):
if v1 <= v2:
raise ValueError('{} > {} does not hold'.format(v1, v2))
def check_geq(v1, v2):
if v1 < v2:
raise ValueError('{} >= {} does not hold'.format(v1, v2))
def check_lt(v1, v2):
if v1 >= v2:
raise ValueError('{} < {} does not hold'.format(v1, v2))
def check_leq(v1, v2):
if v1 > v2:
raise ValueError('{} <= {} does not hold'.format(v1, v2))
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import copy
import random
from parl.common.error_handling import *
class Experience(object):
def __init__(self, sensor_inputs, states, actions, game_status):
check_type_error(list, type(sensor_inputs))
self.sensor_inputs = sensor_inputs # (observation, reward)
self.states = states # other states
self.actions = actions # actions taken
self.game_status = game_status # game status, e.g., max_steps or
# episode end reached
self.next_exp = None # copy of the next Experience
def set_next_exp(self, next_exp):
self.next_exp = copy.deepcopy(next_exp)
#TODO: write copy function
class Sample(object):
"""
A Sample represents one or a sequence of Experiences
"""
def __init__(self, i, n):
self.i = i # starting index of the first experience in the sample
self.n = n # length of the sequence
def __repr__(self):
return str(self.__class__) + ": " + str(self.__dict__)
class ReplayBuffer(object):
def __init__(self, capacity, exp_type=Experience):
"""
Create Replay buffer.
Args:
exp_type(object): Experience class used in the buffer.
capacity(int): Max number of experience to store in the buffer. When
the buffer overflows the old memories are dropped.
"""
check_gt(capacity, 1)
self.buffer = [] # a circular queue to store experiences
self.capacity = capacity # capacity of the buffer
self.last = -1 # the index of the last element in the buffer
self.exp_type = exp_type # Experience class used in the buffer
def __len__(self):
return len(self.buffer)
def buffer_end(self, i):
return i == self.last
def next_idx(self, i):
if self.buffer_end(i):
return -1
else:
return (i + 1) % self.capacity
def add(self, exp):
"""
Store one experience into the buffer.
Args:
exp(self.exp_type): the experience to store in the buffer.
"""
check_type_error(self.exp_type, type(exp))
# the next_exp field should be None at this point
check_eq(exp.next_exp, None)
if len(self.buffer) < self.capacity:
self.buffer.append(None)
self.last = (self.last + 1) % self.capacity
self.buffer[self.last] = copy.deepcopy(exp)
def sample(self, num_samples):
"""
Generate a batch of Samples. Each Sample represents a sequence of
Experiences (length>=1). And a sequence must not cross the boundary
between two games.
Args:
num_samples(int): Number of samples to generate.
Returns: A generator of Samples
"""
if len(self.buffer) <= 1:
yield []
for _ in xrange(num_samples):
while True:
idx = random.randint(0, len(self.buffer) - 1)
if not self.buffer_end(
idx) and not self.buffer[idx].game_status:
break
yield Sample(idx, 1)
def get_experiences(self, sample):
"""
Get Experiences from a Sample
Args:
sample(Sample): a Sample representing a sequence of Experiences
Return(list): a list of Experiences
"""
exps = []
p = sample.i
for _ in xrange(sample.n):
check_last_exp_error(
self.buffer_end(p) or self.buffer[p].game_status, p,
self.buffer[p].game_status)
# make a copy of the buffer element as e may be modified somewhere
e = copy.deepcopy(self.buffer[p])
p = self.next_idx(p)
e.set_next_exp(self.buffer[p])
exps.append(e)
return exps
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import numpy as np
import unittest
from parl.common.error_handling import LastExpError
from parl.common.replay_buffer import Experience, Sample, ReplayBuffer
class ExperienceForTest(Experience):
def __init__(self, obs, reward, actions, new_field, status):
super(ExperienceForTest, self).__init__([obs, reward], [], actions,
status)
self.new_field = new_field
class TestReplayBuffer(unittest.TestCase):
def test_single_instance_replay_buffer(self):
capacity = 30
episode_len = 4
buf = ReplayBuffer(capacity, ExperienceForTest)
total = 0
expect_total = 0
for i in xrange(10 * capacity):
e = ExperienceForTest(
obs=np.zeros(10),
reward=i * 0.5,
actions=i,
new_field=np.ones(20),
status=(i + 1) % episode_len == 0)
buf.add(e)
# check the circular queue in the buffer
self.assertTrue(len(buf) == min(i + 1, capacity))
if (len(buf) < 2): # need at least two elements
continue
# should raise error when trying to pick up the last element
with self.assertRaises(LastExpError):
t = Sample(i % capacity, 1)
buf.get_experiences(t)
expect_total += len(buf)
# neither last element nor episode end should be picked up
for s in buf.sample(len(buf)):
try:
exps = buf.get_experiences(s)
total += 1
except LastExpError as err:
self.fail('test_single_instance_replay_buffer raised '
'LastExpError: ' + err.message)
# check the total number of elements added into the buffer
self.assertTrue(total == expect_total)
# detect incompatible Experience type
with self.assertRaises(TypeError):
e = Experience([np.zeros(10), i * 0.5], [], i, 0)
buf.add(e)
def test_deep_copy(self):
capacity = 5
buf = ReplayBuffer(capacity, Experience)
e0 = Experience(
sensor_inputs=[np.zeros(10), 0],
states=[],
actions=0,
game_status=0)
e1 = Experience([np.ones(10) * 2, 1], [], 0, 1)
buf.add(e0)
e0.sensor_inputs[0] += 1
buf.add(e0)
buf.add(e1)
s = Sample(0, 2)
exps = buf.get_experiences(s)
self.assertEqual(np.sum(exps[0].sensor_inputs[0] == 0), 10)
self.assertEqual(np.sum(exps[1].sensor_inputs[0] == 1), 10)
self.assertEqual(np.sum(exps[1].next_exp.sensor_inputs[0] == 2), 10)
exps[0].next_exp.sensor_inputs[0] += 3
self.assertEqual(np.sum(exps[1].sensor_inputs[0] == 1), 10)
exps[1].sensor_inputs[0] += 4
exps = buf.get_experiences(s)
self.assertEqual(np.sum(exps[0].next_exp.sensor_inputs[0] == 1), 10)
if __name__ == '__main__':
unittest.main()
......@@ -11,6 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from parl.framework.model_base import *
from parl.framework.algorithm_base import *
from parl.framework.agent_base import *
......@@ -124,23 +124,39 @@ class Model(Network):
In conclusion, Model is responsible for forward and
Algorithm is responsible for backward.
Model can also be used to construct target model, which has the same structure as initial model.
Model can also use deepcopy way to construct target model, which has the same structure as initial model.
Note that only the model definition is copied here. To copy the parameters from the current model
to the target model, you must explicitly use sync_params_to function after the program is initialized.
Here is an example:
```python
class Actor(Model):
__init__(self, obs_dim, act_dim):
self.obs_dim = obs_dim
self.act_dim = act_dim
self.fc1 = layers.fc(size=128, act='relu')
self.fc2 = layers.fc(size=64, act='relu')
actor = Actor(obs_dim=12, act_dim=2)
target_actor = copy.deepcopy(actor)
import parl.layers as layers
import parl.Model as Model
class MLPModel(Model):
def __init__(self):
self.fc = layers.fc(size=64)
def policy(self, obs):
out = self.fc(obs)
return out
model = MLPModel()
target_model = deepcopy(model) # automatically create new unique parameters names for target_model.fc
# build program
x = layers.data(name='x', shape=[100], dtype="float32")
y1 = model.policy(x)
y2 = target_model.policy(x)
...
# Need initialize program before calling sync_params_to
fluid_executor.run(fluid.default_startup_program())
...
# synchronize parameters
model.sync_params_to(target_model, gpu_id=gpu_id)
```
Note that it's the model structure that is copied from initial actor,
parameters in initial model havn't been copied to target model.
To copy parameters, you must explicitly use sync_params_to function after the program is initialized.
"""
__metaclass__ = ABCMeta
......
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import parl.layers as layers
from abc import ABCMeta, abstractmethod
from paddle.fluid.framework import Variable
from parl.layers import common_functions as comf
from paddle.fluid.framework import convert_np_dtype_to_dtype_
class PolicyDistribution(object):
__metaclass__ = ABCMeta
def __init__(self, dist):
"""
self.dist represents the quantities that characterize the distribution.
For example, for a Normal distribution, this can be a tuple of (mean, std).
The actual form of self.dist is defined by the user.
"""
self.dist = dist
@abstractmethod
def __call__(self):
"""
Implement __call__ to sample an instance.
"""
pass
@property
@abstractmethod
def dim(self):
"""
For discrete policies, this function returns the number of actions.
For continuous policies, this function returns the action vector length.
For sequential policies (e.g., sentences), this function returns the number
of choices at each step.
"""
pass
def add_uniform_exploration(self, rate):
"""
Given a uniform exploration rate, this function modifies the distribution.
The rate could be a floating number of a Variable.
"""
return NotImplementedError()
def loglikelihood(self, action):
"""
Given an action, this function returns the log likelihood of this action under
the current distribution.
"""
raise NotImplementedError()
class CategoricalDistribution(PolicyDistribution):
def __init__(self, dist):
super(CategoricalDistribution, self).__init__(dist)
assert isinstance(dist, Variable)
def __call__(self):
return layers.sampling_id(self.dist)
@property
def dim(self):
assert len(self.dist.shape) == 2
return self.dist.shape[1]
def add_uniform_exploration(self, rate):
if not (isinstance(rate, float) and rate == 0):
self.dist = self.dist * (1 - rate) + \
1 / float(self.dim) * rate
def loglikelihood(self, action):
assert isinstance(action, Variable)
assert action.dtype == convert_np_dtype_to_dtype_("int") \
or action.dtype == convert_np_dtype_to_dtype_("int64")
return 0 - layers.cross_entropy(input=self.dist, label=action)
class Deterministic(PolicyDistribution):
def __init__(self, dist):
super(Deterministic, self).__init__(dist)
## For deterministic action, we only support continuous ones
assert isinstance(dist, Variable)
assert dist.dtype == convert_np_dtype_to_dtype_("float32") \
or dist.dtype == convert_np_dtype_to_dtype_("float64")
@property
def dim(self):
assert len(self.dist.shape) == 2
return self.dist.shape[1]
def __call__(self):
return self.dist
def q_categorical_distribution(q_value):
"""
Generate a PolicyDistribution object given a Q value.
We construct a one-hot distribution according to the Q value.
"""
assert len(q_value.shape) == 2, "[batch_size, num_actions]"
max_id = comf.argmax_layer(q_value)
prob = layers.cast(
x=layers.one_hot(input=max_id, depth=q_value.shape[-1]),
dtype="float32")
return CategoricalDistribution(prob)
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import paddle.fluid as fluid
import parl.layers as layers
from parl.framework.algorithm_base import Algorithm
from parl.framework.base import Model
from parl.layers import common_functions as comf
from parl.model_zoo.simple_models import SimpleModelDeterministic
import numpy as np
from copy import deepcopy
import unittest
class TestAlgorithm(Algorithm):
def __init__(self, model):
super(TestAlgorithm, self).__init__(
model, hyperparas=dict(), gpu_id=-1)
def predict(self, inputs, states):
return self._rl_predict(self.model, inputs, states)
class TestAlgorithmParas(unittest.TestCase):
def test_sync_paras_in_one_program(self):
"""
Test case for copying parameters
"""
alg1 = TestAlgorithm(
model=SimpleModelDeterministic(
dims=10, mlp_layer_confs=[dict(size=10)]))
alg2 = deepcopy(alg1)
batch_size = 10
sensor = np.random.uniform(
0, 1, [batch_size, alg1.model.dims]).astype("float32")
program = fluid.Program()
startup_program = fluid.Program()
with fluid.program_guard(program, startup_program):
x = layers.data(name='x', shape=[alg1.model.dims], dtype="float32")
try:
# too eary to sync before the layers are created
alg1.model.sync_paras_to(alg2.model, alg2.gpu_id)
self.assertTrue(False) # you shouldn't be here
except:
pass
## first let the program generates the actual variables by using the
## layer functions (before this step the layers haven't been instantiated yet!)
## the call of predict() function already covers all the layers
y0, _ = alg1.predict(inputs=dict(sensor=x), states=dict())
y1, _ = alg2.predict(inputs=dict(sensor=x), states=dict())
######################
exe = fluid.Executor(fluid.CPUPlace())
exe.run(startup_program)
outputs = exe.run(
program,
feed={'x': sensor},
## y and y1 are two dictionaries
fetch_list=y0.values() + y1.values())
self.assertNotEqual(
np.sum(outputs[0].flatten()), np.sum(outputs[1].flatten()))
## do the copying
alg1.model.sync_paras_to(alg2.model, alg2.gpu_id)
outputs = exe.run(
program,
feed={'x': sensor},
## y and y1 are two dictionaries
fetch_list=y0.values() + y1.values())
self.assertEqual(
np.sum(outputs[0].flatten()), np.sum(outputs[1].flatten()))
def test_sync_paras_between_programs(self):
"""
Test case for copying parameters between two different programs
"""
alg1 = TestAlgorithm(
model=SimpleModelDeterministic(
dims=10, mlp_layer_confs=[dict(size=10)]))
alg2 = deepcopy(alg1)
batch_size = 10
sensor = np.random.uniform(
0, 1, [batch_size, alg1.model.dims]).astype("float32")
startup_program = fluid.Program()
program1 = fluid.Program()
program2 = fluid.Program()
with fluid.program_guard(program1, startup_program):
x1 = layers.data(
name='x', shape=[alg1.model.dims], dtype="float32")
y1, _ = alg1.predict(inputs=dict(sensor=x1), states=dict())
with fluid.program_guard(program2, startup_program):
x2 = layers.data(
name='x', shape=[alg1.model.dims], dtype="float32")
y2, _ = alg2.predict(inputs=dict(sensor=x2), states=dict())
exe = fluid.Executor(fluid.CPUPlace())
exe.run(startup_program)
alg1.model.sync_paras_to(alg2.model, alg2.gpu_id)
outputs1 = exe.run(
program1, feed={'x': sensor}, fetch_list=y1.values())
outputs2 = exe.run(
program2, feed={'x': sensor}, fetch_list=y2.values())
self.assertEqual(
np.sum(outputs1[0].flatten()), np.sum(outputs2[0].flatten()))
if __name__ == "__main__":
unittest.main()
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import paddle.fluid as fluid
import parl.layers as layers
from parl.framework.base import Model
from parl.framework.computation_task import ComputationTask
import parl.framework.policy_distribution as pd
from parl.layers import common_functions as comf
from parl.algorithm_zoo.simple_algorithms import SimpleAC, SimpleQ
from parl.model_zoo.simple_models import SimpleModelDeterministic, SimpleModelAC, SimpleModelQ
from test_algorithm import TestAlgorithm
import numpy as np
from copy import deepcopy
import unittest
import math
class TestModelCNN(Model):
def __init__(self, width, height, num_actions):
super(TestModelCNN, self).__init__()
self.conv = layers.conv2d(
num_filters=1, filter_size=3, bias_attr=False)
self.mlp = comf.MLP([
dict(size=32, act="relu", bias_attr=False),
dict(size=16, act="relu", bias_attr=False),
dict(size=num_actions, act="softmax", bias_attr=False)
])
self.height = height
self.width = width
def get_input_specs(self):
## image format CHW
return [("image", dict(shape=[1, self.height, self.width]))]
def get_action_specs(self):
return [("action", dict(shape=[1], dtype="int64"))]
def policy(self, inputs, states):
conv = self.conv(input=inputs.values()[0])
dist = pd.CategoricalDistribution(self.mlp(conv))
return dict(action=dist), states
def value(self, inputs, states):
v_value = layers.fill_constant(
shape=[inputs.values()[0].shape[0], 1], dtype="float32", value=0)
return dict(v_value=v_value)
class TestComputationTask(unittest.TestCase):
def test_predict(self):
"""
Test case for AC-learning and Q-learning predictions
"""
num_actions = 4
def test(input, ct, max):
action_counter = [0] * num_actions
total = 2000
for i in range(total):
actions, states = ct.predict(inputs=input)
assert not states, "states should be empty"
## actions["action"] is a batch of actions
for a in actions["action"]:
action_counter[a] += 1
if max:
### if max, the first action will always be chosen
for i in range(num_actions):
prob = action_counter[i] / float(sum(action_counter))
self.assertAlmostEqual(
prob, 1.0 if i == 0 else 0.0, places=1)
else:
### the actions should be uniform
for i in range(num_actions):
prob = action_counter[i] / float(sum(action_counter))
self.assertAlmostEqual(prob, 1.0 / num_actions, places=1)
dims = 100
ac = SimpleAC(
model=SimpleModelAC(
dims=dims,
num_actions=num_actions,
mlp_layer_confs=[
dict(size=32, act="relu", bias_attr=False),
dict(size=16, act="relu", bias_attr=False),
dict(size=num_actions, act="softmax", bias_attr=False)
]))
ac_cnn = SimpleAC(
model=TestModelCNN(width=84, height=84, num_actions=num_actions))
q = SimpleQ(
model=SimpleModelQ(
dims=dims,
num_actions=num_actions,
mlp_layer_confs=[
dict(size=32, act="relu", bias_attr=False),
dict(size=16, act="relu", bias_attr=False),
dict(size=num_actions, bias_attr=False)
]))
batch_size = 10
height, width = 84, 84
sensor = np.zeros([batch_size, dims]).astype("float32")
image = np.zeros([batch_size, 1, height, width]).astype("float32")
ct0 = ComputationTask(algorithm=ac)
ct1 = ComputationTask(algorithm=q)
ct2 = ComputationTask(algorithm=ac_cnn)
test(dict(sensor=sensor), ct0, max=False)
test(dict(sensor=sensor), ct1, max=True)
test(dict(image=image), ct2, max=False)
def test_ct_para_sharing(self):
"""
Test case for two CTs sharing parameters
"""
alg = TestAlgorithm(
model=SimpleModelDeterministic(
dims=10, mlp_layer_confs=[dict(size=10)]))
ct0 = ComputationTask(algorithm=alg)
ct1 = ComputationTask(algorithm=alg)
batch_size = 10
sensor = np.random.uniform(
0, 1, [batch_size, alg.model.dims]).astype("float32")
outputs0, _ = ct0.predict(inputs=dict(sensor=sensor))
outputs1, _ = ct1.predict(inputs=dict(sensor=sensor))
self.assertEqual(
np.sum(outputs0["continuous_action"].flatten()),
np.sum(outputs1["continuous_action"].flatten()))
def test_ct_para_sync(self):
"""
Test case for two CTs copying parameters
"""
alg = TestAlgorithm(
model=SimpleModelDeterministic(
dims=10, mlp_layer_confs=[dict(size=10)]))
ct0 = ComputationTask(algorithm=alg)
ct1 = ComputationTask(algorithm=deepcopy(alg))
batch_size = 10
sensor = np.random.uniform(
0, 1, [batch_size, ct0.alg.model.dims]).astype("float32")
outputs0, _ = ct0.predict(inputs=dict(sensor=sensor))
outputs1, _ = ct1.predict(inputs=dict(sensor=sensor))
self.assertNotEqual(
np.sum(outputs0["continuous_action"].flatten()),
np.sum(outputs1["continuous_action"].flatten()))
ct0.alg.model.sync_paras_to(ct1.alg.model, ct1.alg.gpu_id)
outputs0, _ = ct0.predict(inputs=dict(sensor=sensor))
outputs1, _ = ct1.predict(inputs=dict(sensor=sensor))
self.assertEqual(
np.sum(outputs0["continuous_action"].flatten()),
np.sum(outputs1["continuous_action"].flatten()))
def test_ct_learning(self):
"""
Test training
"""
num_actions = 2
dims = 100
batch_size = 8
sensor = np.ones([batch_size, dims
]).astype("float32") / dims # normalize
next_sensor = np.zeros([batch_size, dims]).astype("float32")
for on_policy in [True, False]:
if on_policy:
alg = SimpleAC(
model=SimpleModelAC(
dims=dims,
num_actions=num_actions,
mlp_layer_confs=[
dict(size=64, act="relu", bias_attr=False),
dict(size=32, act="relu", bias_attr=False),
dict(size=num_actions, act="softmax")
]),
hyperparas=dict(lr=1e-1))
ct = ComputationTask(algorithm=alg)
else:
alg = SimpleQ(
model=SimpleModelQ(
dims=dims,
num_actions=num_actions,
mlp_layer_confs=[
dict(size=64, act="relu", bias_attr=False),
dict(size=32, act="relu", bias_attr=False),
dict(size=num_actions)
]),
update_ref_interval=100,
hyperparas=dict(lr=1e-1))
ct = ComputationTask(algorithm=alg)
for i in range(1000):
if on_policy:
outputs, _ = ct.predict(inputs=dict(sensor=sensor))
actions = outputs["action"]
actions = np.expand_dims(actions, 1)
else:
## randomly assemble a batch
actions = np.random.choice([0, 1],
size=(batch_size, 1),
p=[0.5, 0.5]).astype("int")
rewards = (1 - actions).astype("float32")
cost = ct.learn(
inputs=dict(sensor=sensor),
next_inputs=dict(next_sensor=next_sensor),
next_episode_end=dict(
next_episode_end=np.ones((batch_size,
1)).astype("float32")),
actions=dict(action=actions),
rewards=dict(reward=rewards))
print("final cost: %f" % cost["cost"])
### the policy should bias towards the first action
outputs, _ = ct.predict(inputs=dict(sensor=sensor))
for a in outputs["action"]:
self.assertEqual(a, 0)
if __name__ == "__main__":
unittest.main()
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import parl.layers as layers
from paddle.fluid.framework import Variable
class Feedforward(layers.Network):
"""
A feedforward network can contain a sequence of components,
where each component can be either a LayerFunc or a Feedforward.
The purpose of this class is to create a collection of LayerFuncs that can
be easily copied from one Network to another.
Examples of feedforward networks can be MLP and CNN.
"""
def __init__(self, components):
for i in range(len(components)):
setattr(self, "ff%06d" % i, components[i])
def __call__(self, input):
attrs = {
attr: getattr(self, attr)
for attr in dir(self) if "ff" in attr
}
for k in sorted(attrs.keys()):
input = attrs[k](input)
return input
class MLP(Feedforward):
def __init__(self, multi_fc_layers):
super(MLP, self).__init__([layers.fc(**c) for c in multi_fc_layers])
class CNN(Feedforward):
"""
Image CNN
"""
def __init__(self, multi_conv_layers):
super(CNN,
self).__init__([layers.conv2d(**c) for c in multi_conv_layers])
def argmax_layer(input):
"""
Get the id of the max val of an input vector
"""
_, index = layers.topk(input, 1)
return index
def inner_prod(x, y):
"""
Get the inner product of two vectors
"""
return layers.reduce_sum(layers.elementwise_mul(x, y), dim=-1)
def sum_to_one_norm_layer(input):
eps = 1e-9 # avoid dividing 0
sum = layers.reduce_sum(input + eps, dim=-1)
return layers.elementwise_div(x=input, y=sum, axis=0)
def idx_select(input, idx):
"""
Given an input vector (Variable) and an idx (int or Variable),
select the entry of the vector according to the idx.
"""
assert isinstance(input, Variable)
assert len(input.shape) == 2
batch_size, num_entries = input.shape
if isinstance(idx, int):
## if idx is a constant int, then we create a variable
idx = layers.fill_constant(
shape=[batch_size, 1], dtype="int64", value=idx)
else:
assert isinstance(idx, Variable)
assert input.shape
select = layers.cast(
x=layers.one_hot(input=idx, depth=num_entries), dtype="float32")
return inner_prod(select, input)
......@@ -12,7 +12,22 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Wrappers for fluid.layers so that the layers can share parameters conveniently.
Wrappers for fluid.layers. It helps to easily share parameters between layers.
Here is an example:
```python
import parl.layers as layers
class MLPModel(Model):
def __init__(self):
self.fc = layers.fc(size=64) # automatically create parameters names "fc_0.w" and "fc_0.b"
def policy1(self, obs):
out = self.fc(obs) # Really create parameters with parameters names "fc_0.w" and "fc_0.b"
def policy2(self, obs):
out = self.fc(obs) # Reusing parameters
```
"""
import inspect
......
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import parl.layers as layers
from parl.framework.base import Model
import parl.framework.policy_distribution as pd
from parl.layers import common_functions as comf
class SimpleModelDeterministic(Model):
def __init__(self, dims, mlp_layer_confs):
super(SimpleModelDeterministic, self).__init__()
self.dims = dims
self.mlp = comf.MLP(mlp_layer_confs)
def get_input_specs(self):
return [("sensor", dict(shape=[self.dims]))]
def get_action_specs(self):
return [("continuous_action", dict(shape=[self.dims]))]
def policy(self, inputs, states):
hidden = self.mlp(inputs.values()[0])
return dict(continuous_action=pd.Deterministic(hidden)), states
class SimpleModelAC(Model):
def __init__(self, dims, num_actions, mlp_layer_confs):
super(SimpleModelAC, self).__init__()
self.dims = dims
assert mlp_layer_confs[-1]["act"] == "softmax"
self.mlp = comf.MLP(mlp_layer_confs[:-1])
self.policy_mlp = comf.MLP(mlp_layer_confs[-1:])
self.value_layer = layers.fc(size=1)
def get_input_specs(self):
return [("sensor", dict(shape=[self.dims]))]
def get_action_specs(self):
return [("action", dict(shape=[1], dtype="int64"))]
def _perceive(self, inputs, states):
return self.mlp(inputs.values()[0])
def policy(self, inputs, states):
dist = pd.CategoricalDistribution(
self.policy_mlp(self._perceive(inputs, states)))
return dict(action=dist), states
def value(self, inputs, states):
return dict(v_value=self.value_layer(self._perceive(inputs, states)))
class SimpleModelQ(Model):
def __init__(self, dims, num_actions, mlp_layer_confs):
super(SimpleModelQ, self).__init__()
self.dims = dims
self.num_actions = num_actions
assert "act" not in mlp_layer_confs[-1], "should be linear act"
self.mlp = comf.MLP(mlp_layer_confs)
def get_input_specs(self):
return [("sensor", dict(shape=[self.dims]))]
def get_action_specs(self):
return [("action", dict(shape=[1], dtype="int64"))]
def policy(self, inputs, states):
values = self.value(inputs, states)
q_value = values["q_value"]
return dict(action=pd.q_categorical_distribution(q_value)), states
def value(self, inputs, states):
return dict(q_value=self.mlp(inputs.values()[0]))
......@@ -30,7 +30,7 @@ def _find_packages(prefix=''):
setup(
name='parl',
version=0.1,
version=1.0,
packages=_find_packages(),
package_data={'': ['*.so']},
install_requires=[
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册