From 3b924e54f0db38d0fd9cd3a13d48abdc23a8d1d2 Mon Sep 17 00:00:00 2001 From: HarrisonLee-zh Date: Tue, 30 Aug 2022 21:27:22 +0800 Subject: [PATCH 1/4] this is a mp_layer api inference clarification --- .../api/paddle/distributed/fleet/Mp_layer.rst | 171 ++++++++++++++++++ 1 file changed, 171 insertions(+) create mode 100644 docs/api/paddle/distributed/fleet/Mp_layer.rst diff --git a/docs/api/paddle/distributed/fleet/Mp_layer.rst b/docs/api/paddle/distributed/fleet/Mp_layer.rst new file mode 100644 index 00000000000..cc151a34104 --- /dev/null +++ b/docs/api/paddle/distributed/fleet/Mp_layer.rst @@ -0,0 +1,171 @@ +Mp_layers +----------------- + + +..Fleet:mp_layers + +Mp_layers是飞桨的分布式训练统一API中的fleet中的一个方法,其主要分为以下几个部分:嵌入式表示, 矩阵乘 + +嵌入式表示: + +方法 +........ +........ +init(num_embeddings, embedding_dim, weight_attr=None, mp_group=None, name=None) +'''''''' + +初始化嵌入式类。 + +**参数** + + - **num_embeddings** (int) 词典的大小尺寸 + - **embedding_dim** (Tensor) 词典中每个词的向量维度 + - **weight_attr** (Tenosr) 指定权重参数的属性 + - **mp_group** (bool) 表示是否采用模型并行的集群 + - **name** (string) 神经网络模型输出的前缀标识 + +**返回** +None + + +create_parameter(weight_attr, shape, dtype, is_bias) +'''''''' + +创建参数模型的权重参数 + + +**参数** + + - **weight_attr** (Tensor) 指定权重参数的属性 + - **shape** (Tensor)模型参数的形状(请注意这里的形状是由词典的大小尺寸整除以GPU数目与embedding_dim的维度连接而成) + - **dtype** 参数的数据类型 + - **is_bias** (bool) 是否存在偏置项,以简单的W=AX+B为例,此处的B即为偏置项 +**返回** +Tensor + +**代码示例** + code-block::python + self.weight = self.create_parameter(attr=self._weight_attr, + shape=self._size, + dtype=self._dtype, + is_bias=False) + + + +c_lookup_table(table, index, start_index, name) +'''''''' + +根据序列号查找表 + +**参数** + + - **table** (Tensor) 输入的张量矩阵 + - **index** (Tensor) 查找表的序列号 + - **start_index** (int) 查重表的初始位置 + - **name** (string) 调用的api名称 +**返回** +Tensor + +**代码示例** + code-block::python + output_parallel = paddle.distributed.collective._c_lookup_table( + self.weight, + x, + start_index=self.vocab_start_index, + name=self._name) + + + +mp_allreduce (output, group, use_cal_stram, use_model_parallel) +.......... + +对矩阵张量进行聚合 + +**参数** + - **output** (Tensor) 输入的张量矩阵 + - **group** (int) 分布式并行的群组节点ID + - **use_cal_stream** (bool)是否使用流水线 + - **use_model_parallel** (bool)是否采用模型并行 +**返回** +Tensor + +**代码示例** + code-block::python + output_parallel = paddle.distributed.collective._c_lookup_table( + self.weight, + x, + start_index=self.vocab_start_index, + name=self._name) + + +embedding (input, weight, padding_index, sparse, name) +'''''''''''' + +对张量进行嵌入式运行 + +**参数** + - **input** (Tensor) 输入的张量 + - **weight** (Tensor) 模型权重参数 + - **padding_index** (int) 该位置下的向量用0补齐 + - **sparse** (bool) 在前向传播和后向传播的过程中不考虑全为0的向量 + - **name** (string) 张量名称等 +**返回** +Tensor + + +矩阵乘: + +方法 +........ +........ +init(in_feature, out_feature, weight_attr=None, has_bias, gather_output, fuse_matmul_bias, mp_group, name) +'''''''' + +对矩阵分片运算进行初始化 + +**参数** + - **in_feature** (Tensor) 输入的张量 + - **out_feature** (Tensor) 输出的张量 + - **weight_attr** (Tensor) 指定权重参数的属性 + - **has_bias** (bool) 是否拥有偏置项 + - **gater_output** (bool) 是否将本地结果进行聚合 + - **fuse_matmul_bias** (bool) 分片向量进行融合时是否存在偏置 + - **mp_group** (bool) 表示是否采用模型并行的集群 + - **name** (string) 张量名称等 + + +c_concat(x, group) +''''''' + +对分片后的张量进行切片 + +**参数** + - **x** 输入的张量 + - **group** (int) 分布式并行的群组节点ID +**返回** +Tensor + + +**代码示例** + code-block::python + output = paddle.distributed.collective._c_concat( + output_parallel, group=self.model_parallel_group) + + +c_split(x, group) +''''''' + +对分片后的张量进行连接 + +**参数** + - **x** (Tensor) 输入的张量 + - **group** (int) 分布式并行的群组节点ID +**返回** +Tensor + +**代码示例** + code-block::python + input_parallel = paddle.distributed.collective._c_split( + x, group=self.model_parallel_group) + + From b85db5210aed28e2f36b4d291bd1e0dec9c046bc Mon Sep 17 00:00:00 2001 From: HarrisonLee-zh Date: Mon, 5 Sep 2022 19:37:16 +0800 Subject: [PATCH 2/4] this is a api inferences descriptions of pipeline parallel --- .../paddle/distributed/fleet/pp_layers.rst | 242 ++++++++++++++++++ 1 file changed, 242 insertions(+) create mode 100644 docs/api/paddle/distributed/fleet/pp_layers.rst diff --git a/docs/api/paddle/distributed/fleet/pp_layers.rst b/docs/api/paddle/distributed/fleet/pp_layers.rst new file mode 100644 index 00000000000..e3b72c81f2e --- /dev/null +++ b/docs/api/paddle/distributed/fleet/pp_layers.rst @@ -0,0 +1,242 @@ +pp_layers +--------------------------------- + +..py:class::paddle.distributed.fleet.meta_parallel.parallel_layers.pp_layers + + +方法 +.......... +.......... +issubclass(layer_func, Layer) +''''''''' + +判断layer_func是否为Layer类的封装 + +**参数** + - **layer_func** (object) 基于Layer类的封装 + - **Layer** (object) 神经网络层的类,包含一系列说明 +**返回** +bool +''''''''' + +将神经网络层信息转换成字符串 + +**参数** + - **name** (string) layer_func的名称 + - **input** (Tensor) 输入的张量 + - **kwargs** (dict) 解析出的字典参数 +**返回** +string + +**代码示例** +..code-block::python + layer_to_str(self.layer_func.__name__, self.inputs, + self.kwargs) + + +sharedLayerDesc类: + +方法 +...... +...... +'''''''' + +共享的神经网络层属性,以类的形式封装 + +**参数** + - **key** (int) 以键值方式标明神经网络层的位置 + - **layer_func** (object) 网络层的对象封装 + - **forward_func** (object) 针对共享层的共享参数的封装 + - **shared_weight_attr** (Tensor) 共享的权重参数属性 + - **input** (Tensor) 输入的张量 + - **kwargs** (dict) 解析出的字典参数 +**返回** +None + +SegmentLayers类: + +方法 +....... +....... +init(layer_desc, num_parts, method, num_virtual_pipeline_stage) +''''''' +此方法的主要目的用作将一个完整的神经网络层进行分离 + +**参数** + - **layer_desc** (object) LayerDesc的对象 + - **num_parts** (int) 神经网络层的分割分数 + - **method** (string) 表示分割方法 + - **num_virtual_pipeline_stages** (int) 流水线并行的层数 +**返回** +None + +**代码示例** +..code-block::python + __init__(self, + layers_desc, + num_parts, + method="uniform", + num_virtual_pipeline_stage=None): + self._layers_desc = layers_desc + self.method = method + self.num_parts = num_parts + self.num_items = len(layers_desc) + self.num_virtual_pipeline_stage = num_virtual_pipeline_stage + +uniform(num_items, num_parts) +'''''''''' +此方法是表示将网络层分割的方法 + +**参数** + - **num_items** 神经网络层的长度 +Tensor + + +PipelineLayerChunk类: + +方法 +........... +........... +append(sublayer) +''''''''''' +**参数** + - **sublayer** (object) 神经网路层的子层 +**返回** +None + +PipelineLayer类: + +方法 +............ +............ +'''''''''''' +Pipeline类的初始化 + +**参数** + - **layers** (object) 关于流水线并行中神经网络层的结构说明 + - **num_stages** (int) 流水线并行模型的度 + - **topology** (object) 混合并行(张量并行,流水线并行等) GPU拓扑图 + - **loss_fn** (object) 梯度下降法中的损失函数 + - **seg_method** (string) 表示分割流水线并行的网络层的方法名 + - **recompute_interval** (int) 表示每个多少个神经网络层进行重计算 + - **num_virtual_pipeline_stages** (int) 针对被分割后的神经网络层的度 +**返回** +None + +**代码示例** +..code-block::python + __init__(self, + layers, + num_stages=None, + topology=None, + loss_fn=None, + seg_method="uniform", + recompute_interval=0, + recompute_ctx=None, + num_virtual_pipeline_stages=None) + +get_stage_from_index(layer_idx) +'''''''''''''' +将虚拟分割后的神经网络层与真实的神经网路层进行重定位 + +**参数** + - **layer_index** (int) 神经网络层分割并行后的虚拟网络层目录 +**返回** +Int + +**代码示例** +..code-block::python + def get_stage_from_index(self, layer_idx): + assert 0 <= layer_idx < self._num_layers, "layer_idx is out of bound" + for virtual_pp_rank in range(self._num_virtual_pipeline_stages): + start_idx = virtual_pp_rank * self._num_virtual_pipeline_stages + for stage in range(self._num_stages): + if self.segment_parts[start_idx + + stage] <= layer_idx < self.segment_parts[ + start_idx + stage + 1]: + return stage + +_segment_network_for_interleave(seg_method) +''''''''''''''' +此方法用于将切片后的模型网络层插入至对应的流水线度中 + +**参数** + - **seg_method** (string) 表示分割流水线并行的网络层的方法名 +**返回** +None + +**代码示例** +..code-block::python + def _segment_network_for_interleave(self, seg_method): + logger.info("start segment network for interleave scheduler") + seg = SegmentLayers( + self._layers_desc, + num_parts=self._num_stages, + method=seg_method, + num_virtual_pipeline_stage=self._num_virtual_pipeline_stages) + self.segment_parts = seg.do_segment() + + logger.info("segment result:" + + ", ".join(str(arg) for arg in self.segment_parts)) + + for i in range(self._stage_id, self._total_stages_with_virtual_stages, + self._num_virtual_pipeline_stages): + assert self.segment_parts[i] <= self.segment_parts[i + 1] + self._start_poss.append(self.segment_parts[i]) + self._end_poss.append(self.segment_parts[i + 1]) + + assert len(self._start_poss) == len(self._end_poss) + + self._print_segmentation_for_debug() + +_build_layer_impl(satrt, end) +''''''''''''' +将网络层的封装和序号加入至运行函数描述列表中 + +**参数** + - **start** (index) 网络层的起始序列号 + - **end** (index) 网络层的终止序列号 +**返回** +List + +**代码示例** +..code-block::python + def _build_layer_impl(self, start, end): + if self._num_virtual_pipeline_stages > 1: + run_function = PipelineLayerChunk() + else: + run_function = self.run_function + + for index, layer in enumerate(self._layers_desc[start:end]): + layer_index = start + index + if isinstance(layer, Layer): + run_function.append(layer) + if self._num_virtual_pipeline_stages == 1: + self.add_sublayer(str(layer_index), layer) + elif isinstance(layer, SharedLayerDesc): + if layer.layer_name not in self.shared_layers: + self.shared_layers[layer.layer_name] = layer.build_layer() + self.shared_weight_attrs[ + layer.layer_name] = layer.shared_weight_attr + for param in self.shared_layers[ + layer.layer_name].parameters(): + setattr(param, "is_firstly_shared", True) + + if layer.forward_func is None: + run_function.append(self.shared_layers[layer.layer_name]) + + else: + run_function.append( + partial(layer.forward_func, + self.shared_layers[layer.layer_name])) + + elif isinstance(layer, LayerDesc): + model = layer.build_layer() + run_function.append(model) + if self._num_virtual_pipeline_stages == 1: + self.add_sublayer(str(layer_index), model) + else: + run_function.append(layer) + + return run_function + From 4e78a56886908b89be6f96076e8786aafe7aedfc Mon Sep 17 00:00:00 2001 From: HarrisonLee-zh Date: Mon, 5 Sep 2022 20:48:33 +0800 Subject: [PATCH 3/4] this is an api inference descrptions file for pipeline parallel --- docs/api/paddle/distributed/fleet/pp_layers.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/api/paddle/distributed/fleet/pp_layers.rst b/docs/api/paddle/distributed/fleet/pp_layers.rst index e3b72c81f2e..6de622366b7 100644 --- a/docs/api/paddle/distributed/fleet/pp_layers.rst +++ b/docs/api/paddle/distributed/fleet/pp_layers.rst @@ -1,5 +1,5 @@ pp_layers ---------------------------------- +-------------------- ..py:class::paddle.distributed.fleet.meta_parallel.parallel_layers.pp_layers From d768131aa2632cc47468e7e129300b08238e5bfd Mon Sep 17 00:00:00 2001 From: HarrisonLee-zh Date: Fri, 9 Sep 2022 16:47:38 +0800 Subject: [PATCH 4/4] this is an api inference clarification, test=develop --- docs/api/paddle/distributed/fleet/random.rst | 192 +++++++++++++++++++ 1 file changed, 192 insertions(+) create mode 100644 docs/api/paddle/distributed/fleet/random.rst diff --git a/docs/api/paddle/distributed/fleet/random.rst b/docs/api/paddle/distributed/fleet/random.rst new file mode 100644 index 00000000000..6ff5b8e53b0 --- /dev/null +++ b/docs/api/paddle/distributed/fleet/random.rst @@ -0,0 +1,192 @@ +random +------------------ + + py:class::paddle.distributed.fleet.meta_parallel.parallel_layers.random + + random主要是根据模型并行方式产生对应的随机数,从而在dropout策略中针对不同的并行方式有不同的随机数进行概率对,以此来决定某个梯度值的去留。 + + +RNGStatesTracker类: +----------------- + +方法 +......... +......... +add(name, seed) +''''''''' + +将模型并行的方式与随机数状态种子相匹配(映射) + +**参数** + - **name** (srting) 模型并行的方式 + - **seed** (int) 随机数 +**返回** +dict + +**代码示例** +..code-block::python + def add(self, name, seed): + if seed in self.seeds_: + raise ValueError('seed {} already exists'.format(seed)) + self.seeds_.add(seed) + if name in self.states_: + raise ValueError('state {} already exists'.format(name)) + orig_rng_state = paddle.get_cuda_rng_state() + paddle.seed(seed) + self.states_[name] = paddle.get_cuda_rng_state() + paddle.set_cuda_rng_state(orig_rng_state) + + +rng_state(name) +''''''''''' + +设置随机数的状态信息,并将其加以保存 + +**参数** + - **name** 模型状态信息 +**返回** +None + +**代码示例** +..code-block::python + def rng_state(self, name=MODEL_PARALLEL_RNG): + if name not in self.states_: + raise ValueError('state {} does not exist'.format(name)) + orig_cuda_rng_state = paddle.get_cuda_rng_state() + paddle.set_cuda_rng_state(self.states_[name]) + try: + yield + finally: + self.states_[name] = paddle.get_cuda_rng_state() + paddle.set_cuda_rng_state(orig_cuda_rng_state) + + + +model_parallel_random_seed(seed) +''''''''''' + +设置模型并行的随机种子,并且将模型并行的方式与随机数种子相匹配(映射) + +**参数** + - **seed** (int) 随机数种子,默认为None +**返回** +None + + +**代码示例** +..code-block::python + def model_parallel_random_seed(seed=None): + import paddle.distributed.fleet as fleet + hcg = fleet.get_hybrid_communicate_group() + rank = hcg.get_model_parallel_rank() + if seed: + global_seed = seed + local_seed = seed * 1024 + rank * 100 + else: + global_seed = np.random.randint(0, 655350) + local_seed = np.random.randint(rank * 10000, (rank + 1) * 10000 - 1) + + RNG_STATE_TRACKER.reset() + RNG_STATE_TRACKER.add(MODEL_PARALLEL_RNG, local_seed) + paddle.seed(global_seed) + + + +determinate_seed(rng_name) +'''''''''''' + +根据模型并行的方式获取最终的用于dropout的最终随机数种子 + +**参数** + - **rng_name** 模型并行的名称 +**返回** +Int + +**代码示例** +..code-block::python + def determinate_seed(rng_name): + assert rng_name is not None and rng_name != "" + helper = LayerHelper('seed', ** locals()) + out = helper.create_variable_for_type_inference(dtype=paddle.int32) + helper.append_op(type='seed', + outputs={'Out': out}, + attrs={ + 'deterministic': True, + 'rng_name': rng_name, + 'force_cpu': True + }) + return out + + +dropout(x, p, axis, rng_name, training, mode, name) +'''''''''''' + +根据模型并行方式产生不同的随机数种子,并且将这些随机数种子适用于dropout方法时,与设定的概率比较大小,决定提督元素的保留和舍入。 + +**参数** + - **x** (Tensor) 输入的矩阵张量 + - **p** (float) 张量元素置零的概率,即丢弃的概率 + - **axis** (int|list| tuple) dropout操作沿着某一坐标轴运行 + - **rng_name** (string) 随机数种子生成器的名称,被用来获取随机数种子 + - **training** (bool) 标识当前流程是否在训练中 + - **mode** (string) 针对保留下来的梯度元素进行数值上的放缩 + - **name** (string) 操作的名称 +**返回** +处理后的张量矩阵 + +**代码示例** +..code-block::python +def dropout(x, p=0.5, axis=None,rng_name=None, training=True, mode="upscale_in_train", name=None): + if rng_name is None: + return paddle.nn.functional.dropout(x, p, axis, training, mode, name) + + if not isinstance(p, (float, int, Variable)): + raise TypeError("p argument should be a number(int|float) or Variable") + + if isinstance(p, (int, float)) and p == 0: return x + + assert 0 <= p <= 1, ValueError("p argument should between 0 and 1") + assert mode in ('downscale_in_infer', 'upscale_in_train'), \ + ValueError("mode argument should be 'downscale_in_infer' or 'upscale_in_train'") + + assert axis is None, \ + TypeError("unsupport axis when using random seed generator") + + mode = 'downgrade_in_infer' if mode == 'downscale_in_infer' else mode #semantic transfer + + if _non_static_mode(): + out, mask = _legacy_C_ops.dropout(x, 'dropout_prob', p, 'is_test', + not training, 'fix_seed', False, + 'seed', 0, 'dropout_implementation', + mode) + return out + + seed = determinate_seed(rng_name) + + if isinstance(p, Variable) and not p.shape != [1]: + raise TypeError( "Required p.shape == [1] if type(p) is Variable, but received p.shape = {}" + .format(p.shape)) + + helper = LayerHelper('dropout', ** locals()) + check_variable_and_dtype(x, 'x', ['float16', 'float32', 'float64'], + 'dropout') + + out = helper.create_variable_for_type_inference(dtype=x.dtype) + mask = helper.create_variable_for_type_inference(dtype=core.VarDesc.VarType.UINT8, stop_gradient=True) + + helper.append_op(type='dropout', + inputs={ + 'X': [x], + 'Seed': seed + }, + outputs={ + 'Out': [out], + 'Mask': [mask] + }, + attrs={ + 'dropout_prob': p, + 'is_test': not training, + 'dropout_implementation': mode, + }) + return out +