Skip to content

Commit

Permalink
Merge pull request #3 from RUCAIBox/0.2.x
Browse files Browse the repository at this point in the history
get latest code from 0.2.x
  • Loading branch information
linzihan-backforward authored Nov 25, 2020
2 parents 1336392 + 05e78ec commit ee37080
Show file tree
Hide file tree
Showing 10 changed files with 253 additions and 125 deletions.
24 changes: 24 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,30 @@ If you want to change the models, just run the script by setting additional comm
python run_recbole.py --model=[model_name]
```


## Time and memory cost of models
We test our models on three datasets of different size (small size, medium size and large size) to estimate their time and memory cost. You can
click links to check more information.<br>
(**NOTE:** Our test results only reflect the approximate time and memory cost of models. If you find any error in our result,
please let us know.)<br>

* [General recommendation models](time_test_result/General_recommendation.md)<br>
* [Sequential recommendation models]()<br>
* [Context-aware recommendation models]()<br>
* [Knowledge-based recommendation models]()<br>

Here is our testing device information:<br>
```
GPU: TITAN GTX
Driver Version: 430.64
CUDA Version: 10.1
Memory size: 65412748 KB
CPU: Intel(R) Xeon(R) Silver 4110 CPU @ 2.10GHz
The number of CPU cores: 8
Cache size: 11264KB
```


## RecBole Major Releases
| Releases | Date | Features |
|-----------|--------|-------------------------|
Expand Down
9 changes: 9 additions & 0 deletions recbole/data/dataloader/abstract_dataloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,15 @@ def set_batch_size(self, batch_size):
self.batch_size = batch_size
self.logger.warning('Batch size is changed to {}'.format(batch_size))

def upgrade_batch_size(self, batch_size):
"""Upgrade the batch_size of the dataloader, if input batch_size is bigger than current batch_size.
Args:
batch_size (int): the new batch_size of dataloader.
"""
if self.batch_size < batch_size:
self.set_batch_size(batch_size)

def get_user_feature(self):
"""It is similar to :meth:`~recbole.data.dataset.dataset.Dataset.get_user_feature`, but it will return an
:class:`~recbole.data.interaction.Interaction` of user feature instead of a :class:`pandas.DataFrame`.
Expand Down
150 changes: 78 additions & 72 deletions recbole/data/dataloader/general_dataloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,28 +72,27 @@ class GeneralNegSampleDataLoader(NegSampleByMixin, AbstractDataLoader):
"""
def __init__(self, config, dataset, sampler, neg_sample_args,
batch_size=1, dl_format=InputType.POINTWISE, shuffle=False):
self.uid2index, self.uid2items_num = None, None
self.uid_list, self.uid2index, self.uid2items_num = None, None, None

super().__init__(config, dataset, sampler, neg_sample_args,
batch_size=batch_size, dl_format=dl_format, shuffle=shuffle)

def setup(self):
if self.user_inter_in_one_batch:
self.uid2index, self.uid2items_num = self.dataset.uid2index
self.uid_list, self.uid2index, self.uid2items_num = self.dataset.uid2index
self._batch_size_adaptation()

def data_preprocess(self):
if self.user_inter_in_one_batch:
new_inter_num = 0
new_inter_feat = []
new_uid2index = []
for uid, index in self.uid2index:
for uid in self.uid_list:
index = self.uid2index[uid]
new_inter_feat.append(self._neg_sampling(self.dataset.inter_feat[index]))
new_num = len(new_inter_feat[-1])
new_uid2index.append((uid, slice(new_inter_num, new_inter_num + new_num)))
new_inter_num += new_num
self.uid2index[uid] = slice(new_inter_num, new_inter_num + new_num)
self.uid2items_num[uid] = new_num
self.dataset.inter_feat = pd.concat(new_inter_feat, ignore_index=True)
self.uid2index = np.array(new_uid2index)
else:
self.dataset.inter_feat = self._neg_sampling(self.dataset.inter_feat)

Expand All @@ -108,36 +107,35 @@ def _batch_size_adaptation(self):
batch_num = i
new_batch_size += inters_num[i]
self.step = batch_num
self.set_batch_size(new_batch_size)
self.upgrade_batch_size(new_batch_size)
else:
batch_num = max(self.batch_size // self.times, 1)
new_batch_size = batch_num * self.times
self.step = batch_num if self.real_time else new_batch_size
self.set_batch_size(new_batch_size)
self.upgrade_batch_size(new_batch_size)

@property
def pr_end(self):
if self.user_inter_in_one_batch:
return len(self.uid2index)
return len(self.uid_list)
else:
return len(self.dataset)

def _shuffle(self):
if self.user_inter_in_one_batch:
new_index = np.random.permutation(len(self.uid2index))
self.uid2index = self.uid2index[new_index]
self.uid2items_num = self.uid2items_num[new_index]
np.random.shuffle(self.uid_list)
else:
self.dataset.shuffle()

def _next_batch_data(self):
if self.user_inter_in_one_batch:
sampling_func = self._neg_sampling if self.real_time else (lambda x: x)
cur_data = []
for uid, index in self.uid2index[self.pr: self.pr + self.step]:
for uid in self.uid_list[self.pr: self.pr + self.step]:
index = self.uid2index[uid]
cur_data.append(sampling_func(self.dataset[index]))
cur_data = pd.concat(cur_data, ignore_index=True)
pos_len_list = self.uid2items_num[self.pr: self.pr + self.step]
pos_len_list = self.uid2items_num[self.uid_list[self.pr: self.pr + self.step]]
user_len_list = pos_len_list * self.times
self.pr += self.step
return self._dataframe_to_interaction(cur_data, list(pos_len_list), list(user_len_list))
Expand Down Expand Up @@ -184,7 +182,14 @@ def get_pos_len_list(self):
Returns:
np.ndarray: Number of positive item for each user in a training/evaluating epoch.
"""
return self.uid2items_num
return self.uid2items_num[self.uid_list]

def get_user_len_list(self):
"""
Returns:
np.ndarray: Number of all item for each user in a training/evaluating epoch.
"""
return self.uid2items_num[self.uid_list] * self.times


class GeneralFullDataLoader(NegSampleMixin, AbstractDataLoader):
Expand All @@ -208,88 +213,89 @@ def __init__(self, config, dataset, sampler, neg_sample_args,
batch_size=1, dl_format=InputType.POINTWISE, shuffle=False):
if neg_sample_args['strategy'] != 'full':
raise ValueError('neg_sample strategy in GeneralFullDataLoader() should be `full`')
self.uid2index, self.uid2items_num = dataset.uid2index

uid_field = dataset.uid_field
iid_field = dataset.iid_field
user_num = dataset.user_num
self.uid_list = []
self.uid2items_num = np.zeros(user_num, dtype=np.int64)
self.uid2swap_idx = np.array([None] * user_num)
self.uid2rev_swap_idx = np.array([None] * user_num)
self.uid2history_item = np.array([None] * user_num)

dataset.sort(by=uid_field, ascending=True)
last_uid = None
positive_item = None
uid2used_item = sampler.used_ids
for uid, iid in dataset.inter_feat[[uid_field, iid_field]].values:
if uid != last_uid:
if last_uid is not None:
self._set_user_property(last_uid, uid2used_item[last_uid], positive_item)
last_uid = uid
self.uid_list.append(uid)
positive_item = set()
positive_item.add(iid)
self._set_user_property(last_uid, uid2used_item[last_uid], positive_item)
self.user_df = dataset.join(pd.DataFrame(self.uid_list, columns=[uid_field]))

super().__init__(config, dataset, sampler, neg_sample_args,
batch_size=batch_size, dl_format=dl_format, shuffle=shuffle)

def _set_user_property(self, uid, used_item, positive_item):
history_item = used_item - positive_item
positive_item_num = len(positive_item)
self.uid2items_num[uid] = positive_item_num
swap_idx = torch.tensor(sorted(set(range(positive_item_num)) ^ positive_item))
self.uid2swap_idx[uid] = swap_idx
self.uid2rev_swap_idx[uid] = swap_idx.flip(0)
self.uid2history_item[uid] = torch.tensor(list(history_item))

def data_preprocess(self):
self.user_tensor, tmp_pos_idx, tmp_used_idx, self.pos_len_list, self.neg_len_list = \
self._neg_sampling(self.uid2index, show_progress=True)
tmp_pos_len_list = [sum(self.pos_len_list[_: _ + self.step]) for _ in range(0, self.pr_end, self.step)]
tot_item_num = self.dataset.item_num
tmp_used_len_list = [sum(
[tot_item_num - x for x in self.neg_len_list[_: _ + self.step]]
) for _ in range(0, self.pr_end, self.step)]
self.pos_idx = list(torch.split(tmp_pos_idx, tmp_pos_len_list))
self.used_idx = list(torch.split(tmp_used_idx, tmp_used_len_list))
for i in range(len(self.pos_idx)):
self.pos_idx[i] -= i * tot_item_num * self.step
for i in range(len(self.used_idx)):
self.used_idx[i] -= i * tot_item_num * self.step
pass

def _batch_size_adaptation(self):
batch_num = max(self.batch_size // self.dataset.item_num, 1)
new_batch_size = batch_num * self.dataset.item_num
self.step = batch_num
self.set_batch_size(new_batch_size)
self.upgrade_batch_size(new_batch_size)

@property
def pr_end(self):
return len(self.uid2index)
return len(self.uid_list)

def _shuffle(self):
self.logger.warnning('GeneralFullDataLoader can\'t shuffle')

def _next_batch_data(self):
if not self.real_time:
slc = slice(self.pr, self.pr + self.step)
idx = self.pr // self.step
cur_data = self.user_tensor[slc], self.pos_idx[idx], self.used_idx[idx], \
self.pos_len_list[slc], self.neg_len_list[slc]
else:
cur_data = self._neg_sampling(self.uid2index[self.pr: self.pr + self.step])
cur_data = self._neg_sampling(self.user_df[self.pr: self.pr + self.step])
self.pr += self.step
return cur_data

def _neg_sampling(self, uid2index, show_progress=False):
uid_field = self.dataset.uid_field
iid_field = self.dataset.iid_field
tot_item_num = self.dataset.item_num

start_idx = 0
pos_len_list = []
neg_len_list = []

pos_idx = []
used_idx = []
def _neg_sampling(self, user_df):
uid_list = user_df[self.dataset.uid_field].values
user_interaction = self._dataframe_to_interaction(user_df)

iter_data = tqdm(uid2index) if show_progress else uid2index
for uid, index in iter_data:
pos_item_id = self.dataset.inter_feat[iid_field][index].values
pos_idx.extend([_ + start_idx for _ in pos_item_id])
pos_num = len(pos_item_id)
pos_len_list.append(pos_num)
history_item = self.uid2history_item[uid_list]
history_row = torch.cat([torch.full_like(hist_iid, i) for i, hist_iid in enumerate(history_item)])
history_col = torch.cat(list(history_item))

used_item_id = self.sampler.used_ids[uid]
used_idx.extend([_ + start_idx for _ in used_item_id])
used_num = len(used_item_id)

neg_num = tot_item_num - used_num
neg_len_list.append(neg_num)

start_idx += tot_item_num

user_df = pd.DataFrame({uid_field: np.array(uid2index[:, 0], dtype=np.int)})
user_interaction = self._dataframe_to_interaction(self.join(user_df))

return user_interaction, \
torch.LongTensor(pos_idx), torch.LongTensor(used_idx), \
pos_len_list, neg_len_list
swap_idx = self.uid2swap_idx[uid_list]
rev_swap_idx = self.uid2rev_swap_idx[uid_list]
swap_row = torch.cat([torch.full_like(swap, i) for i, swap in enumerate(swap_idx)])
swap_col_after = torch.cat(list(swap_idx))
swap_col_before = torch.cat(list(rev_swap_idx))
return user_interaction, (history_row, history_col), swap_row, swap_col_after, swap_col_before

def get_pos_len_list(self):
"""
Returns:
np.ndarray: Number of positive item for each user in a training/evaluating epoch.
"""
return self.uid2items_num
return self.uid2items_num[self.uid_list]

def get_user_len_list(self):
"""
Returns:
np.ndarray: Number of all item for each user in a training/evaluating epoch.
"""
return np.full(self.pr_end, self.item_num)
7 changes: 7 additions & 0 deletions recbole/data/dataloader/neg_sample_mixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,13 @@ def get_pos_len_list(self):
"""
raise NotImplementedError('Method [get_pos_len_list] should be implemented.')

def get_user_len_list(self):
"""
Returns:
np.ndarray: Number of all item for each user in a training/evaluating epoch.
"""
raise NotImplementedError('Method [get_user_len_list] should be implemented.')


class NegSampleByMixin(NegSampleMixin):
""":class:`NegSampleByMixin` is an abstract class which can sample negative examples by ratio.
Expand Down
42 changes: 33 additions & 9 deletions recbole/data/dataloader/sequential_dataloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import torch

from recbole.data.dataloader.abstract_dataloader import AbstractDataLoader
from recbole.data.dataloader.neg_sample_mixin import NegSampleByMixin
from recbole.data.dataloader.neg_sample_mixin import NegSampleByMixin, NegSampleMixin
from recbole.utils import DataLoaderType, FeatureSource, FeatureType, InputType


Expand Down Expand Up @@ -180,7 +180,7 @@ def _batch_size_adaptation(self):
batch_num = max(self.batch_size // self.times, 1)
new_batch_size = batch_num * self.times
self.step = batch_num if self.real_time else new_batch_size
self.set_batch_size(new_batch_size)
self.upgrade_batch_size(new_batch_size)

def _next_batch_data(self):
cur_index = slice(self.pr, self.pr + self.step)
Expand Down Expand Up @@ -245,8 +245,15 @@ def get_pos_len_list(self):
"""
return np.ones(self.pr_end, dtype=np.int64)

def get_user_len_list(self):
"""
Returns:
np.ndarray: Number of all item for each user in a training/evaluating epoch.
"""
return np.full(self.pr_end, self.times)


class SequentialFullDataLoader(SequentialDataLoader):
class SequentialFullDataLoader(NegSampleMixin, SequentialDataLoader):
""":class:`SequentialFullDataLoader` is a sequential-dataloader with full sort. In order to speed up calculation,
this dataloader would only return then user part of interactions, positive items and used items.
It would not return negative items.
Expand All @@ -265,24 +272,41 @@ class SequentialFullDataLoader(SequentialDataLoader):

def __init__(self, config, dataset, sampler, neg_sample_args,
batch_size=1, dl_format=InputType.POINTWISE, shuffle=False):
super().__init__(config, dataset,
super().__init__(config, dataset, sampler, neg_sample_args,
batch_size=batch_size, dl_format=dl_format, shuffle=shuffle)

def data_preprocess(self):
pass

def _batch_size_adaptation(self):
pass

def _neg_sampling(self, inter_feat):
pass

def _shuffle(self):
self.logger.warnning('SequentialFullDataLoader can\'t shuffle')

def _next_batch_data(self):
interaction = super()._next_batch_data()
tot_item_num = self.dataset.item_num
inter_num = len(interaction)
pos_idx = used_idx = interaction[self.target_iid_field] + torch.arange(inter_num) * tot_item_num
pos_len_list = [1] * inter_num
neg_len_list = [tot_item_num - 1] * inter_num
return interaction, pos_idx, used_idx, pos_len_list, neg_len_list
scores_row = torch.arange(inter_num).repeat(2)
padding_idx = torch.zeros(inter_num, dtype=torch.int64)
positive_idx = interaction[self.target_iid_field]
scores_col_after = torch.cat((padding_idx, positive_idx))
scores_col_before = torch.cat((positive_idx, padding_idx))
return interaction, None, scores_row, scores_col_after, scores_col_before

def get_pos_len_list(self):
"""
Returns:
np.ndarray or list: Number of positive item for each user in a training/evaluating epoch.
"""
return np.ones(self.pr_end, dtype=np.int64)

def get_user_len_list(self):
"""
Returns:
np.ndarray: Number of all item for each user in a training/evaluating epoch.
"""
return np.full(len(self.uid_list), self.item_num)
Loading

0 comments on commit ee37080

Please sign in to comment.