Skip to content

Commit

Permalink
prog
Browse files Browse the repository at this point in the history
  • Loading branch information
HammadB committed Sep 24, 2024
1 parent bdcd910 commit 919deca
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 2 deletions.
14 changes: 13 additions & 1 deletion rust/worker/src/execution/orchestration/count.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ pub(crate) struct CountQueryOrchestrator {
blockfile_provider: BlockfileProvider,
// Result channel
result_channel: Option<tokio::sync::oneshot::Sender<Result<usize, Box<dyn ChromaError>>>>,
// Request version context
collection_version: u32,
log_position: u64,
}

#[derive(Error, Debug)]
Expand All @@ -52,6 +55,8 @@ enum CountQueryOrchestratorError {
CollectionNotFound(Uuid),
#[error("Get collection error: {0}")]
GetCollectionError(#[from] GetCollectionsError),
#[error("Collection version mismatch")]
CollectionVersionMismatch,
}

impl ChromaError for CountQueryOrchestratorError {
Expand All @@ -65,6 +70,7 @@ impl ChromaError for CountQueryOrchestratorError {
CountQueryOrchestratorError::SystemTimeError(_) => ErrorCodes::Internal,
CountQueryOrchestratorError::CollectionNotFound(_) => ErrorCodes::NotFound,
CountQueryOrchestratorError::GetCollectionError(e) => e.code(),
CountQueryOrchestratorError::CollectionVersionMismatch => ErrorCodes::VersionMismatch,
}
}
}
Expand All @@ -78,6 +84,8 @@ impl CountQueryOrchestrator {
sysdb: Box<SysDb>,
dispatcher: ComponentHandle<Dispatcher>,
blockfile_provider: BlockfileProvider,
collection_version: u32,
log_position: u64,
) -> Self {
Self {
system,
Expand All @@ -90,6 +98,8 @@ impl CountQueryOrchestrator {
dispatcher,
blockfile_provider,
result_channel: None,
collection_version,
log_position,
}
}

Expand Down Expand Up @@ -170,7 +180,9 @@ impl CountQueryOrchestrator {
let input = PullLogsInput::new(
collection.id,
// The collection log position is inclusive, and we want to start from the next log.
collection.log_position + 1,
// Note that we query using the incoming log position this is critical for correctness
// TODO: We should make all the log service code use u64 instead of i64
(self.log_position as i64) + 1,
100,
None,
Some(end_timestamp),
Expand Down
13 changes: 12 additions & 1 deletion rust/worker/src/execution/orchestration/get_vectors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,16 @@ enum GetVectorsError {
TaskSendError(#[from] ChannelError),
#[error("System time error")]
SystemTimeError(#[from] std::time::SystemTimeError),
#[error("Collection version mismatch")]
CollectionVersionMismatch,
}

impl ChromaError for GetVectorsError {
fn code(&self) -> ErrorCodes {
match self {
GetVectorsError::TaskSendError(e) => e.code(),
GetVectorsError::SystemTimeError(_) => ErrorCodes::Internal,
GetVectorsError::CollectionVersionMismatch => ErrorCodes::VersionMismatch,
}
}
}
Expand All @@ -74,6 +77,8 @@ pub struct GetVectorsOrchestrator {
// Result channel
result_channel:
Option<tokio::sync::oneshot::Sender<Result<GetVectorsResult, Box<dyn ChromaError>>>>,
collection_version: u32,
log_position: u64,
}

impl GetVectorsOrchestrator {
Expand All @@ -86,6 +91,8 @@ impl GetVectorsOrchestrator {
sysdb: Box<SysDb>,
dispatcher: ComponentHandle<Dispatcher>,
blockfile_provider: BlockfileProvider,
collection_version: u32,
log_position: u64,
) -> Self {
Self {
state: ExecutionState::Pending,
Expand All @@ -100,6 +107,8 @@ impl GetVectorsOrchestrator {
record_segment: None,
collection: None,
result_channel: None,
collection_version,
log_position,
}
}

Expand Down Expand Up @@ -132,7 +141,9 @@ impl GetVectorsOrchestrator {
let input = PullLogsInput::new(
collection.id,
// The collection log position is inclusive, and we want to start from the next log
collection.log_position + 1,
// Note that we query using the incoming log position this is critical for correctness
// TODO: We should make all the log service code use u64 instead of i64
(self.log_position as i64) + 1,
100,
None,
Some(end_timestamp),
Expand Down

0 comments on commit 919deca

Please sign in to comment.