Skip to content

Commit

Permalink
[SWIG] Add streaming data support + cpp tests (#3997)
Browse files Browse the repository at this point in the history
* [feature] Add ChunkedArray to SWIG

* Add ChunkedArray
* Add ChunkedArray_API_extensions.i
* Add SWIG class wrappers

* Address some review comments

* Fix linting issues

* Move test to tests/test_ChunkedArray_manually.cpp

* Add test note

* Move ChunkedArray to include/LightGBM/utils/

* Declare more explicit types of ChunkedArray in the SWIG API.

* Port ChunkedArray tests to googletest

* Please C++ linter

* Address StrikerRUS' review comments

* Update SWIG doc & disable ChunkedArray<int64_t>

* Use CHECK_EQ instead of assert

* Change include order (linting)

* Rename ChunkedArray -> chunked_array files

* Change header guards

* Address last comments from StrikerRUS
  • Loading branch information
AlbertoEAF authored Mar 21, 2021
1 parent 971b548 commit 4ded134
Show file tree
Hide file tree
Showing 6 changed files with 570 additions and 34 deletions.
2 changes: 1 addition & 1 deletion .ci/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ if [[ $TASK == "lint" ]]; then
echo "Linting R code"
Rscript ${BUILD_DIRECTORY}/.ci/lint_r_code.R ${BUILD_DIRECTORY} || exit -1
echo "Linting C++ code"
cpplint --filter=-build/c++11,-build/include_subdir,-build/header_guard,-whitespace/line_length --recursive ./src ./include ./R-package || exit -1
cpplint --filter=-build/c++11,-build/include_subdir,-build/header_guard,-whitespace/line_length --recursive ./src ./include ./R-package ./swig || exit -1
exit 0
fi

Expand Down
260 changes: 260 additions & 0 deletions include/LightGBM/utils/chunked_array.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,260 @@
/*!
* Copyright (c) 2021 Microsoft Corporation. All rights reserved.
* Licensed under the MIT License. See LICENSE file in the project root for license information.
*
* Author: Alberto Ferreira
*/
#ifndef LIGHTGBM_UTILS_CHUNKED_ARRAY_HPP_
#define LIGHTGBM_UTILS_CHUNKED_ARRAY_HPP_

#include <LightGBM/utils/log.h>

#include <stdint.h>

#include <algorithm>
#include <new>
#include <vector>


namespace LightGBM {

/**
* Container that manages a dynamic array of fixed-length chunks.
*
* The class also takes care of allocation & release of the underlying
* memory. It can be used with either a high or low-level API.
*
* The high-level API allocates chunks as needed, manages addresses automatically and keeps
* track of number of inserted elements, but is not thread-safe (this is ok as usually input is a streaming iterator).
* For parallel input sources the low-level API must be used.
*
* Note: When using this for `LGBM_DatasetCreateFromMats` use a
* chunk_size multiple of #num_cols for your dataset, so each chunk
* contains "complete" instances.
*
* === High-level insert API intro ===
*
* The easiest way to use is:
* 0. ChunkedArray(chunk_size) # Choose appropriate size
* 1. add(value) # as many times as you want (will generate chunks as needed)
* 2. data() or void_data() # retrieves a T** or void** pointer (useful for `LGBM_DatasetCreateFromMats`).
*
* Useful query methods (all O(1)):
* - get_add_count() # total count of added elements.
* - get_chunks_count() # how many chunks are currently allocated.
* - get_current_chunk_added_count() # for the last add() chunk, how many items there are.
* - get_chunk_size() # get constant chunk_size from constructor call.
*
* With those you can generate int32_t sizes[]. Last chunk can be smaller than chunk_size, so, for any i:
* - sizes[i<last] = get_chunk_size()
* - sizes[i==last] = get_add_count()
*
*
* === Low-level insert API intro ===
*
* For advanced usage - useful for inserting in parallel - one can also:
* 1. call new_chunk() at any time for as many chunks as needed. (thread-UNsafe)
* 2. call setitem(chunk, idx, value) to insert each value. (thread-safe)
*
*/
template <class T>
class ChunkedArray {
public:
explicit ChunkedArray(size_t chunk_size)
: _chunk_size(chunk_size), _last_chunk_idx(0), _last_idx_in_last_chunk(0) {
if (chunk_size == 0) {
Log::Fatal("ChunkedArray chunk size must be larger than 0!");
}
new_chunk();
}

~ChunkedArray() {
release();
}

/**
* Adds a value to the chunks sequentially.
* If the last chunk is full it creates a new one and appends to it.
*
* @param value value to insert.
*/
void add(T value) {
if (!within_bounds(_last_chunk_idx, _last_idx_in_last_chunk)) {
new_chunk();
++_last_chunk_idx;
_last_idx_in_last_chunk = 0;
}

CHECK_EQ(setitem(_last_chunk_idx, _last_idx_in_last_chunk, value), 0);
++_last_idx_in_last_chunk;
}

/**
* @return Number of add() calls.
*/
size_t get_add_count() const {
return _last_chunk_idx * _chunk_size + _last_idx_in_last_chunk;
}

/**
* @return Number of allocated chunks.
*/
size_t get_chunks_count() const {
return _chunks.size();
}

/**
* @return Number of elemends add()'ed in the last chunk.
*/
size_t get_last_chunk_add_count() const {
return _last_idx_in_last_chunk;
}

/**
* Getter for the chunk size set at the constructor.
*
* @return Return the size of chunks.
*/
size_t get_chunk_size() const {
return _chunk_size;
}

/**
* Returns the pointer to the raw chunks data.
*
* @return T** pointer to raw data.
*/
T **data() noexcept {
return _chunks.data();
}

/**
* Returns the pointer to the raw chunks data, but cast to void**.
* This is so ``LGBM_DatasetCreateFromMats`` accepts it.
*
* @return void** pointer to raw data.
*/
void **data_as_void() noexcept {
return reinterpret_cast<void**>(_chunks.data());
}

/**
* Coalesces (copies chunked data) to a contiguous array of the same type.
* It assumes that ``other`` has enough space to receive that data.
*
* @param other array with elements T of size >= this->get_add_count().
* @param all_valid_addresses
* If true exports values from all valid addresses independently of add() count.
* Otherwise, exports only up to `get_add_count()` addresses.
*/
void coalesce_to(T *other, bool all_valid_addresses = false) const {
const size_t full_chunks = this->get_chunks_count() - 1;

// Copy full chunks:
size_t i = 0;
for (size_t chunk = 0; chunk < full_chunks; ++chunk) {
T* chunk_ptr = _chunks[chunk];
for (size_t in_chunk_idx = 0; in_chunk_idx < _chunk_size; ++in_chunk_idx) {
other[i++] = chunk_ptr[in_chunk_idx];
}
}
// Copy filled values from last chunk only:
const size_t last_chunk_elems_to_copy = all_valid_addresses ? _chunk_size : this->get_last_chunk_add_count();
T* chunk_ptr = _chunks[full_chunks];
for (size_t in_chunk_idx = 0; in_chunk_idx < last_chunk_elems_to_copy; ++in_chunk_idx) {
other[i++] = chunk_ptr[in_chunk_idx];
}
}

/**
* Return value from array of chunks.
*
* @param chunk_index index of the chunk
* @param index_within_chunk index within chunk
* @param on_fail_value sentinel value. If out of bounds returns that value.
*
* @return pointer or nullptr if index is out of bounds.
*/
T getitem(size_t chunk_index, size_t index_within_chunk, T on_fail_value) const noexcept {
if (within_bounds(chunk_index, index_within_chunk))
return _chunks[chunk_index][index_within_chunk];
else
return on_fail_value;
}

/**
* Sets the value at a specific address in one of the chunks.
*
* @param chunk_index index of the chunk
* @param index_within_chunk index within chunk
* @param value value to store
*
* @return 0 = success, -1 = out of bounds access.
*/
int setitem(size_t chunk_index, size_t index_within_chunk, T value) noexcept {
if (within_bounds(chunk_index, index_within_chunk)) {
_chunks[chunk_index][index_within_chunk] = value;
return 0;
} else {
return -1;
}
}

/**
* To reset storage call this.
* Will release existing resources and prepare for reuse.
*/
void clear() noexcept {
release();
new_chunk();
}

/**
* Deletes all the allocated chunks.
* Do not use container after this! See ``clear()`` instead.
*/
void release() noexcept {
std::for_each(_chunks.begin(), _chunks.end(), [](T* c) { delete[] c; });
_chunks.clear();
_chunks.shrink_to_fit();
_last_chunk_idx = 0;
_last_idx_in_last_chunk = 0;
}

/**
* As the array is dynamic, checks whether a given address is currently within bounds.
*
* @param chunk_index index of the chunk
* @param index_within_chunk index within that chunk
* @return true if that chunk is already allocated and index_within_chunk < chunk size.
*/
inline bool within_bounds(size_t chunk_index, size_t index_within_chunk) const {
return (chunk_index < _chunks.size()) && (index_within_chunk < _chunk_size);
}

/**
* Adds a new chunk to the array of chunks. Not thread-safe.
*/
void new_chunk() {
_chunks.push_back(new (std::nothrow) T[_chunk_size]);

// Check memory allocation success:
if (!_chunks[_chunks.size()-1]) {
release();
Log::Fatal("Memory exhausted! Cannot allocate new ChunkedArray chunk.");
}
}

private:
const size_t _chunk_size;
std::vector<T*> _chunks;

// For the add() interface & some of the get_*() queries:
size_t _last_chunk_idx; //<! Index of chunks
size_t _last_idx_in_last_chunk; //<! Index within chunk
};


} // namespace LightGBM

#endif // LIGHTGBM_UTILS_CHUNKED_ARRAY_HPP_
23 changes: 23 additions & 0 deletions swig/ChunkedArray_API_extensions.i
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/**
* Wrap chunked_array.hpp class for SWIG usage.
*
* Author: Alberto Ferreira
*/

%{
#include "../include/LightGBM/utils/chunked_array.hpp"
%}

%include "../include/LightGBM/utils/chunked_array.hpp"

using LightGBM::ChunkedArray;

%template(int32ChunkedArray) ChunkedArray<int32_t>;
/* Unfortunately, for the time being,
* SWIG has issues generating the overloads to coalesce_to()
* for larger integral types
* so we won't support that for now:
*/
//%template(int64ChunkedArray) ChunkedArray<int64_t>;
%template(floatChunkedArray) ChunkedArray<float>;
%template(doubleChunkedArray) ChunkedArray<double>;
Loading

0 comments on commit 4ded134

Please sign in to comment.