Skip to content

Commit

Permalink
Merge pull request #47 from y-crdt/deep-observe
Browse files Browse the repository at this point in the history
Deep Observe
  • Loading branch information
Waidhoferj authored May 6, 2022
2 parents 524217d + 67a6593 commit b834098
Show file tree
Hide file tree
Showing 11 changed files with 426 additions and 95 deletions.
35 changes: 33 additions & 2 deletions src/shared_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,42 @@ use crate::{
y_text::YText,
y_xml::{YXmlElement, YXmlText},
};
use pyo3::prelude::*;
use pyo3::create_exception;
use pyo3::{exceptions::PyException, prelude::*};
use std::convert::TryFrom;
use yrs::types::TYPE_REFS_XML_ELEMENT;
use yrs::types::TYPE_REFS_XML_TEXT;
use yrs::types::{TypeRefs, TYPE_REFS_ARRAY, TYPE_REFS_MAP, TYPE_REFS_TEXT};
use yrs::{types::TYPE_REFS_XML_ELEMENT, SubscriptionId};

// Common errors
create_exception!(y_py, PreliminaryObservationException, PyException, "Occurs when an observer is attached to a Y type that is not integrated into a YDoc. Y types can only be observed once they have been added to a YDoc.");

/// Creates a default error with a common message string for throwing a `PyErr`.
pub(crate) trait DefaultPyErr {
/// Creates a new instance of the error with a default message.
fn default_message() -> PyErr;
}

impl DefaultPyErr for PreliminaryObservationException {
fn default_message() -> PyErr {
PreliminaryObservationException::new_err(
"Cannot observe a preliminary type. Must be added to a YDoc first",
)
}
}

#[pyclass]
#[derive(Clone, Copy)]
pub struct ShallowSubscription(pub SubscriptionId);
#[pyclass]
#[derive(Clone, Copy)]
pub struct DeepSubscription(pub SubscriptionId);

#[derive(FromPyObject)]
pub enum SubId {
Shallow(ShallowSubscription),
Deep(DeepSubscription),
}

