Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update array module in Rust SDK #534

Merged
merged 57 commits into from
Oct 8, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
57 commits
Select commit Hold shift + click to select a range
09476e2
2nd pr
Sijie-L Aug 23, 2021
f7c07c3
2nd pr
Sijie-L Aug 23, 2021
ccc1999
Merge branch 'main' into rustling2
Sijie-L Aug 23, 2021
77e73d1
2nd pr fmt
Sijie-L Aug 23, 2021
ee5374a
Merge branch 'rustling2' of https://github.com/TREiop/v6d into rustling2
Sijie-L Aug 23, 2021
59ede16
2nd pr: update protocol
Sijie-L Aug 23, 2021
f6e9992
Object_meta wip
Sijie-L Aug 23, 2021
86dfc6f
Add common/util/uuid
Sijie-L Aug 24, 2021
d3b34d9
object_meta WIP
Sijie-L Aug 24, 2021
bb28118
Add status module
Sijie-L Aug 25, 2021
e42e30e
Object_meta 未调通
Sijie-L Aug 25, 2021
f8e9265
ObjectMeta client成员Weak指针待改
Sijie-L Aug 26, 2021
bcbfdd6
Object_meta update
Sijie-L Aug 26, 2021
7036df0
Update object_meta
Sijie-L Sep 5, 2021
d10fa85
Merge branch 'main' into rustling2
Sijie-L Sep 6, 2021
83a69cd
Solve duplicate definations
Sijie-L Sep 6, 2021
76db89c
Remove verbose imports
Sijie-L Sep 6, 2021
84574f0
Add lazy_static dependency
Sijie-L Sep 6, 2021
604a725
Add arrow dependency
Sijie-L Sep 7, 2021
9103d35
Add memory/payload module; Move Payload
Sijie-L Sep 7, 2021
6905e81
Update blob BufferSet class
Sijie-L Sep 7, 2021
73655ce
Update object_factory module
Sijie-L Sep 7, 2021
8fa2749
Update object_factory module
Sijie-L Sep 8, 2021
9f9a6ee
Update object module
Sijie-L Sep 9, 2021
1854a5b
Update object module
Sijie-L Sep 15, 2021
ebb5b2a
存档
Sijie-L Sep 16, 2021
f150d34
将IPCClient/RPCClient中的stream改为Option<RefCell<StreamKind>>,实现内部可变性
Sijie-L Sep 16, 2021
e262a39
Add singleton test
Sijie-L Sep 16, 2021
23b80d3
Merge branch 'main' into rustling2
Sijie-L Sep 16, 2021
0ac1c18
pr
Sijie-L Sep 16, 2021
203e06e
pr
Sijie-L Sep 16, 2021
e319cb7
pr
Sijie-L Sep 16, 2021
fb70aa7
pr
Sijie-L Sep 17, 2021
b02fa89
pr
Sijie-L Sep 17, 2021
ca25c22
pr
Sijie-L Sep 17, 2021
77d1ad3
pr
Sijie-L Sep 17, 2021
33e62ae
Update blob
Sijie-L Sep 18, 2021
20e5786
Add typename module
Sijie-L Sep 19, 2021
2a2dcc8
Update array
Sijie-L Sep 19, 2021
84195de
WIP Convert object struct to object trait
Sijie-L Sep 21, 2021
3f18331
WIP object trait; Add dyn_clone crate
Sijie-L Sep 22, 2021
9673d02
Update Array<T>
Sijie-L Sep 22, 2021
9bb9d98
Convert ObjectBuilder from struct to trait
Sijie-L Sep 22, 2021
67ba5c0
Update array
Sijie-L Sep 23, 2021
1975b62
pr Object trait
Sijie-L Sep 23, 2021
9b061ff
pr
Sijie-L Sep 23, 2021
bcbecd4
Merge branch 'main' into rustling2
Sijie-L Sep 23, 2021
9af796f
pr
Sijie-L Sep 23, 2021
676ad30
pr
Sijie-L Sep 23, 2021
8555c3b
Correct ObjectInitializer
Sijie-L Sep 25, 2021
0aa6759
WIP Correct mistakes that found in the meeting
Sijie-L Sep 25, 2021
37d9150
Update array
Sijie-L Sep 26, 2021
63a76cd
Update ipc_client create_buffer
Sijie-L Sep 26, 2021
06e35ad
Update ArrayBuilder
Sijie-L Sep 27, 2021
91fc904
WIP array pr
Sijie-L Sep 29, 2021
b6e05d7
Merge branch 'main' into rustling2
Sijie-L Sep 29, 2021
ef4b4ee
WIP array module
Sijie-L Sep 29, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions rust/vineyard/src/client/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ use std::os::unix::net::UnixStream;

