From ad3831ae838f49e85f7805c3536787d70e49b102 Mon Sep 17 00:00:00 2001 From: jjbbong Date: Wed, 2 Oct 2024 13:27:15 +0800 Subject: [PATCH 1/4] During training, a certain proportion of middle-layer nodes from the tree are randomly selected within each batch for training. --- docs/source/models/tdm.md | 5 ++++ tzrec/datasets/dataset.py | 51 +++++++++++++++++++++++++++++++++----- tzrec/protos/sampler.proto | 6 +++++ 3 files changed, 56 insertions(+), 6 deletions(-) diff --git a/docs/source/models/tdm.md b/docs/source/models/tdm.md index f1b3023..2b6dc26 100644 --- a/docs/source/models/tdm.md +++ b/docs/source/models/tdm.md @@ -113,6 +113,11 @@ model_config { - layer_num_sample: 每个样本在树每层的负采样个数 - item_id_field: item_id列名 - attr_delimiter: 目前是以","写表的, 因此需设置为"," + - remain_ratio: (可选, 默认为1.0)训练时每个batch内在树中随机选取一定比例的非叶节点层的节点训练, 可以一定程度防止过拟合 + - probability_type: (可选, 默认为"UNIFORM")随机选取树中非叶节点层时, 每层被选中的概率, 目前可选择"UNIFORM", "ARITHMETIC", "RECIPROCAL" + - "UNIFORM": 每层被选中的概率相同 + - "ARTITHMETIC": 每层被选中概率等差递增 + - "RECIPROCAL": 每层被选中的概率呈反比例函数递增, 即p正比于1/(TREE_LEVEL - CUR_LEVEL) ### 示例Config diff --git a/tzrec/datasets/dataset.py b/tzrec/datasets/dataset.py index 0f01ce7..f737e4b 100644 --- a/tzrec/datasets/dataset.py +++ b/tzrec/datasets/dataset.py @@ -99,26 +99,65 @@ def _expand_tdm_sample( sampler_config = getattr(data_config, sampler_type) tree_level = len(sampler_config.layer_num_sample) num_all_layer_neg = sum(sampler_config.layer_num_sample) + layer_num_sample = sampler_config.layer_num_sample item_fea_names = pos_sampled.keys() all_fea_names = input_data.keys() label_fields = set(data_config.label_fields) user_fea_names = all_fea_names - item_fea_names - label_fields + remain_ratio = sampler_config.remain_ratio + probability_type = sampler_config.probabilty_type + if probability_type == "UNIFORM": + p = np.array([1/(tree_level-1)]*(tree_level-1)) + elif probability_type == "ARITHMETIC": + p = np.arange(1, tree_level) / sum(np.arange(1, tree_level)) + elif probability_type == "RECIPROCAL": + p = 1/np.arange(tree_level-1, 0, -1) + p = p/sum(p) + else: + raise ValueError( + f"probability_type: [{probability_type}]" + "is not supported now." + ) + + remain_layer = np.random.choice(range(tree_level-1), int(remain_ratio*(tree_level-1)), replace=False, p=p) + remain_layer.sort() + num_remain_layer_neg = sum([layer_num_sample[i] for i in remain_layer]) + layer_num_sample[-1] + for item_fea_name in item_fea_names: + batch_size = len(input_data[item_fea_name]) + pos_index = ( + (remain_layer[None, :] + (tree_level-1) * np.arange(batch_size)[:, None]) + .flatten() + .astype(np.int64) + ) + neg_index = ( + ( + np.concatenate( + [ + range(sum(layer_num_sample[:i]), sum(layer_num_sample[:i+1])) + for i in np.append(remain_layer, tree_level-1) + ] + )[None, :] + + num_all_layer_neg * np.arange(batch_size)[:, None] + ) + .flatten() + .astype(np.int64) + ) input_data[item_fea_name] = pa.concat_arrays( [ input_data[item_fea_name], - pos_sampled[item_fea_name], - neg_sampled[item_fea_name], + pos_sampled[item_fea_name].take(pos_index), + neg_sampled[item_fea_name].take(neg_index), ] ) # In the sampling results, the sampled outcomes for each item are contiguous. for user_fea_name in user_fea_names: user_fea = input_data[user_fea_name] - pos_index = np.repeat(np.arange(len(user_fea)), tree_level - 1) - neg_index = np.repeat(np.arange(len(user_fea)), num_all_layer_neg) + pos_index = np.repeat(np.arange(len(user_fea)), len(remain_layer)) + neg_index = np.repeat(np.arange(len(user_fea)), num_remain_layer_neg) pos_expand_user_fea = user_fea.take(pos_index) neg_expand_user_fea = user_fea.take(neg_index) input_data[user_fea_name] = pa.concat_arrays( @@ -134,11 +173,11 @@ def _expand_tdm_sample( [ input_data[label_field].cast(pa.int64()), pa.array( - [1] * (len(input_data[label_field]) * (tree_level - 1)), + [1] * (len(input_data[label_field]) * len(remain_layer)), type=pa.int64(), ), pa.array( - [0] * (len(input_data[label_field]) * num_all_layer_neg), + [0] * (len(input_data[label_field]) * num_remain_layer_neg), type=pa.int64(), ), ] diff --git a/tzrec/protos/sampler.proto b/tzrec/protos/sampler.proto index c4c0755..cf530eb 100644 --- a/tzrec/protos/sampler.proto +++ b/tzrec/protos/sampler.proto @@ -130,4 +130,10 @@ message TDMSampler { optional uint32 num_eval_sample = 8 [default=0]; // field delimiter of input data optional string field_delimiter = 9; + // The training process only trains a randomly selected + // proportion of nodes in the middle layers of the tree + optional float remain_ratio = 10 [default=1.0]; + // The type of probability for selecting and retaining + // each layer in the middle layers of the tree. + optional string probabilty_type = 11 [default="UNIFORM"]; } From a957c85db234ccb995786f0c946080b070f78c2e Mon Sep 17 00:00:00 2001 From: jjbbong Date: Sun, 6 Oct 2024 12:19:44 +0800 Subject: [PATCH 2/4] add remain_ratio test --- tzrec/datasets/dataset.py | 37 ++++++----- tzrec/datasets/dataset_test.py | 117 +++++++++++++++++++++++++++++++++ 2 files changed, 139 insertions(+), 15 deletions(-) diff --git a/tzrec/datasets/dataset.py b/tzrec/datasets/dataset.py index f737e4b..e8e26ea 100644 --- a/tzrec/datasets/dataset.py +++ b/tzrec/datasets/dataset.py @@ -107,23 +107,30 @@ def _expand_tdm_sample( user_fea_names = all_fea_names - item_fea_names - label_fields remain_ratio = sampler_config.remain_ratio - probability_type = sampler_config.probabilty_type - if probability_type == "UNIFORM": - p = np.array([1/(tree_level-1)]*(tree_level-1)) - elif probability_type == "ARITHMETIC": - p = np.arange(1, tree_level) / sum(np.arange(1, tree_level)) - elif probability_type == "RECIPROCAL": - p = 1/np.arange(tree_level-1, 0, -1) - p = p/sum(p) - else: - raise ValueError( - f"probability_type: [{probability_type}]" - "is not supported now." + if remain_ratio < 1.0: + probability_type = sampler_config.probabilty_type + if probability_type == "UNIFORM": + p = np.array([1/(tree_level-1)]*(tree_level-1)) + elif probability_type == "ARITHMETIC": + p = np.arange(1, tree_level) / sum(np.arange(1, tree_level)) + elif probability_type == "RECIPROCAL": + p = 1/np.arange(tree_level-1, 0, -1) + p = p/sum(p) + else: + raise ValueError( + f"probability_type: [{probability_type}]" + "is not supported now." + ) + remain_layer = np.random.choice( + range(tree_level-1), int(remain_ratio*(tree_level-1)), replace=False, p=p ) + remain_layer.sort() + else: + remain_layer = list(range(tree_level-1)) - remain_layer = np.random.choice(range(tree_level-1), int(remain_ratio*(tree_level-1)), replace=False, p=p) - remain_layer.sort() - num_remain_layer_neg = sum([layer_num_sample[i] for i in remain_layer]) + layer_num_sample[-1] + num_remain_layer_neg = sum( + [layer_num_sample[i] for i in remain_layer] + ) + layer_num_sample[-1] for item_fea_name in item_fea_names: batch_size = len(input_data[item_fea_name]) diff --git a/tzrec/datasets/dataset_test.py b/tzrec/datasets/dataset_test.py index 6af2c1d..3048922 100644 --- a/tzrec/datasets/dataset_test.py +++ b/tzrec/datasets/dataset_test.py @@ -16,6 +16,7 @@ import numpy as np import pyarrow as pa +import math from graphlearn.python.nn.pytorch.data import utils from parameterized import parameterized from torch.utils.data import DataLoader @@ -425,6 +426,122 @@ def test_dataset_predict_mode(self, debug_level): ) else: self.assertEqual(list(batch.reserves.get().column_names), ["label"]) + + def test_dataset_with_tdm_sampler_and_remain_ratio(self): + node = tempfile.NamedTemporaryFile("w") + self._temp_files.append(node) + node.write("id:int64\tweight:float\tattrs:string\n") + for i in range(63): + node.write(f"{i}\t{1}\t{int(math.log(i+1,2))}:{i}:{i+1000}:我们{i}\n") + node.flush() + + def _ancesstor(code): + ancs = [] + while code > 0: + code = int((code - 1) / 2) + ancs.append(code) + return ancs + + edge = tempfile.NamedTemporaryFile("w") + self._temp_files.append(edge) + edge.write("src_id:int64\tdst_id:int\tweight:float\n") + for i in range(31, 63): + for anc in _ancesstor(i): + edge.write(f"{i}\t{anc}\t{1.0}\n") + edge.flush() + + def _childern(code): + return [2 * code + 1, 2 * code + 2] + + predict_edge = tempfile.NamedTemporaryFile("w") + self._temp_files.append(predict_edge) + predict_edge.write("src_id:int64\tdst_id:int\tweight:float\n") + for i in range(7, 15): + predict_edge.write(f"0\t{i}\t{1}\n") + for i in range(7, 31): + for child in _childern(i): + predict_edge.write(f"{i}\t{child}\t{1}\n") + predict_edge.flush() + + input_fields = [ + pa.field(name="int_a", type=pa.int64()), + pa.field(name="float_b", type=pa.float64()), + pa.field(name="str_c", type=pa.string()), + pa.field(name="int_d", type=pa.int64()), + pa.field(name="float_d", type=pa.float64()), + pa.field(name="label", type=pa.int32()), + ] + feature_cfgs = [ + feature_pb2.FeatureConfig( + id_feature=feature_pb2.IdFeature(feature_name="int_a") + ), + feature_pb2.FeatureConfig( + id_feature=feature_pb2.IdFeature(feature_name="str_c") + ), + feature_pb2.FeatureConfig( + raw_feature=feature_pb2.RawFeature(feature_name="float_b") + ), + feature_pb2.FeatureConfig( + id_feature=feature_pb2.IdFeature(feature_name="int_d") + ), + feature_pb2.FeatureConfig( + raw_feature=feature_pb2.RawFeature(feature_name="float_d") + ), + ] + features = create_features( + feature_cfgs, + neg_fields=["int_a", "float_b", "str_c"] + ) + + dataset = _TestDataset( + data_config=data_pb2.DataConfig( + batch_size=4, + dataset_type=data_pb2.DatasetType.OdpsDataset, + fg_encoded=True, + label_fields=["label"], + negative_sampler=sampler_pb2.TDMSampler( + input_input_path=node.name, + edge_input_path=edge.name, + predict_edge_input_path=predict_edge.name, + attr_fields=["tree_level", "int_a", "float_b", "str_c"], + item_id_field="int_a", + layer_num_sample=[1,1,1,1,1,5], + field_delimiter=",", + remain_ratio=0.4, + probability_type="UNIFORM" + ), + ), + features=features, + input_path="", + input_fields=input_fields, + ) + dataset.launch_sampler_cluster(2) + dataloader = DataLoader( + dataset=dataset, + batch_size=None, + num_workers=2, + pin_memory=True, + collate_fn=lambda x: x, + ) + iterator = iter(dataloader) + batch = next(iterator) + self.assertEqual( + batch.dense_features[BASE_DATA_GROUP].keys(), ["float_b", "float_d"] + ) + self.assertEqual( + batch.dense_features[BASE_DATA_GROUP].values().size(), (40, 2) + ) + self.assertEqual( + batch.sparse_features[BASE_DATA_GROUP].keys(), + ["int_a", "str_c", "int_d"], + ) + self.assertEqual( + batch.sparse_features[BASE_DATA_GROUP].values().size(), (120,) + ) + self.assertEqual( + batch.sparse_features[BASE_DATA_GROUP].lengths().size(), (120,) + ) + self.assertEqual(batch.labels["label"].size(), (40,)) if __name__ == "__main__": From 517bb7d7f3dd6811ad30e9325ac36a1dfcc96e5e Mon Sep 17 00:00:00 2001 From: jjbbong Date: Tue, 8 Oct 2024 11:55:12 +0800 Subject: [PATCH 3/4] code style --- tzrec/datasets/dataset_test.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tzrec/datasets/dataset_test.py b/tzrec/datasets/dataset_test.py index 3048922..3f88d22 100644 --- a/tzrec/datasets/dataset_test.py +++ b/tzrec/datasets/dataset_test.py @@ -515,6 +515,7 @@ def _childern(code): input_path="", input_fields=input_fields, ) + dataset.launch_sampler_cluster(2) dataloader = DataLoader( dataset=dataset, From dfbaf255b63fb801d114611736b3a2820645ce74 Mon Sep 17 00:00:00 2001 From: jjbbong Date: Tue, 8 Oct 2024 12:00:49 +0800 Subject: [PATCH 4/4] fix code style --- docs/source/models/tdm.md | 6 +++--- tzrec/datasets/dataset.py | 33 +++++++++++++++++---------------- tzrec/datasets/dataset_test.py | 22 +++++++++------------- tzrec/protos/sampler.proto | 6 +++--- 4 files changed, 32 insertions(+), 35 deletions(-) diff --git a/docs/source/models/tdm.md b/docs/source/models/tdm.md index 2b6dc26..864f972 100644 --- a/docs/source/models/tdm.md +++ b/docs/source/models/tdm.md @@ -115,9 +115,9 @@ model_config { - attr_delimiter: 目前是以","写表的, 因此需设置为"," - remain_ratio: (可选, 默认为1.0)训练时每个batch内在树中随机选取一定比例的非叶节点层的节点训练, 可以一定程度防止过拟合 - probability_type: (可选, 默认为"UNIFORM")随机选取树中非叶节点层时, 每层被选中的概率, 目前可选择"UNIFORM", "ARITHMETIC", "RECIPROCAL" - - "UNIFORM": 每层被选中的概率相同 - - "ARTITHMETIC": 每层被选中概率等差递增 - - "RECIPROCAL": 每层被选中的概率呈反比例函数递增, 即p正比于1/(TREE_LEVEL - CUR_LEVEL) + - "UNIFORM": 每层被选中的概率相同 + - "ARTITHMETIC": 每层被选中概率等差递增 + - "RECIPROCAL": 每层被选中的概率呈反比例函数递增, 即p正比于1/(TREE_LEVEL - CUR_LEVEL) ### 示例Config diff --git a/tzrec/datasets/dataset.py b/tzrec/datasets/dataset.py index e8e26ea..11d1f8e 100644 --- a/tzrec/datasets/dataset.py +++ b/tzrec/datasets/dataset.py @@ -110,32 +110,33 @@ def _expand_tdm_sample( if remain_ratio < 1.0: probability_type = sampler_config.probabilty_type if probability_type == "UNIFORM": - p = np.array([1/(tree_level-1)]*(tree_level-1)) + p = np.array([1 / (tree_level - 1)] * (tree_level - 1)) elif probability_type == "ARITHMETIC": p = np.arange(1, tree_level) / sum(np.arange(1, tree_level)) elif probability_type == "RECIPROCAL": - p = 1/np.arange(tree_level-1, 0, -1) - p = p/sum(p) + p = 1 / np.arange(tree_level - 1, 0, -1) + p = p / sum(p) else: raise ValueError( - f"probability_type: [{probability_type}]" - "is not supported now." - ) - remain_layer = np.random.choice( - range(tree_level-1), int(remain_ratio*(tree_level-1)), replace=False, p=p + f"probability_type: [{probability_type}]" "is not supported now." ) + remain_layer = np.random.choice( + range(tree_level - 1), + int(remain_ratio * (tree_level - 1)), + replace=False, + p=p, + ) remain_layer.sort() else: - remain_layer = list(range(tree_level-1)) - - num_remain_layer_neg = sum( - [layer_num_sample[i] for i in remain_layer] - ) + layer_num_sample[-1] + remain_layer = list(range(tree_level - 1)) + num_remain_layer_neg = ( + sum([layer_num_sample[i] for i in remain_layer]) + layer_num_sample[-1] + ) for item_fea_name in item_fea_names: batch_size = len(input_data[item_fea_name]) pos_index = ( - (remain_layer[None, :] + (tree_level-1) * np.arange(batch_size)[:, None]) + (remain_layer[None, :] + (tree_level - 1) * np.arange(batch_size)[:, None]) .flatten() .astype(np.int64) ) @@ -143,8 +144,8 @@ def _expand_tdm_sample( ( np.concatenate( [ - range(sum(layer_num_sample[:i]), sum(layer_num_sample[:i+1])) - for i in np.append(remain_layer, tree_level-1) + range(sum(layer_num_sample[:i]), sum(layer_num_sample[: i + 1])) + for i in np.append(remain_layer, tree_level - 1) ] )[None, :] + num_all_layer_neg * np.arange(batch_size)[:, None] diff --git a/tzrec/datasets/dataset_test.py b/tzrec/datasets/dataset_test.py index 3f88d22..e7eb611 100644 --- a/tzrec/datasets/dataset_test.py +++ b/tzrec/datasets/dataset_test.py @@ -10,13 +10,13 @@ # limitations under the License. +import math import tempfile import unittest from typing import Any, Dict, Iterator, List, Optional import numpy as np import pyarrow as pa -import math from graphlearn.python.nn.pytorch.data import utils from parameterized import parameterized from torch.utils.data import DataLoader @@ -426,7 +426,7 @@ def test_dataset_predict_mode(self, debug_level): ) else: self.assertEqual(list(batch.reserves.get().column_names), ["label"]) - + def test_dataset_with_tdm_sampler_and_remain_ratio(self): node = tempfile.NamedTemporaryFile("w") self._temp_files.append(node) @@ -489,8 +489,7 @@ def _childern(code): ), ] features = create_features( - feature_cfgs, - neg_fields=["int_a", "float_b", "str_c"] + feature_cfgs, neg_fields=["int_a", "float_b", "str_c"] ) dataset = _TestDataset( @@ -505,17 +504,17 @@ def _childern(code): predict_edge_input_path=predict_edge.name, attr_fields=["tree_level", "int_a", "float_b", "str_c"], item_id_field="int_a", - layer_num_sample=[1,1,1,1,1,5], + layer_num_sample=[1, 1, 1, 1, 1, 5], field_delimiter=",", remain_ratio=0.4, - probability_type="UNIFORM" + probability_type="UNIFORM", ), ), features=features, input_path="", input_fields=input_fields, ) - + dataset.launch_sampler_cluster(2) dataloader = DataLoader( dataset=dataset, @@ -524,21 +523,18 @@ def _childern(code): pin_memory=True, collate_fn=lambda x: x, ) + iterator = iter(dataloader) batch = next(iterator) self.assertEqual( batch.dense_features[BASE_DATA_GROUP].keys(), ["float_b", "float_d"] ) - self.assertEqual( - batch.dense_features[BASE_DATA_GROUP].values().size(), (40, 2) - ) + self.assertEqual(batch.dense_features[BASE_DATA_GROUP].values().size(), (40, 2)) self.assertEqual( batch.sparse_features[BASE_DATA_GROUP].keys(), ["int_a", "str_c", "int_d"], ) - self.assertEqual( - batch.sparse_features[BASE_DATA_GROUP].values().size(), (120,) - ) + self.assertEqual(batch.sparse_features[BASE_DATA_GROUP].values().size(), (120,)) self.assertEqual( batch.sparse_features[BASE_DATA_GROUP].lengths().size(), (120,) ) diff --git a/tzrec/protos/sampler.proto b/tzrec/protos/sampler.proto index cf530eb..2350a7c 100644 --- a/tzrec/protos/sampler.proto +++ b/tzrec/protos/sampler.proto @@ -130,10 +130,10 @@ message TDMSampler { optional uint32 num_eval_sample = 8 [default=0]; // field delimiter of input data optional string field_delimiter = 9; - // The training process only trains a randomly selected + // The training process only trains a randomly selected // proportion of nodes in the middle layers of the tree optional float remain_ratio = 10 [default=1.0]; - // The type of probability for selecting and retaining - // each layer in the middle layers of the tree. + // The type of probability for selecting and retaining + // each layer in the middle layers of the tree optional string probabilty_type = 11 [default="UNIFORM"]; }