diff --git a/rust/vineyard/src/client/client.rs b/rust/vineyard/src/client/client.rs index 93b351b5..9eb9cd28 100644 --- a/rust/vineyard/src/client/client.rs +++ b/rust/vineyard/src/client/client.rs @@ -20,8 +20,6 @@ use std::os::unix::net::UnixStream; use serde_json::Value; -use super::ipc_client::IPCClient; -use super::rpc_client::RPCClient; use super::rust_io::*; use super::ObjectMeta; diff --git a/rust/vineyard/src/client/ds/blob.rs b/rust/vineyard/src/client/ds/blob.rs index 1c66b2ef..9a97e7da 100644 --- a/rust/vineyard/src/client/ds/blob.rs +++ b/rust/vineyard/src/client/ds/blob.rs @@ -18,12 +18,10 @@ use std::rc::{Rc, Weak}; use std::sync::{Arc, Mutex}; use arrow::buffer as arrow; -use lazy_static::lazy_static; use serde_json::json; use super::object::{Object, ObjectBase, ObjectBuilder, Registered}; -use super::object_factory::ObjectFactory; use super::object_meta::ObjectMeta; use super::payload::Payload; use super::status::*; @@ -161,9 +159,7 @@ impl Blob { pub fn dump() {} // Question: VLOG(); VLOG_IS_ON() - // Question: It will consume a client since IPCClient cannot implement clone - // trait(UnixStream). - pub fn make_empty(client: IPCClient) -> Rc { + pub fn make_empty(client: Rc) -> Rc { let mut empty_blob = Blob::default(); empty_blob.id = empty_blob_id(); empty_blob.size = 0; @@ -183,7 +179,9 @@ impl Blob { empty_blob .meta .add_json_key_value(&"transient".to_string(), &json!(true)); - let tmp: Rc = Rc::new(client); // Needs clone trait here + let tmp = Rc::clone(&client); // Needs clone trait here + let tmp = tmp as Rc; + empty_blob.meta.set_client(Some(Rc::downgrade(&tmp))); Rc::new(empty_blob) @@ -318,6 +316,10 @@ impl BufferSet { &self.buffers } + pub fn all_buffers_mut(&mut self) -> &mut HashMap>> { + &mut self.buffers + } + pub fn emplace_null_buffer(&mut self, id: ObjectID) -> io::Result<()> { if let Some(buf) = self.buffers.get(&id) { if let Some(_) = buf { diff --git a/rust/vineyard/src/client/ds/object.rs b/rust/vineyard/src/client/ds/object.rs index 75e7fdad..3d8a4ed6 100644 --- a/rust/vineyard/src/client/ds/object.rs +++ b/rust/vineyard/src/client/ds/object.rs @@ -1,3 +1,4 @@ +use std::any::Any; /** Copyright 2020-2021 Alibaba Group Holding Limited. Licensed under the Apache License, Version 2.0 (the "License"); @@ -19,7 +20,6 @@ use dyn_clone::DynClone; use serde_json::json; -use super::blob::Blob; use super::object_meta::ObjectMeta; use super::status::*; use super::uuid::ObjectID; @@ -35,7 +35,7 @@ pub trait ObjectBase { } } -pub trait Object: ObjectBase + Send + DynClone { +pub trait Object: ObjectBase + Send + std::fmt::Debug + DynClone { fn meta(&self) -> &ObjectMeta; fn meta_mut(&mut self) -> &mut ObjectMeta; @@ -46,6 +46,13 @@ pub trait Object: ObjectBase + Send + DynClone { fn set_meta(&mut self, meta: &ObjectMeta); + fn as_any(self: &'_ Self) -> &'_ dyn Any + where + Self: Sized + 'static, + { + self + } + fn nbytes(&self) -> usize { self.meta().get_nbytes() } @@ -69,7 +76,7 @@ pub trait Object: ObjectBase + Send + DynClone { .get_key_value(&"transient".to_string()) .as_bool() .unwrap()); - if (!persist) { + if !persist { let client = self.meta().get_client().unwrap().upgrade().unwrap(); VINEYARD_CHECK_OK(client.if_persist(self.id())); let persist = client.if_persist(self.id()).unwrap(); @@ -88,6 +95,8 @@ pub trait Object: ObjectBase + Send + DynClone { dyn_clone::clone_trait_object!(Object); +pub trait GlobalObject {} + pub trait ObjectBuilder: ObjectBase { fn sealed(&self) -> bool; diff --git a/rust/vineyard/src/client/ds/object_factory.rs b/rust/vineyard/src/client/ds/object_factory.rs index 75ba558f..238c4cdf 100644 --- a/rust/vineyard/src/client/ds/object_factory.rs +++ b/rust/vineyard/src/client/ds/object_factory.rs @@ -25,7 +25,7 @@ use super::typename::type_name; pub struct ObjectFactory {} -type ObjectInitializer = Box; +type ObjectInitializer = fn() -> Box; pub trait Create { fn create() -> &'static Arc>>; @@ -36,9 +36,12 @@ impl ObjectFactory { let typename = type_name::(); println!("Register data type: {}", typename); let KNOWN_TYPES = ObjectFactory::get_known_types(); - let tmp: Box = (*T::create().lock().unwrap()).clone(); - KNOWN_TYPES.lock().unwrap().insert(typename, tmp); - // Question: T::create() + let closure: ObjectInitializer = || (T::create().lock().unwrap()).clone(); + // 如果create返回不是引用的话:Arc::try_unwrap(*T::create()).unwrap().into_inner().unwrap() + // dyn_clone. Otherwise: + // cannot move out of dereference of `MutexGuard<'_, Box>` + KNOWN_TYPES.lock().unwrap().insert(typename, closure); + true } @@ -51,9 +54,7 @@ impl ObjectFactory { "Failed to create an instance due to the unknown typename: {}", type_name ), - Some(initialized_object) => Ok((*initialized_object).clone()), - // Question: Add dyn_clone crate - // 应该是个closure + Some(initialized_object) => Ok((*initialized_object)()), } } @@ -71,9 +72,8 @@ impl ObjectFactory { type_name ), Some(target) => { - // Question: 闭包返回一个新的实例 + let mut target = (target)(); - let mut target = (*target).clone(); target.construct(&metadata); return Ok(target); } diff --git a/rust/vineyard/src/client/ds/object_meta.rs b/rust/vineyard/src/client/ds/object_meta.rs index e3179e98..cc142933 100644 --- a/rust/vineyard/src/client/ds/object_meta.rs +++ b/rust/vineyard/src/client/ds/object_meta.rs @@ -15,7 +15,6 @@ See the License for the specific language governing permissions and limitations under the License. */ use std::io; -use std::ops; use std::rc::{Rc, Weak}; use arrow::buffer as arrow; @@ -197,7 +196,7 @@ impl ObjectMeta { .extend(&member.buffer_set.borrow()); } - pub fn add_member_with_object(&mut self, name: &String, member: &Object) { + pub fn add_member_with_object(&mut self, name: &String, member: &dyn Object) { self.add_member_with_meta(name, member.meta()); } @@ -231,8 +230,9 @@ impl ObjectMeta { ret.set_meta_data(self.client.as_ref().unwrap(), &child_meta); let all_buffer_set = self.buffer_set.borrow(); let all_blobs = all_buffer_set.all_buffers(); - let ret_blobs = ret.buffer_set.borrow().all_buffers().clone(); // 去掉 - for (key, _) in ret_blobs.iter() { + // let ret_buffer_set = ret.buffer_set.borrow_mut(); + // let ret_blobs = ret_buffer_set.all_buffers();//.clone(); // 去掉 + for (key, _) in ret.buffer_set.clone().borrow_mut().all_buffers().iter() { if let Some(value) = all_blobs.get(key) { ret.set_buffer(*key, &value); // clone } @@ -290,8 +290,6 @@ impl ObjectMeta { &self.buffer_set } - // TODO: Check logic - pub fn find_all_blobs(&mut self, tree: &Value) { if tree.is_null() { return; @@ -311,7 +309,6 @@ impl ObjectMeta { cond2 = true; } if cond2 || cond1 { - // QUESTION // TODO VINEYARD_CHECK_OK(self.buffer_set.borrow_mut().emplace_null_buffer(member_id)); } } else { diff --git a/rust/vineyard/src/client/ipc_client.rs b/rust/vineyard/src/client/ipc_client.rs index 2261d5f7..e5e3c5e6 100644 --- a/rust/vineyard/src/client/ipc_client.rs +++ b/rust/vineyard/src/client/ipc_client.rs @@ -15,7 +15,7 @@ limitations under the License. */ use std::io; use std::io::prelude::*; -use std::rc::{Rc, Weak}; +use std::rc::Rc; use serde_json::Value; @@ -62,7 +62,7 @@ impl Default for IPCClient { } impl IPCClient { - pub fn create_blob(&mut self, size: usize, blob: Box) -> Result<(), bool> { + pub fn create_blob(&self, size: usize, blob: &Box) -> io::Result<()> { ENSURE_CONNECTED(self.connected()); let object_id = invalid_object_id(); let mut object: Payload; @@ -72,12 +72,31 @@ impl IPCClient { } pub fn create_buffer( + &mut self, size: usize, id: ObjectID, payload: &mut Payload, - buffer: Option>, - ) -> Result<(), bool> { - panic!(); //TODO + ) -> io::Result>> { + ENSURE_CONNECTED(self.connected()); + let mut stream = self.get_stream()?; + let message_out = write_create_remote_buffer_request(size); + do_write(&mut stream, &message_out)?; + let mut message_in = String::new(); + do_read(&mut stream, &mut message_in)?; + let message_in: Value = serde_json::from_str(&message_in)?; + let (id, payload) = read_create_buffer_reply(message_in)?; + + let shared: *const u8 = std::ptr::null(); + if payload.data_size > 0 { + RETURN_ON_ERROR( + //TODO: mmapToClient(payload.store_fd, payload.map_size, false, true, &shared) + Ok(()), + ); + } + //let buffer = std::make_shared(shared + payload.data_offset, + // payload.data_size); + + panic!(); } pub fn drop_buffer(&mut self, id: ObjectID, fd: i32) -> Result<(), bool> { diff --git a/rust/vineyard/src/common/memory/payload.rs b/rust/vineyard/src/common/memory/payload.rs index 2239f772..f392884b 100644 --- a/rust/vineyard/src/common/memory/payload.rs +++ b/rust/vineyard/src/common/memory/payload.rs @@ -1,5 +1,3 @@ -use std::ptr; - use serde_json::{json, Value}; use super::uuid::*; @@ -11,7 +9,7 @@ pub struct Payload { arena_fd: i32, data_offset: isize, - data_size: i64, + pub data_size: i64, map_size: i64, pointer: *const u8, // TODO: Check if this is right for nullptr } @@ -25,7 +23,7 @@ impl Default for Payload { data_offset: 0, data_size: 0, map_size: 0, - pointer: ptr::null(), // nullptr + pointer: std::ptr::null(), // nullptr } } } @@ -51,6 +49,6 @@ impl Payload { self.data_offset = tree["data_offset"].as_i64().unwrap() as isize; self.data_size = tree["data_size"].as_i64().unwrap(); self.map_size = tree["map_size"].as_i64().unwrap(); - self.pointer = ptr::null(); // nullptr + self.pointer = std::ptr::null(); // nullptr } } diff --git a/rust/vineyard/src/common/util/protocol.rs b/rust/vineyard/src/common/util/protocol.rs index 22ad43af..6723c2d7 100644 --- a/rust/vineyard/src/common/util/protocol.rs +++ b/rust/vineyard/src/common/util/protocol.rs @@ -213,7 +213,7 @@ pub fn write_persist_request(id: ObjectID) -> String { encode_msg(msg) } -pub fn read_persist_reply(root: Value) -> io::Result<(())> { +pub fn read_persist_reply(root: Value) -> io::Result<()> { CHECK_IPC_ERROR(&root, "persist_reply"); Ok(()) } @@ -223,7 +223,7 @@ pub fn write_if_persist_request(id: ObjectID) -> String { encode_msg(msg) } -pub fn read_if_persist_reply(root: Value) -> io::Result<(bool)> { +pub fn read_if_persist_reply(root: Value) -> io::Result { CHECK_IPC_ERROR(&root, "if_persist_reply"); let persist = root["persist"].as_bool().unwrap_or(false); Ok(persist) diff --git a/rust/vineyard/src/common/util/status.rs b/rust/vineyard/src/common/util/status.rs index cb99ac51..b24d9de7 100644 --- a/rust/vineyard/src/common/util/status.rs +++ b/rust/vineyard/src/common/util/status.rs @@ -2,22 +2,28 @@ use std::io; pub fn VINEYARD_CHECK_OK(status: io::Result) { if let Err(_) = status { - println!("Error occurs") + panic!("Error occurs.") } } pub fn VINEYARD_ASSERT(condition: bool) { - if (!condition) { + if !condition { panic!() } } pub fn RETURN_ON_ASSERT(b: bool) { - if (!b) { + if !b { panic!("On assert failed."); } } +pub fn RETURN_ON_ERROR(status: io::Result) { + if let Err(_) = status { + panic!("Error occurs.") + } +} + // Question pub fn CHECK(condition: bool) { if !condition { diff --git a/rust/vineyard/src/module/array.rs b/rust/vineyard/src/module/array.rs index 855c5482..b2a676b9 100644 --- a/rust/vineyard/src/module/array.rs +++ b/rust/vineyard/src/module/array.rs @@ -1,4 +1,6 @@ use std::any::Any; +use std::cell::RefCell; + use std::io; use std::marker::PhantomData; use std::mem; @@ -6,6 +8,7 @@ use std::rc::Rc; use std::sync::{Arc, Mutex}; use lazy_static::lazy_static; +use serde_json::json; use super::status::*; use super::typename::type_name; @@ -15,7 +18,7 @@ use super::IPCClient; use super::ObjectMeta; use super::ENSURE_NOT_SEALED; use super::{Blob, BlobWriter}; -use super::{Object, ObjectBase, ObjectBuilder, Registered}; +use super::{GlobalObject, Object, ObjectBase, ObjectBuilder, Registered}; #[derive(Debug, Clone)] pub struct Array { @@ -23,17 +26,19 @@ pub struct Array { id: ObjectID, registered: bool, size: usize, - buffer: Rc, // Question: unsafe Send // 不行用Arc - phantom: PhantomData, // Question: if this is correct? + buffer: Rc, // Question: unsafe Send // 不行用Arc + phantom: PhantomData, } unsafe impl Send for Array {} impl Create for Array { fn create() -> &'static Arc>> { + // TODO: Drop reference lazy_static! { static ref SHARED_ARRAY: Arc>> = - Arc::new(Mutex::new(Box::new(Array::default() as Array))); // Question + Arc::new(Mutex::new(Box::new(Array::default() as Array))); // FIXME + } &SHARED_ARRAY } @@ -82,9 +87,9 @@ impl Array { } } -impl Registered for Array {} +impl Registered for Array {} -impl Object for Array { +impl Object for Array { fn meta(&self) -> &ObjectMeta { &self.meta } @@ -132,10 +137,23 @@ pub trait ArrayBaseBuilder: ObjectBuilder { value .meta .set_type_name(&type_name::>().to_string()); - // if (std::is_base_of>::value) + // if (std::is_base_of>::value) TODO + if true { + value.meta.set_global(true); + } + value.size = self.size(); + value + .meta + .add_json_key_value(&"size_".to_string(), &json!(value.size)); + // Question: using __buffer__value_type = typename decltype(__value->buffer_)::element_type; + // auto __value_buffer_ = std::dynamic_pointer_cast<__buffer__value_type>( + // buffer_->_Seal(client)); + panic!(); } + fn size(&self) -> usize; + fn set_size(&mut self, size: usize); fn set_buffer(&mut self, buffer: &Rc); } @@ -149,6 +167,9 @@ pub struct ArrayBuilder { } impl ArrayBaseBuilder for ArrayBuilder { + fn size(&self) -> usize { + self.size + } fn set_size(&mut self, size: usize) { self.size = size; } @@ -168,16 +189,60 @@ impl ObjectBuilder for ArrayBuilder { } } -impl ObjectBase for ArrayBuilder {} +impl ObjectBase for ArrayBuilder { + fn build(&mut self, client: &IPCClient) -> io::Result<()> { + self.set_size(self.size); + //self.set_buffer(&Rc::from(self.buffer_writer)); + Ok(()) + } +} + +impl ArrayBuilder { + pub fn from(&mut self, client: &IPCClient, size: usize) { + ArrayBaseBuilder::from(self, client); + self.size = size; + VINEYARD_CHECK_OK(client.create_blob(self.size * mem::size_of::(), &self.buffer_writer)); + let data = unsafe { *self.buffer_writer.data() }; + //let data: T = data; // Question: Cannot coerce T to u8 + //self.data = data; + } -impl ArrayBuilder { - // pub fn from(client: &impl Client, size: usize) -> ArrayBuilder { - // VINEYARD_CHECK_OK(client.create_blob(size * mem::size_of(), buffer_writer)); - // ArrayBuilder{ - // size: size, - // data: buffer_writer.data() //TODO - // } - // } + pub fn from_vec(&mut self, client: &IPCClient, vec: Vec) { + self.from(client, vec.len()); + let dest: *mut T = std::ptr::null_mut(); + unsafe { + std::ptr::copy_nonoverlapping(vec.as_ptr(), dest, self.size * mem::size_of::()); + } + let data = unsafe { &*dest }; + self.data = (*data).clone(); + // Raw pointers don't move ownership. + // Compiler cannot protect against bugs like use-after-free; + } + + pub fn from_array(&mut self, client: &IPCClient, data: *const T, size: usize) { + self.from(client, size); + let dest: *mut T = std::ptr::null_mut(); + unsafe { + std::ptr::copy_nonoverlapping(data, dest, self.size * mem::size_of::()); + } + let data = unsafe { &*dest }; + self.data = (*data).clone(); + } + + pub fn size(&self) -> usize { + self.size + } + + // TODO: + // T& operator[](size_t idx) { return data_[idx]; } + + pub fn data_mut(&mut self) -> *mut T { + &mut self.data + } + + pub fn data(&self) -> *const T { + &self.data + } } pub struct ResizableArrayBuilder { @@ -188,6 +253,10 @@ pub struct ResizableArrayBuilder { } impl ArrayBaseBuilder for ResizableArrayBuilder { + fn size(&self) -> usize { + self.size + } + fn set_size(&mut self, size: usize) { self.size = size; } diff --git a/rust/vineyard/src/module/mod.rs b/rust/vineyard/src/module/mod.rs index 8903a92d..d4f13e43 100644 --- a/rust/vineyard/src/module/mod.rs +++ b/rust/vineyard/src/module/mod.rs @@ -7,7 +7,7 @@ pub use crate::client::rpc_client::RPCClient; pub use crate::client::ds::blob::{Blob, BlobWriter}; pub use crate::client::ds::object::{ - Object, ObjectBase, ObjectBuilder, Registered, ENSURE_NOT_SEALED, + GlobalObject, Object, ObjectBase, ObjectBuilder, Registered, ENSURE_NOT_SEALED, }; pub use crate::client::ds::object_factory::{Create, ObjectFactory}; pub use crate::client::ds::object_meta::ObjectMeta;