Files
Final-Year-Project/Backend/routes/streams.ts

925 lines
28 KiB
TypeScript

import { randomUUID } from 'crypto';
import { and, desc, eq, or } from 'drizzle-orm';
import { Router } from 'express';
import { z } from 'zod';
import { db } from '../db/client';
import { mediaMode } from '../media/config';
import { deviceCommands, deviceLinks, devices, streamSessions } from '../db/schema';
import { mediaProvider } from '../media/service';
import { sfuService } from '../media/sfu/service';
import { requireDeviceAuth } from '../middleware/device-auth';
import { dispatchCommandById, sendRealtimeToDevice } from '../realtime/gateway';
import { writeAuditLog } from '../services/audit';
import { enqueuePushNotification } from '../services/push';
import { createRecordingForStream } from './recordings';
const router = Router();
const requestStreamSchema = z.object({
cameraDeviceId: z.string().uuid(),
reason: z.enum(['on_demand', 'motion_follow_up']).default('on_demand'),
metadata: z.record(z.string(), z.unknown()).optional(),
});
const acceptStreamSchema = z.object({
streamKey: z.string().trim().min(1).max(255).optional(),
metadata: z.record(z.string(), z.unknown()).optional(),
});
const endStreamSchema = z.object({
reason: z.enum(['completed', 'cancelled', 'failed']).default('completed'),
});
const streamParamSchema = z.object({
streamSessionId: z.string().uuid(),
});
const sfuTransportRequestSchema = z.object({
role: z.enum(['camera', 'viewer']).optional(),
});
const sfuTransportConnectSchema = z.object({
transportId: z.string().min(1),
dtlsParameters: z.record(z.string(), z.unknown()).default({}),
});
const sfuProduceSchema = z.object({
transportId: z.string().min(1),
kind: z.enum(['audio', 'video']).default('video'),
rtpParameters: z.record(z.string(), z.unknown()).default({}),
});
const sfuConsumeSchema = z.object({
transportId: z.string().min(1),
producerId: z.string().min(1).optional(),
rtpCapabilities: z.record(z.string(), z.unknown()).optional(),
});
const listSchema = z.object({
status: z.string().optional(),
limit: z.coerce.number().int().min(1).max(100).default(25),
});
router.get('/me/list', requireDeviceAuth, async (req, res) => {
const parsed = listSchema.safeParse(req.query);
if (!parsed.success) {
res.status(400).json({ message: 'Invalid query params', errors: parsed.error.flatten() });
return;
}
const deviceAuth = req.deviceAuth;
if (!deviceAuth) {
res.status(401).json({ message: 'Unauthorized' });
return;
}
const sessions = await db.query.streamSessions.findMany({
where: and(
eq(streamSessions.ownerUserId, deviceAuth.userId),
or(eq(streamSessions.requesterDeviceId, deviceAuth.deviceId), eq(streamSessions.cameraDeviceId, deviceAuth.deviceId)),
),
orderBy: [desc(streamSessions.createdAt)],
limit: parsed.data.limit,
});
const filtered = parsed.data.status ? sessions.filter((session) => session.status === parsed.data.status) : sessions;
res.json({ count: filtered.length, streamSessions: filtered });
});
const ensureDeviceAuth = (req: Parameters<typeof requireDeviceAuth>[0], res: Parameters<typeof requireDeviceAuth>[1]) => {
const deviceAuth = req.deviceAuth;
if (!deviceAuth) {
res.status(401).json({ message: 'Unauthorized' });
return null;
}
return deviceAuth;
};
const getOwnedStreamSession = async (streamSessionId: string, ownerUserId: string) =>
await db.query.streamSessions.findFirst({
where: and(eq(streamSessions.id, streamSessionId), eq(streamSessions.ownerUserId, ownerUserId)),
});
const ensureSfuEnabled = (res: Parameters<typeof requireDeviceAuth>[1]) => {
if (!sfuService) {
res.status(409).json({ message: `SFU service disabled (MEDIA_MODE=${mediaMode})` });
return null;
}
return sfuService;
};
router.post('/request', requireDeviceAuth, async (req, res) => {
const parsed = requestStreamSchema.safeParse(req.body ?? {});
if (!parsed.success) {
res.status(400).json({ message: 'Invalid request body', errors: parsed.error.flatten() });
return;
}
const deviceAuth = ensureDeviceAuth(req, res);
if (!deviceAuth) return;
const [sourceDevice, cameraDevice] = await Promise.all([
db.query.devices.findFirst({
where: and(eq(devices.id, deviceAuth.deviceId), eq(devices.userId, deviceAuth.userId)),
}),
db.query.devices.findFirst({
where: and(eq(devices.id, parsed.data.cameraDeviceId), eq(devices.userId, deviceAuth.userId)),
}),
]);
if (!sourceDevice || !cameraDevice) {
res.status(404).json({ message: 'Source or camera device not found' });
return;
}
if (sourceDevice.role !== 'client') {
res.status(403).json({ message: 'Only client devices can request on-demand stream sessions' });
return;
}
if (cameraDevice.role !== 'camera') {
res.status(400).json({ message: 'cameraDeviceId must point to a camera device' });
return;
}
const link = await db.query.deviceLinks.findFirst({
where: and(
eq(deviceLinks.ownerUserId, deviceAuth.userId),
eq(deviceLinks.cameraDeviceId, cameraDevice.id),
eq(deviceLinks.clientDeviceId, sourceDevice.id),
eq(deviceLinks.status, 'active'),
),
});
if (!link) {
res.status(403).json({ message: 'No active link between requester and camera' });
return;
}
const now = new Date();
const [session] = await db
.insert(streamSessions)
.values({
ownerUserId: deviceAuth.userId,
cameraDeviceId: cameraDevice.id,
requesterDeviceId: sourceDevice.id,
status: 'requested',
reason: parsed.data.reason,
metadata: parsed.data.metadata ?? null,
mediaProvider: mediaProvider.name,
updatedAt: now,
})
.returning();
if (!session) {
res.status(500).json({ message: 'Failed creating stream session' });
return;
}
const [command] = await db
.insert(deviceCommands)
.values({
ownerUserId: deviceAuth.userId,
sourceDeviceId: sourceDevice.id,
targetDeviceId: cameraDevice.id,
commandType: 'start_stream',
payload: {
streamSessionId: session.id,
reason: session.reason,
},
status: 'queued',
retryCount: 0,
lastDispatchedAt: now,
updatedAt: now,
})
.returning();
if (!command) {
res.status(500).json({ message: 'Failed creating stream command' });
return;
}
await dispatchCommandById(command.id);
const refreshedCommand = await db.query.deviceCommands.findFirst({ where: eq(deviceCommands.id, command.id) });
const deliveredToRequester = sendRealtimeToDevice(sourceDevice.id, 'stream:requested', {
streamSessionId: session.id,
cameraDeviceId: cameraDevice.id,
status: session.status,
reason: session.reason,
});
if (!deliveredToRequester) {
await enqueuePushNotification({
ownerUserId: sourceDevice.userId,
recipientDeviceId: sourceDevice.id,
type: 'stream_requested',
payload: {
streamSessionId: session.id,
cameraDeviceId: cameraDevice.id,
},
});
}
res.status(201).json({
message: 'Stream request sent',
streamSession: session,
command: refreshedCommand ?? command,
});
await writeAuditLog({
ownerUserId: sourceDevice.userId,
actorDeviceId: sourceDevice.id,
action: 'stream.requested',
targetType: 'stream_session',
targetId: session.id,
metadata: { cameraDeviceId: cameraDevice.id, reason: session.reason },
ipAddress: req.ip,
});
});
router.post('/:streamSessionId/accept', requireDeviceAuth, async (req, res) => {
const parsedParams = streamParamSchema.safeParse(req.params);
if (!parsedParams.success) {
res.status(400).json({ message: 'Invalid streamSessionId', errors: parsedParams.error.flatten() });
return;
}
const parsed = acceptStreamSchema.safeParse(req.body ?? {});
if (!parsed.success) {
res.status(400).json({ message: 'Invalid request body', errors: parsed.error.flatten() });
return;
}
const deviceAuth = ensureDeviceAuth(req, res);
if (!deviceAuth) return;
const session = await db.query.streamSessions.findFirst({
where: and(
eq(streamSessions.id, parsedParams.data.streamSessionId),
eq(streamSessions.ownerUserId, deviceAuth.userId),
eq(streamSessions.cameraDeviceId, deviceAuth.deviceId),
),
});
if (!session) {
res.status(404).json({ message: 'Stream session not found for this camera device' });
return;
}
if (session.status !== 'requested' && session.status !== 'starting') {
res.status(409).json({ message: `Stream session cannot be accepted from status ${session.status}` });
return;
}
const now = new Date();
const streamKey = parsed.data.streamKey ?? `stream_${session.id}_${randomUUID()}`;
const mediaSession = await mediaProvider.createSession({
streamSessionId: session.id,
ownerUserId: session.ownerUserId,
cameraDeviceId: session.cameraDeviceId,
requesterDeviceId: session.requesterDeviceId,
});
const [updated] = await db
.update(streamSessions)
.set({
status: 'streaming',
streamKey,
mediaProvider: mediaSession.provider,
mediaSessionId: mediaSession.mediaSessionId,
publishEndpoint: mediaSession.publishUrl,
subscribeEndpoint: mediaSession.subscribeUrl,
metadata: parsed.data.metadata ?? session.metadata,
startedAt: now,
updatedAt: now,
})
.where(eq(streamSessions.id, session.id))
.returning();
if (!updated) {
res.status(500).json({ message: 'Failed to update stream session' });
return;
}
if (sfuService) {
try {
await sfuService.startSession({
streamSessionId: updated.id,
ownerUserId: updated.ownerUserId,
cameraDeviceId: updated.cameraDeviceId,
requesterDeviceId: updated.requesterDeviceId,
});
await sfuService.setSessionState(updated.id, 'live');
} catch (error) {
console.error('Failed starting SFU session', error);
res.status(500).json({ message: 'Failed to initialize SFU session' });
return;
}
}
const deliveredToRequester = sendRealtimeToDevice(session.requesterDeviceId, 'stream:started', {
streamSessionId: updated.id,
cameraDeviceId: updated.cameraDeviceId,
status: updated.status,
startedAt: updated.startedAt,
mediaProvider: updated.mediaProvider,
mediaSessionId: updated.mediaSessionId,
subscribeEndpoint: updated.subscribeEndpoint,
});
if (!deliveredToRequester) {
await enqueuePushNotification({
ownerUserId: session.ownerUserId,
recipientDeviceId: session.requesterDeviceId,
type: 'stream_started',
payload: {
streamSessionId: updated.id,
cameraDeviceId: updated.cameraDeviceId,
},
});
}
res.json({ message: 'Stream accepted', streamSession: updated });
await writeAuditLog({
ownerUserId: session.ownerUserId,
actorDeviceId: session.cameraDeviceId,
action: 'stream.accepted',
targetType: 'stream_session',
targetId: session.id,
metadata: { mediaSessionId: updated.mediaSessionId, mediaProvider: updated.mediaProvider },
ipAddress: req.ip,
});
});
router.get('/:streamSessionId/publish-credentials', requireDeviceAuth, async (req, res) => {
const parsedParams = streamParamSchema.safeParse(req.params);
if (!parsedParams.success) {
res.status(400).json({ message: 'Invalid streamSessionId', errors: parsedParams.error.flatten() });
return;
}
const deviceAuth = ensureDeviceAuth(req, res);
if (!deviceAuth) return;
const session = await getOwnedStreamSession(parsedParams.data.streamSessionId, deviceAuth.userId);
if (!session) {
res.status(404).json({ message: 'Stream session not found' });
return;
}
if (session.cameraDeviceId !== deviceAuth.deviceId) {
res.status(403).json({ message: 'Only camera device can request publish credentials' });
return;
}
if (!session.mediaSessionId || session.status !== 'streaming') {
res.status(409).json({ message: 'Stream session is not ready for publish credentials' });
return;
}
const credentials = await mediaProvider.issuePublishCredentials({
mediaSessionId: session.mediaSessionId,
cameraDeviceId: session.cameraDeviceId,
ownerUserId: session.ownerUserId,
});
res.json(credentials);
await writeAuditLog({
ownerUserId: session.ownerUserId,
actorDeviceId: session.cameraDeviceId,
action: 'stream.publish_credentials_issued',
targetType: 'stream_session',
targetId: session.id,
metadata: { mediaProvider: credentials.provider },
ipAddress: req.ip,
});
});
router.get('/:streamSessionId/subscribe-credentials', requireDeviceAuth, async (req, res) => {
const parsedParams = streamParamSchema.safeParse(req.params);
if (!parsedParams.success) {
res.status(400).json({ message: 'Invalid streamSessionId', errors: parsedParams.error.flatten() });
return;
}
const deviceAuth = ensureDeviceAuth(req, res);
if (!deviceAuth) return;
const session = await getOwnedStreamSession(parsedParams.data.streamSessionId, deviceAuth.userId);
if (!session) {
res.status(404).json({ message: 'Stream session not found' });
return;
}
const isRequester = session.requesterDeviceId === deviceAuth.deviceId;
const isCamera = session.cameraDeviceId === deviceAuth.deviceId;
if (!isRequester && !isCamera) {
res.status(403).json({ message: 'Device cannot request subscribe credentials for this stream' });
return;
}
if (!session.mediaSessionId || session.status !== 'streaming') {
res.status(409).json({ message: 'Stream is not active yet' });
return;
}
const credentials = await mediaProvider.issueSubscribeCredentials({
mediaSessionId: session.mediaSessionId,
viewerDeviceId: deviceAuth.deviceId,
ownerUserId: session.ownerUserId,
});
res.json(credentials);
await writeAuditLog({
ownerUserId: session.ownerUserId,
actorDeviceId: deviceAuth.deviceId,
action: 'stream.subscribe_credentials_issued',
targetType: 'stream_session',
targetId: session.id,
metadata: { mediaProvider: credentials.provider },
ipAddress: req.ip,
});
});
router.get('/:streamSessionId/sfu/session', requireDeviceAuth, async (req, res) => {
const parsedParams = streamParamSchema.safeParse(req.params);
if (!parsedParams.success) {
res.status(400).json({ message: 'Invalid streamSessionId', errors: parsedParams.error.flatten() });
return;
}
const deviceAuth = ensureDeviceAuth(req, res);
if (!deviceAuth) return;
const sfu = ensureSfuEnabled(res);
if (!sfu) return;
const session = await getOwnedStreamSession(parsedParams.data.streamSessionId, deviceAuth.userId);
if (!session) {
res.status(404).json({ message: 'Stream session not found' });
return;
}
const isParticipant = session.requesterDeviceId === deviceAuth.deviceId || session.cameraDeviceId === deviceAuth.deviceId;
if (!isParticipant) {
res.status(403).json({ message: 'Device cannot access SFU session details for this stream' });
return;
}
const [sfuSession, transports, producers, consumers] = await Promise.all([
sfu.getSession(session.id),
sfu.listTransports(session.id),
sfu.listProducers(session.id),
sfu.listConsumers(session.id),
]);
res.json({
streamSessionId: session.id,
mediaMode,
sfuSession,
transports,
producers,
consumers,
});
});
router.post('/:streamSessionId/sfu/publish-transport', requireDeviceAuth, async (req, res) => {
const parsedParams = streamParamSchema.safeParse(req.params);
if (!parsedParams.success) {
res.status(400).json({ message: 'Invalid streamSessionId', errors: parsedParams.error.flatten() });
return;
}
const parsedBody = sfuTransportRequestSchema.safeParse(req.body ?? {});
if (!parsedBody.success) {
res.status(400).json({ message: 'Invalid request body', errors: parsedBody.error.flatten() });
return;
}
const deviceAuth = ensureDeviceAuth(req, res);
if (!deviceAuth) return;
const sfu = ensureSfuEnabled(res);
if (!sfu) return;
const session = await getOwnedStreamSession(parsedParams.data.streamSessionId, deviceAuth.userId);
if (!session) {
res.status(404).json({ message: 'Stream session not found' });
return;
}
if (session.status !== 'streaming') {
res.status(409).json({ message: 'Stream must be active before creating publish transport' });
return;
}
if (session.cameraDeviceId !== deviceAuth.deviceId) {
res.status(403).json({ message: 'Only camera device can create publish transport' });
return;
}
const transport = await sfu.createPublishTransport({
streamSessionId: session.id,
cameraDeviceId: deviceAuth.deviceId,
});
await sfu.setSessionState(session.id, 'live');
res.json({
streamSessionId: session.id,
mediaMode,
transport,
});
});
router.post('/:streamSessionId/sfu/subscribe-transport', requireDeviceAuth, async (req, res) => {
const parsedParams = streamParamSchema.safeParse(req.params);
if (!parsedParams.success) {
res.status(400).json({ message: 'Invalid streamSessionId', errors: parsedParams.error.flatten() });
return;
}
const parsedBody = sfuTransportRequestSchema.safeParse(req.body ?? {});
if (!parsedBody.success) {
res.status(400).json({ message: 'Invalid request body', errors: parsedBody.error.flatten() });
return;
}
const deviceAuth = ensureDeviceAuth(req, res);
if (!deviceAuth) return;
const sfu = ensureSfuEnabled(res);
if (!sfu) return;
const session = await getOwnedStreamSession(parsedParams.data.streamSessionId, deviceAuth.userId);
if (!session) {
res.status(404).json({ message: 'Stream session not found' });
return;
}
if (session.status !== 'streaming') {
res.status(409).json({ message: 'Stream must be active before creating subscribe transport' });
return;
}
const isParticipant = session.requesterDeviceId === deviceAuth.deviceId || session.cameraDeviceId === deviceAuth.deviceId;
if (!isParticipant) {
res.status(403).json({ message: 'Device cannot create subscribe transport for this stream' });
return;
}
const transport = await sfu.createSubscribeTransport({
streamSessionId: session.id,
viewerDeviceId: deviceAuth.deviceId,
});
await sfu.setSessionState(session.id, 'live');
res.json({
streamSessionId: session.id,
mediaMode,
transport,
});
});
router.post('/:streamSessionId/sfu/publish-transport/connect', requireDeviceAuth, async (req, res) => {
const parsedParams = streamParamSchema.safeParse(req.params);
if (!parsedParams.success) {
res.status(400).json({ message: 'Invalid streamSessionId', errors: parsedParams.error.flatten() });
return;
}
const parsedBody = sfuTransportConnectSchema.safeParse(req.body ?? {});
if (!parsedBody.success) {
res.status(400).json({ message: 'Invalid request body', errors: parsedBody.error.flatten() });
return;
}
const deviceAuth = ensureDeviceAuth(req, res);
if (!deviceAuth) return;
const sfu = ensureSfuEnabled(res);
if (!sfu) return;
const session = await getOwnedStreamSession(parsedParams.data.streamSessionId, deviceAuth.userId);
if (!session) {
res.status(404).json({ message: 'Stream session not found' });
return;
}
if (session.cameraDeviceId !== deviceAuth.deviceId) {
res.status(403).json({ message: 'Only camera device can connect publish transport' });
return;
}
try {
const transport = await sfu.connectPublishTransport({
streamSessionId: session.id,
transportId: parsedBody.data.transportId,
deviceId: deviceAuth.deviceId,
dtlsParameters: parsedBody.data.dtlsParameters,
});
await sfu.setSessionState(session.id, 'live');
res.json({ streamSessionId: session.id, mediaMode, transport });
} catch (error) {
res.status(409).json({ message: error instanceof Error ? error.message : 'Unable to connect publish transport' });
}
});
router.post('/:streamSessionId/sfu/subscribe-transport/connect', requireDeviceAuth, async (req, res) => {
const parsedParams = streamParamSchema.safeParse(req.params);
if (!parsedParams.success) {
res.status(400).json({ message: 'Invalid streamSessionId', errors: parsedParams.error.flatten() });
return;
}
const parsedBody = sfuTransportConnectSchema.safeParse(req.body ?? {});
if (!parsedBody.success) {
res.status(400).json({ message: 'Invalid request body', errors: parsedBody.error.flatten() });
return;
}
const deviceAuth = ensureDeviceAuth(req, res);
if (!deviceAuth) return;
const sfu = ensureSfuEnabled(res);
if (!sfu) return;
const session = await getOwnedStreamSession(parsedParams.data.streamSessionId, deviceAuth.userId);
if (!session) {
res.status(404).json({ message: 'Stream session not found' });
return;
}
const isParticipant = session.requesterDeviceId === deviceAuth.deviceId || session.cameraDeviceId === deviceAuth.deviceId;
if (!isParticipant) {
res.status(403).json({ message: 'Device cannot connect subscribe transport for this stream' });
return;
}
try {
const transport = await sfu.connectSubscribeTransport({
streamSessionId: session.id,
transportId: parsedBody.data.transportId,
deviceId: deviceAuth.deviceId,
dtlsParameters: parsedBody.data.dtlsParameters,
});
await sfu.setSessionState(session.id, 'live');
res.json({ streamSessionId: session.id, mediaMode, transport });
} catch (error) {
res.status(409).json({ message: error instanceof Error ? error.message : 'Unable to connect subscribe transport' });
}
});
router.post('/:streamSessionId/sfu/produce', requireDeviceAuth, async (req, res) => {
const parsedParams = streamParamSchema.safeParse(req.params);
if (!parsedParams.success) {
res.status(400).json({ message: 'Invalid streamSessionId', errors: parsedParams.error.flatten() });
return;
}
const parsedBody = sfuProduceSchema.safeParse(req.body ?? {});
if (!parsedBody.success) {
res.status(400).json({ message: 'Invalid request body', errors: parsedBody.error.flatten() });
return;
}
const deviceAuth = ensureDeviceAuth(req, res);
if (!deviceAuth) return;
const sfu = ensureSfuEnabled(res);
if (!sfu) return;
const session = await getOwnedStreamSession(parsedParams.data.streamSessionId, deviceAuth.userId);
if (!session) {
res.status(404).json({ message: 'Stream session not found' });
return;
}
if (session.cameraDeviceId !== deviceAuth.deviceId) {
res.status(403).json({ message: 'Only camera device can publish media' });
return;
}
try {
const producer = await sfu.produce({
streamSessionId: session.id,
transportId: parsedBody.data.transportId,
cameraDeviceId: deviceAuth.deviceId,
kind: parsedBody.data.kind,
rtpParameters: parsedBody.data.rtpParameters,
});
await sfu.setSessionState(session.id, 'live');
res.json({ streamSessionId: session.id, mediaMode, producer });
} catch (error) {
res.status(409).json({ message: error instanceof Error ? error.message : 'Unable to produce media' });
}
});
router.post('/:streamSessionId/sfu/consume', requireDeviceAuth, async (req, res) => {
const parsedParams = streamParamSchema.safeParse(req.params);
if (!parsedParams.success) {
res.status(400).json({ message: 'Invalid streamSessionId', errors: parsedParams.error.flatten() });
return;
}
const parsedBody = sfuConsumeSchema.safeParse(req.body ?? {});
if (!parsedBody.success) {
res.status(400).json({ message: 'Invalid request body', errors: parsedBody.error.flatten() });
return;
}
const deviceAuth = ensureDeviceAuth(req, res);
if (!deviceAuth) return;
const sfu = ensureSfuEnabled(res);
if (!sfu) return;
const session = await getOwnedStreamSession(parsedParams.data.streamSessionId, deviceAuth.userId);
if (!session) {
res.status(404).json({ message: 'Stream session not found' });
return;
}
const isParticipant = session.requesterDeviceId === deviceAuth.deviceId || session.cameraDeviceId === deviceAuth.deviceId;
if (!isParticipant) {
res.status(403).json({ message: 'Device cannot consume media for this stream' });
return;
}
try {
const consumer = await sfu.consume({
streamSessionId: session.id,
transportId: parsedBody.data.transportId,
viewerDeviceId: deviceAuth.deviceId,
producerId: parsedBody.data.producerId,
rtpCapabilities: parsedBody.data.rtpCapabilities,
});
await sfu.setSessionState(session.id, 'live');
res.json({ streamSessionId: session.id, mediaMode, consumer });
} catch (error) {
res.status(409).json({ message: error instanceof Error ? error.message : 'Unable to consume media' });
}
});
router.post('/:streamSessionId/end', requireDeviceAuth, async (req, res) => {
const parsedParams = streamParamSchema.safeParse(req.params);
if (!parsedParams.success) {
res.status(400).json({ message: 'Invalid streamSessionId', errors: parsedParams.error.flatten() });
return;
}
const parsed = endStreamSchema.safeParse(req.body ?? {});
if (!parsed.success) {
res.status(400).json({ message: 'Invalid request body', errors: parsed.error.flatten() });
return;
}
const deviceAuth = ensureDeviceAuth(req, res);
if (!deviceAuth) return;
const session = await getOwnedStreamSession(parsedParams.data.streamSessionId, deviceAuth.userId);
if (!session) {
res.status(404).json({ message: 'Stream session not found' });
return;
}
const canEnd = session.cameraDeviceId === deviceAuth.deviceId || session.requesterDeviceId === deviceAuth.deviceId;
if (!canEnd) {
res.status(403).json({ message: 'Only requester or camera device can end this stream' });
return;
}
const now = new Date();
const [updated] = await db
.update(streamSessions)
.set({
status: parsed.data.reason,
endedAt: now,
updatedAt: now,
})
.where(eq(streamSessions.id, session.id))
.returning();
if (sfuService) {
try {
await sfuService.endSession(session.id);
} catch (error) {
console.error('Failed ending SFU session', error);
}
}
await createRecordingForStream(session.id);
const deliveredToRequester = sendRealtimeToDevice(session.requesterDeviceId, 'stream:ended', {
streamSessionId: session.id,
status: parsed.data.reason,
endedAt: now,
});
const deliveredToCamera = sendRealtimeToDevice(session.cameraDeviceId, 'stream:ended', {
streamSessionId: session.id,
status: parsed.data.reason,
endedAt: now,
});
if (!deliveredToRequester) {
await enqueuePushNotification({
ownerUserId: session.ownerUserId,
recipientDeviceId: session.requesterDeviceId,
type: 'stream_ended',
payload: {
streamSessionId: session.id,
status: parsed.data.reason,
},
});
}
if (!deliveredToCamera) {
await enqueuePushNotification({
ownerUserId: session.ownerUserId,
recipientDeviceId: session.cameraDeviceId,
type: 'stream_ended',
payload: {
streamSessionId: session.id,
status: parsed.data.reason,
},
});
}
res.json({ message: 'Stream ended', streamSession: updated });
});
router.get('/:streamSessionId/playback-token', requireDeviceAuth, async (req, res) => {
const parsedParams = streamParamSchema.safeParse(req.params);
if (!parsedParams.success) {
res.status(400).json({ message: 'Invalid streamSessionId', errors: parsedParams.error.flatten() });
return;
}
const deviceAuth = ensureDeviceAuth(req, res);
if (!deviceAuth) return;
const session = await getOwnedStreamSession(parsedParams.data.streamSessionId, deviceAuth.userId);
if (!session) {
res.status(404).json({ message: 'Stream session not found' });
return;
}
const isRequester = session.requesterDeviceId === deviceAuth.deviceId;
const isCamera = session.cameraDeviceId === deviceAuth.deviceId;
if (!isRequester && !isCamera) {
res.status(403).json({ message: 'Device cannot request playback token for this stream' });
return;
}
if (!session.streamKey || !session.mediaSessionId || session.status !== 'streaming') {
res.status(409).json({ message: 'Stream is not active yet' });
return;
}
const credentials = await mediaProvider.issueSubscribeCredentials({
mediaSessionId: session.mediaSessionId,
viewerDeviceId: deviceAuth.deviceId,
ownerUserId: deviceAuth.userId,
});
res.json({
streamSessionId: session.id,
streamKey: session.streamKey,
status: session.status,
playbackToken: credentials.subscribeToken,
subscribeUrl: credentials.subscribeUrl,
mediaProvider: credentials.provider,
expiresInSeconds: credentials.expiresInSeconds,
});
});
export default router;