Pr 2352 ci branch (#2382)
* Fix unsigned integer underflow Passing --max-batch-size to the launcher actually had no effect because after a few requests the max_size passed to State::next_batch would underflow becoming a largo positive number. In the scheduler, as soon as the cached batch size reached the max_batch_size the max_size passed to next_batch becomes 0. Since the only check in that funcion is ``` if Some(batch_requests.len()) == max_size { break; } ``` and it's called after the `batch_requests.len()` has become 1, it doesn't do anything to prevent more than 0 requests from being batched. Now we have cached batch in the server that is large than max_batch_size and `max_size - batch_size as usize` underflows. Signed-off-by: Max de Bayser <mbayser@br.ibm.com> * fix: update v3 scheduler and ensure max_batch_size > 0 --------- Signed-off-by: Max de Bayser <mbayser@br.ibm.com> Co-authored-by: Max de Bayser <mbayser@br.ibm.com>
This commit is contained in:
parent
cb3ae30284
commit
6d06473cf4
|
@ -168,7 +168,8 @@ pub(crate) async fn batching_task(
|
||||||
};
|
};
|
||||||
|
|
||||||
let token_budget = max_batch_total_tokens.saturating_sub(batch_max_tokens);
|
let token_budget = max_batch_total_tokens.saturating_sub(batch_max_tokens);
|
||||||
let max_size = max_batch_size.map(|max_size| max_size - batch_size as usize);
|
let max_size =
|
||||||
|
max_batch_size.map(|max_size| max_size.saturating_sub(batch_size as usize));
|
||||||
|
|
||||||
// Try to get a new batch
|
// Try to get a new batch
|
||||||
if let Some((mut new_entries, new_batch, span)) = queue
|
if let Some((mut new_entries, new_batch, span)) = queue
|
||||||
|
|
|
@ -150,6 +150,14 @@ async fn main() -> Result<(), RouterError> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if let Some(max_batch_size) = max_batch_size {
|
||||||
|
if max_batch_size == 0 {
|
||||||
|
return Err(RouterError::ArgumentValidation(
|
||||||
|
"`max_batch_size` must be > 0".to_string(),
|
||||||
|
));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
let (backend, _backend_info) = connect_backend(
|
let (backend, _backend_info) = connect_backend(
|
||||||
max_input_tokens,
|
max_input_tokens,
|
||||||
max_total_tokens,
|
max_total_tokens,
|
||||||
|
|
|
@ -226,6 +226,13 @@ impl State {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if let Some(max_size) = max_size {
|
||||||
|
if max_size == 0 {
|
||||||
|
tracing::debug!("No capacity");
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Pad prefill_token_budget to be a multiple of block size
|
// Pad prefill_token_budget to be a multiple of block size
|
||||||
let prefill_token_budget =
|
let prefill_token_budget =
|
||||||
((prefill_token_budget + self.block_size - 1) / self.block_size) * self.block_size;
|
((prefill_token_budget + self.block_size - 1) / self.block_size) * self.block_size;
|
||||||
|
|
|
@ -205,6 +205,13 @@ impl State {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if let Some(max_size) = max_size {
|
||||||
|
if max_size == 0 {
|
||||||
|
tracing::debug!("No capacity");
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Pad prefill_token_budget to be a multiple of block size
|
// Pad prefill_token_budget to be a multiple of block size
|
||||||
let prefill_token_budget =
|
let prefill_token_budget =
|
||||||
((prefill_token_budget + self.block_size - 1) / self.block_size) * self.block_size;
|
((prefill_token_budget + self.block_size - 1) / self.block_size) * self.block_size;
|
||||||
|
|
|
@ -161,8 +161,8 @@ pub(crate) async fn batching_task(
|
||||||
};
|
};
|
||||||
|
|
||||||
let token_budget = max_batch_total_tokens.saturating_sub(batch_max_tokens);
|
let token_budget = max_batch_total_tokens.saturating_sub(batch_max_tokens);
|
||||||
let max_size = max_batch_size.map(|max_size| max_size - batch_size as usize);
|
let max_size =
|
||||||
|
max_batch_size.map(|max_size| max_size.saturating_sub(batch_size as usize));
|
||||||
// Try to get a new batch
|
// Try to get a new batch
|
||||||
if let Some((mut new_entries, new_batch, span)) = queue
|
if let Some((mut new_entries, new_batch, span)) = queue
|
||||||
.next_batch(min_size, max_size, max_batch_prefill_tokens, token_budget)
|
.next_batch(min_size, max_size, max_batch_prefill_tokens, token_budget)
|
||||||
|
|
Loading…
Reference in New Issue