Skip to content

Commit

Permalink
separated observers into a separate observe and observe_deep func…
Browse files Browse the repository at this point in the history
…tion
  • Loading branch information
Waidhoferj committed May 6, 2022
1 parent 3fc52b9 commit b6fe296
Show file tree
Hide file tree
Showing 10 changed files with 330 additions and 177 deletions.
35 changes: 33 additions & 2 deletions src/shared_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,42 @@ use crate::{
y_text::YText,
y_xml::{YXmlElement, YXmlText},
};
use pyo3::prelude::*;
use pyo3::create_exception;
use pyo3::{exceptions::PyException, prelude::*};
use std::convert::TryFrom;
use yrs::types::TYPE_REFS_XML_ELEMENT;
use yrs::types::TYPE_REFS_XML_TEXT;
use yrs::types::{TypeRefs, TYPE_REFS_ARRAY, TYPE_REFS_MAP, TYPE_REFS_TEXT};
use yrs::{types::TYPE_REFS_XML_ELEMENT, SubscriptionId};

// Common errors
create_exception!(y_py, PreliminaryObservationException, PyException, "Occurs when an observer is attached to a Y type that is not integrated into a YDoc. Y types can only be observed once they have been added to a YDoc.");

/// Creates a default error with a common message string for throwing a `PyErr`.
pub(crate) trait DefaultPyErr {
/// Creates a new instance of the error with a default message.
fn default_message() -> PyErr;
}

impl DefaultPyErr for PreliminaryObservationException {
fn default_message() -> PyErr {
PreliminaryObservationException::new_err(
"Cannot observe a preliminary type. Must be added to a YDoc first",
)
}
}

#[pyclass]
#[derive(Clone, Copy)]
pub struct ShallowSubscription(pub SubscriptionId);
#[pyclass]
#[derive(Clone, Copy)]
pub struct DeepSubscription(pub SubscriptionId);

#[derive(FromPyObject)]
pub enum SubId {
Shallow(ShallowSubscription),
Deep(DeepSubscription),
}

