test_observer.py 3.2 KB
Newer Older
1 2 3 4 5 6 7 8
import platform

import numpy as np
import pytest

import megengine as mge
import megengine.distributed as dist
from megengine.distributed.helper import get_device_count_by_fork
9 10
from megengine.quantization.observer import (
    ExponentialMovingAverageObserver,
11
    HistogramObserver,
12 13 14 15 16 17 18 19 20 21 22
    MinMaxObserver,
    Observer,
    PassiveObserver,
    SyncExponentialMovingAverageObserver,
    SyncMinMaxObserver,
)


def test_observer():
    with pytest.raises(TypeError):
        Observer("qint8")
23 24 25 26 27 28


def test_min_max_observer():
    x = np.random.rand(3, 3, 3, 3).astype("float32")
    np_min, np_max = x.min(), x.max()
    x = mge.tensor(x)
29
    m = MinMaxObserver()
30
    m(x)
31 32 33 34 35 36 37 38 39 40 41 42 43
    np.testing.assert_allclose(m.min_val.numpy(), np_min)
    np.testing.assert_allclose(m.max_val.numpy(), np_max)


def test_exponential_moving_average_observer():
    t = np.random.rand()
    x1 = np.random.rand(3, 3, 3, 3).astype("float32")
    x2 = np.random.rand(3, 3, 3, 3).astype("float32")
    expected_min = x1.min() * t + x2.min() * (1 - t)
    expected_max = x1.max() * t + x2.max() * (1 - t)
    m = ExponentialMovingAverageObserver(momentum=t)
    m(mge.tensor(x1, dtype=np.float32))
    m(mge.tensor(x2, dtype=np.float32))
44 45
    np.testing.assert_allclose(m.min_val.numpy(), expected_min, atol=1e-5)
    np.testing.assert_allclose(m.max_val.numpy(), expected_max, atol=1e-5)
46 47


48 49 50 51 52 53 54 55 56 57
def test_histogram_observer():
    x = np.random.rand(3, 3, 3, 3).astype("float32")
    np_min, np_max = x.min(), x.max()
    x = mge.tensor(x)
    m = HistogramObserver()
    m(x)
    np.testing.assert_allclose(m.min_val.numpy(), np_min)
    np.testing.assert_allclose(m.max_val.numpy(), np_max)


58 59
def test_passive_observer():
    q_dict = {"scale": mge.tensor(1.0)}
60 61
    m = PassiveObserver("qint8")
    m.set_qparams(q_dict)
62 63 64 65 66
    assert m.orig_scale == 1.0
    assert m.scale == 1.0
    m.scale = 2.0
    assert m.scale == 2.0
    assert m.get_qparams() == {"scale": mge.tensor(2.0)}
67 68


69
@pytest.mark.require_ngpu(2)
70 71
@pytest.mark.isolated_distributed
def test_sync_min_max_observer():
72 73
    word_size = get_device_count_by_fork("gpu")
    x = np.random.rand(3 * word_size, 3, 3, 3).astype("float32")
74 75
    np_min, np_max = x.min(), x.max()

76 77 78
    @dist.launcher
    def worker():
        rank = dist.get_rank()
79
        m = SyncMinMaxObserver()
80
        y = mge.tensor(x[rank * 3 : (rank + 1) * 3])
81 82 83
        m(y)
        assert m.min_val == np_min and m.max_val == np_max

84
    worker()
85 86


87
@pytest.mark.require_ngpu(2)
88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104
@pytest.mark.isolated_distributed
def test_sync_exponential_moving_average_observer():
    word_size = get_device_count_by_fork("gpu")
    t = np.random.rand()
    x1 = np.random.rand(3 * word_size, 3, 3, 3).astype("float32")
    x2 = np.random.rand(3 * word_size, 3, 3, 3).astype("float32")
    expected_min = x1.min() * t + x2.min() * (1 - t)
    expected_max = x1.max() * t + x2.max() * (1 - t)

    @dist.launcher
    def worker():
        rank = dist.get_rank()
        m = SyncExponentialMovingAverageObserver(momentum=t)
        y1 = mge.tensor(x1[rank * 3 : (rank + 1) * 3])
        y2 = mge.tensor(x2[rank * 3 : (rank + 1) * 3])
        m(y1)
        m(y2)
105 106
        np.testing.assert_allclose(m.min_val.numpy(), expected_min, atol=1e-6)
        np.testing.assert_allclose(m.max_val.numpy(), expected_max, atol=1e-6)
107 108

    worker()