323 lines
8.8 KiB
TypeScript
323 lines
8.8 KiB
TypeScript
import { and, eq, lt } from 'drizzle-orm';
|
|
import { Server as HttpServer } from 'http';
|
|
import { Server as SocketIOServer } from 'socket.io';
|
|
import { z } from 'zod';
|
|
|
|
import { db } from '../db/client';
|
|
import { deviceCommands, devices } from '../db/schema';
|
|
import { hasRequiredTables } from '../utils/db-schema';
|
|
import { verifyDeviceToken } from '../utils/device-token';
|
|
|
|
const HEARTBEAT_INTERVAL_MS = 15_000;
|
|
const RETRY_INTERVAL_MS = 5_000;
|
|
const RETRY_DISPATCH_DELAY_MS = 10_000;
|
|
const MAX_RETRIES = 3;
|
|
|
|
const commandAckSchema = z.object({
|
|
commandId: z.string().uuid(),
|
|
status: z.enum(['acknowledged', 'rejected']),
|
|
error: z.string().optional(),
|
|
});
|
|
|
|
const roomForDevice = (deviceId: string): string => `device:${deviceId}`;
|
|
|
|
let io: SocketIOServer | null = null;
|
|
let retryTimer: ReturnType<typeof setInterval> | null = null;
|
|
|
|
const countSocketsForDevice = (deviceId: string): number => {
|
|
if (!io) {
|
|
return 0;
|
|
}
|
|
|
|
return io.sockets.adapter.rooms.get(roomForDevice(deviceId))?.size ?? 0;
|
|
};
|
|
|
|
export const isDeviceOnline = (deviceId: string): boolean => countSocketsForDevice(deviceId) > 0;
|
|
|
|
export const sendRealtimeToDevice = (
|
|
deviceId: string,
|
|
eventName: string,
|
|
payload: Record<string, unknown>,
|
|
): boolean => {
|
|
if (!io || !isDeviceOnline(deviceId)) {
|
|
return false;
|
|
}
|
|
|
|
io.to(roomForDevice(deviceId)).emit(eventName, payload);
|
|
return true;
|
|
};
|
|
|
|
const markDevicePresence = async (deviceId: string, status: 'online' | 'offline') => {
|
|
const now = new Date();
|
|
|
|
await db
|
|
.update(devices)
|
|
.set({
|
|
status,
|
|
lastSeenAt: now,
|
|
updatedAt: now,
|
|
})
|
|
.where(eq(devices.id, deviceId));
|
|
};
|
|
|
|
const emitCommand = (command: {
|
|
id: string;
|
|
sourceDeviceId: string;
|
|
targetDeviceId: string;
|
|
commandType: string;
|
|
payload: Record<string, unknown> | null;
|
|
createdAt: Date;
|
|
}): boolean => {
|
|
if (!io) {
|
|
return false;
|
|
}
|
|
|
|
if (countSocketsForDevice(command.targetDeviceId) === 0) {
|
|
return false;
|
|
}
|
|
|
|
io.to(roomForDevice(command.targetDeviceId)).emit('command:received', {
|
|
commandId: command.id,
|
|
sourceDeviceId: command.sourceDeviceId,
|
|
commandType: command.commandType,
|
|
payload: command.payload,
|
|
createdAt: command.createdAt,
|
|
});
|
|
|
|
return true;
|
|
};
|
|
|
|
export const dispatchCommandById = async (commandId: string): Promise<void> => {
|
|
const command = await db.query.deviceCommands.findFirst({
|
|
where: eq(deviceCommands.id, commandId),
|
|
});
|
|
|
|
if (!command) {
|
|
return;
|
|
}
|
|
|
|
const now = new Date();
|
|
|
|
const delivered = emitCommand({
|
|
id: command.id,
|
|
sourceDeviceId: command.sourceDeviceId,
|
|
targetDeviceId: command.targetDeviceId,
|
|
commandType: command.commandType,
|
|
payload: command.payload,
|
|
createdAt: command.createdAt,
|
|
});
|
|
|
|
await db
|
|
.update(deviceCommands)
|
|
.set({
|
|
status: delivered ? 'sent' : 'queued',
|
|
lastDispatchedAt: now,
|
|
retryCount: delivered ? command.retryCount : command.retryCount,
|
|
updatedAt: now,
|
|
error: delivered ? null : 'target device offline',
|
|
})
|
|
.where(eq(deviceCommands.id, command.id));
|
|
};
|
|
|
|
const retryPendingCommands = async () => {
|
|
const threshold = new Date(Date.now() - RETRY_DISPATCH_DELAY_MS);
|
|
|
|
const pending = await db.query.deviceCommands.findMany({
|
|
where: and(eq(deviceCommands.status, 'sent'), lt(deviceCommands.lastDispatchedAt, threshold)),
|
|
limit: 100,
|
|
orderBy: (fields, operators) => [operators.asc(fields.createdAt)],
|
|
});
|
|
|
|
for (const command of pending) {
|
|
const now = new Date();
|
|
const nextRetryCount = command.retryCount + 1;
|
|
|
|
if (nextRetryCount > MAX_RETRIES) {
|
|
await db
|
|
.update(deviceCommands)
|
|
.set({
|
|
status: 'failed',
|
|
updatedAt: now,
|
|
error: 'max retries exceeded',
|
|
})
|
|
.where(eq(deviceCommands.id, command.id));
|
|
continue;
|
|
}
|
|
|
|
const delivered = emitCommand({
|
|
id: command.id,
|
|
sourceDeviceId: command.sourceDeviceId,
|
|
targetDeviceId: command.targetDeviceId,
|
|
commandType: command.commandType,
|
|
payload: command.payload,
|
|
createdAt: command.createdAt,
|
|
});
|
|
|
|
await db
|
|
.update(deviceCommands)
|
|
.set({
|
|
status: delivered ? 'sent' : 'queued',
|
|
lastDispatchedAt: now,
|
|
retryCount: nextRetryCount,
|
|
updatedAt: now,
|
|
error: delivered ? null : 'target device offline',
|
|
})
|
|
.where(eq(deviceCommands.id, command.id));
|
|
}
|
|
};
|
|
|
|
export const setupRealtimeGateway = (server: HttpServer): SocketIOServer => {
|
|
if (io) {
|
|
return io;
|
|
}
|
|
|
|
io = new SocketIOServer(server, {
|
|
cors: {
|
|
origin: process.env.BETTER_AUTH_TRUSTED_ORIGINS?.split(',').map((origin) => origin.trim()).filter(Boolean) ?? true,
|
|
credentials: true,
|
|
},
|
|
pingInterval: HEARTBEAT_INTERVAL_MS,
|
|
});
|
|
|
|
io.use(async (socket, next) => {
|
|
try {
|
|
const tokenFromAuth = typeof socket.handshake.auth?.token === 'string' ? socket.handshake.auth.token : null;
|
|
const authorization = socket.handshake.headers.authorization;
|
|
const tokenFromHeader =
|
|
typeof authorization === 'string' && authorization.startsWith('Bearer ')
|
|
? authorization.slice('Bearer '.length).trim()
|
|
: null;
|
|
|
|
const token = tokenFromAuth ?? tokenFromHeader;
|
|
|
|
if (!token) {
|
|
next(new Error('Missing device token'));
|
|
return;
|
|
}
|
|
|
|
const payload = verifyDeviceToken(token);
|
|
|
|
if (!payload) {
|
|
next(new Error('Invalid device token'));
|
|
return;
|
|
}
|
|
|
|
const device = await db.query.devices.findFirst({
|
|
where: and(eq(devices.id, payload.deviceId), eq(devices.userId, payload.userId)),
|
|
});
|
|
|
|
if (!device) {
|
|
next(new Error('Device not found'));
|
|
return;
|
|
}
|
|
|
|
if (device.role !== payload.role) {
|
|
next(new Error('Token role does not match device role'));
|
|
return;
|
|
}
|
|
|
|
socket.data.deviceAuth = payload;
|
|
next();
|
|
} catch (error) {
|
|
next(error instanceof Error ? error : new Error('Authentication failed'));
|
|
}
|
|
});
|
|
|
|
io.on('connection', async (socket) => {
|
|
const auth = socket.data.deviceAuth as { userId: string; deviceId: string; role: 'camera' | 'client' };
|
|
const deviceRoom = roomForDevice(auth.deviceId);
|
|
|
|
socket.join(deviceRoom);
|
|
await markDevicePresence(auth.deviceId, 'online');
|
|
|
|
socket.emit('connected', {
|
|
deviceId: auth.deviceId,
|
|
role: auth.role,
|
|
serverTime: new Date().toISOString(),
|
|
});
|
|
|
|
socket.on('heartbeat', async () => {
|
|
await markDevicePresence(auth.deviceId, 'online');
|
|
socket.emit('heartbeat:ack', { at: new Date().toISOString() });
|
|
});
|
|
|
|
socket.on('command:ack', async (input) => {
|
|
const parsed = commandAckSchema.safeParse(input);
|
|
|
|
if (!parsed.success) {
|
|
socket.emit('error:command_ack', {
|
|
message: 'Invalid command ack payload',
|
|
errors: parsed.error.flatten(),
|
|
});
|
|
return;
|
|
}
|
|
|
|
const command = await db.query.deviceCommands.findFirst({
|
|
where: eq(deviceCommands.id, parsed.data.commandId),
|
|
});
|
|
|
|
if (!command) {
|
|
socket.emit('error:command_ack', { message: 'Command not found' });
|
|
return;
|
|
}
|
|
|
|
if (command.targetDeviceId !== auth.deviceId) {
|
|
socket.emit('error:command_ack', { message: 'Not authorized to ack this command' });
|
|
return;
|
|
}
|
|
|
|
const now = new Date();
|
|
|
|
await db
|
|
.update(deviceCommands)
|
|
.set({
|
|
status: parsed.data.status,
|
|
acknowledgedAt: now,
|
|
updatedAt: now,
|
|
error: parsed.data.status === 'rejected' ? parsed.data.error ?? 'rejected' : null,
|
|
})
|
|
.where(eq(deviceCommands.id, command.id));
|
|
|
|
io?.to(roomForDevice(command.sourceDeviceId)).emit('command:status', {
|
|
commandId: command.id,
|
|
status: parsed.data.status,
|
|
acknowledgedAt: now.toISOString(),
|
|
error: parsed.data.error,
|
|
});
|
|
});
|
|
|
|
socket.on('disconnect', async () => {
|
|
// Small delay allows fast reconnects to reuse presence without flapping.
|
|
setTimeout(async () => {
|
|
if (countSocketsForDevice(auth.deviceId) === 0) {
|
|
await markDevicePresence(auth.deviceId, 'offline');
|
|
}
|
|
}, 500);
|
|
});
|
|
});
|
|
|
|
if (!retryTimer) {
|
|
const requiredTables = ['device_commands'];
|
|
|
|
void (async () => {
|
|
const ready = await hasRequiredTables(requiredTables);
|
|
if (!ready) {
|
|
console.warn(
|
|
`[command retry] skipped startup because required tables are missing (${requiredTables.join(', ')}). Run migrations and restart.`,
|
|
);
|
|
return;
|
|
}
|
|
|
|
retryTimer = setInterval(() => {
|
|
retryPendingCommands().catch((error) => {
|
|
console.error('Failed retrying pending commands', error);
|
|
});
|
|
}, RETRY_INTERVAL_MS);
|
|
})().catch((error) => {
|
|
console.error('Failed initializing command retry worker', error);
|
|
});
|
|
}
|
|
|
|
return io;
|
|
};
|