import { eq } from 'drizzle-orm'; import { Router } from 'express'; import { db } from '../../db/client'; import { streamSessions } from '../../db/schema'; import { simpleStreamingEnabled } from '../../media/config'; import { sfuService } from '../../media/sfu/service'; import { requireDeviceAuth } from '../../middleware/device-auth'; import { sendRealtimeToDevice } from '../../realtime/gateway'; import { createRecordingForStream } from '../../services/recordings'; import { enqueuePushNotification } from '../../services/push'; import { createStreamEndedPayload, toSimpleStreamSessionResponse } from '../../streaming/simple'; import { endStreamSchema, streamParamSchema } from './schemas'; import { ensureStreamDeviceAuth, getOwnedStreamSession, isStreamParticipant, shouldCreateRecordingPlaceholder } from './shared'; const router = Router(); router.post('/:streamSessionId/end', requireDeviceAuth, async (req, res) => { const parsedParams = streamParamSchema.safeParse(req.params); if (!parsedParams.success) { res.status(400).json({ message: 'Invalid streamSessionId', errors: parsedParams.error.flatten() }); return; } const parsed = endStreamSchema.safeParse(req.body ?? {}); if (!parsed.success) { res.status(400).json({ message: 'Invalid request body', errors: parsed.error.flatten() }); return; } const deviceAuth = ensureStreamDeviceAuth(req, res); if (!deviceAuth) return; const session = await getOwnedStreamSession(parsedParams.data.streamSessionId, deviceAuth.userId); if (!session) { res.status(404).json({ message: 'Stream session not found' }); return; } if (!isStreamParticipant(session, deviceAuth.deviceId)) { res.status(403).json({ message: 'Only requester or camera device can end this stream' }); return; } const now = new Date(); const nextStatus = simpleStreamingEnabled ? 'ended' : parsed.data.reason; const nextMetadata = simpleStreamingEnabled && parsed.data.reason !== 'completed' ? { ...(session.metadata ?? {}), endReason: parsed.data.reason, } : session.metadata; const [updated] = await db .update(streamSessions) .set({ status: nextStatus, endedAt: now, metadata: nextMetadata, updatedAt: now, }) .where(eq(streamSessions.id, session.id)) .returning(); if (!updated) { res.status(500).json({ message: 'Failed to update stream session' }); return; } if (sfuService) { try { await sfuService.endSession(session.id); } catch (error) { console.error('Failed ending SFU session', error); } } if (shouldCreateRecordingPlaceholder()) { await createRecordingForStream(session.id); } const endedPayload = simpleStreamingEnabled ? createStreamEndedPayload({ streamSessionId: session.id, cameraDeviceId: session.cameraDeviceId, requesterDeviceId: session.requesterDeviceId, endedAt: now, reason: parsed.data.reason, }) : { streamSessionId: session.id, status: parsed.data.reason, endedAt: now, }; console.info('[stream.end]', { streamSessionId: session.id, requesterDeviceId: session.requesterDeviceId, cameraDeviceId: session.cameraDeviceId, reason: parsed.data.reason, status: simpleStreamingEnabled ? 'ended' : parsed.data.reason, }); const deliveredToRequester = sendRealtimeToDevice(session.requesterDeviceId, 'stream:ended', endedPayload); const deliveredToCamera = sendRealtimeToDevice(session.cameraDeviceId, 'stream:ended', endedPayload); if (!deliveredToRequester) { await enqueuePushNotification({ ownerUserId: session.ownerUserId, recipientDeviceId: session.requesterDeviceId, type: 'stream_ended', payload: endedPayload, }); } if (!deliveredToCamera) { await enqueuePushNotification({ ownerUserId: session.ownerUserId, recipientDeviceId: session.cameraDeviceId, type: 'stream_ended', payload: endedPayload, }); } res.json({ message: 'Stream ended', streamSession: toSimpleStreamSessionResponse(updated) }); }); export default router;