import { and, desc, eq } from 'drizzle-orm'; import { Router } from 'express'; import { z } from 'zod'; import { db } from '../db/client'; import { commands, deviceLinks, devices } from '../db/schema'; import { simpleStreamingEnabled } from '../media/config'; import { requireAuth } from '../middleware/auth'; import { requireDeviceAuth } from '../middleware/device-auth'; import { dispatchCommandById } from '../realtime/gateway'; const router = Router(); const commandCreateSchema = z.object({ sourceDeviceId: z.string().uuid(), targetDeviceId: z.string().uuid(), commandType: z.enum(['start_stream', 'stop_stream', 'ping', 'update_settings']), payload: z.record(z.string(), z.unknown()).optional(), }); const ackSchema = z.object({ status: z.enum(['acknowledged', 'rejected']), error: z.string().optional(), }); const commandParamSchema = z.object({ commandId: z.string().uuid(), }); const querySchema = z.object({ sourceDeviceId: z.string().uuid().optional(), targetDeviceId: z.string().uuid().optional(), status: z.string().optional(), limit: z.coerce.number().int().min(1).max(100).default(25), }); router.post('/', requireAuth, async (req, res) => { const parsed = commandCreateSchema.safeParse(req.body); if (!parsed.success) { res.status(400).json({ message: 'Invalid request body', errors: parsed.error.flatten() }); return; } const authSession = req.auth; if (!authSession?.user?.id) { res.status(401).json({ message: 'Unauthorized' }); return; } if (simpleStreamingEnabled && parsed.data.commandType === 'start_stream') { res.status(409).json({ message: 'start_stream commands are disabled while SIMPLE_STREAMING is enabled; use /streams/request instead', }); return; } if (parsed.data.sourceDeviceId === parsed.data.targetDeviceId) { res.status(400).json({ message: 'sourceDeviceId and targetDeviceId must differ' }); return; } const [sourceDevice, targetDevice] = await Promise.all([ db.query.devices.findFirst({ where: and(eq(devices.id, parsed.data.sourceDeviceId), eq(devices.userId, authSession.user.id)), }), db.query.devices.findFirst({ where: and(eq(devices.id, parsed.data.targetDeviceId), eq(devices.userId, authSession.user.id)), }), ]); if (!sourceDevice || !targetDevice) { res.status(400).json({ message: 'Both source and target devices must belong to the authenticated user' }); return; } const validRoleDirection = sourceDevice.role === 'client' && targetDevice.role === 'camera'; if (!validRoleDirection) { res.status(400).json({ message: 'Commands are currently allowed only from client -> camera devices' }); return; } const link = await db.query.deviceLinks.findFirst({ where: and( eq(deviceLinks.ownerUserId, authSession.user.id), eq(deviceLinks.cameraDeviceId, targetDevice.id), eq(deviceLinks.clientDeviceId, sourceDevice.id), eq(deviceLinks.status, 'active'), ), }); if (!link) { res.status(403).json({ message: 'No active camera-client link found for command routing' }); return; } const now = new Date(); const [command] = await db .insert(commands) .values({ ownerUserId: authSession.user.id, sourceDeviceId: sourceDevice.id, targetDeviceId: targetDevice.id, commandType: parsed.data.commandType, payload: parsed.data.payload ?? null, status: 'queued', lastDispatchedAt: now, updatedAt: now, }) .returning(); if (!command) { res.status(500).json({ message: 'Failed creating command' }); return; } await dispatchCommandById(command.id); const refreshed = await db.query.commands.findFirst({ where: eq(commands.id, command.id) }); res.status(201).json({ message: 'Command queued', command: refreshed ?? command, }); }); router.get('/', requireAuth, async (req, res) => { const parsed = querySchema.safeParse(req.query); if (!parsed.success) { res.status(400).json({ message: 'Invalid query params', errors: parsed.error.flatten() }); return; } const authSession = req.auth; if (!authSession?.user?.id) { res.status(401).json({ message: 'Unauthorized' }); return; } const commandResults = await db.query.commands.findMany({ where: eq(commands.ownerUserId, authSession.user.id), orderBy: [desc(commands.createdAt)], limit: parsed.data.limit, }); const filtered = commandResults.filter((command) => { if (parsed.data.sourceDeviceId && command.sourceDeviceId !== parsed.data.sourceDeviceId) { return false; } if (parsed.data.targetDeviceId && command.targetDeviceId !== parsed.data.targetDeviceId) { return false; } if (parsed.data.status && command.status !== parsed.data.status) { return false; } return true; }); res.json({ count: filtered.length, commands: filtered }); }); router.post('/:commandId/ack', requireDeviceAuth, async (req, res) => { const parsedParams = commandParamSchema.safeParse(req.params); if (!parsedParams.success) { res.status(400).json({ message: 'Invalid commandId', errors: parsedParams.error.flatten() }); return; } const parsed = ackSchema.safeParse(req.body); if (!parsed.success) { res.status(400).json({ message: 'Invalid request body', errors: parsed.error.flatten() }); return; } const deviceAuth = req.deviceAuth; if (!deviceAuth) { res.status(401).json({ message: 'Unauthorized' }); return; } const command = await db.query.commands.findFirst({ where: eq(commands.id, parsedParams.data.commandId), }); if (!command) { res.status(404).json({ message: 'Command not found' }); return; } if (command.targetDeviceId !== deviceAuth.deviceId) { res.status(403).json({ message: 'Only target device can acknowledge this command' }); return; } const now = new Date(); const [updated] = await db .update(commands) .set({ status: parsed.data.status, acknowledgedAt: now, updatedAt: now, error: parsed.data.status === 'rejected' ? parsed.data.error ?? 'rejected' : null, }) .where(eq(commands.id, command.id)) .returning(); res.json({ message: 'Command acknowledged', command: updated }); }); export default router;