#[derive(Clone)]
pub enum SharedType<T, P> {
Expand Down
20 changes: 20 additions & 0 deletions src/type_conversions.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,24 @@
use lib0::any::Any;
use pyo3::prelude::*;
use pyo3::types as pytypes;
use pyo3::types::PyList;
use std::collections::HashMap;
use std::convert::TryFrom;
use std::ops::Deref;
use yrs::block::{ItemContent, Prelim};
use yrs::types::Events;
use yrs::types::{Attrs, Branch, BranchPtr, Change, Delta, EntryChange, Value};
use yrs::{Array, Map, Text, Transaction};

use crate::shared_types::{Shared, SharedType};
use crate::y_array::YArray;
use crate::y_array::YArrayEvent;
use crate::y_map::YMap;
use crate::y_map::YMapEvent;
use crate::y_text::YText;
use crate::y_text::YTextEvent;
use crate::y_xml::YXmlEvent;
use crate::y_xml::YXmlTextEvent;
use crate::y_xml::{YXmlElement, YXmlText};

pub trait ToPython {
Expand Down Expand Up @@ -324,6 +331,19 @@ impl ToPython for Value {
}
}

pub(crate) fn events_into_py(txn: &Transaction, events: &Events) -> PyObject {
Python::with_gil(|py| {
let py_events = events.iter().map(|event| match event {
yrs::types::Event::Text(e_txt) => YTextEvent::new(e_txt, txn).into_py(py),
yrs::types::Event::Array(e_arr) => YArrayEvent::new(e_arr, txn).into_py(py),
yrs::types::Event::Map(e_map) => YMapEvent::new(e_map, txn).into_py(py),
yrs::types::Event::XmlElement(e_xml) => YXmlEvent::new(e_xml, txn).into_py(py),
yrs::types::Event::XmlText(e_xml) => YXmlTextEvent::new(e_xml, txn).into_py(py),
});
PyList::new(py, py_events).into()
})
}

pub struct PyValueWrapper(pub PyObject);

impl Prelim for PyValueWrapper {
Expand Down
67 changes: 44 additions & 23 deletions src/y_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,19 @@ use std::convert::TryInto;
use std::mem::ManuallyDrop;
use std::ops::DerefMut;

use crate::type_conversions::insert_at;
use crate::shared_types::{
DeepSubscription, DefaultPyErr, PreliminaryObservationException, ShallowSubscription, SubId,
};
use crate::type_conversions::{events_into_py, insert_at};
use crate::y_transaction::YTransaction;

use super::shared_types::SharedType;
use crate::type_conversions::ToPython;
use pyo3::exceptions::{PyIndexError, PyTypeError};
use pyo3::exceptions::PyIndexError;
use pyo3::prelude::*;
use pyo3::types::{PyList, PySlice, PySliceIndices};
use yrs::types::array::{ArrayEvent, ArrayIter};
use yrs::types::DeepObservable;
use yrs::{Array, SubscriptionId, Transaction};

/// A collection used to store data in an indexed sequence structure. This type is internally
Expand Down Expand Up @@ -186,35 +190,52 @@ impl YArray {
/// Subscribes to all operations happening over this instance of `YArray`. All changes are
/// batched and eventually triggered during transaction commit phase.
/// Returns a `SubscriptionId` which can be used to cancel the callback with `unobserve`.
pub fn observe(&mut self, f: PyObject) -> PyResult<SubscriptionId> {
pub fn observe(&mut self, f: PyObject) -> PyResult<ShallowSubscription> {
match &mut self.0 {
SharedType::Integrated(array) => {
let subscription = array.observe(move |txn, e| {
Python::with_gil(|py| {
let event = YArrayEvent::new(e, txn);
if let Err(err) = f.call1(py, (event,)) {
err.restore(py)
}
let sub: SubscriptionId = array
.observe(move |txn, e| {
Python::with_gil(|py| {
let event = YArrayEvent::new(e, txn);
if let Err(err) = f.call1(py, (event,)) {
err.restore(py)
}
})
})
});
Ok(subscription.into())
.into();
Ok(ShallowSubscription(sub))
}
SharedType::Prelim(_) => Err(PyTypeError::new_err(
"Cannot observe a preliminary type. Must be added to a YDoc first",
)),
SharedType::Prelim(_) => Err(PreliminaryObservationException::default_message()),
}
}
/// Observes YArray events and events of all child elements.
pub fn observe_deep(&mut self, f: PyObject) -> PyResult<DeepSubscription> {
match &mut self.0 {
SharedType::Integrated(array) => {
let sub: SubscriptionId = array
.observe_deep(move |txn, events| {
Python::with_gil(|py| {
let events = events_into_py(txn, events);
if let Err(err) = f.call1(py, (events,)) {
err.restore(py)
}
})
})
.into();
Ok(DeepSubscription(sub))
}
SharedType::Prelim(_) => Err(PreliminaryObservationException::default_message()),
}
}

/// Cancels the callback of an observer using the Subscription ID returned from the `observe` method.
pub fn unobserve(&mut self, subscription_id: SubscriptionId) -> PyResult<()> {
pub fn unobserve(&mut self, subscription_id: SubId) -> PyResult<()> {
match &mut self.0 {
SharedType::Integrated(v) => {
v.unobserve(subscription_id);
Ok(())
}
SharedType::Prelim(_) => Err(PyTypeError::new_err(
"Cannot call unobserve on a preliminary type. Must be added to a YDoc first",
)),
SharedType::Integrated(arr) => Ok(match subscription_id {
SubId::Shallow(ShallowSubscription(id)) => arr.unobserve(id),
SubId::Deep(DeepSubscription(id)) => arr.unobserve_deep(id),
}),
SharedType::Prelim(_) => Err(PreliminaryObservationException::default_message()),
}
}
}
Expand Down Expand Up @@ -356,7 +377,7 @@ pub struct YArrayEvent {
}

impl YArrayEvent {
fn new(event: &ArrayEvent, txn: &Transaction) -> Self {
pub fn new(event: &ArrayEvent, txn: &Transaction) -> Self {
let inner = event as *const ArrayEvent;
let txn = txn as *const Transaction;
YArrayEvent {
Expand Down
68 changes: 45 additions & 23 deletions src/y_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,14 @@ use std::collections::HashMap;
use std::mem::ManuallyDrop;
use std::ops::DerefMut;
use yrs::types::map::{MapEvent, MapIter};
use yrs::types::DeepObservable;
use yrs::{Map, SubscriptionId, Transaction};

use crate::shared_types::SharedType;
use crate::type_conversions::{PyValueWrapper, ToPython};
use crate::shared_types::{
DeepSubscription, DefaultPyErr, PreliminaryObservationException, ShallowSubscription,
SharedType, SubId,
};
use crate::type_conversions::{events_into_py, PyValueWrapper, ToPython};
use crate::y_transaction::YTransaction;

/// Collection used to store key-value entries in an unordered manner. Keys are always represented
Expand Down Expand Up @@ -205,33 +209,51 @@ impl YMap {
YMapKeyIterator(self.items())
}

pub fn observe(&mut self, f: PyObject) -> PyResult<SubscriptionId> {
pub fn observe(&mut self, f: PyObject) -> PyResult<ShallowSubscription> {
match &mut self.0 {
SharedType::Integrated(v) => Ok(v
.observe(move |txn, e| {
Python::with_gil(|py| {
let e = YMapEvent::new(e, txn);
if let Err(err) = f.call1(py, (e,)) {
err.restore(py)
}
SharedType::Integrated(v) => {
let sub_id: SubscriptionId = v
.observe(move |txn, e| {
Python::with_gil(|py| {
let e = YMapEvent::new(e, txn);
if let Err(err) = f.call1(py, (e,)) {
err.restore(py)
}
})
})
})
.into()),
SharedType::Prelim(_) => Err(PyTypeError::new_err(
"Cannot observe a preliminary type. Must be added to a YDoc first",
)),
.into();
Ok(ShallowSubscription(sub_id))
}
SharedType::Prelim(_) => Err(PreliminaryObservationException::default_message()),
}
}
/// Cancels the observer callback associated with the `subscripton_id`.
pub fn unobserve(&mut self, subscription_id: SubscriptionId) -> PyResult<()> {

pub fn observe_deep(&mut self, f: PyObject) -> PyResult<DeepSubscription> {
match &mut self.0 {
SharedType::Integrated(map) => {
map.unobserve(subscription_id);
Ok(())
let sub: SubscriptionId = map
.observe_deep(move |txn, events| {
Python::with_gil(|py| {
let events = events_into_py(txn, events);
if let Err(err) = f.call1(py, (events,)) {
err.restore(py)
}
})
})
.into();
Ok(DeepSubscription(sub))
}
SharedType::Prelim(_) => Err(PyTypeError::new_err(
"Cannot unobserve a preliminary type. Must be added to a YDoc first",
)),
SharedType::Prelim(_) => Err(PreliminaryObservationException::default_message()),
}
}
/// Cancels the observer callback associated with the `subscripton_id`.
pub fn unobserve(&mut self, subscription_id: SubId) -> PyResult<()> {
match &mut self.0 {
SharedType::Integrated(map) => Ok(match subscription_id {
SubId::Shallow(ShallowSubscription(id)) => map.unobserve(id),
SubId::Deep(DeepSubscription(id)) => map.unobserve_deep(id),
}),
SharedType::Prelim(_) => Err(PreliminaryObservationException::default_message()),
}
}
}
Expand Down Expand Up @@ -296,7 +318,7 @@ pub struct YMapEvent {
}

impl YMapEvent {
fn new(event: &MapEvent, txn: &Transaction) -> Self {
pub fn new(event: &MapEvent, txn: &Transaction) -> Self {
let inner = event as *const MapEvent;
let txn = txn as *const Transaction;
YMapEvent {
Expand Down
Loading

0 comments on commit b834098

Please sign in to comment.