Skip to content

Commit

Permalink
Pickling of Frames (#1444)
Browse files Browse the repository at this point in the history
- Frame now supports pickle interface;
- when copying a Frame, its key property is no longer lost;
- `Frame.save()` can now also be called without any arguments -- it will save the Frame into memory in Jay format, and return the bytes object containing the Frame's data;
- `dt.open()` can now accept a bytes argument -- same as returned from `Frame.save()`. 

Closes #1442 
Closes #1443
  • Loading branch information
st-pasha authored Nov 23, 2018
1 parent 444678f commit b62faff
Show file tree
Hide file tree
Showing 16 changed files with 252 additions and 59 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@ and this project adheres to [Semantic Versioning](http://semver.org/).
#### Added
- methods `Frame.to_tuples()` and `Frame.to_dict()` (#1400).
- methods `Frame.head(n)` and `Frame.tail(n)` (#1307).
- `Frame` objects are now pickle-able (#1442).

#### Fixed
- crash when an int-column row selector is applied to a Frame which already
had another row filter applied (#1437).
- Frame.copy() now retains the key (#1443).


### [v0.7.0](https://github.com/h2oai/datatable/compare/0.7.0...v0.6.0) — 2018-11-16
Expand Down
7 changes: 4 additions & 3 deletions c/datatable.cc
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,9 @@ DataTable* DataTable::copy() const {
// vector can be default-copied.
newcols.push_back(col->shallowcopy());
}
return new DataTable(std::move(newcols), this);
DataTable* res = new DataTable(std::move(newcols), this);
res->nkeys = nkeys;
return res;
}


Expand Down Expand Up @@ -185,8 +187,7 @@ void DataTable::reify() {



size_t DataTable::memory_footprint()
{
size_t DataTable::memory_footprint() const {
size_t sz = 0;
sz += sizeof(*this);
sz += (ncols + 1) * sizeof(Column*);
Expand Down
14 changes: 9 additions & 5 deletions c/datatable.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ class DataTable {
void rbind(std::vector<DataTable*>, std::vector<std::vector<size_t>>);
DataTable* cbind(std::vector<DataTable*>);
DataTable* copy() const;
size_t memory_footprint();
size_t memory_footprint() const;

/**
* Sort the DataTable by specified columns, and return the corresponding
Expand Down Expand Up @@ -118,6 +118,7 @@ class DataTable {
size_t get_nkeys() const;
void set_key(std::vector<size_t>& col_indices);
void clear_key();
void set_nkeys_unsafe(size_t K);

DataTable* min_datatable() const;
DataTable* max_datatable() const;
Expand All @@ -136,10 +137,8 @@ class DataTable {
static DataTable* load(DataTable* schema, size_t nrows,
const std::string& path, bool recode);

void save_jay(const std::string& path,
const std::vector<std::string>& colnames,
WritableBuffer::Strategy wstrategy);
static DataTable* open_jay(const std::string& path);
MemoryRange save_jay();
void save_jay(const std::string& path, WritableBuffer::Strategy);

private:
void _init_pynames() const;
Expand All @@ -148,13 +147,18 @@ class DataTable {
void _integrity_check_pynames() const;

DataTable* _statdt(colmakerfn f) const;
void save_jay_impl(WritableBuffer*);

#ifdef DTTEST
friend void dttest::cover_names_integrity_checks();
#endif
};


DataTable* open_jay_from_file(const std::string& path);
DataTable* open_jay_from_bytes(const char* ptr, size_t len);
DataTable* open_jay_from_mbuf(const MemoryRange&);


//==============================================================================

Expand Down
5 changes: 5 additions & 0 deletions c/frame/key.cc
Original file line number Diff line number Diff line change
Expand Up @@ -136,3 +136,8 @@ void DataTable::set_key(std::vector<size_t>& col_indices) {

nkeys = K;
}


void DataTable::set_nkeys_unsafe(size_t K) {
nkeys = K;
}
1 change: 1 addition & 0 deletions c/frame/py_frame.cc
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ const char* Frame::Type::classdoc() {

void Frame::Type::init_methods_and_getsets(Methods& mm, GetSetters& gs)
{
_init_init(mm, gs);
_init_names(mm, gs);

gs.add<&Frame::get_ncols>("ncols",
Expand Down
3 changes: 3 additions & 0 deletions c/frame/py_frame.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ class Frame : public PyObject {
static void init_methods_and_getsets(Methods&, GetSetters&);
private:
static void _init_names(Methods&, GetSetters&);
static void _init_init(Methods&, GetSetters&);
};

// Internal "constructor" of Frame objects. We do not use real constructors
Expand All @@ -78,6 +79,8 @@ class Frame : public PyObject {
void m__release_buffer__(Py_buffer* buf) const;
oobj m__getitem__(robj item);
void m__setitem__(robj item, robj value);
oobj m__getstate__(const NoArgs&); // pickling support
void m__setstate__(const PKArgs&);

oobj _repr_html_(const NoArgs&);
oobj get_ncols() const;
Expand Down
49 changes: 49 additions & 0 deletions c/frame/py_frame_init.cc
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,9 @@ class FrameInitializationManager {
} else {
make_datatable(srcdt);
}
if (srcdt->get_nkeys()) {
frame->dt->set_nkeys_unsafe(srcdt->get_nkeys());
}
}


Expand Down Expand Up @@ -671,6 +674,52 @@ void Frame::m__init__(PKArgs& args) {
}



//------------------------------------------------------------------------------
// pickling / unpickling
//------------------------------------------------------------------------------

static NoArgs fn___getstate__("__getstate__", nullptr);
static PKArgs fn___setstate__(1, 0, 0, false, false, {"state"},
"__setstate__", nullptr);


// TODO: add py::obytes object
oobj Frame::m__getstate__(const NoArgs&) {
MemoryRange mr = dt->save_jay();
auto data = static_cast<const char*>(mr.xptr());
auto size = static_cast<Py_ssize_t>(mr.size());
return oobj::from_new_reference(PyBytes_FromStringAndSize(data, size));
}

void Frame::m__setstate__(const PKArgs& args) {
PyObject* _state = args[0].to_borrowed_ref();
if (!PyBytes_Check(_state)) {
throw TypeError() << "`__setstate__()` expects a bytes object";
}
// Clean up any previous state of the Frame (since pickle first creates an
// empty Frame object, and then calls __setstate__ on it).
m__dealloc__();
core_dt = nullptr;
stypes = nullptr;
ltypes = nullptr;

const char* data = PyBytes_AS_STRING(_state);
size_t length = static_cast<size_t>(PyBytes_GET_SIZE(_state));
dt = open_jay_from_bytes(data, length);
PyObject* _dt = pydatatable::wrap(dt);
if (!_dt) throw PyError();
core_dt = reinterpret_cast<pydatatable::obj*>(_dt);
core_dt->_frame = this;
}


void Frame::Type::_init_init(Methods& mm, GetSetters&) {
mm.add<&Frame::m__getstate__, fn___getstate__>();
mm.add<&Frame::m__setstate__, fn___setstate__>();
}


} // namespace py


Expand Down
27 changes: 21 additions & 6 deletions c/jay/open_jay.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,29 @@


// Helper functions
static Column* column_from_jay(const jay::Column* jaycol, MemoryRange& jaybuf);
static Column* column_from_jay(const jay::Column* jaycol,
const MemoryRange& jaybuf);



//------------------------------------------------------------------------------
// Open DataTable
//------------------------------------------------------------------------------

DataTable* DataTable::open_jay(const std::string& path)
DataTable* open_jay_from_file(const std::string& path) {
MemoryRange mbuf = MemoryRange::mmap(path);
return open_jay_from_mbuf(mbuf);
}

DataTable* open_jay_from_bytes(const char* ptr, size_t len) {
MemoryRange mbuf = MemoryRange::external(ptr, len);
return open_jay_from_mbuf(mbuf);
}


DataTable* open_jay_from_mbuf(const MemoryRange& mbuf)
{
std::vector<std::string> colnames;
MemoryRange mbuf = MemoryRange::mmap(path);

const uint8_t* ptr = static_cast<const uint8_t*>(mbuf.rptr());
const size_t len = mbuf.size();
Expand Down Expand Up @@ -69,7 +80,7 @@ DataTable* DataTable::open_jay(const std::string& path)
}

auto dt = new DataTable(std::move(columns), colnames);
dt->nkeys = static_cast<size_t>(frame->nkeys());
dt->set_nkeys_unsafe(static_cast<size_t>(frame->nkeys()));
return dt;
}

Expand All @@ -79,7 +90,9 @@ DataTable* DataTable::open_jay(const std::string& path)
// Open an individual column
//------------------------------------------------------------------------------

static MemoryRange extract_buffer(MemoryRange& src, const jay::Buffer* jbuf) {
static MemoryRange extract_buffer(
const MemoryRange& src, const jay::Buffer* jbuf)
{
size_t offset = jbuf->offset();
size_t length = jbuf->length();
return MemoryRange::view(src, length, offset + 8);
Expand All @@ -98,7 +111,9 @@ static void initStats(Stats* stats, const jay::Column* jcol) {
}


static Column* column_from_jay(const jay::Column* jcol, MemoryRange& jaybuf) {
static Column* column_from_jay(
const jay::Column* jcol, const MemoryRange& jaybuf)
{
jay::Type jtype = jcol->type();

Column* col = nullptr;
Expand Down
38 changes: 28 additions & 10 deletions c/jay/save_jay.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ using WritableBufferPtr = std::unique_ptr<WritableBuffer>;
static jay::Type stype_to_jaytype[DT_STYPES_COUNT];
static flatbuffers::Offset<jay::Column> column_to_jay(
Column* col, const std::string& name, flatbuffers::FlatBufferBuilder& fbb,
WritableBufferPtr& wb);
static jay::Buffer saveMemoryRange(const MemoryRange*, WritableBufferPtr&);
WritableBuffer* wb);
static jay::Buffer saveMemoryRange(const MemoryRange*, WritableBuffer*);
template <typename T, typename A, typename StatBuilder>
static flatbuffers::Offset<void> saveStats(
Stats* stats, flatbuffers::FlatBufferBuilder& fbb);
Expand All @@ -27,16 +27,34 @@ static flatbuffers::Offset<void> saveStats(
// Save DataTable
//------------------------------------------------------------------------------

/**
* Save Frame in Jay format to the provided file.
*/
void DataTable::save_jay(const std::string& path,
const std::vector<std::string>& colnames,
WritableBuffer::Strategy wstrategy)
{
// Cannot store a view frame, so materialize first.
reify();

size_t sizehint = (wstrategy == WritableBuffer::Strategy::Auto)
? memory_footprint() : 0;
auto wb = WritableBuffer::create_target(path, sizehint, wstrategy);
save_jay_impl(wb.get());
}


/**
* Save Frame in Jay format to memory,
*/
MemoryRange DataTable::save_jay() {
auto wb = std::unique_ptr<MemoryWritableBuffer>(
new MemoryWritableBuffer(memory_footprint()));
save_jay_impl(wb.get());
return wb->get_mbuf();
}


void DataTable::save_jay_impl(WritableBuffer* wb) {
// Cannot store a view frame, so materialize first.
reify();

wb->write(8, "JAY1\0\0\0\0");

flatbuffers::FlatBufferBuilder fbb(1024);
Expand All @@ -45,10 +63,10 @@ void DataTable::save_jay(const std::string& path,
for (size_t i = 0; i < ncols; ++i) {
Column* col = columns[i];
if (col->stype() == SType::OBJ) {
DatatableWarning() << "Column `" << colnames[i]
DatatableWarning() << "Column `" << names[i]
<< "` of type obj64 was not saved";
} else {
auto saved_col = column_to_jay(col, colnames[i], fbb, wb);
auto saved_col = column_to_jay(col, names[i], fbb, wb);
msg_columns.push_back(saved_col);
}
}
Expand Down Expand Up @@ -82,7 +100,7 @@ void DataTable::save_jay(const std::string& path,

static flatbuffers::Offset<jay::Column> column_to_jay(
Column* col, const std::string& name, flatbuffers::FlatBufferBuilder& fbb,
WritableBufferPtr& wb)
WritableBuffer* wb)
{
jay::Stats jsttype = jay::Stats_NONE;
flatbuffers::Offset<void> jsto;
Expand Down Expand Up @@ -170,7 +188,7 @@ void init_jay() {


static jay::Buffer saveMemoryRange(
const MemoryRange* mbuf, WritableBufferPtr& wb)
const MemoryRange* mbuf, WritableBuffer* wb)
{
if (!mbuf) return jay::Buffer();
size_t len = mbuf->size();
Expand Down
14 changes: 7 additions & 7 deletions c/memrange.cc
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@
ViewedMRI* base;

public:
ViewMRI(size_t n, MemoryRange& src, size_t offset);
ViewMRI(size_t n, const MemoryRange& src, size_t offset);
virtual ~ViewMRI() override;

void resize(size_t n) override;
Expand All @@ -130,15 +130,15 @@
size_t refcount;

public:
static ViewedMRI* acquire_viewed(MemoryRange& src);
static ViewedMRI* acquire_viewed(const MemoryRange& src);
void release();

bool is_writable() const;
size_t memory_footprint() const override;
const char* name() const override { return "viewed"; }

private:
ViewedMRI(MemoryRange& src);
ViewedMRI(const MemoryRange& src);
};


Expand Down Expand Up @@ -223,7 +223,7 @@
return MemoryRange(new ExternalMRI(n, ptr, pb));
}

MemoryRange MemoryRange::view(MemoryRange& src, size_t n, size_t offset) {
MemoryRange MemoryRange::view(const MemoryRange& src, size_t n, size_t offset) {
return MemoryRange(new ViewMRI(n, src, offset));
}

Expand Down Expand Up @@ -602,7 +602,7 @@
// ViewMRI
//==============================================================================

ViewMRI::ViewMRI(size_t n, MemoryRange& src, size_t offs) {
ViewMRI::ViewMRI(size_t n, const MemoryRange& src, size_t offs) {
xassert(offs + n <= src.size());
base = ViewedMRI::acquire_viewed(src);
offset = offs;
Expand Down Expand Up @@ -647,7 +647,7 @@
// ViewedMRI
//==============================================================================

ViewedMRI::ViewedMRI(MemoryRange& src) {
ViewedMRI::ViewedMRI(const MemoryRange& src) {
BaseMRI* implptr = src.o->impl.release();
src.o->impl.reset(this);
parent = src.o; // copy std::shared_ptr
Expand All @@ -660,7 +660,7 @@
resizable = false;
}

ViewedMRI* ViewedMRI::acquire_viewed(MemoryRange& src) {
ViewedMRI* ViewedMRI::acquire_viewed(const MemoryRange& src) {
BaseMRI* implptr = src.o->impl.get();
ViewedMRI* viewedptr = dynamic_cast<ViewedMRI*>(implptr);
if (!viewedptr) {
Expand Down
2 changes: 1 addition & 1 deletion c/memrange.h
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ class MemoryRange
static MemoryRange acquire(void* ptr, size_t n);
static MemoryRange external(const void* ptr, size_t n);
static MemoryRange external(const void* ptr, size_t n, Py_buffer* pybuf);
static MemoryRange view(MemoryRange& src, size_t n, size_t offset);
static MemoryRange view(const MemoryRange& src, size_t n, size_t offset);
static MemoryRange mmap(const std::string& path);
static MemoryRange mmap(const std::string& path, size_t n, int fd = -1);
static MemoryRange overmap(const std::string& path, size_t nextra,
Expand Down
Loading

0 comments on commit b62faff

Please sign in to comment.