downgrades zmq implementation for v5.x

This commit is contained in:
nai-degen 2023-06-03 22:00:44 -05:00
parent 780defab2f
commit 367a541c9c
3 changed files with 75 additions and 48 deletions

View File

@ -1,30 +1,42 @@
import { spawn, ChildProcess } from "child_process";
import { join } from "path";
import { Dealer } from "zeromq";
import { logger } from "../logger";
type Zmq = typeof import("zeromq");
const TOKENIZER_SOCKET = "tcp://localhost:5555";
const log = logger.child({ module: "claude-ipc" });
const pythonLog = logger.child({ module: "claude-python" });
let tokenizer: ChildProcess;
let socket: Dealer;
let socket: ReturnType<Zmq["socket"]>;
export async function init() {
log.info("Initializing Claude tokenizer IPC");
try {
tokenizer = launchTokenizer();
socket = new Dealer({ sendTimeout: 500 });
const zmq = await import("zeromq");
tokenizer = await launchTokenizer();
socket = zmq.socket("dealer");
socket.connect(TOKENIZER_SOCKET);
await socket.send(["init"]);
const response = await socket.receive();
socket.send(["init"]);
const response = await new Promise<string>((resolve) => {
const timeout = setTimeout(() => resolve("timeout"), 1000);
socket.once("message", (msg) => {
clearTimeout(timeout);
resolve(msg);
});
});
if (response === "timeout") {
throw new Error("Timeout waiting for init response");
}
if (response.toString() !== "ok") {
throw new Error("Unexpected init response");
}
// Start message pump
processMessages();
socket.on("message", onMessage);
socket.on("error", (err) => {
log.error({ err }, "Claude tokenizer socket error");
});
// Test tokenizer
const result = await requestTokenCount({
@ -35,8 +47,13 @@ export async function init() {
log.error({ result }, "Unexpected test token count");
throw new Error("Unexpected test token count");
}
} catch (e) {
log.error({ e }, "Failed to initialize Claude tokenizer");
} catch (err) {
log.error({ err: err.message }, "Failed to initialize Claude tokenizer");
if (process.env.NODE_ENV !== "production") {
console.error(
`\nClaude tokenizer failed to initialize.\nIf you want to use the tokenizer, see the Optional Dependencies documentation.\n`
);
}
return false;
}
log.info("Claude tokenizer IPC ready");
@ -60,7 +77,7 @@ export async function requestTokenCount({
}
log.debug({ requestId, chars: prompt.length }, "Requesting token count");
await socket.send(["tokenize", requestId, prompt]);
socket.send(["tokenize", requestId, prompt]);
log.debug({ requestId }, "Waiting for socket response");
return new Promise<number>(async (resolve, reject) => {
@ -79,44 +96,51 @@ export async function requestTokenCount({
log.warn({ requestId }, err);
reject(new Error(err));
}
}, 250); // TODO: make this configurable, some really crappy VMs might need more time
}, 500);
});
}
async function processMessages() {
if (!socket) {
throw new Error("Claude tokenizer is not initialized");
function onMessage(requestId: Buffer, tokens: Buffer) {
const request = pendingRequests.get(requestId.toString());
if (!request) {
log.error({ requestId }, "No pending request found for incoming message");
return;
}
log.debug("Starting message loop");
for await (const [requestId, tokens] of socket) {
const request = pendingRequests.get(requestId.toString());
if (!request) {
log.error({ requestId }, "No pending request found for incoming message");
continue;
request.resolve(Number(tokens.toString()));
}
async function launchTokenizer() {
return new Promise<ChildProcess>((resolve, reject) => {
let resolved = false;
const proc = spawn("python", [
"-u",
join(__dirname, "tokenization", "claude-tokenizer.py"),
]);
if (!proc) {
reject(new Error("Failed to spawn Claude tokenizer"));
}
request.resolve(Number(tokens.toString()));
}
}
function launchTokenizer() {
const proc = spawn("python", [
"-u",
join(__dirname, "tokenization", "claude-tokenizer.py"),
]);
if (!proc) {
throw new Error("Failed to start Claude tokenizer. Is python installed?");
}
proc.stdout!.on("data", (data) => {
pythonLog.info(data.toString());
});
proc.stderr!.on("data", (data) => {
pythonLog.error(data.toString());
});
proc.on("close", (code) => {
pythonLog.info(`Claude tokenizer exited with code ${code}`);
socket.close();
socket = undefined!;
tokenizer = undefined!;
});
return proc;
proc.stdout!.on("data", (data) => {
pythonLog.info(data.toString());
});
proc.stderr!.on("data", (data) => {
pythonLog.error(data.toString());
});
proc.on("close", (code) => {
pythonLog.info(`Claude tokenizer exited with code ${code}`);
socket?.close();
socket = undefined!;
tokenizer = undefined!;
if (code !== 0 && !resolved) {
resolved = true;
reject(new Error("Claude tokenizer exited immediately"));
}
});
// Wait a moment to catch any immediate errors (missing imports, etc)
setTimeout(() => {
if (!resolved) {
resolved = true;
resolve(proc);
}
}, 100);
});
}

View File

@ -19,12 +19,14 @@ def init(socket):
while True:
message = socket.recv_multipart()
routing_id, command = message
print(f"claude-tokenizer.py: received message {message}")
print(f"claude-tokenizer.py: received command {command}")
if command == b"init":
print("claude-tokenizer.py: initialized")
socket.send_multipart([routing_id, b"ok"])
break
except Exception as e:
print("claude-tokenizer.py: failed to initialize")
print("claude-tokenizer.py: failed to initialize ({e})")
return
message_processor(socket)

View File

@ -10,7 +10,8 @@
"skipDefaultLibCheck": true,
"outDir": "build",
"sourceMap": true,
"resolveJsonModule": true
"resolveJsonModule": true,
"useUnknownInCatchVariables": false
},
"include": ["src"],
"exclude": ["node_modules"],