import { and, eq, lt } from 'drizzle-orm'; import { Server as HttpServer } from 'http'; import { Server as SocketIOServer } from 'socket.io'; import { z } from 'zod'; import { db } from '../db/client'; import { deviceCommands, devices } from '../db/schema'; import { hasRequiredTables } from '../utils/db-schema'; import { verifyDeviceToken } from '../utils/device-token'; const HEARTBEAT_INTERVAL_MS = 15_000; const RETRY_INTERVAL_MS = 5_000; const RETRY_DISPATCH_DELAY_MS = 10_000; const MAX_RETRIES = 3; const commandAckSchema = z.object({ commandId: z.string().uuid(), status: z.enum(['acknowledged', 'rejected']), error: z.string().optional(), }); const roomForDevice = (deviceId: string): string => `device:${deviceId}`; let io: SocketIOServer | null = null; let retryTimer: ReturnType | null = null; const countSocketsForDevice = (deviceId: string): number => { if (!io) { return 0; } return io.sockets.adapter.rooms.get(roomForDevice(deviceId))?.size ?? 0; }; export const isDeviceOnline = (deviceId: string): boolean => countSocketsForDevice(deviceId) > 0; export const sendRealtimeToDevice = ( deviceId: string, eventName: string, payload: Record, ): boolean => { if (!io || !isDeviceOnline(deviceId)) { return false; } io.to(roomForDevice(deviceId)).emit(eventName, payload); return true; }; const markDevicePresence = async (deviceId: string, status: 'online' | 'offline') => { const now = new Date(); await db .update(devices) .set({ status, lastSeenAt: now, updatedAt: now, }) .where(eq(devices.id, deviceId)); }; const emitCommand = (command: { id: string; sourceDeviceId: string; targetDeviceId: string; commandType: string; payload: Record | null; createdAt: Date; }): boolean => { if (!io) { return false; } if (countSocketsForDevice(command.targetDeviceId) === 0) { return false; } io.to(roomForDevice(command.targetDeviceId)).emit('command:received', { commandId: command.id, sourceDeviceId: command.sourceDeviceId, commandType: command.commandType, payload: command.payload, createdAt: command.createdAt, }); return true; }; export const dispatchCommandById = async (commandId: string): Promise => { const command = await db.query.deviceCommands.findFirst({ where: eq(deviceCommands.id, commandId), }); if (!command) { return; } const now = new Date(); const delivered = emitCommand({ id: command.id, sourceDeviceId: command.sourceDeviceId, targetDeviceId: command.targetDeviceId, commandType: command.commandType, payload: command.payload, createdAt: command.createdAt, }); await db .update(deviceCommands) .set({ status: delivered ? 'sent' : 'queued', lastDispatchedAt: now, retryCount: delivered ? command.retryCount : command.retryCount, updatedAt: now, error: delivered ? null : 'target device offline', }) .where(eq(deviceCommands.id, command.id)); }; const retryPendingCommands = async () => { const threshold = new Date(Date.now() - RETRY_DISPATCH_DELAY_MS); const pending = await db.query.deviceCommands.findMany({ where: and(eq(deviceCommands.status, 'sent'), lt(deviceCommands.lastDispatchedAt, threshold)), limit: 100, orderBy: (fields, operators) => [operators.asc(fields.createdAt)], }); for (const command of pending) { const now = new Date(); const nextRetryCount = command.retryCount + 1; if (nextRetryCount > MAX_RETRIES) { await db .update(deviceCommands) .set({ status: 'failed', updatedAt: now, error: 'max retries exceeded', }) .where(eq(deviceCommands.id, command.id)); continue; } const delivered = emitCommand({ id: command.id, sourceDeviceId: command.sourceDeviceId, targetDeviceId: command.targetDeviceId, commandType: command.commandType, payload: command.payload, createdAt: command.createdAt, }); await db .update(deviceCommands) .set({ status: delivered ? 'sent' : 'queued', lastDispatchedAt: now, retryCount: nextRetryCount, updatedAt: now, error: delivered ? null : 'target device offline', }) .where(eq(deviceCommands.id, command.id)); } }; export const setupRealtimeGateway = (server: HttpServer): SocketIOServer => { if (io) { return io; } io = new SocketIOServer(server, { cors: { origin: process.env.BETTER_AUTH_TRUSTED_ORIGINS?.split(',').map((origin) => origin.trim()).filter(Boolean) ?? true, credentials: true, }, pingInterval: HEARTBEAT_INTERVAL_MS, }); io.use(async (socket, next) => { try { const tokenFromAuth = typeof socket.handshake.auth?.token === 'string' ? socket.handshake.auth.token : null; const authorization = socket.handshake.headers.authorization; const tokenFromHeader = typeof authorization === 'string' && authorization.startsWith('Bearer ') ? authorization.slice('Bearer '.length).trim() : null; const token = tokenFromAuth ?? tokenFromHeader; if (!token) { next(new Error('Missing device token')); return; } const payload = verifyDeviceToken(token); if (!payload) { next(new Error('Invalid device token')); return; } const device = await db.query.devices.findFirst({ where: and(eq(devices.id, payload.deviceId), eq(devices.userId, payload.userId)), }); if (!device) { next(new Error('Device not found')); return; } if (device.role !== payload.role) { next(new Error('Token role does not match device role')); return; } socket.data.deviceAuth = payload; next(); } catch (error) { next(error instanceof Error ? error : new Error('Authentication failed')); } }); io.on('connection', async (socket) => { const auth = socket.data.deviceAuth as { userId: string; deviceId: string; role: 'camera' | 'client' }; const deviceRoom = roomForDevice(auth.deviceId); socket.join(deviceRoom); await markDevicePresence(auth.deviceId, 'online'); socket.emit('connected', { deviceId: auth.deviceId, role: auth.role, serverTime: new Date().toISOString(), }); socket.on('heartbeat', async () => { await markDevicePresence(auth.deviceId, 'online'); socket.emit('heartbeat:ack', { at: new Date().toISOString() }); }); socket.on('command:ack', async (input) => { const parsed = commandAckSchema.safeParse(input); if (!parsed.success) { socket.emit('error:command_ack', { message: 'Invalid command ack payload', errors: parsed.error.flatten(), }); return; } const command = await db.query.deviceCommands.findFirst({ where: eq(deviceCommands.id, parsed.data.commandId), }); if (!command) { socket.emit('error:command_ack', { message: 'Command not found' }); return; } if (command.targetDeviceId !== auth.deviceId) { socket.emit('error:command_ack', { message: 'Not authorized to ack this command' }); return; } const now = new Date(); await db .update(deviceCommands) .set({ status: parsed.data.status, acknowledgedAt: now, updatedAt: now, error: parsed.data.status === 'rejected' ? parsed.data.error ?? 'rejected' : null, }) .where(eq(deviceCommands.id, command.id)); io?.to(roomForDevice(command.sourceDeviceId)).emit('command:status', { commandId: command.id, status: parsed.data.status, acknowledgedAt: now.toISOString(), error: parsed.data.error, }); }); socket.on('disconnect', async () => { // Small delay allows fast reconnects to reuse presence without flapping. setTimeout(async () => { if (countSocketsForDevice(auth.deviceId) === 0) { await markDevicePresence(auth.deviceId, 'offline'); } }, 500); }); }); if (!retryTimer) { const requiredTables = ['device_commands']; void (async () => { const ready = await hasRequiredTables(requiredTables); if (!ready) { console.warn( `[command retry] skipped startup because required tables are missing (${requiredTables.join(', ')}). Run migrations and restart.`, ); return; } retryTimer = setInterval(() => { retryPendingCommands().catch((error) => { console.error('Failed retrying pending commands', error); }); }, RETRY_INTERVAL_MS); })().catch((error) => { console.error('Failed initializing command retry worker', error); }); } return io; };