feat(router): ask hf.co for pipelinetag to decide on compat_return_full_text (#89)

This commit is contained in:
OlivierDehaene 2023-02-28 10:19:32 +01:00 committed by GitHub
parent 21340f24ba
commit 4e685d907e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 87 additions and 29 deletions

1
Cargo.lock generated
View File

@ -2268,6 +2268,7 @@ dependencies = [
"opentelemetry-otlp", "opentelemetry-otlp",
"parking_lot", "parking_lot",
"rand", "rand",
"reqwest",
"serde", "serde",
"serde_json", "serde_json",
"text-generation-client", "text-generation-client",

View File

@ -26,6 +26,7 @@ opentelemetry = { version = "0.18.0", features = ["rt-tokio"] }
opentelemetry-otlp = "0.11.0" opentelemetry-otlp = "0.11.0"
parking_lot = "0.12.1" parking_lot = "0.12.1"
rand = "0.8.5" rand = "0.8.5"
reqwest = { version = "0.11.14", features = [] }
serde = "1.0.152" serde = "1.0.152"
serde_json = "1.0.93" serde_json = "1.0.93"
thiserror = "1.0.38" thiserror = "1.0.38"

View File

@ -40,13 +40,16 @@ pub(crate) struct GenerateParameters {
example = 0.95 example = 0.95
)] )]
pub top_p: Option<f32>, pub top_p: Option<f32>,
#[serde(default = "default_do_sample")] #[serde(default)]
#[schema(default = "false", example = true)] #[schema(default = "false", example = true)]
pub do_sample: bool, pub do_sample: bool,
#[serde(default = "default_max_new_tokens")] #[serde(default = "default_max_new_tokens")]
#[schema(exclusive_minimum = 0, exclusive_maximum = 512, default = "20")] #[schema(exclusive_minimum = 0, exclusive_maximum = 512, default = "20")]
pub max_new_tokens: u32, pub max_new_tokens: u32,
#[serde(default)] #[serde(default)]
#[schema(default = "None", example = false)]
pub return_full_text: Option<bool>,
#[serde(default)]
#[schema(inline, max_items = 4, example = json ! (["photographer"]))] #[schema(inline, max_items = 4, example = json ! (["photographer"]))]
pub stop: Vec<String>, pub stop: Vec<String>,
#[serde(default)] #[serde(default)]
@ -56,10 +59,6 @@ pub(crate) struct GenerateParameters {
pub seed: Option<u64>, pub seed: Option<u64>,
} }
fn default_do_sample() -> bool {
false
}
fn default_max_new_tokens() -> u32 { fn default_max_new_tokens() -> u32 {
20 20
} }
@ -70,8 +69,9 @@ fn default_parameters() -> GenerateParameters {
repetition_penalty: None, repetition_penalty: None,
top_k: None, top_k: None,
top_p: None, top_p: None,
do_sample: default_do_sample(), do_sample: false,
max_new_tokens: default_max_new_tokens(), max_new_tokens: default_max_new_tokens(),
return_full_text: None,
stop: vec![], stop: vec![],
details: false, details: false,
seed: None, seed: None,

View File

@ -87,7 +87,7 @@ fn main() -> Result<(), std::io::Error> {
// This will only be used to validate payloads // This will only be used to validate payloads
// //
// We need to download it outside of the Tokio runtime // We need to download it outside of the Tokio runtime
let tokenizer = Tokenizer::from_pretrained(tokenizer_name, None).unwrap(); let tokenizer = Tokenizer::from_pretrained(tokenizer_name.clone(), None).unwrap();
// Launch Tokio runtime // Launch Tokio runtime
tokio::runtime::Builder::new_multi_thread() tokio::runtime::Builder::new_multi_thread()
@ -97,6 +97,27 @@ fn main() -> Result<(), std::io::Error> {
.block_on(async { .block_on(async {
init_logging(otlp_endpoint, json_output); init_logging(otlp_endpoint, json_output);
// Get pipeline tag
let model_info = reqwest::get(format!(
"https://huggingface.co/api/models/{tokenizer_name}"
))
.await
.expect("Could not connect to hf.co")
.text()
.await
.expect("error when retrieving model info from hf.co");
let model_info: serde_json::Value =
serde_json::from_str(&model_info).expect("unable to parse model info");
// if pipeline-tag == text-generation we default to return_full_text = true
let compat_return_full_text = match model_info.get("pipeline_tag") {
None => {
tracing::warn!("no pipeline tag found for model {tokenizer_name}");
false
}
Some(pipeline_tag) => pipeline_tag.as_str() == Some("text-generation"),
};
// Instantiate sharded client from the master unix socket // Instantiate sharded client from the master unix socket
let mut sharded_client = ShardedClient::connect_uds(master_shard_uds_path) let mut sharded_client = ShardedClient::connect_uds(master_shard_uds_path)
.await .await
@ -113,6 +134,7 @@ fn main() -> Result<(), std::io::Error> {
// Run server // Run server
server::run( server::run(
compat_return_full_text,
max_concurrent_requests, max_concurrent_requests,
max_stop_sequences, max_stop_sequences,
max_input_length, max_input_length,

View File

@ -29,11 +29,18 @@ use utoipa_swagger_ui::SwaggerUi;
/// Compatibility route with api-inference and AzureML /// Compatibility route with api-inference and AzureML
#[instrument(skip(infer))] #[instrument(skip(infer))]
async fn compat_generate( async fn compat_generate(
default_return_full_text: Extension<bool>,
infer: Extension<Infer>, infer: Extension<Infer>,
req: Json<CompatGenerateRequest>, req: Json<CompatGenerateRequest>,
) -> Result<impl IntoResponse, (StatusCode, Json<ErrorResponse>)> { ) -> Result<impl IntoResponse, (StatusCode, Json<ErrorResponse>)> {
let mut req = req.0;
// default return_full_text given the pipeline_tag
if req.parameters.return_full_text.is_none() {
req.parameters.return_full_text = Some(default_return_full_text.0)
}
// switch on stream // switch on stream
let req = req.0;
if req.stream { if req.stream {
Ok(generate_stream(infer, Json(req.into())) Ok(generate_stream(infer, Json(req.into()))
.await .await
@ -63,6 +70,7 @@ async fn health(infer: Extension<Infer>) -> Result<(), (StatusCode, Json<ErrorRe
top_p: None, top_p: None,
do_sample: false, do_sample: false,
max_new_tokens: 1, max_new_tokens: 1,
return_full_text: None,
stop: Vec::new(), stop: Vec::new(),
details: false, details: false,
seed: None, seed: None,
@ -81,13 +89,13 @@ async fn health(infer: Extension<Infer>) -> Result<(), (StatusCode, Json<ErrorRe
responses( responses(
(status = 200, description = "Generated Text", body = GenerateResponse), (status = 200, description = "Generated Text", body = GenerateResponse),
(status = 424, description = "Generation Error", body = ErrorResponse, (status = 424, description = "Generation Error", body = ErrorResponse,
example = json!({"error": "Request failed during generation"})), example = json ! ({"error": "Request failed during generation"})),
(status = 429, description = "Model is overloaded", body = ErrorResponse, (status = 429, description = "Model is overloaded", body = ErrorResponse,
example = json!({"error": "Model is overloaded"})), example = json ! ({"error": "Model is overloaded"})),
(status = 422, description = "Input validation error", body = ErrorResponse, (status = 422, description = "Input validation error", body = ErrorResponse,
example = json!({"error": "Input validation error"})), example = json ! ({"error": "Input validation error"})),
(status = 500, description = "Incomplete generation", body = ErrorResponse, (status = 500, description = "Incomplete generation", body = ErrorResponse,
example = json!({"error": "Incomplete generation"})), example = json ! ({"error": "Incomplete generation"})),
) )
)] )]
#[instrument( #[instrument(
@ -108,8 +116,14 @@ async fn generate(
let span = tracing::Span::current(); let span = tracing::Span::current();
let start_time = Instant::now(); let start_time = Instant::now();
// Inference let mut add_prompt = None;
if req.0.parameters.return_full_text.unwrap_or(false) {
add_prompt = Some(req.0.inputs.clone());
}
let details = req.0.parameters.details; let details = req.0.parameters.details;
// Inference
let response = infer.generate(req.0).await?; let response = infer.generate(req.0).await?;
// Token details // Token details
@ -176,8 +190,13 @@ async fn generate(
); );
// Send response // Send response
let mut output_text = response.generated_text.text;
if let Some(prompt) = add_prompt {
output_text = prompt + &output_text;
}
let response = GenerateResponse { let response = GenerateResponse {
generated_text: response.generated_text.text, generated_text: output_text,
details, details,
}; };
Ok((headers, Json(response))) Ok((headers, Json(response)))
@ -191,19 +210,19 @@ async fn generate(
request_body = GenerateRequest, request_body = GenerateRequest,
responses( responses(
(status = 200, description = "Generated Text", body = StreamResponse, (status = 200, description = "Generated Text", body = StreamResponse,
content_type="text/event-stream"), content_type = "text/event-stream"),
(status = 424, description = "Generation Error", body = ErrorResponse, (status = 424, description = "Generation Error", body = ErrorResponse,
example = json!({"error": "Request failed during generation"}), example = json ! ({"error": "Request failed during generation"}),
content_type="text/event-stream"), content_type = "text/event-stream"),
(status = 429, description = "Model is overloaded", body = ErrorResponse, (status = 429, description = "Model is overloaded", body = ErrorResponse,
example = json!({"error": "Model is overloaded"}), example = json ! ({"error": "Model is overloaded"}),
content_type="text/event-stream"), content_type = "text/event-stream"),
(status = 422, description = "Input validation error", body = ErrorResponse, (status = 422, description = "Input validation error", body = ErrorResponse,
example = json!({"error": "Input validation error"}), example = json ! ({"error": "Input validation error"}),
content_type="text/event-stream"), content_type = "text/event-stream"),
(status = 500, description = "Incomplete generation", body = ErrorResponse, (status = 500, description = "Incomplete generation", body = ErrorResponse,
example = json!({"error": "Incomplete generation"}), example = json ! ({"error": "Incomplete generation"}),
content_type="text/event-stream"), content_type = "text/event-stream"),
) )
)] )]
#[instrument( #[instrument(
@ -228,6 +247,11 @@ async fn generate_stream(
// Inference // Inference
let mut end_reached = false; let mut end_reached = false;
let mut error = false; let mut error = false;
let mut add_prompt = None;
if req.0.parameters.return_full_text.unwrap_or(false) {
add_prompt = Some(req.0.inputs.clone());
}
let details = req.0.parameters.details; let details = req.0.parameters.details;
match infer.generate_stream(req.0).instrument(info_span!(parent: &span, "async_stream")).await { match infer.generate_stream(req.0).instrument(info_span!(parent: &span, "async_stream")).await {
@ -294,20 +318,28 @@ async fn generate_stream(
// StreamResponse // StreamResponse
end_reached = true; end_reached = true;
let mut output_text = generated_text.text;
if let Some(prompt) = add_prompt {
output_text = prompt + &output_text;
}
let stream_token = StreamResponse { let stream_token = StreamResponse {
token, token,
generated_text: Some(generated_text.text), generated_text: Some(output_text),
details details
}; };
yield Ok(Event::default().json_data(stream_token).unwrap()) yield Ok(Event::default().json_data(stream_token).unwrap());
break;
} }
} }
} }
// yield error // yield error
Err(err) => { Err(err) => {
error = true; error = true;
yield Ok(Event::from(err)) yield Ok(Event::from(err));
break;
} }
} }
} }
@ -315,7 +347,7 @@ async fn generate_stream(
// yield error // yield error
Err(err) => { Err(err) => {
error = true; error = true;
yield Ok(Event::from(err)) yield Ok(Event::from(err));
} }
} }
// Check if generation reached the end // Check if generation reached the end
@ -324,7 +356,7 @@ async fn generate_stream(
let err = InferError::IncompleteGeneration; let err = InferError::IncompleteGeneration;
metrics::increment_counter!("tgi_request_failure", "err" => "incomplete"); metrics::increment_counter!("tgi_request_failure", "err" => "incomplete");
tracing::error!("{err}"); tracing::error!("{err}");
yield Ok(Event::from(err)) yield Ok(Event::from(err));
} }
}; };
@ -345,6 +377,7 @@ async fn metrics(prom_handle: Extension<PrometheusHandle>) -> String {
/// Serving method /// Serving method
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
pub async fn run( pub async fn run(
compat_return_full_text: bool,
max_concurrent_requests: usize, max_concurrent_requests: usize,
max_stop_sequences: usize, max_stop_sequences: usize,
max_input_length: usize, max_input_length: usize,
@ -429,8 +462,9 @@ pub async fn run(
.route("/generate_stream", post(generate_stream)) .route("/generate_stream", post(generate_stream))
.route("/", get(health)) .route("/", get(health))
.route("/health", get(health)) .route("/health", get(health))
.layer(Extension(infer))
.route("/metrics", get(metrics)) .route("/metrics", get(metrics))
.layer(Extension(compat_return_full_text))
.layer(Extension(infer))
.layer(Extension(prom_handle)) .layer(Extension(prom_handle))
.layer(opentelemetry_tracing_layer()) .layer(opentelemetry_tracing_layer())
.layer(cors_layer); .layer(cors_layer);