diff --git a/Backend/index.ts b/Backend/index.ts index c322530..dbb3bca 100644 --- a/Backend/index.ts +++ b/Backend/index.ts @@ -11,6 +11,7 @@ import devicesRoutes from './routes/devices'; import deviceLinksRoutes from './routes/device-links'; import commandsRoutes from './routes/commands'; import eventsRoutes from './routes/events'; +import streamsRoutes from './routes/streams'; import { setupRealtimeGateway } from './realtime/gateway'; import { ensureMinioBucket } from './utils/minio'; @@ -36,6 +37,7 @@ app.use('/devices', devicesRoutes); app.use('/device-links', deviceLinksRoutes); app.use('/commands', commandsRoutes); app.use('/events', eventsRoutes); +app.use('/streams', streamsRoutes); app.use((err: unknown, _req: express.Request, res: express.Response, _next: express.NextFunction) => { console.error(err); diff --git a/Backend/routes/streams.ts b/Backend/routes/streams.ts new file mode 100644 index 0000000..00691e9 --- /dev/null +++ b/Backend/routes/streams.ts @@ -0,0 +1,369 @@ +import { randomUUID } from 'crypto'; + +import { and, desc, eq, or } from 'drizzle-orm'; +import { Router } from 'express'; +import { z } from 'zod'; + +import { db } from '../db/client'; +import { deviceCommands, deviceLinks, devices, streamSessions } from '../db/schema'; +import { requireDeviceAuth } from '../middleware/device-auth'; +import { dispatchCommandById, sendRealtimeToDevice } from '../realtime/gateway'; +import { createStreamPlaybackToken } from '../utils/stream-token'; + +const router = Router(); + +const requestStreamSchema = z.object({ + cameraDeviceId: z.string().uuid(), + reason: z.enum(['on_demand', 'motion_follow_up']).default('on_demand'), + metadata: z.record(z.string(), z.unknown()).optional(), +}); + +const acceptStreamSchema = z.object({ + streamKey: z.string().trim().min(1).max(255).optional(), + metadata: z.record(z.string(), z.unknown()).optional(), +}); + +const endStreamSchema = z.object({ + reason: z.enum(['completed', 'cancelled', 'failed']).default('completed'), +}); + +const streamParamSchema = z.object({ + streamSessionId: z.string().uuid(), +}); + +const listSchema = z.object({ + status: z.string().optional(), + limit: z.coerce.number().int().min(1).max(100).default(25), +}); + +router.post('/request', requireDeviceAuth, async (req, res) => { + const parsed = requestStreamSchema.safeParse(req.body ?? {}); + + if (!parsed.success) { + res.status(400).json({ message: 'Invalid request body', errors: parsed.error.flatten() }); + return; + } + + const deviceAuth = req.deviceAuth; + + if (!deviceAuth) { + res.status(401).json({ message: 'Unauthorized' }); + return; + } + + const [sourceDevice, cameraDevice] = await Promise.all([ + db.query.devices.findFirst({ + where: and(eq(devices.id, deviceAuth.deviceId), eq(devices.userId, deviceAuth.userId)), + }), + db.query.devices.findFirst({ + where: and(eq(devices.id, parsed.data.cameraDeviceId), eq(devices.userId, deviceAuth.userId)), + }), + ]); + + if (!sourceDevice || !cameraDevice) { + res.status(404).json({ message: 'Source or camera device not found' }); + return; + } + + if (sourceDevice.role !== 'client') { + res.status(403).json({ message: 'Only client devices can request on-demand stream sessions' }); + return; + } + + if (cameraDevice.role !== 'camera') { + res.status(400).json({ message: 'cameraDeviceId must point to a camera device' }); + return; + } + + const link = await db.query.deviceLinks.findFirst({ + where: and( + eq(deviceLinks.ownerUserId, deviceAuth.userId), + eq(deviceLinks.cameraDeviceId, cameraDevice.id), + eq(deviceLinks.clientDeviceId, sourceDevice.id), + eq(deviceLinks.status, 'active'), + ), + }); + + if (!link) { + res.status(403).json({ message: 'No active link between requester and camera' }); + return; + } + + const now = new Date(); + + const [session] = await db + .insert(streamSessions) + .values({ + ownerUserId: deviceAuth.userId, + cameraDeviceId: cameraDevice.id, + requesterDeviceId: sourceDevice.id, + status: 'requested', + reason: parsed.data.reason, + metadata: parsed.data.metadata ?? null, + updatedAt: now, + }) + .returning(); + + if (!session) { + res.status(500).json({ message: 'Failed creating stream session' }); + return; + } + + const [command] = await db + .insert(deviceCommands) + .values({ + ownerUserId: deviceAuth.userId, + sourceDeviceId: sourceDevice.id, + targetDeviceId: cameraDevice.id, + commandType: 'start_stream', + payload: { + streamSessionId: session.id, + reason: session.reason, + }, + status: 'queued', + retryCount: 0, + lastDispatchedAt: now, + updatedAt: now, + }) + .returning(); + + if (!command) { + res.status(500).json({ message: 'Failed creating stream command' }); + return; + } + + await dispatchCommandById(command.id); + + const refreshedCommand = await db.query.deviceCommands.findFirst({ where: eq(deviceCommands.id, command.id) }); + + sendRealtimeToDevice(sourceDevice.id, 'stream:requested', { + streamSessionId: session.id, + cameraDeviceId: cameraDevice.id, + status: session.status, + reason: session.reason, + }); + + res.status(201).json({ + message: 'Stream request sent', + streamSession: session, + command: refreshedCommand ?? command, + }); +}); + +router.post('/:streamSessionId/accept', requireDeviceAuth, async (req, res) => { + const parsedParams = streamParamSchema.safeParse(req.params); + + if (!parsedParams.success) { + res.status(400).json({ message: 'Invalid streamSessionId', errors: parsedParams.error.flatten() }); + return; + } + + const parsed = acceptStreamSchema.safeParse(req.body ?? {}); + + if (!parsed.success) { + res.status(400).json({ message: 'Invalid request body', errors: parsed.error.flatten() }); + return; + } + + const deviceAuth = req.deviceAuth; + + if (!deviceAuth) { + res.status(401).json({ message: 'Unauthorized' }); + return; + } + + const session = await db.query.streamSessions.findFirst({ + where: and( + eq(streamSessions.id, parsedParams.data.streamSessionId), + eq(streamSessions.ownerUserId, deviceAuth.userId), + eq(streamSessions.cameraDeviceId, deviceAuth.deviceId), + ), + }); + + if (!session) { + res.status(404).json({ message: 'Stream session not found for this camera device' }); + return; + } + + if (session.status !== 'requested' && session.status !== 'starting') { + res.status(409).json({ message: `Stream session cannot be accepted from status ${session.status}` }); + return; + } + + const now = new Date(); + const streamKey = parsed.data.streamKey ?? `stream_${session.id}_${randomUUID()}`; + + const [updated] = await db + .update(streamSessions) + .set({ + status: 'streaming', + streamKey, + metadata: parsed.data.metadata ?? session.metadata, + startedAt: now, + updatedAt: now, + }) + .where(eq(streamSessions.id, session.id)) + .returning(); + + if (!updated) { + res.status(500).json({ message: 'Failed to update stream session' }); + return; + } + + sendRealtimeToDevice(session.requesterDeviceId, 'stream:started', { + streamSessionId: updated.id, + cameraDeviceId: updated.cameraDeviceId, + status: updated.status, + startedAt: updated.startedAt, + }); + + res.json({ message: 'Stream accepted', streamSession: updated }); +}); + +router.post('/:streamSessionId/end', requireDeviceAuth, async (req, res) => { + const parsedParams = streamParamSchema.safeParse(req.params); + + if (!parsedParams.success) { + res.status(400).json({ message: 'Invalid streamSessionId', errors: parsedParams.error.flatten() }); + return; + } + + const parsed = endStreamSchema.safeParse(req.body ?? {}); + + if (!parsed.success) { + res.status(400).json({ message: 'Invalid request body', errors: parsed.error.flatten() }); + return; + } + + const deviceAuth = req.deviceAuth; + + if (!deviceAuth) { + res.status(401).json({ message: 'Unauthorized' }); + return; + } + + const session = await db.query.streamSessions.findFirst({ + where: and(eq(streamSessions.id, parsedParams.data.streamSessionId), eq(streamSessions.ownerUserId, deviceAuth.userId)), + }); + + if (!session) { + res.status(404).json({ message: 'Stream session not found' }); + return; + } + + const canEnd = session.cameraDeviceId === deviceAuth.deviceId || session.requesterDeviceId === deviceAuth.deviceId; + + if (!canEnd) { + res.status(403).json({ message: 'Only requester or camera device can end this stream' }); + return; + } + + const now = new Date(); + + const [updated] = await db + .update(streamSessions) + .set({ + status: parsed.data.reason, + endedAt: now, + updatedAt: now, + }) + .where(eq(streamSessions.id, session.id)) + .returning(); + + sendRealtimeToDevice(session.requesterDeviceId, 'stream:ended', { + streamSessionId: session.id, + status: parsed.data.reason, + endedAt: now, + }); + + sendRealtimeToDevice(session.cameraDeviceId, 'stream:ended', { + streamSessionId: session.id, + status: parsed.data.reason, + endedAt: now, + }); + + res.json({ message: 'Stream ended', streamSession: updated }); +}); + +router.get('/:streamSessionId/playback-token', requireDeviceAuth, async (req, res) => { + const parsedParams = streamParamSchema.safeParse(req.params); + + if (!parsedParams.success) { + res.status(400).json({ message: 'Invalid streamSessionId', errors: parsedParams.error.flatten() }); + return; + } + + const deviceAuth = req.deviceAuth; + + if (!deviceAuth) { + res.status(401).json({ message: 'Unauthorized' }); + return; + } + + const session = await db.query.streamSessions.findFirst({ + where: and(eq(streamSessions.id, parsedParams.data.streamSessionId), eq(streamSessions.ownerUserId, deviceAuth.userId)), + }); + + if (!session) { + res.status(404).json({ message: 'Stream session not found' }); + return; + } + + const isRequester = session.requesterDeviceId === deviceAuth.deviceId; + const isCamera = session.cameraDeviceId === deviceAuth.deviceId; + + if (!isRequester && !isCamera) { + res.status(403).json({ message: 'Device cannot request playback token for this stream' }); + return; + } + + if (!session.streamKey || session.status !== 'streaming') { + res.status(409).json({ message: 'Stream is not active yet' }); + return; + } + + const playbackToken = createStreamPlaybackToken({ + sessionId: session.id, + viewerDeviceId: deviceAuth.deviceId, + userId: deviceAuth.userId, + }); + + res.json({ + streamSessionId: session.id, + streamKey: session.streamKey, + status: session.status, + playbackToken, + expiresInSeconds: 60 * 15, + }); +}); + +router.get('/me/list', requireDeviceAuth, async (req, res) => { + const parsed = listSchema.safeParse(req.query); + + if (!parsed.success) { + res.status(400).json({ message: 'Invalid query params', errors: parsed.error.flatten() }); + return; + } + + const deviceAuth = req.deviceAuth; + + if (!deviceAuth) { + res.status(401).json({ message: 'Unauthorized' }); + return; + } + + const sessions = await db.query.streamSessions.findMany({ + where: and( + eq(streamSessions.ownerUserId, deviceAuth.userId), + or(eq(streamSessions.requesterDeviceId, deviceAuth.deviceId), eq(streamSessions.cameraDeviceId, deviceAuth.deviceId)), + ), + orderBy: [desc(streamSessions.createdAt)], + limit: parsed.data.limit, + }); + + const filtered = parsed.data.status ? sessions.filter((session) => session.status === parsed.data.status) : sessions; + + res.json({ count: filtered.length, streamSessions: filtered }); +}); + +export default router; diff --git a/Backend/utils/stream-token.ts b/Backend/utils/stream-token.ts new file mode 100644 index 0000000..7469313 --- /dev/null +++ b/Backend/utils/stream-token.ts @@ -0,0 +1,31 @@ +import { createHmac } from 'crypto'; + +type StreamPlaybackPayload = { + sessionId: string; + viewerDeviceId: string; + userId: string; + exp: number; +}; + +const secret = process.env.BETTER_AUTH_SECRET; + +if (!secret) { + throw new Error('BETTER_AUTH_SECRET is required for stream playback token signing'); +} + +const sign = (data: string): string => createHmac('sha256', secret).update(data).digest('base64url'); + +export const createStreamPlaybackToken = ( + payload: Omit, + ttlSeconds = 60 * 15, +): string => { + const body: StreamPlaybackPayload = { + ...payload, + exp: Math.floor(Date.now() / 1000) + ttlSeconds, + }; + + const encodedPayload = Buffer.from(JSON.stringify(body), 'utf8').toString('base64url'); + const signature = sign(encodedPayload); + + return `${encodedPayload}.${signature}`; +};