52 lines
1.9 KiB
Python
52 lines
1.9 KiB
Python
import concurrent.futures
|
|
|
|
import requests
|
|
import tiktoken
|
|
|
|
from llm_server.cluster.cluster_config import cluster_config
|
|
from llm_server.config.global_config import GlobalConfig
|
|
from llm_server.logging import create_logger
|
|
|
|
|
|
def tokenize(prompt: str, backend_url: str) -> int:
|
|
assert backend_url
|
|
assert isinstance(backend_url, str)
|
|
|
|
if not prompt:
|
|
# The tokenizers have issues when the prompt is None.
|
|
return 0
|
|
assert isinstance(prompt, str)
|
|
|
|
logger = create_logger('tokenizer')
|
|
|
|
# The backend could have died between when the request was
|
|
# submitted and now, so let's double check it's still online.
|
|
backend_url = cluster_config.validate_backend(backend_url)
|
|
|
|
tokenizer = tiktoken.get_encoding("cl100k_base")
|
|
|
|
# Split the prompt into 2000 character chunks
|
|
chunk_size = 2000
|
|
chunks = [prompt[i:i + chunk_size] for i in range(0, len(prompt), chunk_size)]
|
|
|
|
# Define a function to send a chunk to the server
|
|
def send_chunk(chunk):
|
|
try:
|
|
r = requests.post(f'{backend_url}/tokenize', json={'input': chunk}, verify=GlobalConfig.get().verify_ssl, timeout=GlobalConfig.get().backend_generate_request_timeout)
|
|
j = r.json()
|
|
return j['length']
|
|
except Exception as e:
|
|
logger.debug(f'Failed to tokenize using VLLM - {e.__class__.__name__}')
|
|
return len(tokenizer.encode(chunk)) + 10
|
|
|
|
# Use a ThreadPoolExecutor to send all chunks to the server at once
|
|
with concurrent.futures.ThreadPoolExecutor() as executor:
|
|
future_to_chunk = {executor.submit(send_chunk, chunk): chunk for chunk in chunks}
|
|
for future in concurrent.futures.as_completed(future_to_chunk):
|
|
chunk = future_to_chunk[future]
|
|
try:
|
|
data = future.result()
|
|
except Exception as exc:
|
|
logger.warning('%r generated an exception: %s' % (chunk, exc))
|
|
return sum(future.result() for future in future_to_chunk)
|