Skip to content

Commit

Permalink
feat(tianmu): fix invalid write/read issue. (#817)
Browse files Browse the repository at this point in the history
  • Loading branch information
lujiashun committed Nov 9, 2022
1 parent eeca82b commit ce92a35
Show file tree
Hide file tree
Showing 3 changed files with 136 additions and 117 deletions.
12 changes: 6 additions & 6 deletions storage/tianmu/core/engine_results.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
namespace Tianmu {
namespace core {

// stonedb8 List -> mem_root_deque
void scan_fields(mem_root_deque<Item *> &fields, uint *&buf_lens, std::map<int, Item *> &items_backup) {
Item *item;
Field *f;
Expand All @@ -36,7 +35,8 @@ void scan_fields(mem_root_deque<Item *> &fields, uint *&buf_lens, std::map<int,
Item::Type item_type;
Item_sum::Sumfunctype sum_type;

buf_lens = new uint[CountVisibleFields(fields)]; // stonedb8
buf_lens = new uint[fields.size()];
memset(buf_lens, 0, sizeof(uint) * fields.size());
uint item_id = 0;
uint field_length = 0;
uint total_length = 0;
Expand Down Expand Up @@ -81,7 +81,7 @@ void scan_fields(mem_root_deque<Item *> &fields, uint *&buf_lens, std::map<int,
tmp->collation.set(item->collation);
tmp->value_.set_charset(item->collation.collation);
tmp->set_data_type(item->data_type());
fields[item_id] = tmp; // stonedb8
fields[item_id] = tmp;
break;
}
case Item::SUM_FUNC_ITEM: {
Expand All @@ -100,15 +100,15 @@ void scan_fields(mem_root_deque<Item *> &fields, uint *&buf_lens, std::map<int,
isum_hybrid_rcbase->hybrid_field_type_ = is->data_type();
isum_hybrid_rcbase->collation.set(is->collation);
isum_hybrid_rcbase->value_.set_charset(is->collation.collation);
fields[item_id] = isum_hybrid_rcbase; // stonedb8
fields[item_id] = isum_hybrid_rcbase;
buf_lens[item_id] = 0;
break;
} else if (sum_type == Item_sum::COUNT_FUNC || sum_type == Item_sum::COUNT_DISTINCT_FUNC ||
sum_type == Item_sum::SUM_BIT_FUNC) {
isum_int_rcbase = new types::Item_sum_int_rcbase();
isum_int_rcbase->unsigned_flag = is->unsigned_flag;
items_backup[item_id] = item;
fields[item_id] = isum_int_rcbase; // stonedb8
fields[item_id] = isum_int_rcbase;
buf_lens[item_id] = 0;
break;
// we can use the same type for SUM,AVG and SUM DIST, AVG DIST
Expand Down Expand Up @@ -138,7 +138,7 @@ void scan_fields(mem_root_deque<Item *> &fields, uint *&buf_lens, std::map<int,

item_id = 0;

for (auto it = fields.begin(); it != fields.end(); ++it) { // stonedb8
for (auto it = fields.begin(); it != fields.end(); ++it) {
item = *it;
item_type = item->type();
switch (item_type) {
Expand Down
211 changes: 115 additions & 96 deletions storage/tianmu/system/cacheable_item.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,153 +23,172 @@

namespace Tianmu {
namespace system {
CacheableItem::CacheableItem(char const *owner_name, char const *object_id, int _default_block_size) {
default_block_size = _default_block_size;
CacheableItem::CacheableItem(char const *owner_name, char const *object_id, int default_block_size) {
default_block_size_ = default_block_size;
max_file_id_ = 0;
max_file_pos_ = 0;
no_block_ = 0;
cur_file_number_ = -1;
file_name_ = nullptr;
DEBUG_ASSERT(owner_name != nullptr);
DEBUG_ASSERT(object_id != nullptr);
// copy the temporary folder first
filename = nullptr;

{
// read the configuration parameter
std::string temp_filename = tianmu_sysvar_cachefolder;
filename_n_position = temp_filename.length();
filename = new char[filename_n_position + 37]; // "...path.../XXXXXXnnnnnnAAAAAAAABBBBBBBB.tianmu_tmp"
std::strcpy(filename, temp_filename.c_str());
if (filename[filename_n_position - 1] != '/' && filename[filename_n_position - 1] != '\\') {
filename[filename_n_position] = '/';
filename_n_position++;
}
filename_n_position += 6;
}

if (filename == nullptr) {
// if the temporary path is not set, use the current folder
filename = new char[36]; // "XXXXXXnnnnnnAAAAAAAABBBBBBBB.tianmu_tmp"
filename_n_position = 6;
constexpr size_t MIN_OWN_NAME_LEN = 3;
constexpr size_t OWNER_AND_OJBECT_LEN = 6;
constexpr size_t NUMBER_FILL_LEN = 6;
constexpr size_t RANDOM_LEN = 8;
constexpr size_t OBJECT_PTR_LEN = 8;
constexpr size_t SUFFIX_LEN = 11;
size_t filename_offset = 0;

// read the configuration parameter
std::string temp_filename = tianmu_sysvar_cachefolder;
char last_char = temp_filename[temp_filename.size() - 1];
if (last_char != '/' && last_char != '\\') {
temp_filename += "/";
}
max_file_id = 0;
max_file_pos = 0;
no_block = 0;
cur_file_number = -1;

// copy the temporary folder first
// "...path.../XXXXXXnnnnnnAAAAAAAABBBBBBBB.tianmu_tmp"
size_t total_length =
temp_filename.length() + OWNER_AND_OJBECT_LEN + NUMBER_FILL_LEN + RANDOM_LEN + OBJECT_PTR_LEN + SUFFIX_LEN + 1;
file_name_ = new char[total_length];
file_name_[total_length - 1] = 0;
std::strcpy(file_name_, temp_filename.c_str());
filename_offset = temp_filename.length();

// fill the file name
int i = 0, j = 0;
while (owner_name[j] != 0 && i < 6) filename[filename_n_position - 6 + (i++)] = owner_name[j++];
while (i < 3) filename[filename_n_position - 6 + (i++)] = '_';

while (owner_name[j] != 0 && i < OWNER_AND_OJBECT_LEN) file_name_[filename_offset + (i++)] = owner_name[j++];
while (i < MIN_OWN_NAME_LEN) file_name_[filename_offset + (i++)] = '_';
j = 0;
while (object_id[j] != 0 && i < 6) filename[filename_n_position - 6 + (i++)] = object_id[j++];
while (i < 6) filename[filename_n_position - 6 + (i++)] = '_';
std::strcpy(filename + filename_n_position, "000000");
char buf[30];
while (object_id[j] != 0 && i < OWNER_AND_OJBECT_LEN) file_name_[filename_offset + (i++)] = object_id[j++];
while (i < OWNER_AND_OJBECT_LEN) file_name_[filename_offset + (i++)] = '_';
filename_offset += OWNER_AND_OJBECT_LEN;

filename_n_position_ = filename_offset;

snprintf(file_name_ + filename_offset, NUMBER_FILL_LEN + 1, "%s", "000000");
filename_offset += NUMBER_FILL_LEN;

char buf[30] = {};
unsigned int random_number = 0;
random_number |= ((rand() % 1024) << 21);
random_number |= ((rand() % 1024) << 11);
random_number |= (rand() % 2048);
std::sprintf(buf, "%X", random_number);
std::strcpy(filename + filename_n_position + 6 + (8 - std::strlen(buf)), buf);
if (std::strlen(buf) < 8)
std::memset(filename + filename_n_position + 6, '0', 8 - std::strlen(buf));
std::sprintf(buf, "%p", this);
std::strcpy(filename + filename_n_position + 14 + (8 - std::strlen(buf)), buf);
if (std::strlen(buf) < 8)
std::memset(filename + filename_n_position + 14, '0', 8 - std::strlen(buf));
std::strcpy(filename + filename_n_position + 22, ".tianmu_tmp");
std::snprintf(file_name_ + filename_offset, RANDOM_LEN + 1, "%08X", random_number);
filename_offset += RANDOM_LEN;

std::snprintf(buf, sizeof(buf), "%p", this);
if (std::strlen(buf) >= OBJECT_PTR_LEN)
std::memcpy(file_name_ + filename_offset, buf, OBJECT_PTR_LEN);
else {
std::strcpy(file_name_ + filename_offset + (OBJECT_PTR_LEN - std::strlen(buf)), buf);
std::memset(file_name_ + filename_offset, '0', OBJECT_PTR_LEN - std::strlen(buf));
}
filename_offset += OBJECT_PTR_LEN;

std::snprintf(file_name_ + filename_offset, SUFFIX_LEN + 1, "%s", ".tianmu_tmp");
filename_offset += SUFFIX_LEN;

DEBUG_ASSERT(file_name_[filename_offset] == 0);
}

CacheableItem::~CacheableItem() {
cur_file_handle.Close();
for (int i = 0; i <= max_file_id; i++) {
cur_file_handle_.Close();
for (int i = 0; i <= max_file_id_; i++) {
SetFilename(i); // delete all files
RemoveFile(filename);
RemoveFile(file_name_);
}
delete[] filename;
delete[] file_name_;
}

void CacheableItem::CI_Put(int block, unsigned char *data, int size) {
if (block == -1)
return;
if (size == -1)
size = default_block_size;
size = default_block_size_;
if (size <= 0)
return;
for (int i = no_block; i < block; i++) { // rare case: the block numbering is not continuous
for (int i = no_block_; i < block; i++) { // rare case: the block numbering is not continuous
// create empty blocks
file_number.push_back(-1);
file_size.push_back(0);
file_start.push_back(0);
file_number_.push_back(-1);
file_size_.push_back(0);
file_start_.push_back(0);
}

try {
if (block >= no_block || size != file_size[block]) {
if (block >= no_block_ || size != file_size_[block]) {
// create a new block or reallocate the existing one
if ((long long)(size) + (long long)(max_file_pos) > 2000000000) { // the file size limit: 2 GB
if ((long long)(size) + (long long)(max_file_pos_) > 2000000000) { // the file size limit: 2 GB
// file becomes too large, start the next one!
max_file_id++;
max_file_pos = 0;
max_file_id_++;
max_file_pos_ = 0;
}
if (block >= no_block) {
file_number.push_back(max_file_id);
file_size.push_back(size);
file_start.push_back(max_file_pos);
no_block = block + 1;
if (block >= no_block_) {
file_number_.push_back(max_file_id_);
file_size_.push_back(size);
file_start_.push_back(max_file_pos_);
no_block_ = block + 1;
} else {
file_number[block] = max_file_id;
file_size[block] = size;
file_start[block] = max_file_pos;
file_number_[block] = max_file_id_;
file_size_[block] = size;
file_start_[block] = max_file_pos_;
}
cur_file_handle.Close();
SetFilename(file_number[block]);
if (max_file_pos == 0) // the new file
cur_file_handle.OpenCreateEmpty(filename);
cur_file_handle_.Close();
SetFilename(file_number_[block]);
if (max_file_pos_ == 0) // the new file
cur_file_handle_.OpenCreateEmpty(file_name_);
else
cur_file_handle.OpenReadWrite(filename);
DEBUG_ASSERT(cur_file_handle.IsOpen());
cur_file_number = file_number[block];
max_file_pos += size;
cur_file_handle_.OpenReadWrite(file_name_);
DEBUG_ASSERT(cur_file_handle_.IsOpen());
cur_file_number_ = file_number_[block];
max_file_pos_ += size;
}
// save the block
if (file_number[block] != cur_file_number) {
if (file_number_[block] != cur_file_number_) {
// open the block file
cur_file_number = file_number[block];
cur_file_handle.Close();
SetFilename(cur_file_number);
cur_file_handle.OpenReadWrite(filename);
DEBUG_ASSERT(cur_file_handle.IsOpen());
cur_file_number_ = file_number_[block];
cur_file_handle_.Close();
SetFilename(cur_file_number_);
cur_file_handle_.OpenReadWrite(file_name_);
DEBUG_ASSERT(cur_file_handle_.IsOpen());
}

#ifdef FUNCTIONS_EXECUTION_TIMES
char str[100];
if (file_size[block] >= 1_MB)
std::sprintf(str, "CacheableItem::CI_Put,write(%dMB)", (int)(file_size[block] / 1_MB));
if (file_size_[block] >= 1_MB)
std::sprintf(str, "CacheableItem::CI_Put,write(%dMB)", (int)(file_size_[block] / 1_MB));
else
std::sprintf(str, "CacheableItem::CI_Put,write(%dKB)", (int)(file_size[block] / 1_KB));
std::sprintf(str, "CacheableItem::CI_Put,write(%dKB)", (int)(file_size_[block] / 1_KB));
FETOperator feto(str);
#endif
cur_file_handle.Seek(file_start[block], SEEK_SET);
cur_file_handle.WriteExact((char *)data, file_size[block]);
cur_file_handle_.Seek(file_start_[block], SEEK_SET);
cur_file_handle_.WriteExact((char *)data, file_size_[block]);
} catch (common::DatabaseException &e) {
throw common::OutOfMemoryException(e.what());
}
}

int CacheableItem::CI_Get(int block, uchar *data, int size, int off) {
if (block >= no_block || file_size[block] <= 0 || (size >= 0 && off + size > file_size[block]))
if (block >= no_block_ || file_size_[block] <= 0 || (size >= 0 && off + size > file_size_[block]))
return -1;

try {
// open file containing the block
if (file_number[block] != cur_file_number) {
if (file_number_[block] != cur_file_number_) {
// open the block file
cur_file_number = file_number[block];
cur_file_handle.Close();
SetFilename(cur_file_number);
cur_file_handle.OpenReadWrite(filename);
DEBUG_ASSERT(cur_file_handle.IsOpen());
cur_file_number_ = file_number_[block];
cur_file_handle_.Close();
SetFilename(cur_file_number_);
cur_file_handle_.OpenReadWrite(file_name_);
DEBUG_ASSERT(cur_file_handle_.IsOpen());
}

// load the block
if (size < 0) {
size = file_size[block];
size = file_size_[block];
off = 0;
}

Expand All @@ -182,8 +201,8 @@ int CacheableItem::CI_Get(int block, uchar *data, int size, int off) {
FETOperator feto(str);
#endif

cur_file_handle.Seek(file_start[block] + off, SEEK_SET);
cur_file_handle.Read((char *)data, size);
cur_file_handle_.Seek(file_start_[block] + off, SEEK_SET);
cur_file_handle_.Read((char *)data, size);
} catch (common::DatabaseException &e) {
throw common::OutOfMemoryException(e.what());
}
Expand All @@ -192,12 +211,12 @@ int CacheableItem::CI_Get(int block, uchar *data, int size, int off) {

void CacheableItem::SetFilename(int i) // char -> void temporary change to enable compilation
{
filename[filename_n_position + 5] = (char)('0' + i % 10);
filename[filename_n_position + 4] = (char)('0' + (i / 10) % 10);
filename[filename_n_position + 3] = (char)('0' + (i / 100) % 10);
filename[filename_n_position + 2] = (char)('0' + (i / 1000) % 10);
filename[filename_n_position + 1] = (char)('0' + (i / 10000) % 10);
filename[filename_n_position] = (char)('0' + (i / 100000) % 10);
file_name_[filename_n_position_ + 5] = (char)('0' + i % 10);
file_name_[filename_n_position_ + 4] = (char)('0' + (i / 10) % 10);
file_name_[filename_n_position_ + 3] = (char)('0' + (i / 100) % 10);
file_name_[filename_n_position_ + 2] = (char)('0' + (i / 1000) % 10);
file_name_[filename_n_position_ + 1] = (char)('0' + (i / 10000) % 10);
file_name_[filename_n_position_] = (char)('0' + (i / 100000) % 10);
}

} // namespace system
Expand Down
30 changes: 15 additions & 15 deletions storage/tianmu/system/cacheable_item.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,32 +53,32 @@ class CacheableItem {

void CI_SetDefaultSize(int size) // If the value is set (here, or in constructor),
{
default_block_size = size;
default_block_size_ = size;
} // then the size parameter in CI_Put may be omitted.

private:
void SetFilename(int i); // set the current filename to the i-th file
char *filename; // the current filename with the full path
char *file_name_; // the current filename with the full path
// the filename template is as follows:
// "..path../XXXXXXnnnnnnAAAAAAAABBBBBBBB.tianmu_tmp" where X..X is an id. of
// object manager and object type (filled with "_") n..n is the number of file
// (0..999999), A..A is the random session number (hex), B..B is the
// semi-random object number (its memory address while creating, hex)
size_t filename_n_position; // the position of the first character of
// "nnnnnn" section of the file name
size_t filename_n_position_; // the position of the first character of
// "nnnnnn" section of the file name

int max_file_id; // maximal used file number, start with 0
int max_file_pos; // the end of the last file used (here we will append)
int no_block; // the number of registered (saved) data blocks
std::vector<int> file_number; // a number of file where the i-th data block is stored
std::vector<int> file_start; // an address in the file where the i-th data block starts
std::vector<int> file_size; // a size of the i-th data block
int max_file_id_; // maximal used file number, start with 0
int max_file_pos_; // the end of the last file used (here we will append)
int no_block_; // the number of registered (saved) data blocks
std::vector<int> file_number_; // a number of file where the i-th data block is stored
std::vector<int> file_start_; // an address in the file where the i-th data block starts
std::vector<int> file_size_; // a size of the i-th data block

int default_block_size;
TianmuFile cur_file_handle;
int cur_file_number; // the number of currently opened file
void *cur_map_addr;
static const size_t cur_map_size;
int default_block_size_;
TianmuFile cur_file_handle_;
int cur_file_number_; // the number of currently opened file
void *cur_map_addr_;
static const size_t cur_map_size_;
};
} // namespace system
} // namespace Tianmu
Expand Down

0 comments on commit ce92a35

Please sign in to comment.