mxnet模型并行性的简单示例
mxnet
python
9
0

关于mxnet的Guon教程中的简单示例对刚刚开始使用mxnet的我们这些人非常有用。到目前为止,还没有一个简单的模型并行性示例。我看到了LSTM的模型并行性示例代码,但是我对mxnet并不陌生,它将帮助我(也许还有其他人)拥有一个更加简化的示例。因此,我通过处理gluon教程中的回归示例,并混入了mxnet.gluon.Trainer中的一些代码,创建了一个模型并行性示例。

但是,我显然弄错了。渐变似乎没有更新。任何人都可以通过找出问题来协助您吗?这里的目标是创建一个线性回归模型,该模型具有三层,每层均位于不同的GPU上。该模型本身没有用,只是作为示例来说明在使用自定义块和命令式编程时如何进行模型并行化的初始化和训练。

据我了解,Trainer()是为数据并行性编写的。它不适用于模型并行性,因为它要求在所有GPU上初始化所有参数。

import os
import numpy as np
import mxnet as mx
from mxnet import nd, autograd, gluon
from mxnet.gluon import Block

# make some data
num_inputs = 2
num_outputs = 1
num_examples = 10000

def real_fn(X):
    return 2 * X[:, 0] - 3.4 * X[:, 1] + 4.2

X = np.random.normal(0,1, (num_examples, num_inputs))
noise = 0.001 * np.random.normal(0,1, (num_examples))
y = real_fn(X) + noise
y = y.reshape(-1,1)

# configuration
hidden_layers = 2
num_gpus = hidden_layers + 1
ctxList = [mx.gpu(i) for i in range(num_gpus)]
#ctxList = [mx.gpu() for i in range(num_gpus)]

#os.environ["MXNET_ENGINE_TYPE"] = "NaiveEngine"
print("\n")

# ======================================================================
class myDenseBlock(Block):
    """
    A custom layer
    """
    def __init__(self, layer_number, size_input, size_output, **kwargs):
        super(myDenseBlock, self).__init__(**kwargs)

        self.layer_number = layer_number
        self.size_input = size_input
        self.size_output = size_output

        with self.name_scope():
            # add parameters to the Block's ParameterDict.
            self.w = self.params.get(
                'weight',
                init= mx.init.Xavier(magnitude=2.24),
                shape=(size_input, size_output),
                grad_req = 'write')

            self.b = self.params.get(
                'bias',
                init= mx.init.Constant(0.5),
                shape=(size_output,),
                grad_req = 'write')

    def forward(self, x):
        x = x.as_in_context(ctxList[self.layer_number])
        with x.context:
            linear = nd.dot(x, self.w.data()) + self.b.data()
            return linear

# ======================================================================

# create net
net = gluon.nn.Sequential()
with net.name_scope():
    # initial layer, with X as input
    net.add(myDenseBlock(0,
        size_input = 2,
        size_output = 2))

    for ii in range(hidden_layers-1):
        net.add(myDenseBlock(ii+1,
            size_input = 2,
            size_output = 2))

    # final block, Y is nx1
    net.add(myDenseBlock(ii+2,
        size_input = 2,
        size_output = 1))


# ititialize paramerters for different blocks (layers) on different gpus.
params = net.collect_params()

"""
The parameters are:
sequential0_mydenseblock0_weight
sequential0_mydenseblock0_bias
sequential0_mydenseblock1_weight
sequential0_mydenseblock1_bias
sequential0_mydenseblock2_weight
sequential0_mydenseblock2_bias
"""

print("\ninitializing:")
for i, param in enumerate(params):
    if 'mydenseblock0' in param:
        params[param].initialize(ctx=ctxList[0])
    elif 'mydenseblock1' in param:
        params[param].initialize(ctx=ctxList[1])
    elif 'mydenseblock2' in param:
        params[param].initialize(ctx=ctxList[2])
    print("  ", i, param, "  ", params[param].list_data()[0].context)
print("\n")

def square_loss(yhat, y):
    return nd.mean((yhat - y) ** 2)

def mytrainer(updaters, params, ignore_stale_grad=False):
    #print("\n")
    for i, param in enumerate(params):
        #print(i, param, "  ", len(params[param].list_data()), params[param].list_data()[0].context)
        if params[param].grad_req == 'null':
            continue
        if not ignore_stale_grad:
            for data in params[param].list_data():
                if not data._fresh_grad:
                    print(
                        "`%s` on context %s has not been updated"%(params[param].name, str(data.context)))
                    assert False

        for upd, arr, grad in zip(updaters, params[param].list_data(), params[param].list_grad()):

            if not ignore_stale_grad or arr._fresh_grad:
                upd(i, grad, arr)
                arr._fresh_grad = False
                #print ("grad= ", grad)


batch_size = 100
epochs = 100000
iteration = -1

opt = mx.optimizer.create('adam', learning_rate=0.001, rescale_grad = 1 / batch_size)
updaters = [mx.optimizer.get_updater(opt)]

# the following definition for updaters does not work either
#updaters = [mx.optimizer.get_updater(opt) for _ in ctxList]

results = []
for e in range(epochs):
    train_groups = np.array_split(np.arange(X.shape[0]), X.shape[0]/batch_size)
    for ii, idx in enumerate(train_groups):
        iteration += 1
        xtrain, ytrain = X[idx,:], y[idx]

        xtrain = nd.array(xtrain)
        xtrain = xtrain.as_in_context(ctxList[0])

        ytrain = nd.array(ytrain).reshape((-1, 1))
        ytrain = ytrain.as_in_context(ctxList[0])

        with autograd.record():
            yhat = net(xtrain)
            error = square_loss(yhat, ytrain.as_in_context(ctxList[-1]))


            # Question: does the call to error.backward() go under the indent 
            # for autograd.record() or outside the indent? The gluon examples have 
            # it both ways

        error.backward()

        mytrainer(updaters, net.collect_params())

        if iteration%10 == 0:

            results.append([iteration, error.asnumpy().item()])
            print(("epoch= {:5,d}, iter= {:6,d},  error= {:6.3E}").format(
                e, iteration, error.asnumpy().item()))

代码在mytrainer()中的“如果不是data._fresh_grad”测试中失败。输出为:

initializing:
   0 sequential0_mydenseblock0_weight    gpu(0)
   1 sequential0_mydenseblock0_bias    gpu(0)
   2 sequential0_mydenseblock1_weight    gpu(1)
   3 sequential0_mydenseblock1_bias    gpu(1)
   4 sequential0_mydenseblock2_weight    gpu(2)
   5 sequential0_mydenseblock2_bias    gpu(2)

`sequential0_mydenseblock0_weight` on context gpu(0) has not been updated

我可以使用mx.autograd.get_symbol(error).tojson()验证计算图仅扩展到gpu(2)上的参数,而不会扩展到其他gpu。

参考资料:
Stack Overflow
收藏
评论
共 1 个回答
高赞 时间 活跃

是的,按照@sergei的评论,移至v1.0.0即可解决此问题。

收藏
评论
新手导航
  • 社区规范
  • 提出问题
  • 进行投票
  • 个人资料
  • 优化问题
  • 回答问题

关于我们

常见问题

内容许可

联系我们

@2020 AskGo
京ICP备20001863号