Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(tianmu): fix invalid write/read issue. (#817) #902

Merged
merged 2 commits into from
Nov 10, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions storage/tianmu/core/engine_results.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
namespace Tianmu {
namespace core {

// stonedb8 List -> mem_root_deque
void scan_fields(mem_root_deque<Item *> &fields, uint *&buf_lens, std::map<int, Item *> &items_backup) {
Item *item;
Field *f;
Expand All @@ -36,7 +35,8 @@ void scan_fields(mem_root_deque<Item *> &fields, uint *&buf_lens, std::map<int,
Item::Type item_type;
Item_sum::Sumfunctype sum_type;

buf_lens = new uint[CountVisibleFields(fields)]; // stonedb8
buf_lens = new uint[fields.size()];
RingsC marked this conversation as resolved.
Show resolved Hide resolved
memset(buf_lens, 0, sizeof(uint) * fields.size());
uint item_id = 0;
uint field_length = 0;
uint total_length = 0;
Expand Down Expand Up @@ -81,7 +81,7 @@ void scan_fields(mem_root_deque<Item *> &fields, uint *&buf_lens, std::map<int,
tmp->collation.set(item->collation);
tmp->value_.set_charset(item->collation.collation);
tmp->set_data_type(item->data_type());
fields[item_id] = tmp; // stonedb8
fields[item_id] = tmp;
break;
}
case Item::SUM_FUNC_ITEM: {
Expand All @@ -100,15 +100,15 @@ void scan_fields(mem_root_deque<Item *> &fields, uint *&buf_lens, std::map<int,
isum_hybrid_rcbase->hybrid_field_type_ = is->data_type();
isum_hybrid_rcbase->collation.set(is->collation);
isum_hybrid_rcbase->value_.set_charset(is->collation.collation);
fields[item_id] = isum_hybrid_rcbase; // stonedb8
fields[item_id] = isum_hybrid_rcbase;
buf_lens[item_id] = 0;
break;
} else if (sum_type == Item_sum::COUNT_FUNC || sum_type == Item_sum::COUNT_DISTINCT_FUNC ||
sum_type == Item_sum::SUM_BIT_FUNC) {
isum_int_rcbase = new types::Item_sum_int_rcbase();
isum_int_rcbase->unsigned_flag = is->unsigned_flag;
items_backup[item_id] = item;
fields[item_id] = isum_int_rcbase; // stonedb8
fields[item_id] = isum_int_rcbase;
buf_lens[item_id] = 0;
break;
// we can use the same type for SUM,AVG and SUM DIST, AVG DIST
Expand Down Expand Up @@ -138,7 +138,7 @@ void scan_fields(mem_root_deque<Item *> &fields, uint *&buf_lens, std::map<int,

item_id = 0;

for (auto it = fields.begin(); it != fields.end(); ++it) { // stonedb8
for (auto it = fields.begin(); it != fields.end(); ++it) {
item = *it;
item_type = item->type();
switch (item_type) {
Expand Down
207 changes: 111 additions & 96 deletions storage/tianmu/system/cacheable_item.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,153 +23,168 @@

namespace Tianmu {
namespace system {
CacheableItem::CacheableItem(char const *owner_name, char const *object_id, int _default_block_size) {
default_block_size = _default_block_size;
CacheableItem::CacheableItem(char const *owner_name, char const *object_id, int default_block_size)
: default_block_size_(default_block_size) {
DEBUG_ASSERT(owner_name != nullptr);
DEBUG_ASSERT(object_id != nullptr);
// copy the temporary folder first
filename = nullptr;

{
// read the configuration parameter
std::string temp_filename = tianmu_sysvar_cachefolder;
filename_n_position = temp_filename.length();
filename = new char[filename_n_position + 37]; // "...path.../XXXXXXnnnnnnAAAAAAAABBBBBBBB.tianmu_tmp"
std::strcpy(filename, temp_filename.c_str());
if (filename[filename_n_position - 1] != '/' && filename[filename_n_position - 1] != '\\') {
filename[filename_n_position] = '/';
filename_n_position++;
}
filename_n_position += 6;
}

if (filename == nullptr) {
// if the temporary path is not set, use the current folder
filename = new char[36]; // "XXXXXXnnnnnnAAAAAAAABBBBBBBB.tianmu_tmp"
filename_n_position = 6;
constexpr size_t kMinOwnerNameLen = 3;
constexpr size_t kOwnerAndObjectLen = 6;
constexpr size_t kNumberFillLen = 6;
constexpr size_t kRandomLen = 8;
constexpr size_t kObjectPtrLen = 8;
constexpr size_t kSuffixLen = 11;
size_t filename_offset = 0;

// read the configuration parameter
std::string temp_filename = tianmu_sysvar_cachefolder;
char last_char = temp_filename[temp_filename.size() - 1];
if (last_char != '/' && last_char != '\\') {
temp_filename += "/";
}
max_file_id = 0;
max_file_pos = 0;
no_block = 0;
cur_file_number = -1;

// copy the temporary folder first
// "...path.../XXXXXXnnnnnnAAAAAAAABBBBBBBB.tianmu_tmp"
size_t total_length =
temp_filename.length() + kOwnerAndObjectLen + kNumberFillLen + kRandomLen + kObjectPtrLen + kSuffixLen + 1;
file_name_ = new char[total_length];
file_name_[total_length - 1] = 0;
std::strcpy(file_name_, temp_filename.c_str());
filename_offset = temp_filename.length();

// fill the file name
int i = 0, j = 0;
while (owner_name[j] != 0 && i < 6) filename[filename_n_position - 6 + (i++)] = owner_name[j++];
while (i < 3) filename[filename_n_position - 6 + (i++)] = '_';

while (owner_name[j] != 0 && i < kOwnerAndObjectLen) file_name_[filename_offset + (i++)] = owner_name[j++];
while (i < kMinOwnerNameLen) file_name_[filename_offset + (i++)] = '_';
j = 0;
while (object_id[j] != 0 && i < 6) filename[filename_n_position - 6 + (i++)] = object_id[j++];
while (i < 6) filename[filename_n_position - 6 + (i++)] = '_';
std::strcpy(filename + filename_n_position, "000000");
char buf[30];
while (object_id[j] != 0 && i < kOwnerAndObjectLen) file_name_[filename_offset + (i++)] = object_id[j++];
while (i < kOwnerAndObjectLen) file_name_[filename_offset + (i++)] = '_';
filename_offset += kOwnerAndObjectLen;

filename_n_position_ = filename_offset;

snprintf(file_name_ + filename_offset, kNumberFillLen + 1, "%s", "000000");
filename_offset += kNumberFillLen;

char buf[30] = {};
unsigned int random_number = 0;
random_number |= ((rand() % 1024) << 21);
random_number |= ((rand() % 1024) << 11);
random_number |= (rand() % 2048);
std::sprintf(buf, "%X", random_number);
std::strcpy(filename + filename_n_position + 6 + (8 - std::strlen(buf)), buf);
if (std::strlen(buf) < 8)
std::memset(filename + filename_n_position + 6, '0', 8 - std::strlen(buf));
std::sprintf(buf, "%p", this);
std::strcpy(filename + filename_n_position + 14 + (8 - std::strlen(buf)), buf);
if (std::strlen(buf) < 8)
std::memset(filename + filename_n_position + 14, '0', 8 - std::strlen(buf));
std::strcpy(filename + filename_n_position + 22, ".tianmu_tmp");
std::snprintf(file_name_ + filename_offset, kRandomLen + 1, "%08X", random_number);
filename_offset += kRandomLen;

std::snprintf(buf, sizeof(buf), "%p", this);
auto object_len = std::strlen(buf);
if (object_len >= kObjectPtrLen)
std::memcpy(file_name_ + filename_offset, buf, kObjectPtrLen);
else {
std::strcpy(file_name_ + filename_offset + (kObjectPtrLen - object_len), buf);
std::memset(file_name_ + filename_offset, '0', kObjectPtrLen - object_len);
}
filename_offset += kObjectPtrLen;

std::snprintf(file_name_ + filename_offset, kSuffixLen + 1, "%s", ".tianmu_tmp");
filename_offset += kSuffixLen;

DEBUG_ASSERT(file_name_[filename_offset] == 0);
}

CacheableItem::~CacheableItem() {
cur_file_handle.Close();
for (int i = 0; i <= max_file_id; i++) {
cur_file_handle_.Close();
for (int i = 0; i <= max_file_id_; i++) {
SetFilename(i); // delete all files
RemoveFile(filename);
RemoveFile(file_name_);
}
delete[] filename;
delete[] file_name_;
}

void CacheableItem::CI_Put(int block, unsigned char *data, int size) {
if (block == -1)
return;
if (size == -1)
size = default_block_size;
size = default_block_size_;
if (size <= 0)
return;
for (int i = no_block; i < block; i++) { // rare case: the block numbering is not continuous
for (int i = no_block_; i < block; i++) { // rare case: the block numbering is not continuous
// create empty blocks
file_number.push_back(-1);
file_size.push_back(0);
file_start.push_back(0);
file_number_.push_back(-1);
file_size_.push_back(0);
file_start_.push_back(0);
}

try {
if (block >= no_block || size != file_size[block]) {
if (block >= no_block_ || size != file_size_[block]) {
// create a new block or reallocate the existing one
if ((long long)(size) + (long long)(max_file_pos) > 2000000000) { // the file size limit: 2 GB
if ((long long)(size) + (long long)(max_file_pos_) > 2000000000) { // the file size limit: 2 GB
// file becomes too large, start the next one!
max_file_id++;
max_file_pos = 0;
max_file_id_++;
max_file_pos_ = 0;
}
if (block >= no_block) {
file_number.push_back(max_file_id);
file_size.push_back(size);
file_start.push_back(max_file_pos);
no_block = block + 1;
if (block >= no_block_) {
file_number_.push_back(max_file_id_);
file_size_.push_back(size);
file_start_.push_back(max_file_pos_);
no_block_ = block + 1;
} else {
file_number[block] = max_file_id;
file_size[block] = size;
file_start[block] = max_file_pos;
file_number_[block] = max_file_id_;
file_size_[block] = size;
file_start_[block] = max_file_pos_;
}
cur_file_handle.Close();
SetFilename(file_number[block]);
if (max_file_pos == 0) // the new file
cur_file_handle.OpenCreateEmpty(filename);
cur_file_handle_.Close();
SetFilename(file_number_[block]);
if (max_file_pos_ == 0) // the new file
cur_file_handle_.OpenCreateEmpty(file_name_);
else
cur_file_handle.OpenReadWrite(filename);
DEBUG_ASSERT(cur_file_handle.IsOpen());
cur_file_number = file_number[block];
max_file_pos += size;
cur_file_handle_.OpenReadWrite(file_name_);
DEBUG_ASSERT(cur_file_handle_.IsOpen());
cur_file_number_ = file_number_[block];
max_file_pos_ += size;
}
// save the block
if (file_number[block] != cur_file_number) {
if (file_number_[block] != cur_file_number_) {
// open the block file
cur_file_number = file_number[block];
cur_file_handle.Close();
SetFilename(cur_file_number);
cur_file_handle.OpenReadWrite(filename);
DEBUG_ASSERT(cur_file_handle.IsOpen());
cur_file_number_ = file_number_[block];
cur_file_handle_.Close();
SetFilename(cur_file_number_);
cur_file_handle_.OpenReadWrite(file_name_);
DEBUG_ASSERT(cur_file_handle_.IsOpen());
}

#ifdef FUNCTIONS_EXECUTION_TIMES
char str[100];
if (file_size[block] >= 1_MB)
std::sprintf(str, "CacheableItem::CI_Put,write(%dMB)", (int)(file_size[block] / 1_MB));
if (file_size_[block] >= 1_MB)
std::sprintf(str, "CacheableItem::CI_Put,write(%dMB)", (int)(file_size_[block] / 1_MB));
else
std::sprintf(str, "CacheableItem::CI_Put,write(%dKB)", (int)(file_size[block] / 1_KB));
std::sprintf(str, "CacheableItem::CI_Put,write(%dKB)", (int)(file_size_[block] / 1_KB));
FETOperator feto(str);
#endif
cur_file_handle.Seek(file_start[block], SEEK_SET);
cur_file_handle.WriteExact((char *)data, file_size[block]);
cur_file_handle_.Seek(file_start_[block], SEEK_SET);
cur_file_handle_.WriteExact((char *)data, file_size_[block]);
} catch (common::DatabaseException &e) {
throw common::OutOfMemoryException(e.what());
}
}

int CacheableItem::CI_Get(int block, uchar *data, int size, int off) {
if (block >= no_block || file_size[block] <= 0 || (size >= 0 && off + size > file_size[block]))
if (block >= no_block_ || file_size_[block] <= 0 || (size >= 0 && off + size > file_size_[block]))
return -1;

try {
// open file containing the block
if (file_number[block] != cur_file_number) {
if (file_number_[block] != cur_file_number_) {
// open the block file
cur_file_number = file_number[block];
cur_file_handle.Close();
SetFilename(cur_file_number);
cur_file_handle.OpenReadWrite(filename);
DEBUG_ASSERT(cur_file_handle.IsOpen());
cur_file_number_ = file_number_[block];
cur_file_handle_.Close();
SetFilename(cur_file_number_);
cur_file_handle_.OpenReadWrite(file_name_);
DEBUG_ASSERT(cur_file_handle_.IsOpen());
}

// load the block
if (size < 0) {
size = file_size[block];
size = file_size_[block];
off = 0;
}

Expand All @@ -182,8 +197,8 @@ int CacheableItem::CI_Get(int block, uchar *data, int size, int off) {
FETOperator feto(str);
#endif

cur_file_handle.Seek(file_start[block] + off, SEEK_SET);
cur_file_handle.Read((char *)data, size);
cur_file_handle_.Seek(file_start_[block] + off, SEEK_SET);
cur_file_handle_.Read((char *)data, size);
} catch (common::DatabaseException &e) {
throw common::OutOfMemoryException(e.what());
}
Expand All @@ -192,12 +207,12 @@ int CacheableItem::CI_Get(int block, uchar *data, int size, int off) {

void CacheableItem::SetFilename(int i) // char -> void temporary change to enable compilation
{
filename[filename_n_position + 5] = (char)('0' + i % 10);
filename[filename_n_position + 4] = (char)('0' + (i / 10) % 10);
filename[filename_n_position + 3] = (char)('0' + (i / 100) % 10);
filename[filename_n_position + 2] = (char)('0' + (i / 1000) % 10);
filename[filename_n_position + 1] = (char)('0' + (i / 10000) % 10);
filename[filename_n_position] = (char)('0' + (i / 100000) % 10);
file_name_[filename_n_position_ + 5] = (char)('0' + i % 10);
file_name_[filename_n_position_ + 4] = (char)('0' + (i / 10) % 10);
file_name_[filename_n_position_ + 3] = (char)('0' + (i / 100) % 10);
file_name_[filename_n_position_ + 2] = (char)('0' + (i / 1000) % 10);
file_name_[filename_n_position_ + 1] = (char)('0' + (i / 10000) % 10);
file_name_[filename_n_position_] = (char)('0' + (i / 100000) % 10);
}

} // namespace system
Expand Down
32 changes: 15 additions & 17 deletions storage/tianmu/system/cacheable_item.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,32 +53,30 @@ class CacheableItem {

void CI_SetDefaultSize(int size) // If the value is set (here, or in constructor),
{
default_block_size = size;
default_block_size_ = size;
} // then the size parameter in CI_Put may be omitted.

private:
void SetFilename(int i); // set the current filename to the i-th file
char *filename; // the current filename with the full path
void SetFilename(int i); // set the current filename to the i-th file
char *file_name_ = nullptr; // the current filename with the full path
// the filename template is as follows:
// "..path../XXXXXXnnnnnnAAAAAAAABBBBBBBB.tianmu_tmp" where X..X is an id. of
// object manager and object type (filled with "_") n..n is the number of file
// (0..999999), A..A is the random session number (hex), B..B is the
// semi-random object number (its memory address while creating, hex)
size_t filename_n_position; // the position of the first character of
// "nnnnnn" section of the file name
size_t filename_n_position_ = 0; // the position of the first character of
// "nnnnnn" section of the file name
int max_file_id_ = 0; // maximal used file number, start with 0
int max_file_pos_ = 0; // the end of the last file used (here we will append)
int no_block_ = 0; // the number of registered (saved) data blocks
std::vector<int> file_number_; // a number of file where the i-th data block is stored
std::vector<int> file_start_; // an address in the file where the i-th data block starts
std::vector<int> file_size_; // a size of the i-th data block

int max_file_id; // maximal used file number, start with 0
int max_file_pos; // the end of the last file used (here we will append)
int no_block; // the number of registered (saved) data blocks
std::vector<int> file_number; // a number of file where the i-th data block is stored
std::vector<int> file_start; // an address in the file where the i-th data block starts
std::vector<int> file_size; // a size of the i-th data block

int default_block_size;
TianmuFile cur_file_handle;
int cur_file_number; // the number of currently opened file
void *cur_map_addr;
static const size_t cur_map_size;
int default_block_size_ = 0;
TianmuFile cur_file_handle_;
int cur_file_number_ = -1; // the number of currently opened file
void *cur_map_addr_ = nullptr;
};
} // namespace system
} // namespace Tianmu
Expand Down