feat(streaming): implement frame relay functionality for real-time video streaming and enhance client stream visibility

This commit is contained in:
2026-02-03 14:20:00 +00:00
parent a2f6a22f97
commit ef74b5ca19
2 changed files with 115 additions and 0 deletions

View File

@@ -26,6 +26,13 @@ const webrtcSignalSchema = z.object({
data: z.record(z.string(), z.unknown()).nullable().optional(),
});
const streamFrameSchema = z.object({
toDeviceId: z.string().uuid(),
streamSessionId: z.string().uuid(),
frame: z.string().min(32),
capturedAt: z.string().optional(),
});
const roomForDevice = (deviceId: string): string => `device:${deviceId}`;
let io: SocketIOServer | null = null;
@@ -233,6 +240,7 @@ export const setupRealtimeGateway = (server: HttpServer): SocketIOServer => {
io.on('connection', async (socket) => {
const auth = socket.data.deviceAuth as { userId: string; deviceId: string; role: 'camera' | 'client' };
const deviceRoom = roomForDevice(auth.deviceId);
const verifiedRelayTargets = new Set<string>();
socket.join(deviceRoom);
await markDevicePresence(auth.deviceId, 'online');
@@ -321,6 +329,38 @@ export const setupRealtimeGateway = (server: HttpServer): SocketIOServer => {
});
});
socket.on('stream:frame', async (input) => {
const parsed = streamFrameSchema.safeParse(input);
if (!parsed.success) {
socket.emit('error:stream_frame', {
message: 'Invalid stream frame payload',
errors: parsed.error.flatten(),
});
return;
}
if (!verifiedRelayTargets.has(parsed.data.toDeviceId)) {
const targetDevice = await db.query.devices.findFirst({
where: and(eq(devices.id, parsed.data.toDeviceId), eq(devices.userId, auth.userId)),
});
if (!targetDevice) {
socket.emit('error:stream_frame', { message: 'Target device not found for this account' });
return;
}
verifiedRelayTargets.add(parsed.data.toDeviceId);
}
io?.to(roomForDevice(parsed.data.toDeviceId)).emit('stream:frame', {
fromDeviceId: auth.deviceId,
streamSessionId: parsed.data.streamSessionId,
frame: parsed.data.frame,
capturedAt: parsed.data.capturedAt ?? new Date().toISOString(),
});
});
socket.on('disconnect', async () => {
// Small delay allows fast reconnects to reuse presence without flapping.
setTimeout(async () => {