Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Batching example #694

Merged
merged 3 commits into from
Aug 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion mistralrs-core/src/paged_attention/block_engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ impl BlockEngine {
let num_required_blocks = seq.get_logical_token_blocks();
let num_free_gpu_blocks = self.gpu_allocator.get_num_free_blocks();

if self.num_gpu_blocks > *num_free_gpu_blocks + num_required_blocks {
if *num_free_gpu_blocks < num_required_blocks {
AllocStatus::Later
} else if self.num_gpu_blocks < num_required_blocks {
AllocStatus::Impossible
Expand Down
2 changes: 2 additions & 0 deletions mistralrs-core/src/paged_attention/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,7 @@ impl PagedAttentionScheduler {
true
}
});

for id in to_free_ids {
self._free(id);
}
Expand Down Expand Up @@ -337,6 +338,7 @@ impl PagedAttentionScheduler {

impl Scheduler for PagedAttentionScheduler {
fn add_seq(&mut self, seq: Sequence) {
println!("Adding sequence {}", seq.id());
self.waiting.push_back(Arc::new(Mutex::new(seq)));
}
fn schedule(&mut self) -> SchedulerOutput<'_> {
Expand Down
5 changes: 5 additions & 0 deletions mistralrs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ serde.workspace = true
image.workspace = true
indexmap.workspace = true
either.workspace = true
futures.workspace = true

[features]
cuda = ["mistralrs-core/cuda"]
Expand Down Expand Up @@ -93,3 +94,7 @@ required-features = []
[[example]]
name = "tools"
required-features = []

[[example]]
name = "batching"
required-features = []
133 changes: 133 additions & 0 deletions mistralrs/examples/batching/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
use either::Either;
use indexmap::IndexMap;
use std::sync::Arc;
use tokio::sync::mpsc::channel;

use mistralrs::{
initialize_logging, ChatCompletionResponse, Constraint, Device, DeviceMapMetadata,
GGUFLoaderBuilder, MemoryGpuConfig, MistralRs, MistralRsBuilder, ModelDType, NormalRequest,
PagedAttentionConfig, Request, RequestMessage, Response, SamplingParams, SchedulerConfig,
TokenSource, Usage,
};

async fn setup() -> anyhow::Result<Arc<MistralRs>> {
// Select a Mistral model
// This uses a model, tokenizer, and chat template, from HF hub.
let loader = GGUFLoaderBuilder::new(
None,
Some("mistralai/Mistral-7B-Instruct-v0.1".to_string()),
"TheBloke/Mistral-7B-Instruct-v0.1-GGUF".to_string(),
vec!["mistral-7b-instruct-v0.1.Q4_K_M.gguf".to_string()],
None,
)
.build();
// Load, into a Pipeline
let pipeline = loader.load_model_from_hf(
None,
TokenSource::CacheToken,
&ModelDType::default(),
&Device::cuda_if_available(0)?,
false,
DeviceMapMetadata::dummy(),
None,
Some(PagedAttentionConfig::new(
Some(32),
500,
MemoryGpuConfig::Utilization(0.9),
)?), // No PagedAttention.
)?;
let config = pipeline
.lock()
.await
.get_metadata()
.cache_config
.as_ref()
.unwrap()
.clone();
// Create the MistralRs, which is a runner
Ok(MistralRsBuilder::new(
pipeline,
SchedulerConfig::PagedAttentionMeta {
max_num_seqs: 500,
config,
},
)
.with_throughput_logging()
.build())
}

async fn bench_mistralrs(n_requests: usize) -> anyhow::Result<()> {
initialize_logging();
let mistralrs = setup().await?;

let mut handles = Vec::new();
for _ in 0..n_requests {
let (tx, rx) = channel(10_000);
let request = Request::Normal(NormalRequest {
messages: RequestMessage::Chat(vec![IndexMap::from([
("role".to_string(), Either::Left("user".to_string())),
(
"content".to_string(),
Either::Left("What is graphene".to_string()),
),
])]),
sampling_params: SamplingParams::default(),
response: tx,
return_logprobs: false,
is_streaming: false,
id: 0,
constraint: Constraint::None,
suffix: None,
adapters: None,
tools: None,
tool_choice: None,
});
mistralrs.get_sender()?.send(request).await?;
handles.push(rx);
}

let responses = futures::future::join_all(handles.iter_mut().map(|x| x.recv())).await;

let mut max_prompt = f32::MIN;
let mut max_completion = f32::MIN;

for response in responses {
let Response::Done(ChatCompletionResponse {
usage:
Usage {
avg_compl_tok_per_sec,
avg_prompt_tok_per_sec,
..
},
..
}) = response.as_ref().unwrap()
else {
match response.as_ref().unwrap() {
Response::InternalError(e) => panic!("Internal error: {e}"),
Response::ValidationError(e) => panic!("Validation error: {e}"),
Response::ModelError(e, c) => panic!(
"Model error: {e}. Response: Text: {}, Prompt T/s: {}, Completion T/s: {}",
c.choices[0].message.content.as_ref().unwrap(),
c.usage.avg_prompt_tok_per_sec,
c.usage.avg_compl_tok_per_sec
),
_ => unreachable!(),
}
};
dbg!(avg_compl_tok_per_sec, avg_prompt_tok_per_sec);
if *avg_compl_tok_per_sec > max_prompt {
max_prompt = *avg_prompt_tok_per_sec;
}
if *avg_compl_tok_per_sec > max_completion {
max_completion = *avg_compl_tok_per_sec;
}
}
println!("Individual sequence stats: {max_prompt} max PP T/s, {max_completion} max TG T/s");

Ok(())
}

#[tokio::main]
async fn main() -> anyhow::Result<()> {
bench_mistralrs(10).await
}
Loading