/** * /server/index.js * * VozTranslate backend: * - Health check * - Voice enrollment (qwen-voice-enrollment) with audio transcoding to WAV PCM16 mono 24k * - Realtime speech translation (qwenKO) over WebSocket * - TTS (system or cloned) over realtime WS * * Env: * DASHSCOPE_API_KEY (required) * DASHSCOPE_REGION=intl|cn (default intl) * PORT=6060 * CORS_ORIGIN=https://playown.guide * FFMPEG_PATH=/usr/bin/ffmpeg (optional) */ import "dotenv/config"; import http from "node:http"; import os from "node:os"; import path from "node:path"; import crypto from "node:crypto"; import { spawn } from "node:child_process"; import fs from "node:fs/promises"; import { existsSync } from "node:fs"; import express from "express"; import cors from "cors"; import helmet from "helmet"; import multer from "multer"; import pino from "pino"; import pinoHttp from "pino-http"; import { WebSocketServer, WebSocket } from "ws"; import { z } from "zod"; import { fetch } from "undici"; const log = pino({ level: process.env.LOG_LEVEL || "info" }); process.on("unhandledRejection", (err) => { log.error({ err }, "unhandledRejection"); }); process.on("uncaughtException", (err) => { log.error({ err }, "uncaughtException"); }); const ENV = { apiKey: process.env.DASHSCOPE_API_KEY || "", region: (process.env.DASHSCOPE_REGION || "intl").toLowerCase(), port: Number(process.env.PORT || 8080), corsOrigin: process.env.CORS_ORIGIN || "http://localhost:3000", }; if (!ENV.apiKey || !ENV.apiKey.startsWith("sk-")) { log.warn('DASHSCOPE_API_KEY missing or does not look like a key (expected sk-...)'); } const DASH = { httpBase: ENV.region === "cn" ? "https://dashscope.aliyuncs.com" : "https://dashscope-intl.aliyuncs.com", wsBase: ENV.region === "cn" ? "wss://dashscope.aliyuncs.com" : "wss://dashscope-intl.aliyuncs.com", }; const SYSTEM_VOICES = ["Ethan", "Kai", "Aiden", "Chelsie", "Jennifer"]; const MODELS = { liveTranslate: "qwen3-livetranslate-flash-realtime", asrRealtime: "qwen3-asr-flash-realtime", ttsSystemRealtime: "qwen3-tts-flash-realtime", ttsClonedRealtime: "qwen3-tts-vc-realtime-2026-01-15", voiceEnrollment: "qwen-voice-enrollment", }; const app = express(); app.disable("x-powered-by"); app.use(helmet()); app.use(cors({ origin: ENV.corsOrigin, credentials: true })); app.use(express.json({ limit: "2mb" })); app.use(pinoHttp({ logger: log })); app.get("/api/health", (_req, res) => { res.json({ ok: true, region: ENV.region, dash_http_base: DASH.httpBase, dash_ws_base: DASH.wsBase, system_voices: SYSTEM_VOICES, now: new Date().toISOString(), }); }); /* ----------------------------- FFmpeg resolve ----------------------------- */ async function resolveFfmpegPath() { const envPath = process.env.FFMPEG_PATH; if (envPath && existsSync(envPath)) return envPath; try { const mod = await import("ffmpeg-static"); const p = (mod && (mod.default ?? mod)) || null; if (typeof p === "string" && existsSync(p)) return p; } catch { // ignore } const candidates = ["/usr/bin/ffmpeg", "/usr/local/bin/ffmpeg", "/snap/bin/ffmpeg"]; for (const p of candidates) if (existsSync(p)) return p; return null; } let ffmpegPathCached = null; /* ----------------------------- Upload / enroll ---------------------------- */ const upload = multer({ storage: multer.diskStorage({ destination: (_req, _file, cb) => cb(null, os.tmpdir()), filename: (_req, file, cb) => { const ext = (path.extname(file.originalname || "") || ".bin").toLowerCase(); cb(null, `enroll-${crypto.randomUUID()}${ext}`); }, }), limits: { fileSize: 25 * 1024 * 1024 }, }); const EnrollResponseSchema = z.object({ output: z.object({ voice: z.string(), target_model: z.string().optional(), }), request_id: z.string().optional(), }); function mkTmpPath(ext) { return path.join(os.tmpdir(), `tmp-${crypto.randomUUID()}${ext.startsWith(".") ? ext : `.${ext}`}`); } async function runFfmpegToWavPcm16Mono24k(inputPath, outputPath) { if (!ffmpegPathCached || !existsSync(ffmpegPathCached)) { ffmpegPathCached = await resolveFfmpegPath(); } if (!ffmpegPathCached || !existsSync(ffmpegPathCached)) { throw new Error("FFmpeg not found. Set FFMPEG_PATH or install ffmpeg on the server."); } const args = ["-y", "-i", inputPath, "-ac", "1", "-ar", "24000", "-sample_fmt", "s16", outputPath]; await new Promise((resolve, reject) => { const p = spawn(ffmpegPathCached, args, { stdio: ["ignore", "ignore", "pipe"] }); let stderr = ""; p.stderr.on("data", (d) => (stderr += d.toString())); p.on("close", (code) => { if (code === 0) resolve(); else reject(new Error(`ffmpeg failed (${code}): ${stderr.slice(0, 2000)}`)); }); p.on("error", reject); }); } app.post("/api/voice/enroll", upload.single("audio"), async (req, res) => { const uploadedPath = req.file?.path; let wavPath = null; try { if (!ENV.apiKey) return res.status(500).json({ error: "Server not configured: DASHSCOPE_API_KEY missing" }); if (!uploadedPath) return res.status(400).json({ error: 'Missing audio file field "audio"' }); const preferred_name = String(req.body?.preferred_name || "user_voice") .slice(0, 16) .replace(/[^0-9A-Za-z_]/g, "_"); const language = req.body?.language ? String(req.body.language) : undefined; const target_model = MODELS.ttsClonedRealtime; wavPath = mkTmpPath(".wav"); await runFfmpegToWavPcm16Mono24k(uploadedPath, wavPath); const wavBuf = await fs.readFile(wavPath); const dataUri = `data:audio/wav;base64,${Buffer.from(wavBuf).toString("base64")}`; const payload = { model: MODELS.voiceEnrollment, input: { action: "create", target_model, preferred_name, audio: { data: dataUri }, ...(language ? { language } : {}), }, }; const url = `${DASH.httpBase}/api/v1/services/audio/tts/customization`; const resp = await fetch(url, { method: "POST", headers: { Authorization: `Bearer ${ENV.apiKey}`, "Content-Type": "application/json" }, body: JSON.stringify(payload), }); const text = await resp.text(); if (!resp.ok) { log.warn({ status: resp.status, body: text }, "voice enroll failed"); return res.status(502).json({ error: "DashScope error", status: resp.status, details: text }); } const data = EnrollResponseSchema.parse(JSON.parse(text)); res.json({ voice: data.output.voice, target_model: data.output.target_model || target_model }); } catch (err) { log.error({ err }, "voice enroll exception"); res.status(500).json({ error: "Internal error", details: String(err?.message || err) }); } finally { try { if (uploadedPath) await fs.unlink(uploadedPath); } catch {} try { if (wavPath) await fs.unlink(wavPath); } catch {} } }); /* -------------------------- Dashscope realtime WS ------------------------- */ const server = http.createServer(app); function safeJsonParse(str) { try { return JSON.parse(str); } catch { return null; } } function dashscopeRealtimeUrl(model) { return `${DASH.wsBase}/api-ws/v1/realtime?model=${encodeURIComponent(model)}`; } function dashscopeRealtimeHeaders() { return { Authorization: `Bearer ${ENV.apiKey}`, "OpenAI-Beta": "realtime=v1", }; } function connectDashscopeRealtime({ model, sessionUpdate }) { const ws = new WebSocket(dashscopeRealtimeUrl(model), { headers: dashscopeRealtimeHeaders() }); ws.on("open", () => ws.send(JSON.stringify(sessionUpdate))); return ws; } /** * Compute "only the new suffix" to avoid duplicate UI growth. * - If curr startsWith(prev): delta = curr.slice(prev.length) * - Else: treat as reset (delta = curr) */ function computeDelta(prev, curr) { if (!curr) return { next: prev, delta: "" }; if (!prev) return { next: curr, delta: curr }; if (curr.startsWith(prev)) return { next: curr, delta: curr.slice(prev.length) }; return { next: curr, delta: curr }; // reset } function clampText(s, maxLen) { if (!s) return ""; return s.length <= maxLen ? s : s.slice(-maxLen); } function ttsStreamOnce({ text, model, voice, onAudioDelta, onDone, onError }) { const sessionUpdate = { event_id: `event_${Date.now()}`, type: "session.update", session: { mode: "server_commit", voice, output_audio_format: "pcm16", }, }; const ws = connectDashscopeRealtime({ model, sessionUpdate }); ws.on("message", (raw) => { const msg = safeJsonParse(String(raw)); const type = msg?.type; if (type === "response.audio.delta") { const delta = msg?.delta; if (delta) onAudioDelta(delta); return; } if (type === "response.done") { onDone?.(); ws.close(); return; } if (type === "error") { onError?.(msg); ws.close(); } }); ws.on("open", () => { ws.send(JSON.stringify({ type: "input_text_buffer.append", text })); ws.send(JSON.stringify({ type: "input_text_buffer.commit" })); ws.send(JSON.stringify({ type: "session.finish" })); }); ws.on("error", (e) => onError?.(e)); } /* ------------------------------ Client schemas ---------------------------- */ const ClientStartSchema = z .object({ type: z.literal("start"), sourceLang: z.string().min(2).max(16), targetLang: z.string().min(2).max(16), voiceMode: z.enum(["system", "cloned"]).default("system"), systemVoice: z.enum(SYSTEM_VOICES).default("Chelsie"), clonedVoice: z.string().optional(), audio: z.object({ sampleRate: z.number().int().min(8000).max(48000), format: z.literal("pcm16"), }), }) .superRefine((v, ctx) => { if (v.voiceMode === "cloned" && !v.clonedVoice) { ctx.addIssue({ code: z.ZodIssueCode.custom, message: "clonedVoice is required when voiceMode is 'cloned'. Enroll first.", path: ["clonedVoice"], }); } }); const ClientAudioSchema = z.object({ type: z.literal("audio"), chunk: z.string().min(1), }); const ClientStopSchema = z.object({ type: z.literal("stop"), }); /* ------------------------------ WS /ws/translate -------------------------- */ const wss = new WebSocketServer({ server, path: "/ws/translate" }); wss.on("connection", (client) => { let dashWs = null; let started = false; let cfg = null; let asrPartialFull = ""; let mtPartialFull = ""; let asrFinal = ""; let mtFinal = ""; const cleanup = () => { try { dashWs?.close(); } catch {} dashWs = null; }; client.on("close", cleanup); client.on("error", (e) => log.warn({ e }, "client ws error")); client.on("message", (raw) => { const msg = safeJsonParse(String(raw)); if (!msg) return; if (!started) { const parsed = ClientStartSchema.safeParse(msg); if (!parsed.success) { client.send(JSON.stringify({ type: "client.error", error: parsed.error.flatten() })); client.close(); return; } if (!ENV.apiKey) { client.send(JSON.stringify({ type: "client.error", error: "Server not configured (missing API key)" })); client.close(); return; } cfg = parsed.data; started = true; asrPartialFull = ""; mtPartialFull = ""; asrFinal = ""; mtFinal = ""; const sessionUpdate = { event_id: `event_${Date.now()}`, type: "session.update", session: { modalities: ["text"], input_audio_format: "pcm16", output_audio_format: "pcm16", input_audio_transcription: { model: MODELS.asrRealtime, language: cfg.sourceLang, }, translation: { language: cfg.targetLang }, }, }; dashWs = connectDashscopeRealtime({ model: MODELS.liveTranslate, sessionUpdate, }); dashWs.on("message", (dashRaw) => { const event = safeJsonParse(String(dashRaw)); if (!event?.type) return; const t = event.type; if (t === "conversation.item.input_audio_transcription.text") { const curr = String(event.text || ""); const { next, delta } = computeDelta(asrPartialFull, curr); asrPartialFull = clampText(next, 6000); if (delta) client.send(JSON.stringify({ type: "asr.delta", delta })); return; } if (t === "conversation.item.input_audio_transcription.completed") { const finalText = String(event.text || event.transcript || asrPartialFull || ""); asrFinal = clampText(finalText, 6000); asrPartialFull = ""; client.send(JSON.stringify({ type: "asr.final", text: asrFinal })); return; } if (t === "response.text.delta") { const delta = String(event.delta || ""); if (delta) client.send(JSON.stringify({ type: "mt.delta", delta })); mtPartialFull = clampText(mtPartialFull + delta, 8000); return; } if (t === "response.text.partial" || t === "response.text") { const curr = String(event.text || ""); const { next, delta } = computeDelta(mtPartialFull, curr); mtPartialFull = clampText(next, 8000); if (delta) client.send(JSON.stringify({ type: "mt.delta", delta })); return; } if (t === "response.text.done") { const finalText = String(event.text || mtPartialFull || ""); mtFinal = clampText(finalText, 8000); mtPartialFull = ""; client.send(JSON.stringify({ type: "mt.final", text: mtFinal })); return; } if (t === "error") { client.send(JSON.stringify({ type: "dash.error", error: event })); client.close(); } }); dashWs.on("open", () => client.send(JSON.stringify({ type: "session.ready", system_voices: SYSTEM_VOICES }))); dashWs.on("close", () => { if (!mtFinal) return; const isCloned = cfg.voiceMode === "cloned"; const model = isCloned ? MODELS.ttsClonedRealtime : MODELS.ttsSystemRealtime; const voice = isCloned ? cfg.clonedVoice : cfg.systemVoice; ttsStreamOnce({ text: mtFinal, model, voice, onAudioDelta: (delta) => client.send(JSON.stringify({ type: "audio.delta", pcm16_b64: delta })), onDone: () => client.send(JSON.stringify({ type: "audio.done" })), onError: (e) => client.send(JSON.stringify({ type: "tts.error", error: e })), }); }); dashWs.on("error", (e) => { client.send(JSON.stringify({ type: "dash.error", error: String(e?.message || e) })); client.close(); }); return; } if (msg.type === "audio") { const parsed = ClientAudioSchema.safeParse(msg); if (!parsed.success) return; const event = { event_id: `event_${Date.now()}`, type: "input_audio_buffer.append", audio: parsed.data.chunk, }; if (dashWs?.readyState === WebSocket.OPEN) dashWs.send(JSON.stringify(event)); return; } if (msg.type === "stop") { const parsed = ClientStopSchema.safeParse(msg); if (!parsed.success) return; try { if (dashWs?.readyState === WebSocket.OPEN) { dashWs.send(JSON.stringify({ type: "input_audio_buffer.commit" })); } } catch {} try { dashWs?.close(); } catch {} } }); }); /** * IMPORTANT: * - Bind to 127.0.0.1 so `curl http://127.0.0.1:7070/...` works * and matches Nginx `proxy_pass http://127.0.0.1:7070`. * - This avoids "IPv6-only listen" where 127.0.0.1 can't connect. */ server.listen(ENV.port, "127.0.0.1", () => { log.info({ port: ENV.port, cors: ENV.corsOrigin, region: ENV.region }, "backend listening"); });