import { randomUUID } from 'crypto'; import { eq } from 'drizzle-orm'; import { Router } from 'express'; import { db } from '../../db/client'; import { streamSessions } from '../../db/schema'; import { createLiveMediaSession } from '../../media/service'; import { sfuService } from '../../media/sfu/service'; import { requireDeviceAuth } from '../../middleware/device-auth'; import { sendRealtimeToDevice } from '../../realtime/gateway'; import { createRecordingForStream } from '../../services/recordings'; import { writeAuditLog } from '../../services/audit'; import { enqueuePushNotification } from '../../services/push'; import { createStreamStartedPayload, toSimpleStreamSessionResponse } from '../../streaming/simple'; import { acceptStreamSchema, streamParamSchema } from './schemas'; import { ensureStreamDeviceAuth, shouldCreateRecordingPlaceholder } from './shared'; const router = Router(); router.post('/:streamSessionId/accept', requireDeviceAuth, async (req, res) => { const parsedParams = streamParamSchema.safeParse(req.params); if (!parsedParams.success) { res.status(400).json({ message: 'Invalid streamSessionId', errors: parsedParams.error.flatten() }); return; } const parsed = acceptStreamSchema.safeParse(req.body ?? {}); if (!parsed.success) { res.status(400).json({ message: 'Invalid request body', errors: parsed.error.flatten() }); return; } const deviceAuth = ensureStreamDeviceAuth(req, res); if (!deviceAuth) return; const session = await db.query.streamSessions.findFirst({ where: eq(streamSessions.id, parsedParams.data.streamSessionId), }); if (!session || session.ownerUserId !== deviceAuth.userId || session.cameraDeviceId !== deviceAuth.deviceId) { res.status(404).json({ message: 'Stream session not found for this camera device' }); return; } if (session.status !== 'requested' && session.status !== 'starting') { res.status(409).json({ message: `Stream session cannot be accepted from status ${session.status}` }); return; } const now = new Date(); const streamKey = parsed.data.streamKey ?? `stream_${session.id}_${randomUUID()}`; const mediaSession = await createLiveMediaSession({ streamSessionId: session.id, ownerUserId: session.ownerUserId, cameraDeviceId: session.cameraDeviceId, requesterDeviceId: session.requesterDeviceId, }); const [updated] = await db .update(streamSessions) .set({ status: 'streaming', streamKey: mediaSession ? streamKey : null, mediaProvider: mediaSession?.provider ?? 'simple', mediaSessionId: mediaSession?.mediaSessionId ?? null, publishEndpoint: mediaSession?.publishUrl ?? null, subscribeEndpoint: mediaSession?.subscribeUrl ?? null, metadata: parsed.data.metadata ?? session.metadata, startedAt: now, updatedAt: now, }) .where(eq(streamSessions.id, session.id)) .returning(); if (!updated) { res.status(500).json({ message: 'Failed to update stream session' }); return; } if (shouldCreateRecordingPlaceholder()) { await createRecordingForStream(updated.id); } if (sfuService) { try { await sfuService.startSession({ streamSessionId: updated.id, ownerUserId: updated.ownerUserId, cameraDeviceId: updated.cameraDeviceId, requesterDeviceId: updated.requesterDeviceId, }); await sfuService.setSessionState(updated.id, 'live'); } catch (error) { console.error('Failed starting SFU session', error); res.status(500).json({ message: 'Failed to initialize SFU session' }); return; } } const startedPayload = createStreamStartedPayload(updated); console.info('[stream.accept]', { streamSessionId: updated.id, requesterDeviceId: updated.requesterDeviceId, cameraDeviceId: updated.cameraDeviceId, mode: mediaSession ? 'legacy' : 'simple', }); const deliveredToRequester = sendRealtimeToDevice(session.requesterDeviceId, 'stream:started', startedPayload); if (!deliveredToRequester) { await enqueuePushNotification({ ownerUserId: session.ownerUserId, recipientDeviceId: session.requesterDeviceId, type: 'stream_started', payload: startedPayload, }); } res.json({ message: 'Stream accepted', streamSession: toSimpleStreamSessionResponse(updated) }); await writeAuditLog({ ownerUserId: session.ownerUserId, actorDeviceId: session.cameraDeviceId, action: 'stream.accepted', targetType: 'stream_session', targetId: session.id, metadata: mediaSession ? { mediaSessionId: updated.mediaSessionId, mediaProvider: updated.mediaProvider } : { transport: 'webrtc' }, ipAddress: req.ip, }); }); export default router;