#[derive(Clone)]
pub enum SharedType<T, P> {
Expand Down
70 changes: 39 additions & 31 deletions src/y_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@ use std::convert::TryInto;
use std::mem::ManuallyDrop;
use std::ops::DerefMut;

use crate::shared_types::{
DeepSubscription, DefaultPyErr, PreliminaryObservationException, ShallowSubscription, SubId,
};
use crate::type_conversions::{events_into_py, insert_at};
use crate::y_transaction::YTransaction;

use super::shared_types::SharedType;
use crate::type_conversions::ToPython;
use pyo3::exceptions::{PyIndexError, PyTypeError};
use pyo3::exceptions::PyIndexError;
use pyo3::prelude::*;
use pyo3::types::{PyList, PySlice, PySliceIndices};
use yrs::types::array::{ArrayEvent, ArrayIter};
Expand Down Expand Up @@ -187,47 +190,52 @@ impl YArray {
/// Subscribes to all operations happening over this instance of `YArray`. All changes are
/// batched and eventually triggered during transaction commit phase.
/// Returns a `SubscriptionId` which can be used to cancel the callback with `unobserve`.
pub fn observe(&mut self, f: PyObject, deep: Option<bool>) -> PyResult<SubscriptionId> {
let deep = deep.unwrap_or(false);
pub fn observe(&mut self, f: PyObject) -> PyResult<ShallowSubscription> {
match &mut self.0 {
SharedType::Integrated(array) if deep => {
let sub = array.observe_deep(move |txn, events| {
Python::with_gil(|py| {
let events = events_into_py(txn, events);
if let Err(err) = f.call1(py, (events,)) {
err.restore(py)
}
SharedType::Integrated(array) => {
let sub: SubscriptionId = array
.observe(move |txn, e| {
Python::with_gil(|py| {
let event = YArrayEvent::new(e, txn);
if let Err(err) = f.call1(py, (event,)) {
err.restore(py)
}
})
})
});
Ok(sub.into())
.into();
Ok(ShallowSubscription(sub))
}
SharedType::Prelim(_) => Err(PreliminaryObservationException::default_message()),
}
}
/// Observes YArray events and events of all child elements.
pub fn observe_deep(&mut self, f: PyObject) -> PyResult<DeepSubscription> {
match &mut self.0 {
SharedType::Integrated(array) => {
let sub = array.observe(move |txn, e| {
Python::with_gil(|py| {
let event = YArrayEvent::new(e, txn);
if let Err(err) = f.call1(py, (event,)) {
err.restore(py)
}
let sub: SubscriptionId = array
.observe_deep(move |txn, events| {
Python::with_gil(|py| {
let events = events_into_py(txn, events);
if let Err(err) = f.call1(py, (events,)) {
err.restore(py)
}
})
})
});
Ok(sub.into())
.into();
Ok(DeepSubscription(sub))
}
SharedType::Prelim(_) => Err(PyTypeError::new_err(
"Cannot observe a preliminary type. Must be added to a YDoc first",
)),
SharedType::Prelim(_) => Err(PreliminaryObservationException::default_message()),
}
}

/// Cancels the callback of an observer using the Subscription ID returned from the `observe` method.
pub fn unobserve(&mut self, subscription_id: SubscriptionId) -> PyResult<()> {
pub fn unobserve(&mut self, subscription_id: SubId) -> PyResult<()> {
match &mut self.0 {
SharedType::Integrated(v) => {
v.unobserve(subscription_id);
Ok(())
}
SharedType::Prelim(_) => Err(PyTypeError::new_err(
"Cannot call unobserve on a preliminary type. Must be added to a YDoc first",
)),
SharedType::Integrated(arr) => Ok(match subscription_id {
SubId::Shallow(ShallowSubscription(id)) => arr.unobserve(id),
SubId::Deep(DeepSubscription(id)) => arr.unobserve_deep(id),
}),
SharedType::Prelim(_) => Err(PreliminaryObservationException::default_message()),
}
}
}
Expand Down
73 changes: 41 additions & 32 deletions src/y_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@ use yrs::types::map::{MapEvent, MapIter};
use yrs::types::DeepObservable;
use yrs::{Map, SubscriptionId, Transaction};

use crate::shared_types::SharedType;
use crate::shared_types::{
DeepSubscription, DefaultPyErr, PreliminaryObservationException, ShallowSubscription,
SharedType, SubId,
};
use crate::type_conversions::{events_into_py, PyValueWrapper, ToPython};
use crate::y_transaction::YTransaction;

Expand Down Expand Up @@ -206,45 +209,51 @@ impl YMap {
YMapKeyIterator(self.items())
}

pub fn observe(&mut self, f: PyObject, deep: Option<bool>) -> PyResult<SubscriptionId> {
let deep = deep.unwrap_or(false);
pub fn observe(&mut self, f: PyObject) -> PyResult<ShallowSubscription> {
match &mut self.0 {
SharedType::Integrated(map) if deep => {
let sub = map.observe_deep(move |txn, events| {
Python::with_gil(|py| {
let events = events_into_py(txn, events);
if let Err(err) = f.call1(py, (events,)) {
err.restore(py)
}
SharedType::Integrated(v) => {
let sub_id: SubscriptionId = v
.observe(move |txn, e| {
Python::with_gil(|py| {
let e = YMapEvent::new(e, txn);
if let Err(err) = f.call1(py, (e,)) {
err.restore(py)
}
})
})
});
Ok(sub.into())
.into();
Ok(ShallowSubscription(sub_id))
}
SharedType::Integrated(v) => Ok(v
.observe(move |txn, e| {
Python::with_gil(|py| {
let e = YMapEvent::new(e, txn);
if let Err(err) = f.call1(py, (e,)) {
err.restore(py)
}
})
})
.into()),
SharedType::Prelim(_) => Err(PyTypeError::new_err(
"Cannot observe a preliminary type. Must be added to a YDoc first",
)),
SharedType::Prelim(_) => Err(PreliminaryObservationException::default_message()),
}
}
/// Cancels the observer callback associated with the `subscripton_id`.
pub fn unobserve(&mut self, subscription_id: SubscriptionId) -> PyResult<()> {

pub fn observe_deep(&mut self, f: PyObject) -> PyResult<DeepSubscription> {
match &mut self.0 {
SharedType::Integrated(map) => {
map.unobserve(subscription_id);
Ok(())
let sub: SubscriptionId = map
.observe_deep(move |txn, events| {
Python::with_gil(|py| {
let events = events_into_py(txn, events);
if let Err(err) = f.call1(py, (events,)) {
err.restore(py)
}
})
})
.into();
Ok(DeepSubscription(sub))
}
SharedType::Prelim(_) => Err(PyTypeError::new_err(
"Cannot unobserve a preliminary type. Must be added to a YDoc first",
)),
SharedType::Prelim(_) => Err(PreliminaryObservationException::default_message()),
}
}
/// Cancels the observer callback associated with the `subscripton_id`.
pub fn unobserve(&mut self, subscription_id: SubId) -> PyResult<()> {
match &mut self.0 {
SharedType::Integrated(map) => Ok(match subscription_id {
SubId::Shallow(ShallowSubscription(id)) => map.unobserve(id),
SubId::Deep(DeepSubscription(id)) => map.unobserve_deep(id),
}),
SharedType::Prelim(_) => Err(PreliminaryObservationException::default_message()),
}
}
}
Expand Down
77 changes: 44 additions & 33 deletions src/y_text.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use crate::shared_types::SharedType;
use crate::shared_types::{
DeepSubscription, DefaultPyErr, PreliminaryObservationException, ShallowSubscription,
SharedType, SubId,
};
use crate::type_conversions::py_into_any;
use crate::type_conversions::{events_into_py, ToPython};
use crate::y_transaction::YTransaction;
Expand All @@ -11,7 +14,7 @@ use std::rc::Rc;
use yrs::types::text::TextEvent;
use yrs::types::Attrs;
use yrs::types::DeepObservable;
use yrs::{SubscriptionId, Text, Transaction};
use yrs::{Text, Transaction};

/// A shared data type used for collaborative text editing. It enables multiple users to add and
/// remove chunks of text in efficient manner. This type is internally represented as a mutable
Expand Down Expand Up @@ -192,45 +195,53 @@ impl YText {
}
}

pub fn observe(&mut self, f: PyObject, deep: Option<bool>) -> PyResult<SubscriptionId> {
let deep = deep.unwrap_or(false);
/// Observes updates from the `YText` instance.
pub fn observe(&mut self, f: PyObject) -> PyResult<ShallowSubscription> {
match &mut self.0 {
SharedType::Integrated(text) if deep => {
let sub = text.observe_deep(move |txn, events| {
Python::with_gil(|py| {
let events = events_into_py(txn, events);
if let Err(err) = f.call1(py, (events,)) {
err.restore(py)
}
SharedType::Integrated(text) => {
let sub_id = text
.observe(move |txn, e| {
Python::with_gil(|py| {
let e = YTextEvent::new(e, txn);
if let Err(err) = f.call1(py, (e,)) {
err.restore(py)
}
});
})
});
Ok(sub.into())
.into();
Ok(ShallowSubscription(sub_id))
}
SharedType::Integrated(v) => Ok(v
.observe(move |txn, e| {
Python::with_gil(|py| {
let e = YTextEvent::new(e, txn);
if let Err(err) = f.call1(py, (e,)) {
err.restore(py)
}
});
})
.into()),
SharedType::Prelim(_) => Err(PyTypeError::new_err(
"Cannot observe a preliminary type. Must be added to a YDoc first",
)),
SharedType::Prelim(_) => Err(PreliminaryObservationException::default_message()),
}
}
/// Cancels the observer callback associated with the `subscripton_id`.
pub fn unobserve(&mut self, subscription_id: SubscriptionId) -> PyResult<()> {

/// Observes updates from the `YText` instance and all of its nested children.
pub fn observe_deep(&mut self, f: PyObject) -> PyResult<DeepSubscription> {
match &mut self.0 {
SharedType::Integrated(text) => {
text.unobserve(subscription_id);
Ok(())
let sub = text
.observe_deep(move |txn, events| {
Python::with_gil(|py| {
let events = events_into_py(txn, events);
if let Err(err) = f.call1(py, (events,)) {
err.restore(py)
}
})
})
.into();
Ok(DeepSubscription(sub))
}
SharedType::Prelim(_) => Err(PyTypeError::new_err(
"Cannot unobserve a preliminary type. Must be added to a YDoc first",
)),
SharedType::Prelim(_) => Err(PreliminaryObservationException::default_message()),
}
}
/// Cancels the observer callback associated with the `subscripton_id`.
pub fn unobserve(&mut self, subscription_id: SubId) -> PyResult<()> {
match &mut self.0 {
SharedType::Integrated(text) => Ok(match subscription_id {
SubId::Shallow(ShallowSubscription(id)) => text.unobserve(id),
SubId::Deep(DeepSubscription(id)) => text.unobserve_deep(id),
}),
SharedType::Prelim(_) => Err(PreliminaryObservationException::default_message()),
}
}
}
Expand Down
Loading

0 comments on commit b6fe296

Please sign in to comment.