Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Spawn dedicated control loops per Indexer #866

Merged
merged 40 commits into from
Jul 18, 2024
Merged
Show file tree
Hide file tree
Changes from 38 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
01acdfe
feat: Add outline for dedicated indexer control loops
morgsmccauley Jul 11, 2024
515d051
feat: Implement rough state handling
morgsmccauley Jul 12, 2024
5fdb5d2
refactor: Abstract data layer provisioning logic
morgsmccauley Jul 12, 2024
2c95925
refactor: Abstract block stream/executor synchronisation
morgsmccauley Jul 12, 2024
08c96c1
refactor: Abstract deprovisioning handling
morgsmccauley Jul 12, 2024
3a5559e
feat: Flush state on control loop finish
morgsmccauley Jul 12, 2024
1d8e888
chore: Notes
morgsmccauley Jul 12, 2024
2e947da
feat: Correctly handle non-existant indexer in registry
morgsmccauley Jul 14, 2024
d57a42b
chore: Log delays during de/provisioning data layer
morgsmccauley Jul 14, 2024
5655c66
feat: Complete sync executor implementation
morgsmccauley Jul 14, 2024
3ff0ca4
refactor: Move block sync logic to handler struct
morgsmccauley Jul 15, 2024
47e554f
refactor: Rename enum `Lifecycle` > `LifecycleStates`
morgsmccauley Jul 15, 2024
d548079
chore: Remove uncompilable synchorniser tests
morgsmccauley Jul 15, 2024
ef4962a
feat: Throttle control loop
morgsmccauley Jul 15, 2024
39a6bc1
refactor: Remove unnecessary `Deprovisioning` state
morgsmccauley Jul 15, 2024
5eaacca
refactor: Rename for clarity
morgsmccauley Jul 15, 2024
b43949a
feat: Move throttle to start of loop to ensure it always runs
morgsmccauley Jul 15, 2024
40021f6
feat: Stop lifecycle on error
morgsmccauley Jul 15, 2024
8538874
refactor: Abstract stopping of block_streams/executors
morgsmccauley Jul 15, 2024
1c6f5c8
refactor: Store `initial_config` to remove need to handle `Option`
morgsmccauley Jul 15, 2024
bbd9f7a
feat: Clean up redis state
morgsmccauley Jul 15, 2024
4165cfc
feat: Spawn lifecycle manager per indexer
morgsmccauley Jul 15, 2024
b1a4944
feat: Remove finished lifecycles
morgsmccauley Jul 15, 2024
1ef544a
chore: Remove notes
morgsmccauley Jul 16, 2024
40c471e
refactor: Remove mocks to enable cloning
morgsmccauley Jul 16, 2024
af3dc45
feat: Get block stream by ID
morgsmccauley Jul 16, 2024
2760d13
feat: Get Executor by id
morgsmccauley Jul 16, 2024
bab4878
refactor: Get stream by account id/function name
morgsmccauley Jul 16, 2024
0421fb0
refactor: Get executor by account/name
morgsmccauley Jul 16, 2024
9ea8fc0
fix: Complete stream/executor get methods
morgsmccauley Jul 16, 2024
e3ea8e6
feat: Migrate state
morgsmccauley Jul 16, 2024
219daae
chore: Add logging
morgsmccauley Jul 16, 2024
8137a46
refactor: avoid using `config` as it may be `None`
morgsmccauley Jul 16, 2024
3fc4cab
fix: Ensure lifecycle exits on delete
morgsmccauley Jul 16, 2024
36ae5f1
fix: Ensure deleting is handled correctly
morgsmccauley Jul 16, 2024
6e6b928
chore: Remove lifecycle filter
morgsmccauley Jul 16, 2024
100ea2e
test: Fix indexer state tests
morgsmccauley Jul 16, 2024
4248bfb
fix: Ensure block streams are correctly resumed
morgsmccauley Jul 16, 2024
8420b61
doc: Document `LifecycleState`
morgsmccauley Jul 16, 2024
81952da
chore: Remove unused import
morgsmccauley Jul 17, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions block-streamer/proto/block_streamer.proto
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,17 @@ service BlockStreamer {

// Lists all current BlockStream processes
rpc ListStreams (ListStreamsRequest) returns (ListStreamsResponse);

// Get info for an existing BlockStream process
rpc GetStream (GetStreamRequest) returns (StreamInfo);
}

// Request message for getting a BlockStream
message GetStreamRequest {
// Account ID which the indexer is defined under
string account_id = 1;
// Name of the indexer
string function_name = 2;
}

