diff --git a/imperative/python/megengine/distributed/helper.py b/imperative/python/megengine/distributed/helper.py index 3f1637ed4a27768e3fa6d4f4f7b00f5c0d580aa1..8d84c5c116e0d1ae2701fa53279e4c6d429b1678 100644 --- a/imperative/python/megengine/distributed/helper.py +++ b/imperative/python/megengine/distributed/helper.py @@ -17,13 +17,114 @@ import numpy as np from megengine.autodiff.grad_manager import GradManager, get_backwarding_grad_manager from megengine.device import get_default_device, get_device_count -from ..functional.param_pack import get_offsets, pack_allreduce_split +from ..core.ops.builtin import ParamPackConcat, ParamPackSplit +from ..core.tensor.core import apply from ..functional.utils import copy +from ..tensor import Tensor from ..utils.future import Future from .functional import all_reduce_sum, broadcast from .group import WORLD, Group, group_barrier, is_distributed +def param_pack_split(inp: Tensor, offsets: list, shapes: list): + r""" + Returns split tensor to tensor list as offsets and shapes described, + only used for ``parampack``. + + :param inp: input tensor. + :param offsets: offsets of outputs, length of `2 * n`, + while n is tensor nums you want to split, + format `[begin0, end0, begin1, end1]`. + :param shapes: tensor shapes of outputs. + :return: splitted tensors. + + Examples: + + .. testcode:: + + import numpy as np + from megengine import tensor + from megengine.distributed.helper import param_pack_split + + a = tensor(np.ones((10,), np.int32)) + b, c = param_pack_split(a, [0, 1, 1, 10], [(1,), (3, 3)]) + print(b.numpy()) + print(c.numpy()) + + Outputs: + + .. testoutput:: + + [1] + [[1 1 1] + [1 1 1] + [1 1 1]] + + """ + op = ParamPackSplit() + op.offsets = offsets + op.shapes = shapes + return apply(op, inp) + + +def param_pack_concat(inps: list, offsets: Tensor, offsets_val: list): + r""" + Returns concated tensor, only used for ``parampack``. + + :param inps: input tensors. + :param offsets: device value of offsets. + :param offsets_val: offsets of inputs, length of `2 * n`, + format `[begin0, end0, begin1, end1]`. + :return: concated tensor. + + Examples: + + .. testcode:: + + import numpy as np + from megengine import tensor + from megengine.distributed.helper import param_pack_concat + + a = tensor(np.ones((1,), np.int32)) + b = tensor(np.ones((3, 3), np.int32)) + offsets_val = [0, 1, 1, 10] + offsets = tensor(offsets_val, np.int32) + c = param_pack_concat([a, b], offsets, offsets_val) + print(c.numpy()) + + Outputs: + + .. testoutput:: + + [1 1 1 1 1 1 1 1 1 1] + + """ + op = ParamPackConcat() + op.offsets = offsets_val + return apply(op, *inps, offsets)[0] + + +def get_offsets(shapes): + offsets = [] + offset = 0 + for shape in shapes: + offsets.append(offset) + offset += int(np.prod(shape)) + offsets.append(offset) + return offsets + + +def pack_allreduce_split(pack_list, shapes, group, reduce_method): + offsets_val = get_offsets(shapes) + offsets = Tensor(offsets_val) + packed_grads = param_pack_concat(pack_list, offsets, offsets_val) + packed_grads = all_reduce_sum(packed_grads, group, group.comp_node) + if reduce_method == "mean": + packed_grads /= group.size + grads = param_pack_split(packed_grads, offsets_val, shapes) + return grads + + class TensorFuture(Future): def device(self): raise "Sorry, this tensor is not ready" diff --git a/imperative/python/megengine/functional/param_pack.py b/imperative/python/megengine/functional/param_pack.py deleted file mode 100644 index 0ad3a11bf4cf36261c70eec93d2990bb6bc4a78a..0000000000000000000000000000000000000000 --- a/imperative/python/megengine/functional/param_pack.py +++ /dev/null @@ -1,34 +0,0 @@ -# -*- coding: utf-8 -*- -# MegEngine is Licensed under the Apache License, Version 2.0 (the "License") -# -# Copyright (c) 2014-2020 Megvii Inc. All rights reserved. -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT ARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -import numpy as np - -from ..tensor import Tensor -from .distributed import all_reduce_sum -from .tensor import param_pack_concat, param_pack_split - - -def get_offsets(shapes): - offsets = [] - offset = 0 - for shape in shapes: - offsets.append(offset) - offset += int(np.prod(shape)) - offsets.append(offset) - return offsets - - -def pack_allreduce_split(pack_list, shapes, group, reduce_method): - offsets_val = get_offsets(shapes) - offsets = Tensor(offsets_val) - packed_grads = param_pack_concat(pack_list, offsets, offsets_val) - packed_grads = all_reduce_sum(packed_grads, group, group.comp_node) - if reduce_method == "mean": - packed_grads /= group.size - grads = param_pack_split(packed_grads, offsets_val, shapes) - return grads diff --git a/imperative/python/megengine/functional/tensor.py b/imperative/python/megengine/functional/tensor.py index b2c8d1701ed245aeea8bd6c388dca27188c33f30..e7bd2c6ee4f9ee583464e82f9b47c1766d3815e2 100644 --- a/imperative/python/megengine/functional/tensor.py +++ b/imperative/python/megengine/functional/tensor.py @@ -46,8 +46,6 @@ __all__ = [ "linspace", "ones", "ones_like", - "param_pack_concat", - "param_pack_split", "reshape", "split", "squeeze", @@ -975,81 +973,3 @@ def arange( if np.dtype(dtype) == np.int32: return result.astype(dtype) return result - - -def param_pack_split(inp: Tensor, offsets: List, shapes: List) -> Tensor: - r""" - Returns split tensor to tensor list as offsets and shapes described, - only used for ``parampack``. - - :param inp: input tensor. - :param offsets: offsets of outputs, length of `2 * n`, - while n is tensor nums you want to split, - format `[begin0, end0, begin1, end1]`. - :param shapes: tensor shapes of outputs. - :return: splitted tensors. - - Examples: - - .. testcode:: - - import numpy as np - import megengine.functional as F - from megengine import tensor - - a = tensor(np.ones((10,), np.int32)) - b, c = F.param_pack_split(a, [0, 1, 1, 10], [(1,), (3, 3)]) - print(b.numpy()) - print(c.numpy()) - - Outputs: - - .. testoutput:: - - [1] - [[1 1 1] - [1 1 1] - [1 1 1]] - - """ - op = builtin.ParamPackSplit() - op.offsets = offsets - op.shapes = shapes - return apply(op, inp) - - -def param_pack_concat(inps: List, offsets: Tensor, offsets_val: List) -> Tensor: - r""" - Returns concated tensor, only used for ``parampack``. - - :param inps: input tensors. - :param offsets: device value of offsets. - :param offsets_val: offsets of inputs, length of `2 * n`, - format `[begin0, end0, begin1, end1]`. - :return: concated tensor. - - Examples: - - .. testcode:: - - import numpy as np - import megengine.functional as F - from megengine import tensor - - a = tensor(np.ones((1,), np.int32)) - b = tensor(np.ones((3, 3), np.int32)) - offsets_val = [0, 1, 1, 10] - offsets = tensor(offsets_val, np.int32) - c = F.param_pack_concat([a, b], offsets, offsets_val) - print(c.numpy()) - - Outputs: - - .. testoutput:: - - [1 1 1 1 1 1 1 1 1 1] - - """ - op = builtin.ParamPackConcat() - op.offsets = offsets_val - return apply(op, *inps, offsets)[0] diff --git a/imperative/python/test/unit/distributed/test_distributed.py b/imperative/python/test/unit/distributed/test_distributed.py index f81b9f42203c00f8932eaab14feba8098a935e9b..29eed7ef8ff87e9bd7242f683882c9762042b266 100644 --- a/imperative/python/test/unit/distributed/test_distributed.py +++ b/imperative/python/test/unit/distributed/test_distributed.py @@ -10,12 +10,17 @@ import multiprocessing as mp import platform import queue +import numpy as np import pytest import megengine as mge import megengine.distributed as dist from megengine.core.ops.builtin import CollectiveComm, ParamPackConcat, ParamPackSplit -from megengine.distributed.helper import get_device_count_by_fork +from megengine.distributed.helper import ( + get_device_count_by_fork, + param_pack_concat, + param_pack_split, +) def _assert_q_empty(q): @@ -195,3 +200,19 @@ def test_oprmm_hashable(): rhs = (CollectiveComm(), ParamPackConcat(), ParamPackSplit()) assert lhs == rhs assert hash(lhs) == hash(rhs) + + +def test_param_pack_split(): + a = mge.Tensor(np.ones((10,), np.int32)) + b, c = param_pack_split(a, [0, 1, 1, 10], [(1,), (3, 3)]) + assert np.allclose(b.numpy(), a.numpy()[1]) + assert np.allclose(c.numpy(), a.numpy()[1:].reshape(3, 3)) + + +def test_param_pack_concat(): + a = mge.Tensor(np.ones((1,), np.int32)) + b = mge.Tensor(np.ones((3, 3), np.int32)) + offsets_val = [0, 1, 1, 10] + offsets = mge.Tensor(offsets_val, np.int32) + c = param_pack_concat([a, b], offsets, offsets_val) + assert np.allclose(np.concatenate([a.numpy(), b.numpy().flatten()]), c.numpy()) diff --git a/imperative/python/test/unit/functional/test_tensor.py b/imperative/python/test/unit/functional/test_tensor.py index 0b02b78f1a59b2badcf9d749fe386610488e8f51..732eea4801dd501cc8c167b0509304cf82398f02 100644 --- a/imperative/python/test/unit/functional/test_tensor.py +++ b/imperative/python/test/unit/functional/test_tensor.py @@ -359,19 +359,3 @@ def test_copy_d2h(): def test_copy_d2d(): copy_test("gpu0", "gpu1") copy_test("gpu0:0", "gpu0:1") - - -def test_param_pack_split(): - a = tensor(np.ones((10,), np.int32)) - b, c = F.param_pack_split(a, [0, 1, 1, 10], [(1,), (3, 3)]) - assert np.allclose(b.numpy(), a.numpy()[1]) - assert np.allclose(c.numpy(), a.numpy()[1:].reshape(3, 3)) - - -def test_param_pack_concat(): - a = tensor(np.ones((1,), np.int32)) - b = tensor(np.ones((3, 3), np.int32)) - offsets_val = [0, 1, 1, 10] - offsets = tensor(offsets_val, np.int32) - c = F.param_pack_concat([a, b], offsets, offsets_val) - assert np.allclose(np.concatenate([a.numpy(), b.numpy().flatten()]), c.numpy())