未验证 提交 6ab0a6a8 编写于 作者: W WangXi 提交者: GitHub

[hybrid parallel] pipeline support adamw and LRScheduler (#34402)

上级 ede001f9
...@@ -1664,6 +1664,16 @@ class Executor(object): ...@@ -1664,6 +1664,16 @@ class Executor(object):
print_period, fetch_handler, print_period, fetch_handler,
use_program_cache) use_program_cache)
from paddle.optimizer.lr import LRScheduler
if hasattr(program, 'lr_sheduler'):
lr_sheduler = program.lr_sheduler
assert isinstance(lr_sheduler, LRScheduler), "must be LRScheduler"
lr_value = lr_sheduler()
lr_var = program.global_block().vars[lr_sheduler._var_name]
data = np.array([lr_value]).astype(convert_dtype(lr_var.dtype))
tensor = core.get_variable_tensor(scope, lr_sheduler._var_name)
tensor.set(data, self.place)
self._default_executor.run_from_dataset(trainer_instance) self._default_executor.run_from_dataset(trainer_instance)
if not use_program_cache: if not use_program_cache:
......
...@@ -4634,6 +4634,9 @@ class PipelineOptimizer(object): ...@@ -4634,6 +4634,9 @@ class PipelineOptimizer(object):
op.type == 'elementwise_div'): op.type == 'elementwise_div'):
device = f"{self._device}:all" device = f"{self._device}:all"
op._set_attr(self._op_device_key, device) op._set_attr(self._op_device_key, device)
elif self._is_weight_decay_op(op) and op.type == 'scale':
# set AdamW decay_coeff to device:all
op._set_attr(self._op_device_key, f"{self._device}:all")
elif op.type == "alloc_float_status": elif op.type == "alloc_float_status":
op._set_attr(self._op_device_key, f"{self._device}:all") op._set_attr(self._op_device_key, f"{self._device}:all")
else: else:
...@@ -5267,6 +5270,11 @@ class PipelineOptimizer(object): ...@@ -5267,6 +5270,11 @@ class PipelineOptimizer(object):
return op.desc.has_attr("op_namescope") \ return op.desc.has_attr("op_namescope") \
and op.desc.attr("op_namescope").startswith("/regularization") and op.desc.attr("op_namescope").startswith("/regularization")
def _is_weight_decay_op(self, op):
# in AdamW namescope is /optimizer_*/weight decay/
return op.desc.has_attr("op_namescope") \
and 'weight decay' in op.desc.attr("op_namescope")
def _get_input_output_info(self, block): def _get_input_output_info(self, block):
''' '''
Get info of op input and output. Get info of op input and output.
......
...@@ -116,10 +116,10 @@ class TestDistMnist2x2(TestDistRunnerBase): ...@@ -116,10 +116,10 @@ class TestDistMnist2x2(TestDistRunnerBase):
steps_per_pass = 10 steps_per_pass = 10
bd = [steps_per_pass * p for p in passes] bd = [steps_per_pass * p for p in passes]
lr = [base_lr * (0.1**i) for i in range(len(bd) + 1)] lr = [base_lr * (0.1**i) for i in range(len(bd) + 1)]
lr_val = fluid.layers.piecewise_decay(boundaries=bd, values=lr) lr_val = paddle.optimizer.lr.PiecewiseDecay(boundaries=bd, values=lr)
opt = fluid.optimizer.Momentum(
opt = paddle.optimizer.AdamW(
learning_rate=lr_val, learning_rate=lr_val,
momentum=0.9,
grad_clip=fluid.clip.GradientClipByGlobalNorm(clip_norm=1.0)) grad_clip=fluid.clip.GradientClipByGlobalNorm(clip_norm=1.0))
acc_steps = 2 # accumulated steps for pipeline acc_steps = 2 # accumulated steps for pipeline
......
...@@ -96,6 +96,15 @@ class TestDistRunnerBase(object): ...@@ -96,6 +96,15 @@ class TestDistRunnerBase(object):
current_endpoint=current_endpoint) current_endpoint=current_endpoint)
return t return t
@staticmethod
def get_lr_scheduler(program):
lr_sheduler = None
if hasattr(program, 'lr_sheduler'):
from paddle.optimizer.lr import LRScheduler
lr_sheduler = program.lr_sheduler
assert isinstance(lr_sheduler, LRScheduler), "must be LRScheduler"
return lr_sheduler
def run_pserver(self, args): def run_pserver(self, args):
self.lr = args.lr self.lr = args.lr
self.get_model(batch_size=args.batch_size) self.get_model(batch_size=args.batch_size)
...@@ -139,11 +148,17 @@ class TestDistRunnerBase(object): ...@@ -139,11 +148,17 @@ class TestDistRunnerBase(object):
data_loader.start() data_loader.start()
print_to_err(type(self).__name__, "begin to train on trainer") print_to_err(type(self).__name__, "begin to train on trainer")
out_losses = [] out_losses = []
main_program = fluid.default_main_program()
lr_sheduler = self.get_lr_scheduler(main_program)
for i in six.moves.xrange(RUN_STEP): for i in six.moves.xrange(RUN_STEP):
loss = exe.run(fluid.default_main_program(), fetch_list=[avg_cost]) loss = exe.run(main_program, fetch_list=[avg_cost])
loss = loss[0] if loss else None loss = loss[0] if loss else None
out_losses.append(loss) out_losses.append(loss)
print_to_err(type(self).__name__, "run step %d finished" % i) print_to_err(type(self).__name__, "run step %d finished" % i)
if lr_sheduler is not None:
lr_sheduler.step()
data_loader.reset() data_loader.reset()
print_to_err(type(self).__name__, "trainer run finished") print_to_err(type(self).__name__, "trainer run finished")
...@@ -494,6 +509,7 @@ class TestDistRunnerBase(object): ...@@ -494,6 +509,7 @@ class TestDistRunnerBase(object):
else: else:
return origin_batch return origin_batch
lr_scheduler = self.get_lr_scheduler(trainer_prog)
print_to_err(type(self).__name__, "begin to train on trainer") print_to_err(type(self).__name__, "begin to train on trainer")
out_losses = [] out_losses = []
for i in six.moves.xrange(RUN_STEP): for i in six.moves.xrange(RUN_STEP):
...@@ -502,6 +518,9 @@ class TestDistRunnerBase(object): ...@@ -502,6 +518,9 @@ class TestDistRunnerBase(object):
feed=feeder.feed(get_data())) feed=feeder.feed(get_data()))
out_losses.append(loss[0]) out_losses.append(loss[0])
print_to_err(type(self).__name__, "run step %d finished" % i) print_to_err(type(self).__name__, "run step %d finished" % i)
if lr_scheduler is not None:
lr_scheduler.step()
print_to_err(type(self).__name__, "trainer run finished") print_to_err(type(self).__name__, "trainer run finished")
print_to_out(out_losses) print_to_out(out_losses)
......
...@@ -160,6 +160,7 @@ class AdamW(Adam): ...@@ -160,6 +160,7 @@ class AdamW(Adam):
self._apply_decay_param_fun = apply_decay_param_fun self._apply_decay_param_fun = apply_decay_param_fun
self._coeff = coeff self._coeff = coeff
self._lr_to_coeff = dict() self._lr_to_coeff = dict()
super(AdamW, self).__init__( super(AdamW, self).__init__(
learning_rate=learning_rate, learning_rate=learning_rate,
parameters=parameters, parameters=parameters,
...@@ -211,6 +212,8 @@ class AdamW(Adam): ...@@ -211,6 +212,8 @@ class AdamW(Adam):
# we do this in _create_optimization_pass # we do this in _create_optimization_pass
decay_coeff = self._lr_to_coeff.get(learning_rate, None) decay_coeff = self._lr_to_coeff.get(learning_rate, None)
if decay_coeff is None: if decay_coeff is None:
# NOTE(wangxi): for pipeline to set device:all
with paddle.static.device_guard(None):
decay_coeff = 1.0 - learning_rate * self._coeff decay_coeff = 1.0 - learning_rate * self._coeff
self._lr_to_coeff[learning_rate] = decay_coeff self._lr_to_coeff[learning_rate] = decay_coeff
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册