feat(streams): add on-demand live request accept end and playback token APIs
This commit is contained in:
@@ -11,6 +11,7 @@ import devicesRoutes from './routes/devices';
|
|||||||
import deviceLinksRoutes from './routes/device-links';
|
import deviceLinksRoutes from './routes/device-links';
|
||||||
import commandsRoutes from './routes/commands';
|
import commandsRoutes from './routes/commands';
|
||||||
import eventsRoutes from './routes/events';
|
import eventsRoutes from './routes/events';
|
||||||
|
import streamsRoutes from './routes/streams';
|
||||||
import { setupRealtimeGateway } from './realtime/gateway';
|
import { setupRealtimeGateway } from './realtime/gateway';
|
||||||
import { ensureMinioBucket } from './utils/minio';
|
import { ensureMinioBucket } from './utils/minio';
|
||||||
|
|
||||||
@@ -36,6 +37,7 @@ app.use('/devices', devicesRoutes);
|
|||||||
app.use('/device-links', deviceLinksRoutes);
|
app.use('/device-links', deviceLinksRoutes);
|
||||||
app.use('/commands', commandsRoutes);
|
app.use('/commands', commandsRoutes);
|
||||||
app.use('/events', eventsRoutes);
|
app.use('/events', eventsRoutes);
|
||||||
|
app.use('/streams', streamsRoutes);
|
||||||
|
|
||||||
app.use((err: unknown, _req: express.Request, res: express.Response, _next: express.NextFunction) => {
|
app.use((err: unknown, _req: express.Request, res: express.Response, _next: express.NextFunction) => {
|
||||||
console.error(err);
|
console.error(err);
|
||||||
|
|||||||
369
Backend/routes/streams.ts
Normal file
369
Backend/routes/streams.ts
Normal file
@@ -0,0 +1,369 @@
|
|||||||
|
import { randomUUID } from 'crypto';
|
||||||
|
|
||||||
|
import { and, desc, eq, or } from 'drizzle-orm';
|
||||||
|
import { Router } from 'express';
|
||||||
|
import { z } from 'zod';
|
||||||
|
|
||||||
|
import { db } from '../db/client';
|
||||||
|
import { deviceCommands, deviceLinks, devices, streamSessions } from '../db/schema';
|
||||||
|
import { requireDeviceAuth } from '../middleware/device-auth';
|
||||||
|
import { dispatchCommandById, sendRealtimeToDevice } from '../realtime/gateway';
|
||||||
|
import { createStreamPlaybackToken } from '../utils/stream-token';
|
||||||
|
|
||||||
|
const router = Router();
|
||||||
|
|
||||||
|
const requestStreamSchema = z.object({
|
||||||
|
cameraDeviceId: z.string().uuid(),
|
||||||
|
reason: z.enum(['on_demand', 'motion_follow_up']).default('on_demand'),
|
||||||
|
metadata: z.record(z.string(), z.unknown()).optional(),
|
||||||
|
});
|
||||||
|
|
||||||
|
const acceptStreamSchema = z.object({
|
||||||
|
streamKey: z.string().trim().min(1).max(255).optional(),
|
||||||
|
metadata: z.record(z.string(), z.unknown()).optional(),
|
||||||
|
});
|
||||||
|
|
||||||
|
const endStreamSchema = z.object({
|
||||||
|
reason: z.enum(['completed', 'cancelled', 'failed']).default('completed'),
|
||||||
|
});
|
||||||
|
|
||||||
|
const streamParamSchema = z.object({
|
||||||
|
streamSessionId: z.string().uuid(),
|
||||||
|
});
|
||||||
|
|
||||||
|
const listSchema = z.object({
|
||||||
|
status: z.string().optional(),
|
||||||
|
limit: z.coerce.number().int().min(1).max(100).default(25),
|
||||||
|
});
|
||||||
|
|
||||||
|
router.post('/request', requireDeviceAuth, async (req, res) => {
|
||||||
|
const parsed = requestStreamSchema.safeParse(req.body ?? {});
|
||||||
|
|
||||||
|
if (!parsed.success) {
|
||||||
|
res.status(400).json({ message: 'Invalid request body', errors: parsed.error.flatten() });
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
const deviceAuth = req.deviceAuth;
|
||||||
|
|
||||||
|
if (!deviceAuth) {
|
||||||
|
res.status(401).json({ message: 'Unauthorized' });
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
const [sourceDevice, cameraDevice] = await Promise.all([
|
||||||
|
db.query.devices.findFirst({
|
||||||
|
where: and(eq(devices.id, deviceAuth.deviceId), eq(devices.userId, deviceAuth.userId)),
|
||||||
|
}),
|
||||||
|
db.query.devices.findFirst({
|
||||||
|
where: and(eq(devices.id, parsed.data.cameraDeviceId), eq(devices.userId, deviceAuth.userId)),
|
||||||
|
}),
|
||||||
|
]);
|
||||||
|
|
||||||
|
if (!sourceDevice || !cameraDevice) {
|
||||||
|
res.status(404).json({ message: 'Source or camera device not found' });
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (sourceDevice.role !== 'client') {
|
||||||
|
res.status(403).json({ message: 'Only client devices can request on-demand stream sessions' });
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (cameraDevice.role !== 'camera') {
|
||||||
|
res.status(400).json({ message: 'cameraDeviceId must point to a camera device' });
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
const link = await db.query.deviceLinks.findFirst({
|
||||||
|
where: and(
|
||||||
|
eq(deviceLinks.ownerUserId, deviceAuth.userId),
|
||||||
|
eq(deviceLinks.cameraDeviceId, cameraDevice.id),
|
||||||
|
eq(deviceLinks.clientDeviceId, sourceDevice.id),
|
||||||
|
eq(deviceLinks.status, 'active'),
|
||||||
|
),
|
||||||
|
});
|
||||||
|
|
||||||
|
if (!link) {
|
||||||
|
res.status(403).json({ message: 'No active link between requester and camera' });
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
const now = new Date();
|
||||||
|
|
||||||
|
const [session] = await db
|
||||||
|
.insert(streamSessions)
|
||||||
|
.values({
|
||||||
|
ownerUserId: deviceAuth.userId,
|
||||||
|
cameraDeviceId: cameraDevice.id,
|
||||||
|
requesterDeviceId: sourceDevice.id,
|
||||||
|
status: 'requested',
|
||||||
|
reason: parsed.data.reason,
|
||||||
|
metadata: parsed.data.metadata ?? null,
|
||||||
|
updatedAt: now,
|
||||||
|
})
|
||||||
|
.returning();
|
||||||
|
|
||||||
|
if (!session) {
|
||||||
|
res.status(500).json({ message: 'Failed creating stream session' });
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
const [command] = await db
|
||||||
|
.insert(deviceCommands)
|
||||||
|
.values({
|
||||||
|
ownerUserId: deviceAuth.userId,
|
||||||
|
sourceDeviceId: sourceDevice.id,
|
||||||
|
targetDeviceId: cameraDevice.id,
|
||||||
|
commandType: 'start_stream',
|
||||||
|
payload: {
|
||||||
|
streamSessionId: session.id,
|
||||||
|
reason: session.reason,
|
||||||
|
},
|
||||||
|
status: 'queued',
|
||||||
|
retryCount: 0,
|
||||||
|
lastDispatchedAt: now,
|
||||||
|
updatedAt: now,
|
||||||
|
})
|
||||||
|
.returning();
|
||||||
|
|
||||||
|
if (!command) {
|
||||||
|
res.status(500).json({ message: 'Failed creating stream command' });
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
await dispatchCommandById(command.id);
|
||||||
|
|
||||||
|
const refreshedCommand = await db.query.deviceCommands.findFirst({ where: eq(deviceCommands.id, command.id) });
|
||||||
|
|
||||||
|
sendRealtimeToDevice(sourceDevice.id, 'stream:requested', {
|
||||||
|
streamSessionId: session.id,
|
||||||
|
cameraDeviceId: cameraDevice.id,
|
||||||
|
status: session.status,
|
||||||
|
reason: session.reason,
|
||||||
|
});
|
||||||
|
|
||||||
|
res.status(201).json({
|
||||||
|
message: 'Stream request sent',
|
||||||
|
streamSession: session,
|
||||||
|
command: refreshedCommand ?? command,
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
router.post('/:streamSessionId/accept', requireDeviceAuth, async (req, res) => {
|
||||||
|
const parsedParams = streamParamSchema.safeParse(req.params);
|
||||||
|
|
||||||
|
if (!parsedParams.success) {
|
||||||
|
res.status(400).json({ message: 'Invalid streamSessionId', errors: parsedParams.error.flatten() });
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
const parsed = acceptStreamSchema.safeParse(req.body ?? {});
|
||||||
|
|
||||||
|
if (!parsed.success) {
|
||||||
|
res.status(400).json({ message: 'Invalid request body', errors: parsed.error.flatten() });
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
const deviceAuth = req.deviceAuth;
|
||||||
|
|
||||||
|
if (!deviceAuth) {
|
||||||
|
res.status(401).json({ message: 'Unauthorized' });
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
const session = await db.query.streamSessions.findFirst({
|
||||||
|
where: and(
|
||||||
|
eq(streamSessions.id, parsedParams.data.streamSessionId),
|
||||||
|
eq(streamSessions.ownerUserId, deviceAuth.userId),
|
||||||
|
eq(streamSessions.cameraDeviceId, deviceAuth.deviceId),
|
||||||
|
),
|
||||||
|
});
|
||||||
|
|
||||||
|
if (!session) {
|
||||||
|
res.status(404).json({ message: 'Stream session not found for this camera device' });
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (session.status !== 'requested' && session.status !== 'starting') {
|
||||||
|
res.status(409).json({ message: `Stream session cannot be accepted from status ${session.status}` });
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
const now = new Date();
|
||||||
|
const streamKey = parsed.data.streamKey ?? `stream_${session.id}_${randomUUID()}`;
|
||||||
|
|
||||||
|
const [updated] = await db
|
||||||
|
.update(streamSessions)
|
||||||
|
.set({
|
||||||
|
status: 'streaming',
|
||||||
|
streamKey,
|
||||||
|
metadata: parsed.data.metadata ?? session.metadata,
|
||||||
|
startedAt: now,
|
||||||
|
updatedAt: now,
|
||||||
|
})
|
||||||
|
.where(eq(streamSessions.id, session.id))
|
||||||
|
.returning();
|
||||||
|
|
||||||
|
if (!updated) {
|
||||||
|
res.status(500).json({ message: 'Failed to update stream session' });
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
sendRealtimeToDevice(session.requesterDeviceId, 'stream:started', {
|
||||||
|
streamSessionId: updated.id,
|
||||||
|
cameraDeviceId: updated.cameraDeviceId,
|
||||||
|
status: updated.status,
|
||||||
|
startedAt: updated.startedAt,
|
||||||
|
});
|
||||||
|
|
||||||
|
res.json({ message: 'Stream accepted', streamSession: updated });
|
||||||
|
});
|
||||||
|
|
||||||
|
router.post('/:streamSessionId/end', requireDeviceAuth, async (req, res) => {
|
||||||
|
const parsedParams = streamParamSchema.safeParse(req.params);
|
||||||
|
|
||||||
|
if (!parsedParams.success) {
|
||||||
|
res.status(400).json({ message: 'Invalid streamSessionId', errors: parsedParams.error.flatten() });
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
const parsed = endStreamSchema.safeParse(req.body ?? {});
|
||||||
|
|
||||||
|
if (!parsed.success) {
|
||||||
|
res.status(400).json({ message: 'Invalid request body', errors: parsed.error.flatten() });
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
const deviceAuth = req.deviceAuth;
|
||||||
|
|
||||||
|
if (!deviceAuth) {
|
||||||
|
res.status(401).json({ message: 'Unauthorized' });
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
const session = await db.query.streamSessions.findFirst({
|
||||||
|
where: and(eq(streamSessions.id, parsedParams.data.streamSessionId), eq(streamSessions.ownerUserId, deviceAuth.userId)),
|
||||||
|
});
|
||||||
|
|
||||||
|
if (!session) {
|
||||||
|
res.status(404).json({ message: 'Stream session not found' });
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
const canEnd = session.cameraDeviceId === deviceAuth.deviceId || session.requesterDeviceId === deviceAuth.deviceId;
|
||||||
|
|
||||||
|
if (!canEnd) {
|
||||||
|
res.status(403).json({ message: 'Only requester or camera device can end this stream' });
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
const now = new Date();
|
||||||
|
|
||||||
|
const [updated] = await db
|
||||||
|
.update(streamSessions)
|
||||||
|
.set({
|
||||||
|
status: parsed.data.reason,
|
||||||
|
endedAt: now,
|
||||||
|
updatedAt: now,
|
||||||
|
})
|
||||||
|
.where(eq(streamSessions.id, session.id))
|
||||||
|
.returning();
|
||||||
|
|
||||||
|
sendRealtimeToDevice(session.requesterDeviceId, 'stream:ended', {
|
||||||
|
streamSessionId: session.id,
|
||||||
|
status: parsed.data.reason,
|
||||||
|
endedAt: now,
|
||||||
|
});
|
||||||
|
|
||||||
|
sendRealtimeToDevice(session.cameraDeviceId, 'stream:ended', {
|
||||||
|
streamSessionId: session.id,
|
||||||
|
status: parsed.data.reason,
|
||||||
|
endedAt: now,
|
||||||
|
});
|
||||||
|
|
||||||
|
res.json({ message: 'Stream ended', streamSession: updated });
|
||||||
|
});
|
||||||
|
|
||||||
|
router.get('/:streamSessionId/playback-token', requireDeviceAuth, async (req, res) => {
|
||||||
|
const parsedParams = streamParamSchema.safeParse(req.params);
|
||||||
|
|
||||||
|
if (!parsedParams.success) {
|
||||||
|
res.status(400).json({ message: 'Invalid streamSessionId', errors: parsedParams.error.flatten() });
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
const deviceAuth = req.deviceAuth;
|
||||||
|
|
||||||
|
if (!deviceAuth) {
|
||||||
|
res.status(401).json({ message: 'Unauthorized' });
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
const session = await db.query.streamSessions.findFirst({
|
||||||
|
where: and(eq(streamSessions.id, parsedParams.data.streamSessionId), eq(streamSessions.ownerUserId, deviceAuth.userId)),
|
||||||
|
});
|
||||||
|
|
||||||
|
if (!session) {
|
||||||
|
res.status(404).json({ message: 'Stream session not found' });
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
const isRequester = session.requesterDeviceId === deviceAuth.deviceId;
|
||||||
|
const isCamera = session.cameraDeviceId === deviceAuth.deviceId;
|
||||||
|
|
||||||
|
if (!isRequester && !isCamera) {
|
||||||
|
res.status(403).json({ message: 'Device cannot request playback token for this stream' });
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!session.streamKey || session.status !== 'streaming') {
|
||||||
|
res.status(409).json({ message: 'Stream is not active yet' });
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
const playbackToken = createStreamPlaybackToken({
|
||||||
|
sessionId: session.id,
|
||||||
|
viewerDeviceId: deviceAuth.deviceId,
|
||||||
|
userId: deviceAuth.userId,
|
||||||
|
});
|
||||||
|
|
||||||
|
res.json({
|
||||||
|
streamSessionId: session.id,
|
||||||
|
streamKey: session.streamKey,
|
||||||
|
status: session.status,
|
||||||
|
playbackToken,
|
||||||
|
expiresInSeconds: 60 * 15,
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
router.get('/me/list', requireDeviceAuth, async (req, res) => {
|
||||||
|
const parsed = listSchema.safeParse(req.query);
|
||||||
|
|
||||||
|
if (!parsed.success) {
|
||||||
|
res.status(400).json({ message: 'Invalid query params', errors: parsed.error.flatten() });
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
const deviceAuth = req.deviceAuth;
|
||||||
|
|
||||||
|
if (!deviceAuth) {
|
||||||
|
res.status(401).json({ message: 'Unauthorized' });
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
const sessions = await db.query.streamSessions.findMany({
|
||||||
|
where: and(
|
||||||
|
eq(streamSessions.ownerUserId, deviceAuth.userId),
|
||||||
|
or(eq(streamSessions.requesterDeviceId, deviceAuth.deviceId), eq(streamSessions.cameraDeviceId, deviceAuth.deviceId)),
|
||||||
|
),
|
||||||
|
orderBy: [desc(streamSessions.createdAt)],
|
||||||
|
limit: parsed.data.limit,
|
||||||
|
});
|
||||||
|
|
||||||
|
const filtered = parsed.data.status ? sessions.filter((session) => session.status === parsed.data.status) : sessions;
|
||||||
|
|
||||||
|
res.json({ count: filtered.length, streamSessions: filtered });
|
||||||
|
});
|
||||||
|
|
||||||
|
export default router;
|
||||||
31
Backend/utils/stream-token.ts
Normal file
31
Backend/utils/stream-token.ts
Normal file
@@ -0,0 +1,31 @@
|
|||||||
|
import { createHmac } from 'crypto';
|
||||||
|
|
||||||
|
type StreamPlaybackPayload = {
|
||||||
|
sessionId: string;
|
||||||
|
viewerDeviceId: string;
|
||||||
|
userId: string;
|
||||||
|
exp: number;
|
||||||
|
};
|
||||||
|
|
||||||
|
const secret = process.env.BETTER_AUTH_SECRET;
|
||||||
|
|
||||||
|
if (!secret) {
|
||||||
|
throw new Error('BETTER_AUTH_SECRET is required for stream playback token signing');
|
||||||
|
}
|
||||||
|
|
||||||
|
const sign = (data: string): string => createHmac('sha256', secret).update(data).digest('base64url');
|
||||||
|
|
||||||
|
export const createStreamPlaybackToken = (
|
||||||
|
payload: Omit<StreamPlaybackPayload, 'exp'>,
|
||||||
|
ttlSeconds = 60 * 15,
|
||||||
|
): string => {
|
||||||
|
const body: StreamPlaybackPayload = {
|
||||||
|
...payload,
|
||||||
|
exp: Math.floor(Date.now() / 1000) + ttlSeconds,
|
||||||
|
};
|
||||||
|
|
||||||
|
const encodedPayload = Buffer.from(JSON.stringify(body), 'utf8').toString('base64url');
|
||||||
|
const signature = sign(encodedPayload);
|
||||||
|
|
||||||
|
return `${encodedPayload}.${signature}`;
|
||||||
|
};
|
||||||
Reference in New Issue
Block a user