133 lines
4.1 KiB
TypeScript
133 lines
4.1 KiB
TypeScript
import { eq } from 'drizzle-orm';
|
|
import { Router } from 'express';
|
|
|
|
import { db } from '../../db/client';
|
|
import { streamSessions } from '../../db/schema';
|
|
import { simpleStreamingEnabled } from '../../media/config';
|
|
import { sfuService } from '../../media/sfu/service';
|
|
import { requireDeviceAuth } from '../../middleware/device-auth';
|
|
import { sendRealtimeToDevice } from '../../realtime/gateway';
|
|
import { createRecordingForStream } from '../../services/recordings';
|
|
import { enqueuePushNotification } from '../../services/push';
|
|
import { createStreamEndedPayload, toSimpleStreamSessionResponse } from '../../streaming/simple';
|
|
import { endStreamSchema, streamParamSchema } from './schemas';
|
|
import { ensureStreamDeviceAuth, getOwnedStreamSession, isStreamParticipant, shouldCreateRecordingPlaceholder } from './shared';
|
|
|
|
const router = Router();
|
|
|
|
router.post('/:streamSessionId/end', requireDeviceAuth, async (req, res) => {
|
|
const parsedParams = streamParamSchema.safeParse(req.params);
|
|
|
|
if (!parsedParams.success) {
|
|
res.status(400).json({ message: 'Invalid streamSessionId', errors: parsedParams.error.flatten() });
|
|
return;
|
|
}
|
|
|
|
const parsed = endStreamSchema.safeParse(req.body ?? {});
|
|
|
|
if (!parsed.success) {
|
|
res.status(400).json({ message: 'Invalid request body', errors: parsed.error.flatten() });
|
|
return;
|
|
}
|
|
|
|
const deviceAuth = ensureStreamDeviceAuth(req, res);
|
|
if (!deviceAuth) return;
|
|
|
|
const session = await getOwnedStreamSession(parsedParams.data.streamSessionId, deviceAuth.userId);
|
|
|
|
if (!session) {
|
|
res.status(404).json({ message: 'Stream session not found' });
|
|
return;
|
|
}
|
|
|
|
if (!isStreamParticipant(session, deviceAuth.deviceId)) {
|
|
res.status(403).json({ message: 'Only requester or camera device can end this stream' });
|
|
return;
|
|
}
|
|
|
|
const now = new Date();
|
|
const nextStatus = simpleStreamingEnabled ? 'ended' : parsed.data.reason;
|
|
const nextMetadata =
|
|
simpleStreamingEnabled && parsed.data.reason !== 'completed'
|
|
? {
|
|
...(session.metadata ?? {}),
|
|
endReason: parsed.data.reason,
|
|
}
|
|
: session.metadata;
|
|
|
|
const [updated] = await db
|
|
.update(streamSessions)
|
|
.set({
|
|
status: nextStatus,
|
|
endedAt: now,
|
|
metadata: nextMetadata,
|
|
updatedAt: now,
|
|
})
|
|
.where(eq(streamSessions.id, session.id))
|
|
.returning();
|
|
|
|
if (!updated) {
|
|
res.status(500).json({ message: 'Failed to update stream session' });
|
|
return;
|
|
}
|
|
|
|
if (sfuService) {
|
|
try {
|
|
await sfuService.endSession(session.id);
|
|
} catch (error) {
|
|
console.error('Failed ending SFU session', error);
|
|
}
|
|
}
|
|
|
|
if (shouldCreateRecordingPlaceholder()) {
|
|
await createRecordingForStream(session.id);
|
|
}
|
|
|
|
const endedPayload = simpleStreamingEnabled
|
|
? createStreamEndedPayload({
|
|
streamSessionId: session.id,
|
|
cameraDeviceId: session.cameraDeviceId,
|
|
requesterDeviceId: session.requesterDeviceId,
|
|
endedAt: now,
|
|
reason: parsed.data.reason,
|
|
})
|
|
: {
|
|
streamSessionId: session.id,
|
|
status: parsed.data.reason,
|
|
endedAt: now,
|
|
};
|
|
|
|
console.info('[stream.end]', {
|
|
streamSessionId: session.id,
|
|
requesterDeviceId: session.requesterDeviceId,
|
|
cameraDeviceId: session.cameraDeviceId,
|
|
reason: parsed.data.reason,
|
|
status: simpleStreamingEnabled ? 'ended' : parsed.data.reason,
|
|
});
|
|
|
|
const deliveredToRequester = sendRealtimeToDevice(session.requesterDeviceId, 'stream:ended', endedPayload);
|
|
const deliveredToCamera = sendRealtimeToDevice(session.cameraDeviceId, 'stream:ended', endedPayload);
|
|
|
|
if (!deliveredToRequester) {
|
|
await enqueuePushNotification({
|
|
ownerUserId: session.ownerUserId,
|
|
recipientDeviceId: session.requesterDeviceId,
|
|
type: 'stream_ended',
|
|
payload: endedPayload,
|
|
});
|
|
}
|
|
|
|
if (!deliveredToCamera) {
|
|
await enqueuePushNotification({
|
|
ownerUserId: session.ownerUserId,
|
|
recipientDeviceId: session.cameraDeviceId,
|
|
type: 'stream_ended',
|
|
payload: endedPayload,
|
|
});
|
|
}
|
|
|
|
res.json({ message: 'Stream ended', streamSession: toSimpleStreamSessionResponse(updated) });
|
|
});
|
|
|
|
export default router;
|