提高模型的训练性能(一)
轉載地址:https://bbs.huaweicloud.com/forum/thread-118716-1-1.html
作者:李響
梯度累積引入Mini-batch的概念,首先對每個Mini-batch的數據計算loss和梯度,但不立即更新模型參數,而是先對所得梯度進行累加,然后在指定數量(N)個Mini-batch之后,用累積后的梯度更新網絡參數。下次訓練前清空過往累積梯度后重新累加,如此往復。最終目的是為了達到跟直接用N*Mini-batch數據訓練幾乎同樣的效果。
在單機模式下,主要通過將訓練流程拆分為正向反向訓練、參數更新和累積梯度清理三個部分實現梯度累積。
(以下以MNIST數據集為例)
導入需要的庫文件:
import argparse
import os
from collections.abc import Iterable
import mindspore.nn as nn
from mindspore import ParameterTuple
from mindspore import context, DatasetHelper, save_checkpoint
from mindspore.nn import Cell
import mindspore.ops as ops
from model_zoo.official.cv.lenet.src.dataset import create_dataset
from model_zoo.official.cv.lenet.src.lenet import LeNet5
定義訓練流程:
將訓練流程拆分為正向反向訓練、參數更新和累積梯度清理三個部分:
TrainForwardBackward計算loss和梯度,利用grad_sum實現梯度累加。
TrainOptim實現參數更新。
TrainClear實現對梯度累加變量grad_sum清零。
_sum_op = ops.MultitypeFuncGraph("grad_sum_op")
_clear_op = ops.MultitypeFuncGraph("clear_op")
@_sum_op.register("Tensor", "Tensor")
def _cumulative_grad(grad_sum, grad):
? ? """Apply grad sum to cumulative gradient."""
? ? add = ops.AssignAdd()
? ? return add(grad_sum, grad)
@_clear_op.register("Tensor", "Tensor")
def _clear_grad_sum(grad_sum, zero):
? ? """Apply zero to clear grad_sum."""
? ? success = True
? ? success = ops.depend(success, ops.assign(grad_sum, zero))
? ? return success
class TrainForwardBackward(Cell):
? ? def __init__(self, network, optimizer, grad_sum, sens=1.0):
? ? ? ? super(TrainForwardBackward, self).__init__(auto_prefix=False)
? ? ? ? self.network = network
? ? ? ? self.network.set_grad()
? ? ? ? self.network.add_flags(defer_inline=True)
? ? ? ? self.weights = ParameterTuple(network.trainable_params())
? ? ? ? self.optimizer = optimizer
? ? ? ? self.grad_sum = grad_sum
? ? ? ? self.grad = ops.GradOperation(get_by_list=True, sens_param=True)
? ? ? ? self.sens = sens
? ? ? ? self.hyper_map = ops.HyperMap()
? ? def construct(self, *inputs):
? ? ? ? weights = self.weights
? ? ? ? loss = self.network(*inputs)
? ? ? ? sens = ops.Fill()(ops.DType()(loss), ops.Shape()(loss), self.sens)
? ? ? ? grads = self.grad(self.network, weights)(*inputs, sens)
? ? ? ? return ops.depend(loss, self.hyper_map(ops.partial(_sum_op), self.grad_sum, grads))
class TrainOptim(Cell):
? ? def __init__(self, optimizer, grad_sum):
? ? ? ? super(TrainOptim, self).__init__(auto_prefix=False)
? ? ? ? self.optimizer = optimizer
? ? ? ? self.grad_sum = grad_sum
? ? def construct(self):
? ? ? ? return self.optimizer(self.grad_sum)
class TrainClear(Cell):
? ? def __init__(self, grad_sum, zeros):
? ? ? ? super(TrainClear, self).__init__(auto_prefix=False)
? ? ? ? self.grad_sum = grad_sum
? ? ? ? self.zeros = zeros
? ? ? ? self.hyper_map = ops.HyperMap()
? ? def construct(self):
? ? ? ? success = self.hyper_map(ops.partial(_clear_op), self.grad_sum, self.zeros)
? ? ? ? return success
訓練并保存模型
調用網絡、優化器及損失函數,然后自定義GradientAccumulation的train_process接口,進行模型訓練。
if __name__ == "__main__":
? ? parser = argparse.ArgumentParser(description='MindSpore Grad Cumulative Example')
? ? parser.add_argument('--device_target', type=str, default="GPU", choices=['GPU'],
? ? ? ? ? ? ? ? ? ? ? ? help='device where the code will be implemented (default: GPU)')
? ? parser.add_argument('--data_path', type=str, default="./Data",
? ? ? ? ? ? ? ? ? ? ? ? help='path where the dataset is saved')
? ? args = parser.parse_args()
? ? context.set_context(mode=context.GRAPH_MODE, device_target=args.device_target)
? ? ds_train = create_dataset(os.path.join(args.data_path, "train"), 32)
? ? net = LeNet5(10)
? ? net_loss = nn.SoftmaxCrossEntropyWithLogits(sparse=True, reduction="mean")
? ? net_opt = nn.Momentum(net.trainable_params(), 0.01, 0.9)
? ? model = GradientAccumulation(net, net_loss, net_opt)
? ? print("============== Starting Training ==============")
? ? model.train_process(10, ds_train, mini_steps=4)
上面闡述的是單機模式,如果在并行模式下,則需要改變策略
在SEMI_AUTO_PARALLEL和AUTO_PARALLEL模式下使用梯度累積,主要是將累積迭代和更新迭代作為兩張圖下發并且交替執行。在累積迭代圖上,只執行正反向運算及梯度累加。在更新迭代圖上,執行正反向運算和參數更新。
定義并行訓練流程
通常情況下,定義了正向網絡后會使用TrainOneStepCell將網絡正反向及優化器關聯到一起。但是梯度累積時存在累積和更新兩種情況,所以我們要基于原有類定義做一些改造。樣例代碼如下:
import numpy as np
import mindspore.common.dtype as mstype
from mindspore import ops, context, Tensor, Parameter
from mindspore.nn import TrainOneStepCell
from mindspore.common.initializer import initializer
zeroslike = ops.ZerosLike()
reset_accu_grads = ops.MultitypeFuncGraph("reset_accu_grads")
@reset_accu_grads.register("Tensor")
def _reset_accu_grads(accu_grad):
? ? succ = True
? ? return ops.depend(succ, ops.assign(accu_grad, zeroslike(accu_grad)))
cast = ops.Cast()
update_accu_grads = ops.MultitypeFuncGraph("update_accu_grads")
@update_accu_grads.register("Tensor", "Tensor")
def _update_accu_grads(accu_grad, grad):
? ? succ = True
? ? return ops.depend(succ, ops.assign_add(accu_grad, cast(grad, mstype.float32)))
class TrainAccuStepsCell(TrainOneStepCell):
? ? def __init__(self, network, optimizer, sens=1.0):
? ? ? ? super(TrainAccuStepsCell, self).__init__(network, optimizer, sens)
? ? ? ? self.accumulation = False
? ? ? ? self.accumulation_steps = context.get_auto_parallel_context("grad_accumulation_step")
? ? ? ? self.accu_grads = self.weights.clone(prefix="accu_grads", init='zeros')
? ? ? ? self.hyper_map = ops.HyperMap()
? ? def construct(self, *inputs):
? ? ? ? """Defines the computation performed."""
? ? ? ? weights = self.weights
? ? ? ? loss = self.network(*inputs)
? ? ? ? sens = ops.Fill()(ops.DType()(loss), ops.Shape()(loss), self.sens)
? ? ? ? grads = self.grad(self.network, weights)(*inputs, sens)
? ? ? ? if self.accumulation and self.accumulation_steps > 1:
? ? ? ? ? ? accu_succ = self.hyper_map(update_accu_grads, self.accu_grads, grads)
? ? ? ? ? ? loss = ops.depend(loss, accu_succ)
? ? ? ? if self.accumulation:
? ? ? ? ? ? succ = False
? ? ? ? else:
? ? ? ? ? ? grads = self.grad_reducer(grads)
? ? ? ? ? ? accu_grads = ops.depend(self.accu_grads, grads)
? ? ? ? ? ? accu_succ = self.hyper_map(reset_accu_grads, accu_grads)
? ? ? ? ? ? loss = ops.depend(loss, accu_succ)
? ? ? ? ? ? succ = self.optimizer(grads)
? ? ? ? return ops.depend(loss, succ)
在TrainOneStepCell的基礎上,增加累積標記accumulation和累積梯度參數accu_grads的定義,分別用于區分訓練流程和保存累積梯度值。在累積迭代圖上,accumulation為True,只執行正反向運算并將梯度累加到參數accu_grads。在更新迭代圖上,accumulation為False,執行正反向運算和參數更新。在動態loss scale場景下,除了梯度需要累積外,溢出標志位也需要累積判斷,可以基于TrainOneStepWithLossScaleCell改造,實現代碼如下:
import numpy as np
import mindspore.common.dtype as mstype
from mindspore import ops, context, Tensor, Parameter
from mindspore.nn import TrainOneStepWithLossScaleCell
from mindspore.nn.wrap.loss_scale import _grad_scale
from mindspore.common.initializer import initializer
zeroslike = ops.ZerosLike()
reset_accu_grads = ops.MultitypeFuncGraph("reset_accu_grads")
@reset_accu_grads.register("Tensor")
def _reset_accu_grads(accu_grad):
? ? succ = True
? ? return ops.depend(succ, ops.assign(accu_grad, zeroslike(accu_grad)))
cast = ops.Cast()
update_accu_grads = ops.MultitypeFuncGraph("update_accu_grads")
@update_accu_grads.register("Tensor", "Tensor")
def _update_accu_grads(accu_grad, grad):
? ? succ = True
? ? return ops.depend(succ, ops.assign_add(accu_grad, cast(grad, mstype.float32)))
class TrainAccuStepsWithLossScaleCell(TrainOneStepWithLossScaleCell):
? ? def __init__(self, network, optimizer, scale_sense):
? ? ? ? super(TrainAccuStepsWithLossScaleCell, self).__init__(network, optimizer, scale_sense)
? ? ? ? self.accumulation = False
? ? ? ? self.accumulation_steps = context.get_auto_parallel_context("grad_accumulation_step")
? ? ? ? self.one = Tensor(np.array([1]).astype(np.int32))
? ? ? ? self.zero = Tensor(np.array([0]).astype(np.int32))
? ? ? ? self.accu_grads = self.weights.clone(prefix="accu_grads", init='zeros')
? ? ? ? self.accu_overflow = Parameter(initializer(0, [1], mstype.int32))
? ? ? ? self.accu_loss = Parameter(initializer(0, [1], mstype.float32))
? ? ? ? self.cast = ops.Cast()
? ? ? ? self.logical_or = ops.LogicalOr()
? ? ? ? self.not_equal = ops.NotEqual()
? ? ? ? self.select = ops.Select()
? ? ? ? self.reshape = ops.Reshape()
? ? def construct(self, *inputs):
? ? ? ? """Defines the computation performed."""
? ? ? ? weights = self.weights
? ? ? ? loss = self.network(*inputs)
? ? ? ? scaling_sens = self.scale_sense
? ? ? ? status, scaling_sens = self.start_overflow_check(loss, scaling_sens)
? ? ? ? scaling_sens_filled = ops.ones_like(loss) * ops.cast(scaling_sens, ops.dtype(loss))
? ? ? ? grads = self.grad(self.network, weights)(*inputs, scaling_sens_filled)
? ? ? ? # accumulate gradients
? ? ? ? if self.accumulation and self.accumulation_steps > 1:
? ? ? ? ? ? accu_succ = self.hyper_map(update_accu_grads, self.accu_grads, grads)
? ? ? ? ? ? loss = ops.depend(loss, accu_succ)
? ? ? ? overflow = self.get_overflow_status(status, grads)
? ? ? ? overflow = self.logical_or(self.not_equal(self.accu_overflow, self.zero), overflow)
? ? ? ? accu_overflow = self.select(overflow, self.one, self.zero)
? ? ? ? if self.accumulation:
? ? ? ? ? ? succ = False
? ? ? ? ? ? self.accu_overflow = accu_overflow
? ? ? ? else:
? ? ? ? ? ? self.accu_overflow = self.zero
? ? ? ? ? ? # apply grad reducer on grads
? ? ? ? ? ? grads = self.grad_reducer(grads)
? ? ? ? ? ? grads = self.hyper_map(ops.partial(_grad_scale, scaling_sens), grads)
? ? ? ? ? ? accu_overflow = self.allreduce(accu_overflow)
? ? ? ? ? ? overflow = self.less_equal(self.base, accu_overflow)
? ? ? ? ? ? accu_grads = ops.depend(self.accu_grads, grads)
? ? ? ? ? ? accu_succ = self.hyper_map(reset_accu_grads, accu_grads)
? ? ? ? ? ? overflow = ops.depend(overflow, accu_succ)
? ? ? ? ? ? overflow = self.reshape(overflow, (()))
? ? ? ? ? ? overflow = self.process_loss_scale(overflow)
? ? ? ? ? ? if overflow:
? ? ? ? ? ? ? ? succ = False
? ? ? ? ? ? else:
? ? ? ? ? ? ? ? succ = self.optimizer(grads)
? ? ? ? ret = (loss, overflow, scaling_sens)
? ? ? ? return ops.depend(ret, succ)
定義并行訓練模型
經過cell_wrapper封裝的網絡已經包含了正反向和優化器實現,我們還需要將數據集對接到網絡并實現兩張圖交替執行。這里基于框架中的Model接口實現上述功能。
import math
from mindspore.train.callback import RunContext
from mindspore import context
from mindspore.context import ParallelMode
from mindspore import Model, connect_network_with_dataset
from mindspore.common.dtype import pytype_to_dtype
from mindspore._c_expression import init_exec_dataset
from mindspore.train.train_thor.dataset_helper import DatasetHelper
def _convert_type(types):
? ? """
? ? Convert from numpy type to tensor type.
? ? Args:
? ? ? ? types (list): Numpy type list of element in dataset.
? ? Returns:
? ? ? ? list, list of element in dataset.
? ? """
? ? ms_types = []
? ? for np_type in types:
? ? ? ? ms_type = pytype_to_dtype(np_type)
? ? ? ? ms_types.append(ms_type)
? ? return ms_types
def _get_types_and_shapes(dataset):
? ? """Get dataset types and shapes."""
? ? dataset_types = _convert_type(dataset.output_types())
? ? dataset_shapes = dataset.output_shapes()
? ? return dataset_types, dataset_shapes
def _exec_datagraph(exec_dataset, dataset_size, phase='dataset'):
? ? """Initialize and execute the dataset graph."""
? ? batch_size = exec_dataset.get_batch_size()
? ? input_indexs = exec_dataset.input_indexs
? ? # transform data format
? ? dataset_types, dataset_shapes = _get_types_and_shapes(exec_dataset)
? ? init_exec_dataset(exec_dataset.__transfer_dataset__.queue_name,
? ? ? ? ? ? ? ? ? ? ? dataset_size,
? ? ? ? ? ? ? ? ? ? ? batch_size,
? ? ? ? ? ? ? ? ? ? ? dataset_types,
? ? ? ? ? ? ? ? ? ? ? dataset_shapes,
? ? ? ? ? ? ? ? ? ? ? input_indexs,
? ? ? ? ? ? ? ? ? ? ? phase=phase,
? ? ? ? ? ? ? ? ? ? ? need_run=False)
class Model_ACCU(Model):
? ? def __init__(self, network, loss_fn=None, optimizer=None, metrics=None, eval_network=None,
? ? ? ? ? ? ? ? ?eval_indexes=None, amp_level="O0", **kwargs):
? ? ? ? super(Model_ACCU, self).__init__(network, loss_fn, optimizer, metrics, eval_network,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?eval_indexes, amp_level, **kwargs)
? ? ? ? self._frequency = context.get_auto_parallel_context("grad_accumulation_step")
? ? ? ? self._train_network = self._build_train_network()
? ? def _exec_preprocess(self, network, is_train, phase, dataset, dataset_sink_mode, sink_size=-1,
? ? ? ? ? ? ? ? ? ? ? ? ?epoch_num=1, iter_first_order=1):
? ? ? ? """Initializes dataset."""
? ? ? ? if dataset_sink_mode and not is_train:
? ? ? ? ? ? dataset.__loop_size__ = 1
? ? ? ? dataset_helper = DatasetHelper(dataset, dataset_sink_mode, sink_size, epoch_num, iter_first_order)
? ? ? ? if dataset_sink_mode and context.get_context("device_target") != "GPU":
? ? ? ? ? ? network = connect_network_with_dataset(network, dataset_helper)
? ? ? ? network.set_train(is_train)
? ? ? ? network.phase = phase
? ? ? ? if self._parallel_mode in (ParallelMode.SEMI_AUTO_PARALLEL, ParallelMode.AUTO_PARALLEL):
? ? ? ? ? ? network.set_auto_parallel()
? ? ? ? return dataset_helper, network
? ? def _train_dataset_sink_process(self, epoch, train_dataset, list_callback=None, cb_params=None, sink_size=-1):
? ? ? ? """
? ? ? ? Training process. The data would be passed to network through dataset channel.
? ? ? ? Args:
? ? ? ? ? ? epoch (int): Total number of iterations on the data.
? ? ? ? ? ? train_dataset (Dataset): A training dataset iterator. If there is no
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?loss_fn, a tuple with multiple data (data1, data2, data3, ...) should be
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?returned and passed to the network. Otherwise, a tuple (data, label) should
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?be returned. The data and label would be passed to the network and loss
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?function respectively.
? ? ? ? ? ? list_callback (Callback): Executor of callback list. Default: None.
? ? ? ? ? ? cb_params (_InternalCallbackParam): Callback parameters. Default: None.
? ? ? ? ? ? sink_size (int): Control the amount of data in each sink. Default: -1.
? ? ? ? """
? ? ? ? if sink_size == -1:
? ? ? ? ? ? epoch_num = epoch
? ? ? ? else:
? ? ? ? ? ? epoch_num = math.ceil(epoch * sink_size / train_dataset.get_dataset_size())
? ? ? ? iter_first_order = 1
? ? ? ? iter_second_order = self._frequency - 1
? ? ? ? train_dataset.__loop_size__ = iter_second_order
? ? ? ? dataset_helper, train_network = self._exec_preprocess(self._train_network,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? is_train=True,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? phase='train',
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? dataset=train_dataset,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? dataset_sink_mode=True,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? sink_size=sink_size,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? epoch_num=epoch_num,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? iter_first_order=iter_first_order)
? ? ? ? self._train_network = train_network
? ? ? ? cb_params.train_network = self._train_network
? ? ? ? cb_params.cur_step_num = 0
? ? ? ? run_context = RunContext(cb_params)
? ? ? ? list_callback.begin(run_context)
? ? ? ? # used to stop training for early stop, such as stopAtTIme or stopATStep
? ? ? ? should_stop = False
? ? ? ? switch_branch_one = True
? ? ? ? index_first_order = 0
? ? ? ? train_network_init_flag = True
? ? ? ? has_do_dataset_init = False
? ? ? ? for i in range(epoch):
? ? ? ? ? ? cb_params.cur_epoch_num = i + 1
? ? ? ? ? ? list_callback.epoch_begin(run_context)
? ? ? ? ? ? # for data sink dataset_helper only iter once, other wise iter epoch_size times.
? ? ? ? ? ? for inputs in dataset_helper:
? ? ? ? ? ? ? ? list_callback.step_begin(run_context)
? ? ? ? ? ? ? ? if switch_branch_one:
? ? ? ? ? ? ? ? ? ? cb_params.cur_step_num += iter_second_order
? ? ? ? ? ? ? ? ? ? if train_network_init_flag:
? ? ? ? ? ? ? ? ? ? ? ? self._train_network.add_flags_recursive(accumulation=True)
? ? ? ? ? ? ? ? ? ? self._train_network.phase = 'train0'
? ? ? ? ? ? ? ? else:
? ? ? ? ? ? ? ? ? ? cb_params.cur_step_num += iter_first_order
? ? ? ? ? ? ? ? ? ? if train_network_init_flag:
? ? ? ? ? ? ? ? ? ? ? ? self._train_network.add_flags_recursive(accumulation=False)
? ? ? ? ? ? ? ? ? ? ? ? train_network_init_flag = False
? ? ? ? ? ? ? ? ? ? self._train_network.phase = 'train1'
? ? ? ? ? ? ? ? ? ? if not has_do_dataset_init:
? ? ? ? ? ? ? ? ? ? ? ? _exec_datagraph(train_dataset, iter_first_order, phase='train1_dataset')
? ? ? ? ? ? ? ? ? ? ? ? has_do_dataset_init = True
? ? ? ? ? ? ? ? switch_branch_one = not switch_branch_one
? ? ? ? ? ? ? ? outputs = self._train_network(*inputs)
? ? ? ? ? ? ? ? cb_params.net_outputs = outputs
? ? ? ? ? ? ? ? list_callback.step_end(run_context)
? ? ? ? ? ? list_callback.epoch_end(run_context)
? ? ? ? ? ? should_stop = should_stop or run_context.get_stop_requested()
? ? ? ? ? ? if should_stop:
? ? ? ? ? ? ? ? break
? ? ? ? dataset_helper.stop_send()
? ? ? ? list_callback.end(run_context)
訓練模型
完成上述定義后,即可利用訓練接口完成模型訓練。首先需要在context.set_auto_parallel_context配置grad_accumulation_step參數,使能梯度累積。其次利用改造的cell_warapper封裝網絡結構,傳入Model_ACCU中初始化模型。
context.set_auto_parallel_context(parallel_mode=ParallelMode.AUTO_PARALLEL, gradients_mean=True, grad_accumulation_step=6)
loss_cb = LossMonitor()
data_path = os.getenv('DATA_PATH')
batch_size = 32
dataset = create_dataset(data_path, batch_size=batch_size)
num_classes = 10
net = resnet50(batch_size, num_classes)
loss = SoftmaxCrossEntropyExpand(sparse=True)
opt = Momentum(filter(lambda x: x.requires_grad, net.get_parameters()), 0.01, 0.9)
net_with_loss = nn.WithLossCell(net, loss)
net_with_loss = VirtualDatasetCell(net_with_loss)
wrap_net = TrainAccuStepsCell(net_with_loss, opt)
model = Model_ACCU(wrap_net)
model.train(epoch_size, dataset, callbacks=[loss_cb], dataset_sink_mode=True)
總結
以上是生活随笔為你收集整理的提高模型的训练性能(一)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 洛谷 - [USACO09MAR]向右看
- 下一篇: 房地产项目全程策划30步骤一