fix(workers): skip background jobs when required tables are missing
This commit is contained in:
@@ -5,6 +5,7 @@ import { z } from 'zod';
|
|||||||
|
|
||||||
import { db } from '../db/client';
|
import { db } from '../db/client';
|
||||||
import { deviceCommands, devices } from '../db/schema';
|
import { deviceCommands, devices } from '../db/schema';
|
||||||
|
import { hasRequiredTables } from '../utils/db-schema';
|
||||||
import { verifyDeviceToken } from '../utils/device-token';
|
import { verifyDeviceToken } from '../utils/device-token';
|
||||||
|
|
||||||
const HEARTBEAT_INTERVAL_MS = 15_000;
|
const HEARTBEAT_INTERVAL_MS = 15_000;
|
||||||
@@ -296,11 +297,25 @@ export const setupRealtimeGateway = (server: HttpServer): SocketIOServer => {
|
|||||||
});
|
});
|
||||||
|
|
||||||
if (!retryTimer) {
|
if (!retryTimer) {
|
||||||
retryTimer = setInterval(() => {
|
const requiredTables = ['device_commands'];
|
||||||
retryPendingCommands().catch((error) => {
|
|
||||||
console.error('Failed retrying pending commands', error);
|
void (async () => {
|
||||||
});
|
const ready = await hasRequiredTables(requiredTables);
|
||||||
}, RETRY_INTERVAL_MS);
|
if (!ready) {
|
||||||
|
console.warn(
|
||||||
|
`[command retry] skipped startup because required tables are missing (${requiredTables.join(', ')}). Run migrations and restart.`,
|
||||||
|
);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
retryTimer = setInterval(() => {
|
||||||
|
retryPendingCommands().catch((error) => {
|
||||||
|
console.error('Failed retrying pending commands', error);
|
||||||
|
});
|
||||||
|
}, RETRY_INTERVAL_MS);
|
||||||
|
})().catch((error) => {
|
||||||
|
console.error('Failed initializing command retry worker', error);
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
return io;
|
return io;
|
||||||
|
|||||||
@@ -2,6 +2,7 @@ import { and, eq, lte } from 'drizzle-orm';
|
|||||||
|
|
||||||
import { db } from '../db/client';
|
import { db } from '../db/client';
|
||||||
import { devices, pushNotifications } from '../db/schema';
|
import { devices, pushNotifications } from '../db/schema';
|
||||||
|
import { hasRequiredTables } from '../utils/db-schema';
|
||||||
|
|
||||||
const MAX_ATTEMPTS = Number(process.env.PUSH_MAX_ATTEMPTS ?? 5);
|
const MAX_ATTEMPTS = Number(process.env.PUSH_MAX_ATTEMPTS ?? 5);
|
||||||
|
|
||||||
@@ -83,10 +84,23 @@ export const dispatchPushQueueOnce = async (): Promise<number> => {
|
|||||||
|
|
||||||
export const startPushWorker = (): void => {
|
export const startPushWorker = (): void => {
|
||||||
const intervalMs = Number(process.env.PUSH_WORKER_INTERVAL_MS ?? 10_000);
|
const intervalMs = Number(process.env.PUSH_WORKER_INTERVAL_MS ?? 10_000);
|
||||||
|
const requiredTables = ['push_notifications', 'devices'];
|
||||||
|
|
||||||
setInterval(() => {
|
void (async () => {
|
||||||
dispatchPushQueueOnce().catch((error) => {
|
const ready = await hasRequiredTables(requiredTables);
|
||||||
console.error('push worker failed', error);
|
if (!ready) {
|
||||||
});
|
console.warn(
|
||||||
}, intervalMs);
|
`[push worker] skipped startup because required tables are missing (${requiredTables.join(', ')}). Run migrations and restart.`,
|
||||||
|
);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
setInterval(() => {
|
||||||
|
dispatchPushQueueOnce().catch((error) => {
|
||||||
|
console.error('push worker failed', error);
|
||||||
|
});
|
||||||
|
}, intervalMs);
|
||||||
|
})().catch((error) => {
|
||||||
|
console.error('push worker failed to initialize', error);
|
||||||
|
});
|
||||||
};
|
};
|
||||||
|
|||||||
24
Backend/utils/db-schema.ts
Normal file
24
Backend/utils/db-schema.ts
Normal file
@@ -0,0 +1,24 @@
|
|||||||
|
import { pool } from '../db/client';
|
||||||
|
|
||||||
|
const PUBLIC_SCHEMA = 'public';
|
||||||
|
|
||||||
|
export const tableExists = async (tableName: string): Promise<boolean> => {
|
||||||
|
const result = await pool.query<{ exists: boolean }>(
|
||||||
|
`
|
||||||
|
select exists (
|
||||||
|
select 1
|
||||||
|
from information_schema.tables
|
||||||
|
where table_schema = $1
|
||||||
|
and table_name = $2
|
||||||
|
) as "exists"
|
||||||
|
`,
|
||||||
|
[PUBLIC_SCHEMA, tableName],
|
||||||
|
);
|
||||||
|
|
||||||
|
return result.rows[0]?.exists === true;
|
||||||
|
};
|
||||||
|
|
||||||
|
export const hasRequiredTables = async (tableNames: string[]): Promise<boolean> => {
|
||||||
|
const checks = await Promise.all(tableNames.map((tableName) => tableExists(tableName)));
|
||||||
|
return checks.every(Boolean);
|
||||||
|
};
|
||||||
@@ -2,17 +2,31 @@ import { and, eq, lt } from 'drizzle-orm';
|
|||||||
|
|
||||||
import { db } from '../db/client';
|
import { db } from '../db/client';
|
||||||
import { recordings } from '../db/schema';
|
import { recordings } from '../db/schema';
|
||||||
|
import { hasRequiredTables } from '../utils/db-schema';
|
||||||
|
|
||||||
const STALE_RECORDING_SECONDS = Number(process.env.RECORDING_STALE_SECONDS ?? 60 * 30);
|
const STALE_RECORDING_SECONDS = Number(process.env.RECORDING_STALE_SECONDS ?? 60 * 30);
|
||||||
|
|
||||||
export const startRecordingsWorker = (): void => {
|
export const startRecordingsWorker = (): void => {
|
||||||
const intervalMs = Number(process.env.RECORDING_WORKER_INTERVAL_MS ?? 30_000);
|
const intervalMs = Number(process.env.RECORDING_WORKER_INTERVAL_MS ?? 30_000);
|
||||||
|
const requiredTables = ['recordings'];
|
||||||
|
|
||||||
setInterval(() => {
|
void (async () => {
|
||||||
reconcileStaleRecordings().catch((error) => {
|
const ready = await hasRequiredTables(requiredTables);
|
||||||
console.error('recordings worker failed', error);
|
if (!ready) {
|
||||||
});
|
console.warn(
|
||||||
}, intervalMs);
|
`[recordings worker] skipped startup because required tables are missing (${requiredTables.join(', ')}). Run migrations and restart.`,
|
||||||
|
);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
setInterval(() => {
|
||||||
|
reconcileStaleRecordings().catch((error) => {
|
||||||
|
console.error('recordings worker failed', error);
|
||||||
|
});
|
||||||
|
}, intervalMs);
|
||||||
|
})().catch((error) => {
|
||||||
|
console.error('recordings worker failed to initialize', error);
|
||||||
|
});
|
||||||
};
|
};
|
||||||
|
|
||||||
const reconcileStaleRecordings = async (): Promise<void> => {
|
const reconcileStaleRecordings = async (): Promise<void> => {
|
||||||
|
|||||||
Reference in New Issue
Block a user