diff --git a/docs/source/models/tdm.md b/docs/source/models/tdm.md index f1b3023..864f972 100644 --- a/docs/source/models/tdm.md +++ b/docs/source/models/tdm.md @@ -113,6 +113,11 @@ model_config { - layer_num_sample: 每个样本在树每层的负采样个数 - item_id_field: item_id列名 - attr_delimiter: 目前是以","写表的, 因此需设置为"," + - remain_ratio: (可选, 默认为1.0)训练时每个batch内在树中随机选取一定比例的非叶节点层的节点训练, 可以一定程度防止过拟合 + - probability_type: (可选, 默认为"UNIFORM")随机选取树中非叶节点层时, 每层被选中的概率, 目前可选择"UNIFORM", "ARITHMETIC", "RECIPROCAL" + - "UNIFORM": 每层被选中的概率相同 + - "ARTITHMETIC": 每层被选中概率等差递增 + - "RECIPROCAL": 每层被选中的概率呈反比例函数递增, 即p正比于1/(TREE_LEVEL - CUR_LEVEL) ### 示例Config diff --git a/tzrec/datasets/dataset.py b/tzrec/datasets/dataset.py index 0f01ce7..11d1f8e 100644 --- a/tzrec/datasets/dataset.py +++ b/tzrec/datasets/dataset.py @@ -99,26 +99,73 @@ def _expand_tdm_sample( sampler_config = getattr(data_config, sampler_type) tree_level = len(sampler_config.layer_num_sample) num_all_layer_neg = sum(sampler_config.layer_num_sample) + layer_num_sample = sampler_config.layer_num_sample item_fea_names = pos_sampled.keys() all_fea_names = input_data.keys() label_fields = set(data_config.label_fields) user_fea_names = all_fea_names - item_fea_names - label_fields + remain_ratio = sampler_config.remain_ratio + if remain_ratio < 1.0: + probability_type = sampler_config.probabilty_type + if probability_type == "UNIFORM": + p = np.array([1 / (tree_level - 1)] * (tree_level - 1)) + elif probability_type == "ARITHMETIC": + p = np.arange(1, tree_level) / sum(np.arange(1, tree_level)) + elif probability_type == "RECIPROCAL": + p = 1 / np.arange(tree_level - 1, 0, -1) + p = p / sum(p) + else: + raise ValueError( + f"probability_type: [{probability_type}]" "is not supported now." + ) + remain_layer = np.random.choice( + range(tree_level - 1), + int(remain_ratio * (tree_level - 1)), + replace=False, + p=p, + ) + remain_layer.sort() + else: + remain_layer = list(range(tree_level - 1)) + + num_remain_layer_neg = ( + sum([layer_num_sample[i] for i in remain_layer]) + layer_num_sample[-1] + ) for item_fea_name in item_fea_names: + batch_size = len(input_data[item_fea_name]) + pos_index = ( + (remain_layer[None, :] + (tree_level - 1) * np.arange(batch_size)[:, None]) + .flatten() + .astype(np.int64) + ) + neg_index = ( + ( + np.concatenate( + [ + range(sum(layer_num_sample[:i]), sum(layer_num_sample[: i + 1])) + for i in np.append(remain_layer, tree_level - 1) + ] + )[None, :] + + num_all_layer_neg * np.arange(batch_size)[:, None] + ) + .flatten() + .astype(np.int64) + ) input_data[item_fea_name] = pa.concat_arrays( [ input_data[item_fea_name], - pos_sampled[item_fea_name], - neg_sampled[item_fea_name], + pos_sampled[item_fea_name].take(pos_index), + neg_sampled[item_fea_name].take(neg_index), ] ) # In the sampling results, the sampled outcomes for each item are contiguous. for user_fea_name in user_fea_names: user_fea = input_data[user_fea_name] - pos_index = np.repeat(np.arange(len(user_fea)), tree_level - 1) - neg_index = np.repeat(np.arange(len(user_fea)), num_all_layer_neg) + pos_index = np.repeat(np.arange(len(user_fea)), len(remain_layer)) + neg_index = np.repeat(np.arange(len(user_fea)), num_remain_layer_neg) pos_expand_user_fea = user_fea.take(pos_index) neg_expand_user_fea = user_fea.take(neg_index) input_data[user_fea_name] = pa.concat_arrays( @@ -134,11 +181,11 @@ def _expand_tdm_sample( [ input_data[label_field].cast(pa.int64()), pa.array( - [1] * (len(input_data[label_field]) * (tree_level - 1)), + [1] * (len(input_data[label_field]) * len(remain_layer)), type=pa.int64(), ), pa.array( - [0] * (len(input_data[label_field]) * num_all_layer_neg), + [0] * (len(input_data[label_field]) * num_remain_layer_neg), type=pa.int64(), ), ] diff --git a/tzrec/datasets/dataset_test.py b/tzrec/datasets/dataset_test.py index 6af2c1d..e7eb611 100644 --- a/tzrec/datasets/dataset_test.py +++ b/tzrec/datasets/dataset_test.py @@ -10,6 +10,7 @@ # limitations under the License. +import math import tempfile import unittest from typing import Any, Dict, Iterator, List, Optional @@ -426,6 +427,119 @@ def test_dataset_predict_mode(self, debug_level): else: self.assertEqual(list(batch.reserves.get().column_names), ["label"]) + def test_dataset_with_tdm_sampler_and_remain_ratio(self): + node = tempfile.NamedTemporaryFile("w") + self._temp_files.append(node) + node.write("id:int64\tweight:float\tattrs:string\n") + for i in range(63): + node.write(f"{i}\t{1}\t{int(math.log(i+1,2))}:{i}:{i+1000}:我们{i}\n") + node.flush() + + def _ancesstor(code): + ancs = [] + while code > 0: + code = int((code - 1) / 2) + ancs.append(code) + return ancs + + edge = tempfile.NamedTemporaryFile("w") + self._temp_files.append(edge) + edge.write("src_id:int64\tdst_id:int\tweight:float\n") + for i in range(31, 63): + for anc in _ancesstor(i): + edge.write(f"{i}\t{anc}\t{1.0}\n") + edge.flush() + + def _childern(code): + return [2 * code + 1, 2 * code + 2] + + predict_edge = tempfile.NamedTemporaryFile("w") + self._temp_files.append(predict_edge) + predict_edge.write("src_id:int64\tdst_id:int\tweight:float\n") + for i in range(7, 15): + predict_edge.write(f"0\t{i}\t{1}\n") + for i in range(7, 31): + for child in _childern(i): + predict_edge.write(f"{i}\t{child}\t{1}\n") + predict_edge.flush() + + input_fields = [ + pa.field(name="int_a", type=pa.int64()), + pa.field(name="float_b", type=pa.float64()), + pa.field(name="str_c", type=pa.string()), + pa.field(name="int_d", type=pa.int64()), + pa.field(name="float_d", type=pa.float64()), + pa.field(name="label", type=pa.int32()), + ] + feature_cfgs = [ + feature_pb2.FeatureConfig( + id_feature=feature_pb2.IdFeature(feature_name="int_a") + ), + feature_pb2.FeatureConfig( + id_feature=feature_pb2.IdFeature(feature_name="str_c") + ), + feature_pb2.FeatureConfig( + raw_feature=feature_pb2.RawFeature(feature_name="float_b") + ), + feature_pb2.FeatureConfig( + id_feature=feature_pb2.IdFeature(feature_name="int_d") + ), + feature_pb2.FeatureConfig( + raw_feature=feature_pb2.RawFeature(feature_name="float_d") + ), + ] + features = create_features( + feature_cfgs, neg_fields=["int_a", "float_b", "str_c"] + ) + + dataset = _TestDataset( + data_config=data_pb2.DataConfig( + batch_size=4, + dataset_type=data_pb2.DatasetType.OdpsDataset, + fg_encoded=True, + label_fields=["label"], + negative_sampler=sampler_pb2.TDMSampler( + input_input_path=node.name, + edge_input_path=edge.name, + predict_edge_input_path=predict_edge.name, + attr_fields=["tree_level", "int_a", "float_b", "str_c"], + item_id_field="int_a", + layer_num_sample=[1, 1, 1, 1, 1, 5], + field_delimiter=",", + remain_ratio=0.4, + probability_type="UNIFORM", + ), + ), + features=features, + input_path="", + input_fields=input_fields, + ) + + dataset.launch_sampler_cluster(2) + dataloader = DataLoader( + dataset=dataset, + batch_size=None, + num_workers=2, + pin_memory=True, + collate_fn=lambda x: x, + ) + + iterator = iter(dataloader) + batch = next(iterator) + self.assertEqual( + batch.dense_features[BASE_DATA_GROUP].keys(), ["float_b", "float_d"] + ) + self.assertEqual(batch.dense_features[BASE_DATA_GROUP].values().size(), (40, 2)) + self.assertEqual( + batch.sparse_features[BASE_DATA_GROUP].keys(), + ["int_a", "str_c", "int_d"], + ) + self.assertEqual(batch.sparse_features[BASE_DATA_GROUP].values().size(), (120,)) + self.assertEqual( + batch.sparse_features[BASE_DATA_GROUP].lengths().size(), (120,) + ) + self.assertEqual(batch.labels["label"].size(), (40,)) + if __name__ == "__main__": unittest.main() diff --git a/tzrec/protos/sampler.proto b/tzrec/protos/sampler.proto index c4c0755..2350a7c 100644 --- a/tzrec/protos/sampler.proto +++ b/tzrec/protos/sampler.proto @@ -130,4 +130,10 @@ message TDMSampler { optional uint32 num_eval_sample = 8 [default=0]; // field delimiter of input data optional string field_delimiter = 9; + // The training process only trains a randomly selected + // proportion of nodes in the middle layers of the tree + optional float remain_ratio = 10 [default=1.0]; + // The type of probability for selecting and retaining + // each layer in the middle layers of the tree + optional string probabilty_type = 11 [default="UNIFORM"]; }