Files
Final-Year-Project/Backend/services/push.ts

107 lines
3.1 KiB
TypeScript

import { and, eq, lte } from 'drizzle-orm';
import { db } from '../db/client';
import { devices, pushNotifications } from '../db/schema';
import { hasRequiredTables } from '../utils/db-schema';
const MAX_ATTEMPTS = Number(process.env.PUSH_MAX_ATTEMPTS ?? 5);
export const enqueuePushNotification = async (input: {
ownerUserId: string;
recipientDeviceId: string;
type: string;
payload?: Record<string, unknown>;
}): Promise<void> => {
await db.insert(pushNotifications).values({
ownerUserId: input.ownerUserId,
recipientDeviceId: input.recipientDeviceId,
type: input.type,
payload: input.payload ?? null,
status: 'queued',
attempts: 0,
nextAttemptAt: new Date(),
updatedAt: new Date(),
});
};
const deliverPush = async (notificationId: string): Promise<void> => {
const notification = await db.query.pushNotifications.findFirst({ where: eq(pushNotifications.id, notificationId) });
if (!notification || notification.status === 'delivered' || notification.status === 'failed') {
return;
}
const recipientDevice = await db.query.devices.findFirst({ where: eq(devices.id, notification.recipientDeviceId) });
const now = new Date();
const attempts = notification.attempts + 1;
if (!recipientDevice || !recipientDevice.pushToken) {
const nextDelaySeconds = Math.min(60 * attempts, 60 * 10);
const shouldFail = attempts >= MAX_ATTEMPTS;
await db
.update(pushNotifications)
.set({
attempts,
status: shouldFail ? 'failed' : 'queued',
lastError: 'missing push token',
nextAttemptAt: new Date(now.getTime() + nextDelaySeconds * 1000),
updatedAt: now,
})
.where(eq(pushNotifications.id, notification.id));
return;
}
// Mock push provider: consider "delivered" when token exists.
await db
.update(pushNotifications)
.set({
attempts,
status: 'delivered',
sentAt: now,
lastError: null,
updatedAt: now,
})
.where(eq(pushNotifications.id, notification.id));
};
export const dispatchPushQueueOnce = async (): Promise<number> => {
const now = new Date();
const queued = await db.query.pushNotifications.findMany({
where: and(eq(pushNotifications.status, 'queued'), lte(pushNotifications.nextAttemptAt, now)),
limit: 100,
});
for (const item of queued) {
await deliverPush(item.id);
}
return queued.length;
};
export const startPushWorker = (): void => {
const intervalMs = Number(process.env.PUSH_WORKER_INTERVAL_MS ?? 10_000);
const requiredTables = ['push_notifications', 'devices'];
void (async () => {
const ready = await hasRequiredTables(requiredTables);
if (!ready) {
console.warn(
`[push worker] skipped startup because required tables are missing (${requiredTables.join(', ')}). Run migrations and restart.`,
);
return;
}
setInterval(() => {
dispatchPushQueueOnce().catch((error) => {
console.error('push worker failed', error);
});
}, intervalMs);
})().catch((error) => {
console.error('push worker failed to initialize', error);
});
};