// Request message for starting a BlockStream
Expand Down
95 changes: 94 additions & 1 deletion block-streamer/src/server/block_streamer_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,38 @@ impl BlockStreamerService {

#[tonic::async_trait]
impl blockstreamer::block_streamer_server::BlockStreamer for BlockStreamerService {
#[tracing::instrument(skip(self))]
async fn get_stream(
&self,
request: Request<blockstreamer::GetStreamRequest>,
) -> Result<Response<blockstreamer::StreamInfo>, Status> {
let request = request.into_inner();

let lock = self.block_streams.lock().map_err(|err| {
tracing::error!(?err, "Failed to acquire `block_streams` lock");
tonic::Status::internal("Failed to acquire `block_streams` lock")
})?;

let stream_entry = lock.iter().find(|(_, block_stream)| {
block_stream.indexer_config.account_id == request.account_id
&& block_stream.indexer_config.function_name == request.function_name
});

if let Some((stream_id, stream)) = stream_entry {
Ok(Response::new(StreamInfo {
stream_id: stream_id.to_string(),
account_id: stream.indexer_config.account_id.to_string(),
function_name: stream.indexer_config.function_name.to_string(),
version: stream.version,
}))
} else {
Err(Status::not_found(format!(
"Block Stream for account {} and name {} does not exist",
request.account_id, request.function_name
)))
}
}

#[tracing::instrument(skip(self))]
async fn start_stream(
&self,
Expand Down Expand Up @@ -171,7 +203,11 @@ impl blockstreamer::block_streamer_server::BlockStreamer for BlockStreamerServic
&self,
_request: Request<blockstreamer::ListStreamsRequest>,
) -> Result<Response<blockstreamer::ListStreamsResponse>, Status> {
let lock = self.block_streams.lock().unwrap();
let lock = self.block_streams.lock().map_err(|err| {
tracing::error!(?err, "Failed to acquire `block_streams` lock");
tonic::Status::internal("Failed to acquire `block_streams` lock")
})?;

let block_streams: Vec<StreamInfo> = lock
.values()
.map(|block_stream| StreamInfo {
Expand Down Expand Up @@ -234,6 +270,63 @@ mod tests {
)
}

#[tokio::test]
async fn get_existing_block_stream() {
let block_streamer_service = create_block_streamer_service();

{
let lock = block_streamer_service.get_block_streams_lock().unwrap();
assert_eq!(lock.len(), 0);
}

block_streamer_service
.start_stream(Request::new(StartStreamRequest {
start_block_height: 0,
account_id: "morgs.near".to_string(),
function_name: "test".to_string(),
version: 0,
redis_stream: "stream".to_string(),
rule: Some(start_stream_request::Rule::ActionAnyRule(ActionAnyRule {
affected_account_id: "queryapi.dataplatform.near".to_string(),
status: 1,
})),
}))
.await
.unwrap();

let stream = block_streamer_service
.get_stream(Request::new(GetStreamRequest {
account_id: "morgs.near".to_string(),
function_name: "test".to_string(),
}))
.await
.unwrap();

assert_eq!(
stream.into_inner().stream_id,
"16210176318434468568".to_string()
);
}

#[tokio::test]
async fn get_non_existant_block_stream() {
let block_streamer_service = create_block_streamer_service();

{
let lock = block_streamer_service.get_block_streams_lock().unwrap();
assert_eq!(lock.len(), 0);
}

let stream_response = block_streamer_service
.get_stream(Request::new(GetStreamRequest {
account_id: "morgs.near".to_string(),
function_name: "test".to_string(),
}))
.await;

assert_eq!(stream_response.err().unwrap().code(), tonic::Code::NotFound);
}

#[tokio::test]
async fn starts_a_block_stream() {
let block_streamer_service = create_block_streamer_service();
Expand Down
176 changes: 164 additions & 12 deletions coordinator/src/handlers/block_streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,34 +5,36 @@ pub use block_streamer::StreamInfo;
use anyhow::Context;
use block_streamer::block_streamer_client::BlockStreamerClient;
use block_streamer::{
start_stream_request::Rule, ActionAnyRule, ActionFunctionCallRule, ListStreamsRequest,
StartStreamRequest, Status, StopStreamRequest,
start_stream_request::Rule, ActionAnyRule, ActionFunctionCallRule, GetStreamRequest,
ListStreamsRequest, StartStreamRequest, Status, StopStreamRequest,
};
use near_primitives::types::AccountId;
use registry_types::StartBlock;
use tonic::transport::channel::Channel;
use tonic::Request;

use crate::indexer_config::IndexerConfig;
use crate::redis::KeyProvider;
use crate::redis::{KeyProvider, RedisClient};
use crate::utils::exponential_retry;

#[cfg(not(test))]
pub use BlockStreamsHandlerImpl as BlockStreamsHandler;
#[cfg(test)]
pub use MockBlockStreamsHandlerImpl as BlockStreamsHandler;

pub struct BlockStreamsHandlerImpl {
#[derive(Clone)]
pub struct BlockStreamsHandler {
client: BlockStreamerClient<Channel>,
redis_client: RedisClient,
}

#[cfg_attr(test, mockall::automock)]
impl BlockStreamsHandlerImpl {
pub fn connect(block_streamer_url: &str) -> anyhow::Result<Self> {
impl BlockStreamsHandler {
pub fn connect(block_streamer_url: &str, redis_client: RedisClient) -> anyhow::Result<Self> {
let channel = Channel::from_shared(block_streamer_url.to_string())
.context("Block Streamer URL is invalid")?
.connect_lazy();
let client = BlockStreamerClient::new(channel);

Ok(Self { client })
Ok(Self {
client,
redis_client,
})
}

pub async fn list(&self) -> anyhow::Result<Vec<StreamInfo>> {
Expand Down Expand Up @@ -79,6 +81,26 @@ impl BlockStreamsHandlerImpl {
.into()
}

pub async fn get(
&self,
account_id: AccountId,
function_name: String,
) -> anyhow::Result<Option<StreamInfo>> {
let request = GetStreamRequest {
account_id: account_id.to_string(),
function_name: function_name.clone(),
};

match self.client.clone().get_stream(Request::new(request)).await {
Ok(response) => Ok(Some(response.into_inner())),
Err(status) if status.code() == tonic::Code::NotFound => Ok(None),
Err(err) => Err(err).context(format!(
"Failed to get stream for account {} and name {}",
account_id, function_name
)),
}
}

pub async fn start(
&self,
start_block_height: u64,
Expand Down Expand Up @@ -139,4 +161,134 @@ impl BlockStreamsHandlerImpl {

Ok(())
}

async fn reconfigure_block_stream(&self, config: &IndexerConfig) -> anyhow::Result<()> {
if matches!(
config.start_block,
StartBlock::Latest | StartBlock::Height(..)
) {
self.redis_client.clear_block_stream(config).await?;
}

let height = match config.start_block {
StartBlock::Latest => config.get_registry_version(),
StartBlock::Height(height) => height,
StartBlock::Continue => self.get_continuation_block_height(config).await?,
};

tracing::info!(
start_block = ?config.start_block,
height,
"Starting block stream"
);

self.start(height, config).await?;

Ok(())
}

async fn start_new_block_stream(&self, config: &IndexerConfig) -> anyhow::Result<()> {
let height = match config.start_block {
StartBlock::Height(height) => height,
StartBlock::Latest => config.get_registry_version(),
StartBlock::Continue => {
tracing::warn!(
"Attempted to start new Block Stream with CONTINUE, using LATEST instead"
);
config.get_registry_version()
}
};

tracing::info!(
start_block = ?config.start_block,
height,
"Starting block stream"
);

self.start(height, config).await
}

async fn get_continuation_block_height(&self, config: &IndexerConfig) -> anyhow::Result<u64> {
let height = self
.redis_client
.get_last_published_block(config)
.await?
.map(|height| height + 1)
.unwrap_or_else(|| {
tracing::warn!(
"Failed to get continuation block height, using registry version instead"
);

config.get_registry_version()
});

Ok(height)
}

async fn resume_block_stream(&self, config: &IndexerConfig) -> anyhow::Result<()> {
let height = self.get_continuation_block_height(config).await?;

tracing::info!(height, "Resuming block stream");

self.start(height, config).await?;

Ok(())
}

pub async fn synchronise_block_stream(
&self,
config: &IndexerConfig,
previous_sync_version: Option<u64>,
) -> anyhow::Result<()> {
let block_stream = self
.get(config.account_id.clone(), config.function_name.clone())
.await?;

if let Some(block_stream) = block_stream {
if block_stream.version == config.get_registry_version() {
return Ok(());
}

tracing::info!(
previous_version = block_stream.version,
"Stopping outdated block stream"
);

self.stop(block_stream.stream_id.clone()).await?;

self.reconfigure_block_stream(config).await?;

return Ok(());
}

if previous_sync_version.is_none() {
self.start_new_block_stream(config).await?;

return Ok(());
}

if previous_sync_version.unwrap() != config.get_registry_version() {
self.reconfigure_block_stream(config).await?;

return Ok(());
}

self.resume_block_stream(config).await?;

Ok(())
}

pub async fn stop_if_needed(
&self,
account_id: AccountId,
function_name: String,
) -> anyhow::Result<()> {
if let Some(block_stream) = self.get(account_id, function_name).await? {
tracing::info!("Stopping block stream");

self.stop(block_stream.stream_id).await?;
}

Ok(())
}
}
Loading
Loading