import { and, eq } from 'drizzle-orm'; import { Router } from 'express'; import { db } from '../../db/client'; import { commands, deviceLinks, devices, streamSessions } from '../../db/schema'; import { simpleStreamingEnabled } from '../../media/config'; import { mediaProvider } from '../../media/service'; import { requireDeviceAuth } from '../../middleware/device-auth'; import { dispatchCommandById, sendRealtimeToDevice } from '../../realtime/gateway'; import { writeAuditLog } from '../../services/audit'; import { enqueuePushNotification } from '../../services/push'; import { createStreamRequestedPayload, toSimpleStreamSessionResponse } from '../../streaming/simple'; import { requestStreamSchema } from './schemas'; import { ensureStreamDeviceAuth } from './shared'; const router = Router(); router.post('/request', requireDeviceAuth, async (req, res) => { const parsed = requestStreamSchema.safeParse(req.body ?? {}); if (!parsed.success) { res.status(400).json({ message: 'Invalid request body', errors: parsed.error.flatten() }); return; } const deviceAuth = ensureStreamDeviceAuth(req, res); if (!deviceAuth) return; const [sourceDevice, cameraDevice] = await Promise.all([ db.query.devices.findFirst({ where: and(eq(devices.id, deviceAuth.deviceId), eq(devices.userId, deviceAuth.userId)), }), db.query.devices.findFirst({ where: and(eq(devices.id, parsed.data.cameraDeviceId), eq(devices.userId, deviceAuth.userId)), }), ]); if (!sourceDevice || !cameraDevice) { res.status(404).json({ message: 'Source or camera device not found' }); return; } if (sourceDevice.role !== 'client') { res.status(403).json({ message: 'Only client devices can request on-demand stream sessions' }); return; } if (cameraDevice.role !== 'camera') { res.status(400).json({ message: 'cameraDeviceId must point to a camera device' }); return; } const link = await db.query.deviceLinks.findFirst({ where: and( eq(deviceLinks.ownerUserId, deviceAuth.userId), eq(deviceLinks.cameraDeviceId, cameraDevice.id), eq(deviceLinks.clientDeviceId, sourceDevice.id), eq(deviceLinks.status, 'active'), ), }); if (!link) { res.status(403).json({ message: 'No active link between requester and camera' }); return; } const now = new Date(); const [session] = await db .insert(streamSessions) .values({ ownerUserId: deviceAuth.userId, cameraDeviceId: cameraDevice.id, requesterDeviceId: sourceDevice.id, status: 'requested', reason: parsed.data.reason, metadata: parsed.data.metadata ?? null, mediaProvider: mediaProvider.name, updatedAt: now, }) .returning(); if (!session) { res.status(500).json({ message: 'Failed creating stream session' }); return; } if (simpleStreamingEnabled) { const requestPayload = createStreamRequestedPayload(session); const deliveredToCamera = sendRealtimeToDevice(cameraDevice.id, 'stream:requested', requestPayload); console.info('[stream.request]', { streamSessionId: session.id, requesterDeviceId: sourceDevice.id, cameraDeviceId: cameraDevice.id, mode: 'simple', }); sendRealtimeToDevice(sourceDevice.id, 'stream:requested', requestPayload); if (!deliveredToCamera) { await enqueuePushNotification({ ownerUserId: cameraDevice.userId, recipientDeviceId: cameraDevice.id, type: 'stream_requested', payload: requestPayload, }); } res.status(201).json({ message: 'Stream request sent', streamSession: toSimpleStreamSessionResponse(session), }); await writeAuditLog({ ownerUserId: sourceDevice.userId, actorDeviceId: sourceDevice.id, action: 'stream.requested', targetType: 'stream_session', targetId: session.id, metadata: { cameraDeviceId: cameraDevice.id, reason: session.reason, transport: 'webrtc' }, ipAddress: req.ip, }); return; } const [command] = await db .insert(commands) .values({ ownerUserId: deviceAuth.userId, sourceDeviceId: sourceDevice.id, targetDeviceId: cameraDevice.id, commandType: 'start_stream', payload: { streamSessionId: session.id, reason: session.reason, }, status: 'queued', retryCount: 0, lastDispatchedAt: now, updatedAt: now, }) .returning(); if (!command) { res.status(500).json({ message: 'Failed creating stream command' }); return; } await dispatchCommandById(command.id); console.info('[stream.request]', { streamSessionId: session.id, requesterDeviceId: sourceDevice.id, cameraDeviceId: cameraDevice.id, mode: 'legacy', commandId: command.id, }); const refreshedCommand = await db.query.commands.findFirst({ where: eq(commands.id, command.id) }); const deliveredToRequester = sendRealtimeToDevice(sourceDevice.id, 'stream:requested', { streamSessionId: session.id, cameraDeviceId: cameraDevice.id, status: session.status, reason: session.reason, }); if (!deliveredToRequester) { await enqueuePushNotification({ ownerUserId: sourceDevice.userId, recipientDeviceId: sourceDevice.id, type: 'stream_requested', payload: { streamSessionId: session.id, cameraDeviceId: cameraDevice.id, }, }); } res.status(201).json({ message: 'Stream request sent', streamSession: session, command: refreshedCommand ?? command, }); await writeAuditLog({ ownerUserId: sourceDevice.userId, actorDeviceId: sourceDevice.id, action: 'stream.requested', targetType: 'stream_session', targetId: session.id, metadata: { cameraDeviceId: cameraDevice.id, reason: session.reason }, ipAddress: req.ip, }); }); export default router;