Skip to content

Commit

Permalink
feat: read_span_flatbuffer() memory use and correctness fixes (#4684)
Browse files Browse the repository at this point in the history
* read_span_flatbuffer: support cleaning up spare examples correctly

Consumers of VW as a library can provide their own event pools, etc. Previous parsers were always able to predict when an even would be needed ahead of time, so would only allocate when necessary. This was done by relying on a single incoming event preallocation to let the external host deallocate in the case of nothing to be parsed.

This does not work for the FB parser due to how it handles re-entrancy, and we do not want to spend the time re-architecting it to avoid this. The fix, in this case, is to expand the API to include a callback to return spare events back to the host's event pool.

* support header for using VW and RL api_status.h side-by-side
* Add additional error reporting to read_span_flatbuffer

* Reset parser when re-entering after bad parse

When re-using the flatbuffer parser across multiple invocations, the parser state could become invalid (retain references to deleted objects)

* Add more tests for bad inputs
* Add comments about what is going on in read_span_flatbuffer
* Fix a place where the parser was returning the semantically incorrect error code
* Remove dead code
  • Loading branch information
lokitoth authored Feb 15, 2024
1 parent 7c2963e commit 305a795
Show file tree
Hide file tree
Showing 8 changed files with 361 additions and 59 deletions.
3 changes: 3 additions & 0 deletions vowpalwabbit/core/include/vw/core/error_data.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ ERROR_CODE_DEFINITION(
ERROR_CODE_DEFINITION(
13, fb_parser_size_mismatch_ft_names_ft_values, "Size of feature names and feature values do not match. ")
ERROR_CODE_DEFINITION(14, unknown_label_type, "Label type in Flatbuffer not understood. ")
ERROR_CODE_DEFINITION(15, fb_parser_span_misaligned, "Input Flatbuffer span is not aligned to an 8-byte boundary. ")
ERROR_CODE_DEFINITION(
16, fb_parser_span_length_mismatch, "Input Flatbuffer span does not match flatbuffer size prefix. ")

// TODO: This is temporary until we switch to the new error handling mechanism.
ERROR_CODE_DEFINITION(10000, vw_exception, "vw_exception: ")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

#pragma once

#include "vw/core/api_status.h"
#include "vw/core/example.h"
#include "vw/core/multi_ex.h"
#include "vw/core/shared_data.h"
Expand All @@ -14,15 +13,21 @@
namespace VW
{

namespace experimental
{
class api_status;
}

using example_sink_f = std::function<void(VW::multi_ex&& spare_examples)>;

namespace parsers
{
namespace flatbuffer
{
int flatbuffer_to_examples(VW::workspace* all, io_buf& buf, VW::multi_ex& examples);
bool read_span_flatbuffer(
VW::workspace* all, const uint8_t* span, size_t length, example_factory_t example_factory, VW::multi_ex& examples);

int read_span_flatbuffer(VW::workspace* all, const uint8_t* span, size_t length, example_factory_t example_factory,
VW::multi_ex& examples, example_sink_f example_sink = nullptr, VW::experimental::api_status* status = nullptr);

class parser
{
Expand Down Expand Up @@ -57,6 +62,19 @@ class parser
VW::experimental::api_status* status = nullptr);
int get_namespace_index(const Namespace* ns, namespace_index& ni, VW::experimental::api_status* status = nullptr);

inline void reset_active_multi_ex()
{
_multi_ex_index = 0;
_active_multi_ex = false;
_multi_example_object = nullptr;
}

inline void reset_active_collection()
{
_example_index = 0;
_active_collection = false;
}

void parse_simple_label(shared_data* sd, polylabel* l, reduction_features* red_features, const SimpleLabel* label);
void parse_cb_label(polylabel* l, const CBLabel* label);
void parse_ccb_label(polylabel* l, const CCBLabel* label);
Expand Down
130 changes: 80 additions & 50 deletions vowpalwabbit/fb_parser/src/parse_example_flatbuffer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@
#include "vw/fb_parser/parse_example_flatbuffer.h"

#include "vw/core/action_score.h"
#include "vw/core/api_status.h"
#include "vw/core/best_constant.h"
#include "vw/core/cb.h"
#include "vw/core/constant.h"
#include "vw/core/error_constants.h"
#include "vw/core/global_data.h"
#include "vw/core/parser.h"
#include "vw/core/scope_exit.h"
#include "vw/core/vw.h"

#include <cfloat>
Expand Down Expand Up @@ -43,8 +45,8 @@ int flatbuffer_to_examples(VW::workspace* all, io_buf& buf, VW::multi_ex& exampl
return static_cast<int>(status.get_error_code() == VW::experimental::error_code::success);
}

bool read_span_flatbuffer(
VW::workspace* all, const uint8_t* span, size_t length, example_factory_t example_factory, VW::multi_ex& examples)
int read_span_flatbuffer(VW::workspace* all, const uint8_t* span, size_t length, example_factory_t example_factory,
VW::multi_ex& examples, example_sink_f example_sink, VW::experimental::api_status* status)
{
// we expect context to contain a size_prefixed flatbuffer (technically a binary string)
// which means:
Expand All @@ -59,16 +61,15 @@ bool read_span_flatbuffer(
// thus context.size() = sizeof(length) + length
io_buf unused;

// TODO: How do we report errors out of here? (This is a general API problem with the parsers)
size_t address = reinterpret_cast<size_t>(span);
if (address % 8 != 0)
{
std::stringstream sstream;
sstream << "fb_parser error: flatbuffer data not aligned to 8 bytes" << std::endl;
sstream << " span => @" << std::hex << address << std::dec << " % " << 8 << " = " << address % 8
<< " (vs desired = " << 0 << ")";
THROW(sstream.str());
return false;

RETURN_ERROR_LS(status, fb_parser_span_misaligned) << sstream.str();
}

flatbuffers::uoffset_t flatbuffer_object_size =
Expand All @@ -79,42 +80,80 @@ bool read_span_flatbuffer(
sstream << "fb_parser error: flatbuffer size prefix does not match actual size" << std::endl;
sstream << " span => @" << std::hex << address << std::dec << " size_prefix = " << flatbuffer_object_size
<< " length = " << length;
THROW(sstream.str());
return false;

RETURN_ERROR_LS(status, fb_parser_span_length_mismatch) << sstream.str();
}

VW::multi_ex temp_ex;
temp_ex.push_back(&example_factory());

// Use scope_exit because the parser reports errors by throwing exceptions (the code path in the vw driver
// uses the return value to signal completion, not errors).
auto scope_guard = VW::scope_exit(
[&temp_ex, &all, &example_sink]()
{
if (example_sink == nullptr) { VW::finish_example(*all, temp_ex); }
else { example_sink(std::move(temp_ex)); }
});

// There is a bit of unhappiness with the interface of the read_XYZ_<format>() functions, because they often
// expect the input multi_ex to have a single "empty" example there. This contributes, in part, to the large
// proliferation of entry points into the JSON parser(s). We want to avoid exposing that insofar as possible,
// so we will check whether we already received a perfectly good example and use that, or create a new one if
// needed.
if (examples.size() > 0)
{
assert(examples.size() == 1);
temp_ex.push_back(examples[0]);
examples.pop_back();
}
else { temp_ex.push_back(&example_factory()); }

bool has_more = true;
VW::experimental::api_status status;
do {
switch (all->parser_runtime.flat_converter->parse_examples(all, unused, temp_ex, span, &status))
switch (int result = all->parser_runtime.flat_converter->parse_examples(all, unused, temp_ex, span, status))
{
case VW::experimental::error_code::success:
has_more = true;
break;
// Because nothing_to_parse is not an error we have to filter it out here, otherwise
// we could simply do RETURN_IF_FAIL(result) and let the macro handle it.
case VW::experimental::error_code::nothing_to_parse:
has_more = false;
break;
default:
std::stringstream sstream;
sstream << "Error parsing examples: " << std::endl;
THROW(sstream.str());
return false;
RETURN_IF_FAIL(result);
}

// The underlying parser will emit a newline example when terminating the parsing
// of a multi_ex block. Since we are collecting it into a multi_ex, we want to
// swallow it here, but should the parser not have followed its contract w.r.t.
// the return value, we should use the presence of the newline example to override
// has_more.
has_more &= !temp_ex[0]->is_newline;

// If this is a real example, we need to move it to the output multi_ex; we also
// need to create a new example to replace it for the next run through the parser.
if (!temp_ex[0]->is_newline)
{
examples.push_back(&example_factory());
std::swap(examples[examples.size() - 1], temp_ex[0]);
// We avoid doing moves or copy construction here because multi_ex contains
// example pointers. The compile-time code here is meant to call attention
// to here if the underlying type changes.
using temp_ex_element_t = std::remove_reference<decltype(temp_ex[0])>::type;
using examples_element_t = std::remove_reference<decltype(examples[0])>::type;

static_assert(std::is_same<temp_ex_element_t, examples_element_t>::value &&
std::is_same<temp_ex_element_t, VW::example*>::value,
"temp_ex and example must be vector-like over VW::example*");

examples.push_back(temp_ex[0]);

// Since we are using a vector of pointers, we can simply reassign the slot to
// the pointer of the newly created destination example for the parser.
temp_ex[0] = &example_factory();
}
} while (has_more);

VW::finish_example(*all, temp_ex);
return true;
return VW::experimental::error_code::success;
}

const VW::parsers::flatbuffer::ExampleRoot* parser::data() { return _data; }
Expand Down Expand Up @@ -198,16 +237,17 @@ int parser::process_collection_item(VW::workspace* all, VW::multi_ex& examples,
{
_active_multi_ex = true;
_multi_example_object = _data->example_obj_as_ExampleCollection()->multi_examples()->Get(_example_index);

// read from active multi_ex
RETURN_IF_FAIL(parse_multi_example(all, examples[0], _multi_example_object, status));
// read from active collection

// if we are done with the multi example, move to the next one, or finish the collection
if (!_active_multi_ex)
{
_example_index++;
if (_example_index == _data->example_obj_as_ExampleCollection()->multi_examples()->size())
{
_example_index = 0;
_active_collection = false;
reset_active_collection();
}
}
}
Expand All @@ -216,11 +256,7 @@ int parser::process_collection_item(VW::workspace* all, VW::multi_ex& examples,
const auto ex = _data->example_obj_as_ExampleCollection()->examples()->Get(_example_index);
RETURN_IF_FAIL(parse_example(all, examples[0], ex, status));
_example_index++;
if (_example_index == _data->example_obj_as_ExampleCollection()->examples()->size())
{
_example_index = 0;
_active_collection = false;
}
if (_example_index == _data->example_obj_as_ExampleCollection()->examples()->size()) { reset_active_collection(); }
}
return VW::experimental::error_code::success;
}
Expand All @@ -231,6 +267,20 @@ int parser::parse_examples(VW::workspace* all, io_buf& buf, VW::multi_ex& exampl
#define RETURN_SUCCESS_FINISHED() \
return buffer_pointer ? VW::experimental::error_code::nothing_to_parse : VW::experimental::error_code::success;

// If we are re-using a single parser instance across multiple invocations, we need to reset
// the state when we get a new buffer_pointer. Otherwise we may be in the middle of a multi_ex
// or example_collection, and the following parse will attempt to reuse the object references
// from the previous buffer, which may have been deallocated.
// TODO: Rewrite the parser to avoid this convoluted, re-entrant logic.
if (buffer_pointer && _flatbuffer_pointer != buffer_pointer)
{
reset_active_multi_ex();
reset_active_collection();
}

// The ExampleCollection processing code owns dispatching to parse_multi_example to handle
// iteration through the outer collection correctly, thus it must have the first chance to
// incoming parse request.
if (_active_collection)
{
RETURN_IF_FAIL(process_collection_item(all, examples, status));
Expand Down Expand Up @@ -307,9 +357,7 @@ int parser::parse_multi_example(
{
// done with multi example, send a newline example and reset
ae->is_newline = true;
_multi_ex_index = 0;
_active_multi_ex = false;
_multi_example_object = nullptr;
reset_active_multi_ex();
return VW::experimental::error_code::success;
}

Expand All @@ -325,30 +373,11 @@ int parser::get_namespace_index(const Namespace* ns, namespace_index& ni, VW::ex
ni = static_cast<uint8_t>(ns->name()->c_str()[0]);
return VW::experimental::error_code::success;
}
else if (flatbuffers::IsFieldPresent(ns, Namespace::VT_HASH))
else
{
ni = ns->hash();
return VW::experimental::error_code::success;
}

if (_active_collection && _active_multi_ex)
{
RETURN_ERROR_LS(status, fb_parser_name_hash_missing)
<< "Either name or hash field must be specified to get the namespace index in collection item with example "
"index "
<< _example_index << "and multi example index " << _multi_ex_index;
}
else if (_active_multi_ex)
{
RETURN_ERROR_LS(status, fb_parser_name_hash_missing)
<< "Either name or hash field must be specified to get the namespace index in multi example index "
<< _multi_ex_index;
}
else
{
RETURN_ERROR_LS(status, fb_parser_name_hash_missing)
<< "Either name or hash field must be specified to get the namespace index";
}
}

bool get_namespace_hash(VW::workspace* all, const Namespace* ns, uint64_t& hash)
Expand Down Expand Up @@ -462,7 +491,7 @@ int parser::parse_namespaces(VW::workspace* all, example* ae, const Namespace* n
}
else
{
if (!has_hashes) { RETURN_NS_PARSER_ERROR(status, fb_parser_name_hash_missing) }
if (!has_hashes) { RETURN_NS_PARSER_ERROR(status, fb_parser_feature_hashes_names_missing) }

if (ns->feature_hashes()->size() != ns->feature_values()->size())
{
Expand Down Expand Up @@ -541,6 +570,7 @@ int parser::parse_flat_label(
break;
}
case Label_NONE:
case Label_no_label:
break;
default:
if (_active_collection && _active_multi_ex)
Expand Down
1 change: 1 addition & 0 deletions vowpalwabbit/fb_parser/src/parse_label.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
// license as described in the file LICENSE.

#include "vw/core/action_score.h"
#include "vw/core/api_status.h"
#include "vw/core/best_constant.h"
#include "vw/core/cb.h"
#include "vw/core/constant.h"
Expand Down
Loading

0 comments on commit 305a795

Please sign in to comment.