Skip to content

Commit

Permalink
fix(interactive): Fix submitting cpp procedure with raw input format (#…
Browse files Browse the repository at this point in the history
…3850)

- For input bytes, should not append additional format byte at server
side. It has been already added at client side.
- For output bytes, the `CppEncoder` and `CypherJson` are also
different, for `CppEncoder`, we should return the full bytes; For
`CypherJson`, since the output is written by `output.put_string()`, we
should discard the first four bytes.
- Other fixes.

---------

Co-authored-by: liulx20 <liulexiao.llx@alibaba-inc.com>
  • Loading branch information
zhanglei1949 and liulx20 committed May 30, 2024
1 parent d3fa932 commit 2cafc1a
Show file tree
Hide file tree
Showing 9 changed files with 307 additions and 24 deletions.
2 changes: 1 addition & 1 deletion flex/engines/graph_db/database/graph_db_session.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

namespace gs {

ReadTransaction GraphDBSession::GetReadTransaction() {
ReadTransaction GraphDBSession::GetReadTransaction() const {
uint32_t ts = db_.version_manager_.acquire_read_timestamp();
return ReadTransaction(db_.graph_, db_.version_manager_, ts);
}
Expand Down
6 changes: 3 additions & 3 deletions flex/engines/graph_db/database/graph_db_session.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ class GraphDBSession {
}
~GraphDBSession() {}

ReadTransaction GetReadTransaction();
ReadTransaction GetReadTransaction() const;

InsertTransaction GetInsertTransaction();

Expand Down Expand Up @@ -164,7 +164,7 @@ class GraphDBSession {
} else if (input_tag == static_cast<uint8_t>(InputFormat::kCypherJson)) {
// For cypherJson there is no query-id provided. The query name is
// provided in the json string.
std::string_view str_view(input.data(), len - 2);
std::string_view str_view(input.data(), len - 1);
VLOG(10) << "string view: " << str_view;
nlohmann::json j;
try {
Expand All @@ -190,7 +190,7 @@ class GraphDBSession {
}
VLOG(10) << "Query name: " << query_name;
return std::make_pair(app_name_to_path_index.at(query_name).second,
std::string_view(str_data, len - 2));
std::string_view(str_data, len - 1));
} else if (input_tag ==
static_cast<uint8_t>(InputFormat::kCypherInternalProcedure)) {
// For cypher internal procedure, the query_name is
Expand Down
36 changes: 32 additions & 4 deletions flex/engines/http_server/handler/hqps_http_handler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,16 @@ seastar::future<std::unique_ptr<seastar::httpd::reply>> hqps_ic_handler::handle(
std::unique_ptr<seastar::httpd::reply> rep) {
auto dst_executor = executor_idx_;
executor_idx_ = (executor_idx_ + 1) % shard_concurrency_;
if (req->content.size() <= 0) {
// At least one input format byte is needed
rep->set_status(seastar::httpd::reply::status_type::internal_server_error);
rep->write_body("bin", seastar::sstring("Empty request!"));
rep->done();
return seastar::make_ready_future<std::unique_ptr<seastar::httpd::reply>>(
std::move(rep));
}
uint8_t input_format =
req->content.back(); // see graph_db_session.h#parse_query_type
// TODO(zhanglei): choose read or write based on the request, after the
// read/write info is supported in physical plan
if (req->param.exists("graph_id")) {
Expand All @@ -135,9 +145,18 @@ seastar::future<std::unique_ptr<seastar::httpd::reply>> hqps_ic_handler::handle(
return seastar::make_ready_future<std::unique_ptr<seastar::httpd::reply>>(
std::move(rep));
}
req->content.append(gs::GraphDBSession::kCypherJson, 1);
} else {
req->content.append(gs::GraphDBSession::kCypherInternalProcedure, 1);
// This handler with accept two kinds of queries, /v1/graph/{graph_id}/query
// and /v1/query/ The former one will have a graph_id in the request, and
// the latter one will not. For the first one, the input format is the last
// byte of the request content, and is added at client side; For the second
// one, the request is send from compiler, and currently compiler will not
// add extra bytes to the request content. So we need to add the input
// format here. Finally, we should REMOVE this adhoc appended byte, i.e. the
// input format byte should be added at compiler side
// TODO(zhanglei): remove this adhoc appended byte, add the byte at compiler
// side. Or maybe we should refine the protocol.
}
#ifdef HAVE_OPENTELEMETRY_CPP
auto tracer = otel::get_tracer("hqps_procedure_query_handler");
Expand All @@ -154,7 +173,7 @@ seastar::future<std::unique_ptr<seastar::httpd::reply>> hqps_ic_handler::handle(

return executor_refs_[dst_executor]
.run_graph_db_query(query_param{std::move(req->content)})
.then([this
.then([this, input_format
#ifdef HAVE_OPENTELEMETRY_CPP
,
outer_span = outer_span
Expand All @@ -171,8 +190,17 @@ seastar::future<std::unique_ptr<seastar::httpd::reply>> hqps_ic_handler::handle(
#endif // HAVE_OPENTELEMETRY_CPP
return seastar::make_ready_future<query_param>(std::move(output));
}
return seastar::make_ready_future<query_param>(
std::move(output.content.substr(4)));
if (input_format == static_cast<uint8_t>(
gs::GraphDBSession::InputFormat::kCppEncoder)) {
return seastar::make_ready_future<query_param>(
std::move(output.content));
} else {
// For cypher input format, the results are written with
// output.put_string(), which will add extra 4 bytes. So we need to
// remove the first 4 bytes here.
return seastar::make_ready_future<query_param>(
std::move(output.content.substr(4)));
}
})
.then_wrapped([rep = std::move(rep)
#ifdef HAVE_OPENTELEMETRY_CPP
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,152 @@

@TestMethodOrder(OrderAnnotation.class)
public class DriverTest {

public static class Encoder {
public Encoder(byte[] bs) {
this.bs = bs;
this.loc = 0;
}

public static int serialize_long(byte[] bytes, int offset, long value) {
bytes[offset++] = (byte) (value & 0xFF);
value >>= 8;
bytes[offset++] = (byte) (value & 0xFF);
value >>= 8;
bytes[offset++] = (byte) (value & 0xFF);
value >>= 8;
bytes[offset++] = (byte) (value & 0xFF);
value >>= 8;
bytes[offset++] = (byte) (value & 0xFF);
value >>= 8;
bytes[offset++] = (byte) (value & 0xFF);
value >>= 8;
bytes[offset++] = (byte) (value & 0xFF);
value >>= 8;
bytes[offset++] = (byte) (value & 0xFF);
return offset;
}

public static int serialize_double(byte[] bytes, int offset, double value) {
long long_value = Double.doubleToRawLongBits(value);
return serialize_long(bytes, offset, long_value);
}

public static int serialize_int(byte[] bytes, int offset, int value) {
bytes[offset++] = (byte) (value & 0xFF);
value >>= 8;
bytes[offset++] = (byte) (value & 0xFF);
value >>= 8;
bytes[offset++] = (byte) (value & 0xFF);
value >>= 8;
bytes[offset++] = (byte) (value & 0xFF);
return offset;
}

public static int serialize_byte(byte[] bytes, int offset, byte value) {
bytes[offset++] = value;
return offset;
}

public static int serialize_bytes(byte[] bytes, int offset, byte[] value) {
offset = serialize_int(bytes, offset, value.length);
System.arraycopy(value, 0, bytes, offset, value.length);
return offset + value.length;
}

public void put_int(int value) {
this.loc = serialize_int(this.bs, this.loc, value);
}

public void put_byte(byte value) {
this.loc = serialize_byte(this.bs, this.loc, value);
}

public void put_long(long value) {
this.loc = serialize_long(this.bs, this.loc, value);
}

public void put_double(double value) {
this.loc = serialize_double(this.bs, this.loc, value);
}

public void put_bytes(byte[] bytes) {
this.loc = serialize_bytes(this.bs, this.loc, bytes);
}

byte[] bs;
int loc;
}

static final class Decoder {
public Decoder(byte[] bs) {
this.bs = bs;
this.loc = 0;
this.len = this.bs.length;
}

public static int get_int(byte[] bs, int loc) {
int ret = (bs[loc + 3] & 0xff);
ret <<= 8;
ret |= (bs[loc + 2] & 0xff);
ret <<= 8;
ret |= (bs[loc + 1] & 0xff);
ret <<= 8;
ret |= (bs[loc] & 0xff);
return ret;
}

public static long get_long(byte[] bs, int loc) {
long ret = (bs[loc + 7] & 0xff);
ret <<= 8;
ret |= (bs[loc + 6] & 0xff);
ret <<= 8;
ret |= (bs[loc + 5] & 0xff);
ret <<= 8;
ret |= (bs[loc + 4] & 0xff);
ret <<= 8;
ret |= (bs[loc + 3] & 0xff);
ret <<= 8;
ret |= (bs[loc + 2] & 0xff);
ret <<= 8;
ret |= (bs[loc + 1] & 0xff);
ret <<= 8;
ret |= (bs[loc] & 0xff);
return ret;
}

public long get_long() {
long ret = get_long(this.bs, this.loc);
this.loc += 8;
return ret;
}

public int get_int() {
int ret = get_int(this.bs, this.loc);
this.loc += 4;
return ret;
}

public byte get_byte() {
return (byte) (bs[loc++] & 0xFF);
}

public String get_string() {
int strlen = this.get_int();
String ret = new String(this.bs, this.loc, strlen);
this.loc += strlen;
return ret;
}

public boolean empty() {
return loc == len;
}

byte[] bs;
int loc;
int len;
}

private static final Logger logger = Logger.getLogger(DriverTest.class.getName());

private static Driver driver;
Expand All @@ -47,7 +193,8 @@ public class DriverTest {
private static String graphId;
private static String jobId;
private static String cypherProcedureId;
private static String cppProcedureId;
private static String cppProcedureId1;
private static String cppProcedureId2;

@BeforeAll
public static void beforeClass() {
Expand Down Expand Up @@ -258,9 +405,9 @@ public void test6CreateCypherProcedure() {

@Test
@Order(8)
public void test7CreateCppProcedure() {
public void test7CreateCppProcedure1() {
CreateProcedureRequest procedure = new CreateProcedureRequest();
procedure.setName("cppProcedure");
procedure.setName("cppProcedure1");
procedure.setDescription("a simple test procedure");
// sampleAppFilePath is under the resources folder,with name sample_app.cc
String sampleAppFilePath = "sample_app.cc";
Expand All @@ -285,11 +432,43 @@ public void test7CreateCppProcedure() {
procedure.setType(CreateProcedureRequest.TypeEnum.CPP);
Result<CreateProcedureResponse> resp = session.createProcedure(graphId, procedure);
assertOk(resp);
cppProcedureId = "cppProcedure";
cppProcedureId1 = "cppProcedure1";
}

@Test
@Order(9)
public void test7CreateCppProcedure2() {
CreateProcedureRequest procedure = new CreateProcedureRequest();
procedure.setName("cppProcedure2");
procedure.setDescription("a simple test procedure");
// sampleAppFilePath is under the resources folder,with name sample_app.cc
String sampleAppFilePath = "read_app_example.cc";
String sampleAppContent = "";
try {
sampleAppContent =
new String(
Files.readAllBytes(
Paths.get(
Thread.currentThread()
.getContextClassLoader()
.getResource(sampleAppFilePath)
.toURI())));
} catch (IOException | URISyntaxException e) {
e.printStackTrace();
}
if (sampleAppContent.isEmpty()) {
throw new RuntimeException("read_app_example content is empty");
}
logger.info("read_app_example content: " + sampleAppContent);
procedure.setQuery(sampleAppContent);
procedure.setType(CreateProcedureRequest.TypeEnum.CPP);
Result<CreateProcedureResponse> resp = session.createProcedure(graphId, procedure);
assertOk(resp);
cppProcedureId2 = "cppProcedure2";
}

@Test
@Order(10)
public void test8Restart() {
Result<String> resp = session.startService(new StartServiceRequest().graphId(graphId));
assertOk(resp);
Expand All @@ -303,10 +482,10 @@ public void test8Restart() {
}

@Test
@Order(10)
public void test9CallCppProcedure() {
@Order(11)
public void test9CallCppProcedure1() {
QueryRequest request = new QueryRequest();
request.setQueryName(cppProcedureId);
request.setQueryName(cppProcedureId1);
request.addArgumentsItem(
new TypedValue()
.value(1)
Expand All @@ -321,7 +500,19 @@ public void test9CallCppProcedure() {
}

@Test
@Order(11)
@Order(12)
public void test9CallCppProcedure2() {
byte[] bytes = new byte[4 + 1];
Encoder encoder = new Encoder(bytes);
encoder.put_int(1);
encoder.put_byte((byte) 1); // Assume the procedure index is 1
String encoded = new String(bytes);
Result<String> resp = session.callProcedureRaw(graphId, encoded);
assertOk(resp);
}

@Test
@Order(13)
public void test10CallCypherProcedureViaNeo4j() {
String query = "CALL " + cypherProcedureId + "() YIELD *;";
org.neo4j.driver.Result result = neo4jSession.run(query);
Expand All @@ -339,11 +530,15 @@ public static void afterClass() {
if (graphId != null) {
if (cypherProcedureId != null) {
Result<String> resp = session.deleteProcedure(graphId, cypherProcedureId);
logger.info("procedure deleted: " + resp.getValue());
logger.info("cypherProcedure deleted: " + resp.getValue());
}
if (cppProcedureId1 != null) {
Result<String> resp = session.deleteProcedure(graphId, cppProcedureId1);
logger.info("cppProcedure1 deleted: " + resp.getValue());
}
if (cppProcedureId != null) {
Result<String> resp = session.deleteProcedure(graphId, cppProcedureId);
logger.info("procedure deleted: " + resp.getValue());
if (cppProcedureId2 != null) {
Result<String> resp = session.deleteProcedure(graphId, cppProcedureId2);
logger.info("cppProcedure2 deleted: " + resp.getValue());
}
Result<String> resp = session.deleteGraph(graphId);
assertOk(resp);
Expand Down
Loading

0 comments on commit 2cafc1a

Please sign in to comment.