136 lines
4.7 KiB
TypeScript
136 lines
4.7 KiB
TypeScript
import { randomUUID } from 'crypto';
|
|
import { eq } from 'drizzle-orm';
|
|
import { Router } from 'express';
|
|
|
|
import { db } from '../../db/client';
|
|
import { streamSessions } from '../../db/schema';
|
|
import { createLiveMediaSession } from '../../media/service';
|
|
import { sfuService } from '../../media/sfu/service';
|
|
import { requireDeviceAuth } from '../../middleware/device-auth';
|
|
import { sendRealtimeToDevice } from '../../realtime/gateway';
|
|
import { createRecordingForStream } from '../../services/recordings';
|
|
import { writeAuditLog } from '../../services/audit';
|
|
import { enqueuePushNotification } from '../../services/push';
|
|
import { createStreamStartedPayload, toSimpleStreamSessionResponse } from '../../streaming/simple';
|
|
import { acceptStreamSchema, streamParamSchema } from './schemas';
|
|
import { ensureStreamDeviceAuth, shouldCreateRecordingPlaceholder } from './shared';
|
|
|
|
const router = Router();
|
|
|
|
router.post('/:streamSessionId/accept', requireDeviceAuth, async (req, res) => {
|
|
const parsedParams = streamParamSchema.safeParse(req.params);
|
|
|
|
if (!parsedParams.success) {
|
|
res.status(400).json({ message: 'Invalid streamSessionId', errors: parsedParams.error.flatten() });
|
|
return;
|
|
}
|
|
|
|
const parsed = acceptStreamSchema.safeParse(req.body ?? {});
|
|
|
|
if (!parsed.success) {
|
|
res.status(400).json({ message: 'Invalid request body', errors: parsed.error.flatten() });
|
|
return;
|
|
}
|
|
|
|
const deviceAuth = ensureStreamDeviceAuth(req, res);
|
|
if (!deviceAuth) return;
|
|
|
|
const session = await db.query.streamSessions.findFirst({
|
|
where: eq(streamSessions.id, parsedParams.data.streamSessionId),
|
|
});
|
|
|
|
if (!session || session.ownerUserId !== deviceAuth.userId || session.cameraDeviceId !== deviceAuth.deviceId) {
|
|
res.status(404).json({ message: 'Stream session not found for this camera device' });
|
|
return;
|
|
}
|
|
|
|
if (session.status !== 'requested' && session.status !== 'starting') {
|
|
res.status(409).json({ message: `Stream session cannot be accepted from status ${session.status}` });
|
|
return;
|
|
}
|
|
|
|
const now = new Date();
|
|
const streamKey = parsed.data.streamKey ?? `stream_${session.id}_${randomUUID()}`;
|
|
const mediaSession = await createLiveMediaSession({
|
|
streamSessionId: session.id,
|
|
ownerUserId: session.ownerUserId,
|
|
cameraDeviceId: session.cameraDeviceId,
|
|
requesterDeviceId: session.requesterDeviceId,
|
|
});
|
|
|
|
const [updated] = await db
|
|
.update(streamSessions)
|
|
.set({
|
|
status: 'streaming',
|
|
streamKey: mediaSession ? streamKey : null,
|
|
mediaProvider: mediaSession?.provider ?? 'simple',
|
|
mediaSessionId: mediaSession?.mediaSessionId ?? null,
|
|
publishEndpoint: mediaSession?.publishUrl ?? null,
|
|
subscribeEndpoint: mediaSession?.subscribeUrl ?? null,
|
|
metadata: parsed.data.metadata ?? session.metadata,
|
|
startedAt: now,
|
|
updatedAt: now,
|
|
})
|
|
.where(eq(streamSessions.id, session.id))
|
|
.returning();
|
|
|
|
if (!updated) {
|
|
res.status(500).json({ message: 'Failed to update stream session' });
|
|
return;
|
|
}
|
|
|
|
if (shouldCreateRecordingPlaceholder()) {
|
|
await createRecordingForStream(updated.id);
|
|
}
|
|
|
|
if (sfuService) {
|
|
try {
|
|
await sfuService.startSession({
|
|
streamSessionId: updated.id,
|
|
ownerUserId: updated.ownerUserId,
|
|
cameraDeviceId: updated.cameraDeviceId,
|
|
requesterDeviceId: updated.requesterDeviceId,
|
|
});
|
|
await sfuService.setSessionState(updated.id, 'live');
|
|
} catch (error) {
|
|
console.error('Failed starting SFU session', error);
|
|
res.status(500).json({ message: 'Failed to initialize SFU session' });
|
|
return;
|
|
}
|
|
}
|
|
|
|
const startedPayload = createStreamStartedPayload(updated);
|
|
console.info('[stream.accept]', {
|
|
streamSessionId: updated.id,
|
|
requesterDeviceId: updated.requesterDeviceId,
|
|
cameraDeviceId: updated.cameraDeviceId,
|
|
mode: mediaSession ? 'legacy' : 'simple',
|
|
});
|
|
const deliveredToRequester = sendRealtimeToDevice(session.requesterDeviceId, 'stream:started', startedPayload);
|
|
|
|
if (!deliveredToRequester) {
|
|
await enqueuePushNotification({
|
|
ownerUserId: session.ownerUserId,
|
|
recipientDeviceId: session.requesterDeviceId,
|
|
type: 'stream_started',
|
|
payload: startedPayload,
|
|
});
|
|
}
|
|
|
|
res.json({ message: 'Stream accepted', streamSession: toSimpleStreamSessionResponse(updated) });
|
|
|
|
await writeAuditLog({
|
|
ownerUserId: session.ownerUserId,
|
|
actorDeviceId: session.cameraDeviceId,
|
|
action: 'stream.accepted',
|
|
targetType: 'stream_session',
|
|
targetId: session.id,
|
|
metadata: mediaSession
|
|
? { mediaSessionId: updated.mediaSessionId, mediaProvider: updated.mediaProvider }
|
|
: { transport: 'webrtc' },
|
|
ipAddress: req.ip,
|
|
});
|
|
});
|
|
|
|
export default router;
|