feat(push): add phase7 offline push queue, worker, APIs, and simulator inbox
This commit is contained in:
@@ -135,6 +135,21 @@ export const notifications = pgTable('notifications', {
|
|||||||
isRead: boolean('is_read').default(false).notNull(),
|
isRead: boolean('is_read').default(false).notNull(),
|
||||||
});
|
});
|
||||||
|
|
||||||
|
export const pushNotifications = pgTable('push_notifications', {
|
||||||
|
id: uuid('id').defaultRandom().primaryKey(),
|
||||||
|
ownerUserId: uuid('owner_user_id').notNull().references(() => users.id),
|
||||||
|
recipientDeviceId: uuid('recipient_device_id').notNull().references(() => devices.id),
|
||||||
|
type: varchar('type', { length: 64 }).notNull(),
|
||||||
|
payload: jsonb('payload').$type<Record<string, unknown> | null>().default(null),
|
||||||
|
status: varchar('status', { length: 32 }).default('queued').notNull(),
|
||||||
|
attempts: integer('attempts').default(0).notNull(),
|
||||||
|
lastError: text('last_error'),
|
||||||
|
sentAt: timestamp('sent_at', { withTimezone: true }),
|
||||||
|
nextAttemptAt: timestamp('next_attempt_at', { withTimezone: true }).defaultNow().notNull(),
|
||||||
|
createdAt: timestamp('created_at', { withTimezone: true }).defaultNow().notNull(),
|
||||||
|
updatedAt: timestamp('updated_at', { withTimezone: true }).defaultNow().notNull(),
|
||||||
|
});
|
||||||
|
|
||||||
export const accounts = pgTable('account', {
|
export const accounts = pgTable('account', {
|
||||||
id: uuid('id').defaultRandom().primaryKey(),
|
id: uuid('id').defaultRandom().primaryKey(),
|
||||||
userId: uuid('user_id').notNull().references(() => users.id),
|
userId: uuid('user_id').notNull().references(() => users.id),
|
||||||
@@ -181,6 +196,7 @@ export const schema = {
|
|||||||
events,
|
events,
|
||||||
videos,
|
videos,
|
||||||
notifications,
|
notifications,
|
||||||
|
pushNotifications,
|
||||||
accounts,
|
accounts,
|
||||||
sessions,
|
sessions,
|
||||||
verifications,
|
verifications,
|
||||||
|
|||||||
17
Backend/drizzle/0010_push_notifications_queue.sql
Normal file
17
Backend/drizzle/0010_push_notifications_queue.sql
Normal file
@@ -0,0 +1,17 @@
|
|||||||
|
CREATE TABLE "push_notifications" (
|
||||||
|
"id" uuid PRIMARY KEY DEFAULT gen_random_uuid() NOT NULL,
|
||||||
|
"owner_user_id" uuid NOT NULL,
|
||||||
|
"recipient_device_id" uuid NOT NULL,
|
||||||
|
"type" varchar(64) NOT NULL,
|
||||||
|
"payload" jsonb DEFAULT 'null'::jsonb,
|
||||||
|
"status" varchar(32) DEFAULT 'queued' NOT NULL,
|
||||||
|
"attempts" integer DEFAULT 0 NOT NULL,
|
||||||
|
"last_error" text,
|
||||||
|
"sent_at" timestamp with time zone,
|
||||||
|
"next_attempt_at" timestamp with time zone DEFAULT now() NOT NULL,
|
||||||
|
"created_at" timestamp with time zone DEFAULT now() NOT NULL,
|
||||||
|
"updated_at" timestamp with time zone DEFAULT now() NOT NULL
|
||||||
|
);
|
||||||
|
--> statement-breakpoint
|
||||||
|
ALTER TABLE "push_notifications" ADD CONSTRAINT "push_notifications_owner_user_id_users_id_fk" FOREIGN KEY ("owner_user_id") REFERENCES "public"."users"("id") ON DELETE no action ON UPDATE no action;--> statement-breakpoint
|
||||||
|
ALTER TABLE "push_notifications" ADD CONSTRAINT "push_notifications_recipient_device_id_devices_id_fk" FOREIGN KEY ("recipient_device_id") REFERENCES "public"."devices"("id") ON DELETE no action ON UPDATE no action;
|
||||||
@@ -71,6 +71,13 @@
|
|||||||
"when": 1770416956419,
|
"when": 1770416956419,
|
||||||
"tag": "0009_recordings_pipeline",
|
"tag": "0009_recordings_pipeline",
|
||||||
"breakpoints": true
|
"breakpoints": true
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"idx": 10,
|
||||||
|
"version": "7",
|
||||||
|
"when": 1770417956419,
|
||||||
|
"tag": "0010_push_notifications_queue",
|
||||||
|
"breakpoints": true
|
||||||
}
|
}
|
||||||
]
|
]
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -13,9 +13,11 @@ import commandsRoutes from './routes/commands';
|
|||||||
import eventsRoutes from './routes/events';
|
import eventsRoutes from './routes/events';
|
||||||
import streamsRoutes from './routes/streams';
|
import streamsRoutes from './routes/streams';
|
||||||
import recordingsRoutes from './routes/recordings';
|
import recordingsRoutes from './routes/recordings';
|
||||||
|
import pushNotificationsRoutes from './routes/push-notifications';
|
||||||
import { setupRealtimeGateway } from './realtime/gateway';
|
import { setupRealtimeGateway } from './realtime/gateway';
|
||||||
import { ensureMinioBucket } from './utils/minio';
|
import { ensureMinioBucket } from './utils/minio';
|
||||||
import { startRecordingsWorker } from './workers/recordings';
|
import { startRecordingsWorker } from './workers/recordings';
|
||||||
|
import { startPushWorker } from './services/push';
|
||||||
|
|
||||||
const app = express();
|
const app = express();
|
||||||
const openApiDocument = buildOpenApiDocument();
|
const openApiDocument = buildOpenApiDocument();
|
||||||
@@ -42,6 +44,7 @@ app.use('/commands', commandsRoutes);
|
|||||||
app.use('/events', eventsRoutes);
|
app.use('/events', eventsRoutes);
|
||||||
app.use('/streams', streamsRoutes);
|
app.use('/streams', streamsRoutes);
|
||||||
app.use('/recordings', recordingsRoutes);
|
app.use('/recordings', recordingsRoutes);
|
||||||
|
app.use('/push-notifications', pushNotificationsRoutes);
|
||||||
|
|
||||||
app.use((err: unknown, _req: express.Request, res: express.Response, _next: express.NextFunction) => {
|
app.use((err: unknown, _req: express.Request, res: express.Response, _next: express.NextFunction) => {
|
||||||
console.error(err);
|
console.error(err);
|
||||||
@@ -61,6 +64,7 @@ const start = async () => {
|
|||||||
|
|
||||||
setupRealtimeGateway(server);
|
setupRealtimeGateway(server);
|
||||||
startRecordingsWorker();
|
startRecordingsWorker();
|
||||||
|
startPushWorker();
|
||||||
|
|
||||||
server.listen(port, () => {
|
server.listen(port, () => {
|
||||||
console.log(`Server is running on port ${port}`);
|
console.log(`Server is running on port ${port}`);
|
||||||
|
|||||||
@@ -134,6 +134,9 @@
|
|||||||
<option value="camera">camera</option>
|
<option value="camera">camera</option>
|
||||||
</select>
|
</select>
|
||||||
|
|
||||||
|
<label>Push Token (simulated)</label>
|
||||||
|
<input id="pushToken" placeholder="optional push token for offline delivery" />
|
||||||
|
|
||||||
<div class="row">
|
<div class="row">
|
||||||
<button id="registerBtn">Register Device</button>
|
<button id="registerBtn">Register Device</button>
|
||||||
<button id="loadSavedBtn" class="alt">Load Saved</button>
|
<button id="loadSavedBtn" class="alt">Load Saved</button>
|
||||||
@@ -189,6 +192,7 @@
|
|||||||
lastMotionEventId: null,
|
lastMotionEventId: null,
|
||||||
lastStreamSessionId: null,
|
lastStreamSessionId: null,
|
||||||
lastRecordingId: null,
|
lastRecordingId: null,
|
||||||
|
latestPushNotificationId: null,
|
||||||
};
|
};
|
||||||
|
|
||||||
const $ = (id) => document.getElementById(id);
|
const $ = (id) => document.getElementById(id);
|
||||||
@@ -208,7 +212,11 @@
|
|||||||
$('deviceState').textContent = JSON.stringify({ device: state.device, hasToken: Boolean(state.deviceToken) }, null, 2);
|
$('deviceState').textContent = JSON.stringify({ device: state.device, hasToken: Boolean(state.deviceToken) }, null, 2);
|
||||||
$('clientState').textContent = JSON.stringify({ lastStreamSessionId: state.lastStreamSessionId }, null, 2);
|
$('clientState').textContent = JSON.stringify({ lastStreamSessionId: state.lastStreamSessionId }, null, 2);
|
||||||
$('cameraState').textContent = JSON.stringify(
|
$('cameraState').textContent = JSON.stringify(
|
||||||
{ lastMotionEventId: state.lastMotionEventId, lastRecordingId: state.lastRecordingId },
|
{
|
||||||
|
lastMotionEventId: state.lastMotionEventId,
|
||||||
|
lastRecordingId: state.lastRecordingId,
|
||||||
|
latestPushNotificationId: state.latestPushNotificationId,
|
||||||
|
},
|
||||||
null,
|
null,
|
||||||
2,
|
2,
|
||||||
);
|
);
|
||||||
@@ -338,6 +346,7 @@
|
|||||||
name: name || undefined,
|
name: name || undefined,
|
||||||
platform: 'web',
|
platform: 'web',
|
||||||
appVersion: 'sim-1',
|
appVersion: 'sim-1',
|
||||||
|
pushToken: $('pushToken').value.trim() || undefined,
|
||||||
}),
|
}),
|
||||||
});
|
});
|
||||||
|
|
||||||
@@ -361,6 +370,21 @@
|
|||||||
log('loaded saved device', parsed);
|
log('loaded saved device', parsed);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
$('loadSavedBtn').addEventListener('click', async () => {
|
||||||
|
try {
|
||||||
|
if (!state.device?.id) return;
|
||||||
|
const token = $('pushToken').value.trim();
|
||||||
|
if (!token) return;
|
||||||
|
await authFetch(`/devices/${state.device.id}`, {
|
||||||
|
method: 'PATCH',
|
||||||
|
body: JSON.stringify({ pushToken: token }),
|
||||||
|
});
|
||||||
|
log('push token updated', { deviceId: state.device.id });
|
||||||
|
} catch (error) {
|
||||||
|
log('push token update failed', { error: error.message });
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
$('connectBtn').addEventListener('click', () => {
|
$('connectBtn').addEventListener('click', () => {
|
||||||
try {
|
try {
|
||||||
connectSocket();
|
connectSocket();
|
||||||
@@ -534,6 +558,55 @@
|
|||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
const pushPanel = document.createElement('section');
|
||||||
|
pushPanel.className = 'panel';
|
||||||
|
pushPanel.style.marginTop = '16px';
|
||||||
|
pushPanel.innerHTML = `
|
||||||
|
<h2>Push Inbox (Offline Fallback)</h2>
|
||||||
|
<div class="row">
|
||||||
|
<button id="dispatchPushWorkerBtn" class="alt">Dispatch Push Worker</button>
|
||||||
|
<button id="pollPushInboxBtn" class="alt">Poll Push Inbox</button>
|
||||||
|
</div>
|
||||||
|
<button id="markLatestPushReadBtn" class="alt">Mark Latest Push Read</button>
|
||||||
|
`;
|
||||||
|
document.querySelector('.page').appendChild(pushPanel);
|
||||||
|
|
||||||
|
$('dispatchPushWorkerBtn').addEventListener('click', async () => {
|
||||||
|
try {
|
||||||
|
const payload = await deviceFetch('/push-notifications/worker/dispatch', { method: 'POST', body: JSON.stringify({}) });
|
||||||
|
log('push worker dispatch', payload);
|
||||||
|
} catch (error) {
|
||||||
|
log('push worker dispatch failed', { error: error.message });
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
$('pollPushInboxBtn').addEventListener('click', async () => {
|
||||||
|
try {
|
||||||
|
const payload = await deviceFetch('/push-notifications/me');
|
||||||
|
const latest = payload.notifications?.[0];
|
||||||
|
if (latest) {
|
||||||
|
state.latestPushNotificationId = latest.id;
|
||||||
|
}
|
||||||
|
render();
|
||||||
|
log('push inbox', payload);
|
||||||
|
} catch (error) {
|
||||||
|
log('poll push inbox failed', { error: error.message });
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
$('markLatestPushReadBtn').addEventListener('click', async () => {
|
||||||
|
try {
|
||||||
|
if (!state.latestPushNotificationId) throw new Error('No push notification selected');
|
||||||
|
const payload = await deviceFetch(`/push-notifications/${state.latestPushNotificationId}/read`, {
|
||||||
|
method: 'POST',
|
||||||
|
body: JSON.stringify({}),
|
||||||
|
});
|
||||||
|
log('push marked read', payload);
|
||||||
|
} catch (error) {
|
||||||
|
log('mark push read failed', { error: error.message });
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
render();
|
render();
|
||||||
</script>
|
</script>
|
||||||
</body>
|
</body>
|
||||||
|
|||||||
@@ -7,6 +7,7 @@ import { deviceLinks, devices, events, notifications } from '../db/schema';
|
|||||||
import { requireAuth } from '../middleware/auth';
|
import { requireAuth } from '../middleware/auth';
|
||||||
import { requireDeviceAuth } from '../middleware/device-auth';
|
import { requireDeviceAuth } from '../middleware/device-auth';
|
||||||
import { sendRealtimeToDevice } from '../realtime/gateway';
|
import { sendRealtimeToDevice } from '../realtime/gateway';
|
||||||
|
import { enqueuePushNotification } from '../services/push';
|
||||||
|
|
||||||
const router = Router();
|
const router = Router();
|
||||||
|
|
||||||
@@ -105,6 +106,19 @@ router.post('/motion/start', requireDeviceAuth, async (req, res) => {
|
|||||||
isRead: false,
|
isRead: false,
|
||||||
sentAt: now,
|
sentAt: now,
|
||||||
});
|
});
|
||||||
|
|
||||||
|
if (!delivered) {
|
||||||
|
await enqueuePushNotification({
|
||||||
|
ownerUserId: deviceAuth.userId,
|
||||||
|
recipientDeviceId: link.clientDeviceId,
|
||||||
|
type: 'motion_detected',
|
||||||
|
payload: {
|
||||||
|
eventId: event.id,
|
||||||
|
cameraDeviceId: cameraDevice.id,
|
||||||
|
startedAt: event.startedAt.toISOString(),
|
||||||
|
},
|
||||||
|
});
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
res.status(201).json({
|
res.status(201).json({
|
||||||
@@ -172,13 +186,27 @@ router.post('/:eventId/motion/end', requireDeviceAuth, async (req, res) => {
|
|||||||
});
|
});
|
||||||
|
|
||||||
for (const link of activeLinks) {
|
for (const link of activeLinks) {
|
||||||
sendRealtimeToDevice(link.clientDeviceId, 'motion:ended', {
|
const delivered = sendRealtimeToDevice(link.clientDeviceId, 'motion:ended', {
|
||||||
eventId: event.id,
|
eventId: event.id,
|
||||||
cameraDeviceId: deviceAuth.deviceId,
|
cameraDeviceId: deviceAuth.deviceId,
|
||||||
status: parsed.data.status,
|
status: parsed.data.status,
|
||||||
endedAt: now,
|
endedAt: now,
|
||||||
videoUrl: parsed.data.videoUrl ?? event.videoUrl,
|
videoUrl: parsed.data.videoUrl ?? event.videoUrl,
|
||||||
});
|
});
|
||||||
|
|
||||||
|
if (!delivered) {
|
||||||
|
await enqueuePushNotification({
|
||||||
|
ownerUserId: deviceAuth.userId,
|
||||||
|
recipientDeviceId: link.clientDeviceId,
|
||||||
|
type: 'motion_ended',
|
||||||
|
payload: {
|
||||||
|
eventId: event.id,
|
||||||
|
cameraDeviceId: deviceAuth.deviceId,
|
||||||
|
status: parsed.data.status,
|
||||||
|
endedAt: now.toISOString(),
|
||||||
|
},
|
||||||
|
});
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
res.json({ message: 'Motion event ended', event: updated, notifiedClients: activeLinks.length });
|
res.json({ message: 'Motion event ended', event: updated, notifiedClients: activeLinks.length });
|
||||||
|
|||||||
100
Backend/routes/push-notifications.ts
Normal file
100
Backend/routes/push-notifications.ts
Normal file
@@ -0,0 +1,100 @@
|
|||||||
|
import { and, desc, eq } from 'drizzle-orm';
|
||||||
|
import { Router } from 'express';
|
||||||
|
import { z } from 'zod';
|
||||||
|
|
||||||
|
import { db } from '../db/client';
|
||||||
|
import { pushNotifications } from '../db/schema';
|
||||||
|
import { requireDeviceAuth } from '../middleware/device-auth';
|
||||||
|
import { dispatchPushQueueOnce } from '../services/push';
|
||||||
|
|
||||||
|
const router = Router();
|
||||||
|
|
||||||
|
const listSchema = z.object({
|
||||||
|
status: z.string().optional(),
|
||||||
|
limit: z.coerce.number().int().min(1).max(100).default(25),
|
||||||
|
});
|
||||||
|
|
||||||
|
const notificationParamSchema = z.object({
|
||||||
|
notificationId: z.string().uuid(),
|
||||||
|
});
|
||||||
|
|
||||||
|
router.get('/me', requireDeviceAuth, async (req, res) => {
|
||||||
|
const parsed = listSchema.safeParse(req.query);
|
||||||
|
|
||||||
|
if (!parsed.success) {
|
||||||
|
res.status(400).json({ message: 'Invalid query params', errors: parsed.error.flatten() });
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
const deviceAuth = req.deviceAuth;
|
||||||
|
|
||||||
|
if (!deviceAuth) {
|
||||||
|
res.status(401).json({ message: 'Unauthorized' });
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
const result = await db.query.pushNotifications.findMany({
|
||||||
|
where: and(
|
||||||
|
eq(pushNotifications.ownerUserId, deviceAuth.userId),
|
||||||
|
eq(pushNotifications.recipientDeviceId, deviceAuth.deviceId),
|
||||||
|
),
|
||||||
|
orderBy: [desc(pushNotifications.createdAt)],
|
||||||
|
limit: parsed.data.limit,
|
||||||
|
});
|
||||||
|
|
||||||
|
const filtered = parsed.data.status ? result.filter((item) => item.status === parsed.data.status) : result;
|
||||||
|
|
||||||
|
res.json({ count: filtered.length, notifications: filtered });
|
||||||
|
});
|
||||||
|
|
||||||
|
router.post('/:notificationId/read', requireDeviceAuth, async (req, res) => {
|
||||||
|
const parsedParams = notificationParamSchema.safeParse(req.params);
|
||||||
|
|
||||||
|
if (!parsedParams.success) {
|
||||||
|
res.status(400).json({ message: 'Invalid notificationId', errors: parsedParams.error.flatten() });
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
const deviceAuth = req.deviceAuth;
|
||||||
|
|
||||||
|
if (!deviceAuth) {
|
||||||
|
res.status(401).json({ message: 'Unauthorized' });
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
const [updated] = await db
|
||||||
|
.update(pushNotifications)
|
||||||
|
.set({
|
||||||
|
status: 'read',
|
||||||
|
updatedAt: new Date(),
|
||||||
|
})
|
||||||
|
.where(
|
||||||
|
and(
|
||||||
|
eq(pushNotifications.id, parsedParams.data.notificationId),
|
||||||
|
eq(pushNotifications.ownerUserId, deviceAuth.userId),
|
||||||
|
eq(pushNotifications.recipientDeviceId, deviceAuth.deviceId),
|
||||||
|
),
|
||||||
|
)
|
||||||
|
.returning();
|
||||||
|
|
||||||
|
if (!updated) {
|
||||||
|
res.status(404).json({ message: 'Notification not found' });
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
res.json({ message: 'Notification marked as read', notification: updated });
|
||||||
|
});
|
||||||
|
|
||||||
|
router.post('/worker/dispatch', requireDeviceAuth, async (req, res) => {
|
||||||
|
const deviceAuth = req.deviceAuth;
|
||||||
|
|
||||||
|
if (!deviceAuth) {
|
||||||
|
res.status(401).json({ message: 'Unauthorized' });
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
const processed = await dispatchPushQueueOnce();
|
||||||
|
res.json({ message: 'Push queue dispatch completed', processed });
|
||||||
|
});
|
||||||
|
|
||||||
|
export default router;
|
||||||
@@ -9,6 +9,7 @@ import { deviceCommands, deviceLinks, devices, streamSessions } from '../db/sche
|
|||||||
import { mediaProvider } from '../media/service';
|
import { mediaProvider } from '../media/service';
|
||||||
import { requireDeviceAuth } from '../middleware/device-auth';
|
import { requireDeviceAuth } from '../middleware/device-auth';
|
||||||
import { dispatchCommandById, sendRealtimeToDevice } from '../realtime/gateway';
|
import { dispatchCommandById, sendRealtimeToDevice } from '../realtime/gateway';
|
||||||
|
import { enqueuePushNotification } from '../services/push';
|
||||||
import { createRecordingForStream } from './recordings';
|
import { createRecordingForStream } from './recordings';
|
||||||
|
|
||||||
const router = Router();
|
const router = Router();
|
||||||
@@ -167,13 +168,25 @@ router.post('/request', requireDeviceAuth, async (req, res) => {
|
|||||||
|
|
||||||
const refreshedCommand = await db.query.deviceCommands.findFirst({ where: eq(deviceCommands.id, command.id) });
|
const refreshedCommand = await db.query.deviceCommands.findFirst({ where: eq(deviceCommands.id, command.id) });
|
||||||
|
|
||||||
sendRealtimeToDevice(sourceDevice.id, 'stream:requested', {
|
const deliveredToRequester = sendRealtimeToDevice(sourceDevice.id, 'stream:requested', {
|
||||||
streamSessionId: session.id,
|
streamSessionId: session.id,
|
||||||
cameraDeviceId: cameraDevice.id,
|
cameraDeviceId: cameraDevice.id,
|
||||||
status: session.status,
|
status: session.status,
|
||||||
reason: session.reason,
|
reason: session.reason,
|
||||||
});
|
});
|
||||||
|
|
||||||
|
if (!deliveredToRequester) {
|
||||||
|
await enqueuePushNotification({
|
||||||
|
ownerUserId: sourceDevice.userId,
|
||||||
|
recipientDeviceId: sourceDevice.id,
|
||||||
|
type: 'stream_requested',
|
||||||
|
payload: {
|
||||||
|
streamSessionId: session.id,
|
||||||
|
cameraDeviceId: cameraDevice.id,
|
||||||
|
},
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
res.status(201).json({
|
res.status(201).json({
|
||||||
message: 'Stream request sent',
|
message: 'Stream request sent',
|
||||||
streamSession: session,
|
streamSession: session,
|
||||||
@@ -251,7 +264,7 @@ router.post('/:streamSessionId/accept', requireDeviceAuth, async (req, res) => {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
sendRealtimeToDevice(session.requesterDeviceId, 'stream:started', {
|
const deliveredToRequester = sendRealtimeToDevice(session.requesterDeviceId, 'stream:started', {
|
||||||
streamSessionId: updated.id,
|
streamSessionId: updated.id,
|
||||||
cameraDeviceId: updated.cameraDeviceId,
|
cameraDeviceId: updated.cameraDeviceId,
|
||||||
status: updated.status,
|
status: updated.status,
|
||||||
@@ -261,6 +274,18 @@ router.post('/:streamSessionId/accept', requireDeviceAuth, async (req, res) => {
|
|||||||
subscribeEndpoint: updated.subscribeEndpoint,
|
subscribeEndpoint: updated.subscribeEndpoint,
|
||||||
});
|
});
|
||||||
|
|
||||||
|
if (!deliveredToRequester) {
|
||||||
|
await enqueuePushNotification({
|
||||||
|
ownerUserId: session.ownerUserId,
|
||||||
|
recipientDeviceId: session.requesterDeviceId,
|
||||||
|
type: 'stream_started',
|
||||||
|
payload: {
|
||||||
|
streamSessionId: updated.id,
|
||||||
|
cameraDeviceId: updated.cameraDeviceId,
|
||||||
|
},
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
res.json({ message: 'Stream accepted', streamSession: updated });
|
res.json({ message: 'Stream accepted', streamSession: updated });
|
||||||
});
|
});
|
||||||
|
|
||||||
@@ -405,18 +430,42 @@ router.post('/:streamSessionId/end', requireDeviceAuth, async (req, res) => {
|
|||||||
|
|
||||||
await createRecordingForStream(session.id);
|
await createRecordingForStream(session.id);
|
||||||
|
|
||||||
sendRealtimeToDevice(session.requesterDeviceId, 'stream:ended', {
|
const deliveredToRequester = sendRealtimeToDevice(session.requesterDeviceId, 'stream:ended', {
|
||||||
streamSessionId: session.id,
|
streamSessionId: session.id,
|
||||||
status: parsed.data.reason,
|
status: parsed.data.reason,
|
||||||
endedAt: now,
|
endedAt: now,
|
||||||
});
|
});
|
||||||
|
|
||||||
sendRealtimeToDevice(session.cameraDeviceId, 'stream:ended', {
|
const deliveredToCamera = sendRealtimeToDevice(session.cameraDeviceId, 'stream:ended', {
|
||||||
streamSessionId: session.id,
|
streamSessionId: session.id,
|
||||||
status: parsed.data.reason,
|
status: parsed.data.reason,
|
||||||
endedAt: now,
|
endedAt: now,
|
||||||
});
|
});
|
||||||
|
|
||||||
|
if (!deliveredToRequester) {
|
||||||
|
await enqueuePushNotification({
|
||||||
|
ownerUserId: session.ownerUserId,
|
||||||
|
recipientDeviceId: session.requesterDeviceId,
|
||||||
|
type: 'stream_ended',
|
||||||
|
payload: {
|
||||||
|
streamSessionId: session.id,
|
||||||
|
status: parsed.data.reason,
|
||||||
|
},
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!deliveredToCamera) {
|
||||||
|
await enqueuePushNotification({
|
||||||
|
ownerUserId: session.ownerUserId,
|
||||||
|
recipientDeviceId: session.cameraDeviceId,
|
||||||
|
type: 'stream_ended',
|
||||||
|
payload: {
|
||||||
|
streamSessionId: session.id,
|
||||||
|
status: parsed.data.reason,
|
||||||
|
},
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
res.json({ message: 'Stream ended', streamSession: updated });
|
res.json({ message: 'Stream ended', streamSession: updated });
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|||||||
92
Backend/services/push.ts
Normal file
92
Backend/services/push.ts
Normal file
@@ -0,0 +1,92 @@
|
|||||||
|
import { and, eq, lte } from 'drizzle-orm';
|
||||||
|
|
||||||
|
import { db } from '../db/client';
|
||||||
|
import { devices, pushNotifications } from '../db/schema';
|
||||||
|
|
||||||
|
const MAX_ATTEMPTS = Number(process.env.PUSH_MAX_ATTEMPTS ?? 5);
|
||||||
|
|
||||||
|
export const enqueuePushNotification = async (input: {
|
||||||
|
ownerUserId: string;
|
||||||
|
recipientDeviceId: string;
|
||||||
|
type: string;
|
||||||
|
payload?: Record<string, unknown>;
|
||||||
|
}): Promise<void> => {
|
||||||
|
await db.insert(pushNotifications).values({
|
||||||
|
ownerUserId: input.ownerUserId,
|
||||||
|
recipientDeviceId: input.recipientDeviceId,
|
||||||
|
type: input.type,
|
||||||
|
payload: input.payload ?? null,
|
||||||
|
status: 'queued',
|
||||||
|
attempts: 0,
|
||||||
|
nextAttemptAt: new Date(),
|
||||||
|
updatedAt: new Date(),
|
||||||
|
});
|
||||||
|
};
|
||||||
|
|
||||||
|
const deliverPush = async (notificationId: string): Promise<void> => {
|
||||||
|
const notification = await db.query.pushNotifications.findFirst({ where: eq(pushNotifications.id, notificationId) });
|
||||||
|
|
||||||
|
if (!notification || notification.status === 'delivered' || notification.status === 'failed') {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
const recipientDevice = await db.query.devices.findFirst({ where: eq(devices.id, notification.recipientDeviceId) });
|
||||||
|
|
||||||
|
const now = new Date();
|
||||||
|
const attempts = notification.attempts + 1;
|
||||||
|
|
||||||
|
if (!recipientDevice || !recipientDevice.pushToken) {
|
||||||
|
const nextDelaySeconds = Math.min(60 * attempts, 60 * 10);
|
||||||
|
const shouldFail = attempts >= MAX_ATTEMPTS;
|
||||||
|
|
||||||
|
await db
|
||||||
|
.update(pushNotifications)
|
||||||
|
.set({
|
||||||
|
attempts,
|
||||||
|
status: shouldFail ? 'failed' : 'queued',
|
||||||
|
lastError: 'missing push token',
|
||||||
|
nextAttemptAt: new Date(now.getTime() + nextDelaySeconds * 1000),
|
||||||
|
updatedAt: now,
|
||||||
|
})
|
||||||
|
.where(eq(pushNotifications.id, notification.id));
|
||||||
|
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Mock push provider: consider "delivered" when token exists.
|
||||||
|
await db
|
||||||
|
.update(pushNotifications)
|
||||||
|
.set({
|
||||||
|
attempts,
|
||||||
|
status: 'delivered',
|
||||||
|
sentAt: now,
|
||||||
|
lastError: null,
|
||||||
|
updatedAt: now,
|
||||||
|
})
|
||||||
|
.where(eq(pushNotifications.id, notification.id));
|
||||||
|
};
|
||||||
|
|
||||||
|
export const dispatchPushQueueOnce = async (): Promise<number> => {
|
||||||
|
const now = new Date();
|
||||||
|
|
||||||
|
const queued = await db.query.pushNotifications.findMany({
|
||||||
|
where: and(eq(pushNotifications.status, 'queued'), lte(pushNotifications.nextAttemptAt, now)),
|
||||||
|
limit: 100,
|
||||||
|
});
|
||||||
|
|
||||||
|
for (const item of queued) {
|
||||||
|
await deliverPush(item.id);
|
||||||
|
}
|
||||||
|
|
||||||
|
return queued.length;
|
||||||
|
};
|
||||||
|
|
||||||
|
export const startPushWorker = (): void => {
|
||||||
|
const intervalMs = Number(process.env.PUSH_WORKER_INTERVAL_MS ?? 10_000);
|
||||||
|
|
||||||
|
setInterval(() => {
|
||||||
|
dispatchPushQueueOnce().catch((error) => {
|
||||||
|
console.error('push worker failed', error);
|
||||||
|
});
|
||||||
|
}, intervalMs);
|
||||||
|
};
|
||||||
Reference in New Issue
Block a user