hf_text-generation-inference/load_tests/load_test.py

150 lines
6.5 KiB
Python

import os
import time
import traceback
from benchmarks.engine import TGIDockerRunner
from benchmarks.k6 import K6Config, K6Benchmark, K6ConstantArrivalRateExecutor, K6ConstantVUsExecutor, ExecutorInputType
from loguru import logger
import pandas as pd
import GPUtil
from parse_load_test import TestType, parse_json_files, plot_metrics
def run_full_test(engine_name: str):
vus_concurrences = list(range(0, 1024, 40))
vus_concurrences[0] = 1
vus_concurrences.append(1024)
arrival_rates = list(range(0, 200, 10))
arrival_rates[0] = 1
arrival_rates.append(200)
for input_type in [ExecutorInputType.SHAREGPT_CONVERSATIONS, ExecutorInputType.CONSTANT_TOKENS]:
for c in arrival_rates:
logger.info(f'Running k6 with constant arrival rate for {c} req/s with input type {input_type.value}')
k6_executor = K6ConstantArrivalRateExecutor(2000, c, '60s', input_type)
k6_config = K6Config(f'{engine_name}', k6_executor, input_num_tokens=200)
benchmark = K6Benchmark(k6_config, f'results/{input_type.value}/')
benchmark.run()
for c in vus_concurrences:
logger.info(f'Running k6 with constant VUs with concurrency {c} with input type {input_type.value}')
k6_executor = K6ConstantVUsExecutor(c, '60s', input_type)
k6_config = K6Config(f'{engine_name}', k6_executor, input_num_tokens=200)
benchmark = K6Benchmark(k6_config, f'results/{input_type.value}/')
benchmark.run()
def merge_previous_results(csv_path: str, df: pd.DataFrame, version_id: str) -> pd.DataFrame:
if os.path.exists(csv_path):
previous_df = pd.read_csv(csv_path)
previous_df['name'] = previous_df['name'].str.replace('tgi', f'tgi_{version_id}')
df = pd.concat([previous_df, df])
return df
def percentage_diff(x):
# in case we have no value to compare
if len(x) < 2:
return 0
xsum = (x[1] + x[0])
if xsum == 0:
return 0
return abs(x[1] - x[0]) / (xsum / 2) * 100
def compute_avg_delta(df: pd.DataFrame, metric: str, test_type: TestType) -> float:
if test_type == TestType.CONSTANT_VUS:
param = 'vus'
elif test_type == TestType.CONSTANT_ARRIVAL_RATE:
param = 'rate'
else:
return 0.0
filtered = df[df[param].notna()].groupby(param)[metric]
return filtered.apply(lambda x: percentage_diff(sorted(x.values))).mean()
def compute_avg_table(df: pd.DataFrame):
# only keep the current version and semver rows for comparison
df = df[df['name'].str.startswith(('tgi', 'v'))]
# compute the average delta for each metric and test type
avg_table = pd.DataFrame()
for input_type in [ExecutorInputType.SHAREGPT_CONVERSATIONS, ExecutorInputType.CONSTANT_TOKENS]:
df_avg = df[df['input_type'] == input_type.value]
for test_type in [TestType.CONSTANT_VUS, TestType.CONSTANT_ARRIVAL_RATE]:
for metric in df.columns:
if metric in ['inter_token_latency', 'time_to_first_token', 'end_to_end_latency',
'tokens_throughput', 'requests_ok', 'error_rate']:
avg_delta = compute_avg_delta(df_avg, metric, test_type)
avg_table = pd.concat([avg_table, pd.DataFrame(
{'metric': metric, 'input_type': input_type.value, 'test_type': test_type.value,
'avg_delta': avg_delta}, index=[0])])
# write the result to a markdown formatted table in a file
path = os.path.join(os.getcwd(), 'output', f'benchmark_avg_delta.md')
avg_table.to_markdown(path, index=False, tablefmt='github',
headers=['Metric', 'Input Type', 'Test Type', 'Avg Delta (%)'])
def main():
model = 'Qwen/Qwen2-7B'
runner = TGIDockerRunner(model)
max_concurrent_requests = 8000
# run TGI
try:
logger.info('Running TGI')
runner.run([('max-concurrent-requests', max_concurrent_requests)])
logger.info('TGI is running')
run_full_test('tgi')
except Exception as e:
logger.error(f'Error: {e}')
# print the stack trace
print(traceback.format_exc())
finally:
runner.stop()
time.sleep(5)
all_dfs = pd.DataFrame()
for input_type in [ExecutorInputType.SHAREGPT_CONVERSATIONS, ExecutorInputType.CONSTANT_TOKENS]:
for test_type in [TestType.CONSTANT_VUS, TestType.CONSTANT_ARRIVAL_RATE]:
directory = os.path.join('results', input_type.value.lower(), test_type.value.lower())
# check if directory exists
if not os.path.exists(directory):
logger.error(f'Directory {directory} does not exist')
continue
dfs = parse_json_files(directory, test_type)
# create output directory if it does not exist
os.makedirs('output', exist_ok=True)
# save the data to a csv file
path = os.path.join(os.getcwd(), 'output', f'{input_type.value.lower()}_{test_type.value.lower()}.csv')
dfs.to_csv(path)
# check if we have previous results CSV file by listing /tmp/artifacts/<input_type> directory,
# merge them if they exist
prev_root = '/tmp/artifacts'
try:
if os.path.exists(prev_root):
directories = [item for item in os.listdir(prev_root) if
os.path.isdir(os.path.join(prev_root, item))]
for d in directories:
for f in os.listdir(f'{prev_root}/{d}'):
if f.endswith(f'{input_type.value.lower()}_{test_type.value.lower()}.csv'):
csv_path = os.path.join('/tmp/artifacts', d, f)
# only keep short commit hash
if len(d) > 7:
d = d[:7]
dfs = merge_previous_results(csv_path, dfs, d)
except Exception as e:
logger.error(f'Error while merging previous results, skipping: {e}')
plot_metrics(f'{model} {get_gpu_names()}', dfs, test_type,
f'output/{input_type.value.lower()}_{test_type.value.lower()}')
all_dfs = pd.concat([all_dfs, dfs])
compute_avg_table(all_dfs)
def get_gpu_names() -> str:
gpus = GPUtil.getGPUs()
if len(gpus) == 0:
return ''
return f'{len(gpus)}x{gpus[0].name if gpus else "No GPU available"}'
if __name__ == '__main__':
main()