use serde_json::Value;

use super::ipc_client::IPCClient;
use super::rpc_client::RPCClient;
use super::rust_io::*;
use super::ObjectMeta;

Expand Down
14 changes: 8 additions & 6 deletions rust/vineyard/src/client/ds/blob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,10 @@ use std::rc::{Rc, Weak};
use std::sync::{Arc, Mutex};

use arrow::buffer as arrow;
use lazy_static::lazy_static;
use serde_json::json;

use super::object::{Object, ObjectBase, ObjectBuilder, Registered};

use super::object_factory::ObjectFactory;
use super::object_meta::ObjectMeta;
use super::payload::Payload;
use super::status::*;
Expand Down Expand Up @@ -161,9 +159,7 @@ impl Blob {

pub fn dump() {} // Question: VLOG(); VLOG_IS_ON()

// Question: It will consume a client since IPCClient cannot implement clone
// trait(UnixStream).
pub fn make_empty(client: IPCClient) -> Rc<Blob> {
pub fn make_empty(client: Rc<IPCClient>) -> Rc<Blob> {
let mut empty_blob = Blob::default();
empty_blob.id = empty_blob_id();
empty_blob.size = 0;
Expand All @@ -183,7 +179,9 @@ impl Blob {
empty_blob
.meta
.add_json_key_value(&"transient".to_string(), &json!(true));
let tmp: Rc<dyn Client> = Rc::new(client); // Needs clone trait here
let tmp = Rc::clone(&client); // Needs clone trait here
let tmp = tmp as Rc<dyn Client>;

empty_blob.meta.set_client(Some(Rc::downgrade(&tmp)));

Rc::new(empty_blob)
Expand Down Expand Up @@ -318,6 +316,10 @@ impl BufferSet {
&self.buffers
}

pub fn all_buffers_mut(&mut self) -> &mut HashMap<ObjectID, Option<Rc<arrow::Buffer>>> {
&mut self.buffers
}

pub fn emplace_null_buffer(&mut self, id: ObjectID) -> io::Result<()> {
if let Some(buf) = self.buffers.get(&id) {
if let Some(_) = buf {
Expand Down
15 changes: 12 additions & 3 deletions rust/vineyard/src/client/ds/object.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::any::Any;
/** Copyright 2020-2021 Alibaba Group Holding Limited.

Licensed under the Apache License, Version 2.0 (the "License");
Expand All @@ -19,7 +20,6 @@ use dyn_clone::DynClone;

use serde_json::json;

use super::blob::Blob;
use super::object_meta::ObjectMeta;
use super::status::*;
use super::uuid::ObjectID;
Expand All @@ -35,7 +35,7 @@ pub trait ObjectBase {
}
}

pub trait Object: ObjectBase + Send + DynClone {
pub trait Object: ObjectBase + Send + std::fmt::Debug + DynClone {
fn meta(&self) -> &ObjectMeta;

fn meta_mut(&mut self) -> &mut ObjectMeta;
Expand All @@ -46,6 +46,13 @@ pub trait Object: ObjectBase + Send + DynClone {

fn set_meta(&mut self, meta: &ObjectMeta);

fn as_any(self: &'_ Self) -> &'_ dyn Any
where
Self: Sized + 'static,
{
self
}

fn nbytes(&self) -> usize {
self.meta().get_nbytes()
}
Expand All @@ -69,7 +76,7 @@ pub trait Object: ObjectBase + Send + DynClone {
.get_key_value(&"transient".to_string())
.as_bool()
.unwrap());
if (!persist) {
if !persist {
let client = self.meta().get_client().unwrap().upgrade().unwrap();
VINEYARD_CHECK_OK(client.if_persist(self.id()));
let persist = client.if_persist(self.id()).unwrap();
Expand All @@ -88,6 +95,8 @@ pub trait Object: ObjectBase + Send + DynClone {

dyn_clone::clone_trait_object!(Object);

pub trait GlobalObject {}

pub trait ObjectBuilder: ObjectBase {
fn sealed(&self) -> bool;

Expand Down
18 changes: 9 additions & 9 deletions rust/vineyard/src/client/ds/object_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use super::typename::type_name;

pub struct ObjectFactory {}

type ObjectInitializer = Box<dyn Object>;
type ObjectInitializer = fn() -> Box<dyn Object>;

pub trait Create {
fn create() -> &'static Arc<Mutex<Box<dyn Object>>>;
Expand All @@ -36,9 +36,12 @@ impl ObjectFactory {
let typename = type_name::<T>();
println!("Register data type: {}", typename);
let KNOWN_TYPES = ObjectFactory::get_known_types();
let tmp: Box<dyn Object> = (*T::create().lock().unwrap()).clone();
KNOWN_TYPES.lock().unwrap().insert(typename, tmp);
// Question: T::create()
let closure: ObjectInitializer = || (T::create().lock().unwrap()).clone();
// 如果create返回不是引用的话:Arc::try_unwrap(*T::create()).unwrap().into_inner().unwrap()
// dyn_clone. Otherwise:
// cannot move out of dereference of `MutexGuard<'_, Box<dyn object::Object>>`
KNOWN_TYPES.lock().unwrap().insert(typename, closure);

true
}

Expand All @@ -51,9 +54,7 @@ impl ObjectFactory {
"Failed to create an instance due to the unknown typename: {}",
type_name
),
Some(initialized_object) => Ok((*initialized_object).clone()),
// Question: Add dyn_clone crate
// 应该是个closure
Some(initialized_object) => Ok((*initialized_object)()),
}
}

Expand All @@ -71,9 +72,8 @@ impl ObjectFactory {
type_name
),
Some(target) => {
// Question: 闭包返回一个新的实例
let mut target = (target)();

let mut target = (*target).clone();
target.construct(&metadata);
return Ok(target);
}
Expand Down
11 changes: 4 additions & 7 deletions rust/vineyard/src/client/ds/object_meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
use std::io;
use std::ops;
use std::rc::{Rc, Weak};

use arrow::buffer as arrow;
Expand Down Expand Up @@ -197,7 +196,7 @@ impl ObjectMeta {
.extend(&member.buffer_set.borrow());
}

pub fn add_member_with_object(&mut self, name: &String, member: &Object) {
pub fn add_member_with_object(&mut self, name: &String, member: &dyn Object) {
self.add_member_with_meta(name, member.meta());
}

Expand Down Expand Up @@ -231,8 +230,9 @@ impl ObjectMeta {
ret.set_meta_data(self.client.as_ref().unwrap(), &child_meta);
let all_buffer_set = self.buffer_set.borrow();
let all_blobs = all_buffer_set.all_buffers();
let ret_blobs = ret.buffer_set.borrow().all_buffers().clone(); // 去掉
for (key, _) in ret_blobs.iter() {
// let ret_buffer_set = ret.buffer_set.borrow_mut();
// let ret_blobs = ret_buffer_set.all_buffers();//.clone(); // 去掉
for (key, _) in ret.buffer_set.clone().borrow_mut().all_buffers().iter() {
if let Some(value) = all_blobs.get(key) {
ret.set_buffer(*key, &value); // clone
}
Expand Down Expand Up @@ -290,8 +290,6 @@ impl ObjectMeta {
&self.buffer_set
}

// TODO: Check logic

pub fn find_all_blobs(&mut self, tree: &Value) {
if tree.is_null() {
return;
Expand All @@ -311,7 +309,6 @@ impl ObjectMeta {
cond2 = true;
}
if cond2 || cond1 {
// QUESTION // TODO
VINEYARD_CHECK_OK(self.buffer_set.borrow_mut().emplace_null_buffer(member_id));
}
} else {
Expand Down
29 changes: 24 additions & 5 deletions rust/vineyard/src/client/ipc_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ limitations under the License.
*/
use std::io;
use std::io::prelude::*;
use std::rc::{Rc, Weak};
use std::rc::Rc;

use serde_json::Value;

Expand Down Expand Up @@ -62,7 +62,7 @@ impl Default for IPCClient {
}

impl IPCClient {
pub fn create_blob(&mut self, size: usize, blob: Box<BlobWriter>) -> Result<(), bool> {
pub fn create_blob(&self, size: usize, blob: &Box<BlobWriter>) -> io::Result<()> {
ENSURE_CONNECTED(self.connected());
let object_id = invalid_object_id();
let mut object: Payload;
Expand All @@ -72,12 +72,31 @@ impl IPCClient {
}

pub fn create_buffer(
&mut self,
size: usize,
id: ObjectID,
payload: &mut Payload,
buffer: Option<Rc<arrow::MutableBuffer>>,
) -> Result<(), bool> {
panic!(); //TODO
) -> io::Result<Option<Rc<arrow::MutableBuffer>>> {
ENSURE_CONNECTED(self.connected());
let mut stream = self.get_stream()?;
let message_out = write_create_remote_buffer_request(size);
do_write(&mut stream, &message_out)?;
let mut message_in = String::new();
do_read(&mut stream, &mut message_in)?;
let message_in: Value = serde_json::from_str(&message_in)?;
let (id, payload) = read_create_buffer_reply(message_in)?;

let shared: *const u8 = std::ptr::null();
if payload.data_size > 0 {
RETURN_ON_ERROR(
//TODO: mmapToClient(payload.store_fd, payload.map_size, false, true, &shared)
Ok(()),
);
}
//let buffer = std::make_shared<arrow::MutableBuffer>(shared + payload.data_offset,
// payload.data_size);

panic!();
}

pub fn drop_buffer(&mut self, id: ObjectID, fd: i32) -> Result<(), bool> {
Expand Down
8 changes: 3 additions & 5 deletions rust/vineyard/src/common/memory/payload.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
use std::ptr;

use serde_json::{json, Value};

use super::uuid::*;
Expand All @@ -11,7 +9,7 @@ pub struct Payload {

arena_fd: i32,
data_offset: isize,
data_size: i64,
pub data_size: i64,
map_size: i64,
pointer: *const u8, // TODO: Check if this is right for nullptr
}
Expand All @@ -25,7 +23,7 @@ impl Default for Payload {
data_offset: 0,
data_size: 0,
map_size: 0,
pointer: ptr::null(), // nullptr
pointer: std::ptr::null(), // nullptr
}
}
}
Expand All @@ -51,6 +49,6 @@ impl Payload {
self.data_offset = tree["data_offset"].as_i64().unwrap() as isize;
self.data_size = tree["data_size"].as_i64().unwrap();
self.map_size = tree["map_size"].as_i64().unwrap();
self.pointer = ptr::null(); // nullptr
self.pointer = std::ptr::null(); // nullptr
}
}
4 changes: 2 additions & 2 deletions rust/vineyard/src/common/util/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ pub fn write_persist_request(id: ObjectID) -> String {
encode_msg(msg)
}

pub fn read_persist_reply(root: Value) -> io::Result<(())> {
pub fn read_persist_reply(root: Value) -> io::Result<()> {
CHECK_IPC_ERROR(&root, "persist_reply");
Ok(())
}
Expand All @@ -223,7 +223,7 @@ pub fn write_if_persist_request(id: ObjectID) -> String {
encode_msg(msg)
}

pub fn read_if_persist_reply(root: Value) -> io::Result<(bool)> {
pub fn read_if_persist_reply(root: Value) -> io::Result<bool> {
CHECK_IPC_ERROR(&root, "if_persist_reply");
let persist = root["persist"].as_bool().unwrap_or(false);
Ok(persist)
Expand Down
12 changes: 9 additions & 3 deletions rust/vineyard/src/common/util/status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,28 @@ use std::io;

pub fn VINEYARD_CHECK_OK<T>(status: io::Result<T>) {
if let Err(_) = status {
println!("Error occurs")
panic!("Error occurs.")
}
}

pub fn VINEYARD_ASSERT(condition: bool) {
if (!condition) {
if !condition {
panic!()
}
}

pub fn RETURN_ON_ASSERT(b: bool) {
if (!b) {
if !b {
panic!("On assert failed.");
}
}

pub fn RETURN_ON_ERROR<T>(status: io::Result<T>) {
if let Err(_) = status {
panic!("Error occurs.")
}
}

// Question
pub fn CHECK(condition: bool) {
if !condition {
Expand Down
Loading