from collections import OrderedDict, defaultdict
from .. import tensor
import numpy as np
from .. import _full_sync, tensor
from ..core._imperative_rt import CompNode
from ..core._imperative_rt.core2 import Tensor as RawTensor
from ..core._imperative_rt.core2 import (
......@@ -15,16 +17,11 @@ from ..device import get_default_device
from ..utils.dlpack import from_dlpack, to_dlpack
from .tracing import trace
from mge_xlalib.xla_extension import ArrayImpl
from ..xla.lib import xla_client as xc
except ImportError as e:
xla_client_compute_stream = None
......@@ -93,6 +90,39 @@ class xla_trace(trace):
def unset_env(self):
def convert_params_to_xla(self):
from ..device import coalesce_free_memory
from ..utils.module_utils import get_expand_structure
from ..tensor import Tensor
backend = self.xla_exec.backend
devices = backend.local_devices()
_, device_id, _ = CompNode(get_default_device()).physical_locator
device_index = (
0 if len(devices) == 0 else [d.id for d in devices].index(device_id)
device = devices[device_index]
for attr, _ in self.attr_to_key.items():
param = get_expand_structure(attr[0], attr[1])
for tensor, _ in self.opt_param_dict.items():
def as_xla_array(tensor, backend, device):
np_array = tensor.numpy()
if np_array.shape == ():
np_array = np_array[np.newaxis]
xla_array = backend.buffer_from_pyval(np_array, device)
for attr, _ in self.attr_to_key.items():
param = get_expand_structure(attr[0], attr[1])
as_xla_array(param, backend, device)
for tensor, _ in self.opt_param_dict.items():
as_xla_array(tensor, backend, device)
def compile(self):
from ..xla import build_xla
from ..traced_module.pytree import SUPPORTED_LEAF_TYPE, register_supported_type
......@@ -102,13 +132,6 @@ class xla_trace(trace):
from ..distributed import get_mm_server_addr, is_distributed
assert self.traced
if self.overall:
for attr, _ in self.attr_to_key.items():
param = get_expand_structure(attr[0], attr[1])
for tensor, _ in self.opt_param_dict.items():
self.xla_exec, self.inp_ids, self.out_ids = build_xla(
......@@ -116,6 +139,8 @@ class xla_trace(trace):
ip=get_mm_server_addr()[0] if is_distributed() else None,
port=get_mm_server_addr()[1] + 1 if is_distributed() else None,
if self.overall:
id2inpidx = defaultdict(list)
id2outidx = defaultdict(list)
for idx, id in enumerate(self.inp_ids):
......@@ -73,16 +73,9 @@ class InputsHandler:
if i._is_external_value():
if "gpu" in i.device.physical_name:
capsule = to_dlpack(i)
xla_array = self.from_dlpack(capsule)
r = self.handler(
self.local_devices, [self.input_indices[idx],], [i,]
capsule = to_dlpack(i)
xla_array = self.from_dlpack(capsule)
return rst
def __str__(self):
......@@ -3,27 +3,48 @@ import platform
import numpy as np
import pytest
import megengine.distributed as dist
import megengine.functional as F
import megengine.jit as jit
import megengine.functional.distributed as fdist
import megengine.tensor as tensor
from megengine import autodiff, is_cuda_available
from megengine.autodiff.grad_manager import GradManager
from meg_xlalib.xla_extension import ArrayImpl
from megengine.core._imperative_rt.core2 import (
from megengine.jit import xla_trace
from megengine.module import Conv2d
def test_external_flag_set():
@pytest.mark.skipif(int(platform.python_version_tuple()[1]) < 8, reason="need py38")
@pytest.mark.skipif(platform.system() != "Linux", reason="only support linux now")
@pytest.mark.skipif(not is_cuda_available(), reason="only support cuda now")
def test_external_tsf_set():
from mge_xlalib.xla_extension import ArrayImpl
def test_fun():
def test_func(inp):
return inp
assert is_external_convert()
inp = tensor(np.random.random((9, 9, 32, 32)))
mge_inp = test_func(inp)
xla_inp = test_func(inp)
assert xla_inp._is_external_value()
assert isinstance(xla_inp._external_obj(), ArrayImpl)
assert mge_inp.shape == xla_inp.shape
assert mge_inp.dtype == xla_inp.dtype
assert not xla_inp._is_external_value()
@pytest.mark.skipif(int(platform.python_version_tuple()[1]) < 8, reason="need py38")
@pytest.mark.skipif(platform.system() != "Linux", reason="only support linux now")
@pytest.mark.skipif(not is_cuda_available(), reason="only support cuda now")
def test_external_value():
m = Conv2d(9,9, 3,groups=9)
m = Conv2d(9, 9, 3, groups=9)
gm = GradManager()
......@@ -39,8 +60,44 @@ def test_external_value():
model.weight.grad = None
return ig, wg
inp = tensor(np.random.random((9,9, 32, 32)))*100
inp = tensor(np.random.random((9, 9, 32, 32))) * 100
mge_ig, mge_wg = conv_grad(inp, m)
xla_ig, xla_wg = conv_grad(inp, m)
np.testing.assert_allclose(mge_ig.numpy(), xla_ig.numpy())
np.testing.assert_allclose(mge_wg.numpy(), xla_wg.numpy(), atol=1e-5)
@pytest.mark.skipif(int(platform.python_version_tuple()[1]) < 8, reason="need py38")
@pytest.mark.skipif(platform.system() != "Linux", reason="only support linux now")
def test_distributed_convert():
from mge_xlalib.xla_extension import ArrayImpl
def tester(ishape, n_gpus, dtype=None):
def worker(data):
rank = dist.get_rank()
inp = tensor(data[rank])
def func1(inp):
return fdist.all_reduce_sum(inp)
mge_rst = func1(inp)
xla_rst = func1(inp)
assert xla_rst._is_external_value()
assert isinstance(xla_rst._external_obj(), ArrayImpl)
np.testing.assert_allclose(mge_rst.numpy(), xla_rst.numpy(), atol=1e-5)
assert mge_rst.shape == xla_rst.shape
assert mge_rst.dtype == xla_rst.dtype
assert not xla_rst._is_external_value()
x = np.random.randn(*ishape).astype(dtype)
y = np.random.randn(*ishape).astype(dtype)
data = (x, y)
a, b = conv_grad(inp, m)
a1, b1 = conv_grad(inp, m)
np.testing.assert_allclose(a.numpy(), a1.numpy())
\ No newline at end of file
tester((16, 1, 64,), 2)
