Skip to content

Commit

Permalink
Add worker/queues to process notifications based on expiraton status (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
SerpentBytes authored Mar 11, 2023
1 parent b61a3f0 commit 50713c7
Show file tree
Hide file tree
Showing 9 changed files with 229 additions and 32 deletions.
11 changes: 11 additions & 0 deletions app/lib/notifications.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,11 @@ import { createTransport } from 'nodemailer';
import secrets from '~/lib/secrets.server';
import logger from './logger.server';

import {
addCertificateExpirationNotifications,
addRecordExpirationNotifications,
} from '~/queues/notifications/expiration-notification.server';

const { NOTIFICATIONS_EMAIL_USER, NODE_ENV, SMTP_PORT } = process.env;
const { NOTIFICATIONS_USERNAME, NOTIFICATIONS_PASSWORD } = secrets;

Expand Down Expand Up @@ -46,4 +51,10 @@ const sendNotification = async (emailAddress: string, subject: string, text: str
}
};

export async function init() {
logger.debug('Notifications init: adding jobs for certificate/record expiration notices');
await addCertificateExpirationNotifications();
await addRecordExpirationNotifications();
}

export default sendNotification;
4 changes: 3 additions & 1 deletion app/models/record.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ export async function updateRecordById(
description?: Record['description'],
course?: Record['course'],
ports?: Record['ports'],
expiresAt?: Record['expiresAt']
expiresAt?: Record['expiresAt'],
lastNotified?: Record['lastNotified']
) {
return prisma.record.update({
where: { id },
Expand All @@ -45,6 +46,7 @@ export async function updateRecordById(
course,
ports,
expiresAt,
lastNotified,
},
});
}
Expand Down
177 changes: 177 additions & 0 deletions app/queues/notifications/expiration-notification.server.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
// Third party modules
import { Worker, Queue, UnrecoverableError } from 'bullmq';
import dayjs from 'dayjs';

// Internal modules
import { redis } from '~/lib/redis.server';
import logger from '~/lib/logger.server';
import { addNotification } from './notifications.server';
import { prisma } from '~/db.server';

// Types
import type { NotificationData } from './notifications.server';

enum RecordType {
Certificate = 'certificate',
DnsRecord = 'record',
}
interface ExpirationStatusPayload {
type: RecordType;
}
// constant for notification frequency in days
const NOTIFICATION_FREQUENCY = 7;
// name for the queue
const expirationNotificationQueueName = 'expiration-notification';

const updateNotificationStatus = (type: RecordType, id: number) => {
switch (type) {
case RecordType.Certificate:
return prisma.certificate.update({
where: {
id,
},
data: {
lastNotified: new Date(),
},
});
case RecordType.DnsRecord:
return prisma.record.update({
where: {
id,
},
data: {
lastNotified: new Date(),
},
});
}
};

// fetch records by status based on type
const getRecordsByExpiration = (type: RecordType) => {
switch (type) {
case RecordType.Certificate:
return prisma.certificate.findMany({
where: {
validTo: {
lte: dayjs().add(1, 'M').toDate(),
},
OR: [
{
lastNotified: null,
},
{
lastNotified: dayjs()
.subtract(NOTIFICATION_FREQUENCY * 4, 'd')
.toDate(),
},
],
},
select: {
user: true,
id: true,
lastNotified: true,
},
});
case RecordType.DnsRecord:
return prisma.record.findMany({
where: {
expiresAt: {
lte: dayjs().add(1, 'M').toDate(),
},
OR: [
{
lastNotified: null,
},
{
lastNotified: dayjs()
.subtract(NOTIFICATION_FREQUENCY * 4, 'd')
.toDate(),
},
],
},
select: {
user: true,
id: true,
lastNotified: true,
},
});
}
};
// Queue initialization
const expirationNotificationQueue = new Queue<ExpirationStatusPayload>(
expirationNotificationQueueName,
{
connection: redis,
}
);

expirationNotificationQueue.on('error', (err) => {
logger.warn(
'Notifications: Error running check for DNS Records/Certificates about to expire',
err
);
});

// function to add jobs
const addExpirationNotifications = async (type: RecordType) => {
let jobName = `${expirationNotificationQueueName}-${type}`;
return expirationNotificationQueue.add(
jobName,
{ type },
{
repeat: { every: 5 * 60 * 1000 },
}
);
};
// only way to interact add jobs
export const addRecordExpirationNotifications = async () =>
addExpirationNotifications(RecordType.DnsRecord);
export const addCertificateExpirationNotifications = async () =>
addExpirationNotifications(RecordType.Certificate);

// function to update notification and add notification jobs
const updateStatusAndNotify = async (type: RecordType, id: number, data: NotificationData) => {
const { emailAddress, subject, message } = data;
await updateNotificationStatus(type, id);
await addNotification({
emailAddress,
subject,
message,
});
};
// worker instance to process DNS Record/Certificates expiration notification jobs
const expirationNotificationWorker = new Worker<ExpirationStatusPayload>(
expirationNotificationQueueName,
async (job) => {
const { type } = job.data;
try {
logger.debug(`Notifications: processing job ${job.name}`);
let records = await getRecordsByExpiration(RecordType.DnsRecord);
await Promise.all(
records.map(async (record) => {
if (!record.lastNotified || record.lastNotified < dayjs().subtract(30, 'd').toDate()) {
await updateStatusAndNotify(type, record.id, {
emailAddress: record.user.email,
subject: `Sample ${type} expiration notice`,
message: `Sample ${type} expiration notice message`,
});
}
})
);
logger.debug(`Notifications: job ${job.name} completed`);
} catch (err) {
// fail job from repeating - encountered error:
logger.error(`Notifications: job ${job.name} failed, rethrowing error as Unrecoverable`, err);

const newError = new UnrecoverableError((err as Error).message);
newError.stack = (err as Error).stack;
throw newError;
}
},
{ connection: redis }
);

//logic to execute if worker failed to process job
expirationNotificationWorker.on('failed', (job, err) => {
logger.warn(`Notifications: job ${job?.name} failed with error: `, err);
});
File renamed without changes.
1 change: 1 addition & 0 deletions app/routes/__index/certificate/index.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ export default function CertificateIndexRoute() {
'Private----BEGIN CERTIFICATE-----MIIFMjCCAxoCCQCVordquLnq8TANBgkqhkiG9w0BAQUFADBbMQswCQYDVQQGEwJBVTETMBEGA1UECBMKU29tZS1TdGF0ZTEhMB8GA1UEChMYSW50ZXJuZXQgV2lkZ2l0cyBQdHkgTHRkMRQwEgYDVQQDEwtleGFtcGxlLmNvbTAeFw0xNzA5MTQxNDMzMTRaFw0xODA5MTQxNDMzMTRaMFsxCzAJBgNVBAYTAkFVMRMwEQYDVQQIEwpTb21lLVN0YXRlMSEwHwYDVQQKExhJbnRlcm5ldCBXaWRnaXRzIFB0eSBMdGQxFDASBgNVBAMTC2V4YW1wbGUuY29tMIICIjANBgkqhkiG9w0BAQEFAAOCAg8AMIICCgKCAgEAwi2PYBNGl1n78niRGDKgcsWK03TcTeVbQ1HztA57Rr1iDHAZNx3Mv4E/Sha8VKbKoshcmUcOS3AlmbIZX+7+9c7lL2oD+vtUZF1YUR/69fWuO72wk6fKj/eofxH9Ud5KFje8qrYZdJWKkPMdWlYgjD6qpA5wl60NiuxmUr44ADZDytqHzNThN3wrFruz74PcMfakcSUMxkh98LuNeGtqHpEAw+wliko3oDD4PanvDvp5mRgiQVKHEGT7dm85Up+W1iJKJ65fkc/j940MaLbdISZYYCT5dtPgCGKCHgVuVrY+OXFJrD3TTm94ILsR/BkS/VSKNigGVPXg3q8tgIS++k13CzLUO0PNRMuod1RD9j5NEc2CVic9rcH06ugZyHlOcuVvvRsPGd52BPn+Jf1aePKPPQHxT9i5GOs80CJw0eduZCDZB32biRYNwUtjFkHbu8ii2IGkvhnWonjd4w5wOldG+RPr+XoFCIaHp5TszQ+HnUTLIXKtBgzzCKjK4eZqrck7xpo5B5m5V7EUxBze2LYVky+GsDsqL8CggQqJL4ZKuZVoxgPwhnDy5nMs057NCU9EnXcauMW9UEqEHu5NXnmGJrCvQ56wjYN3lgvCHEtmIpsRjCCWaBJYiawu1J5ZAf1yGTVNh8pEvO//zL9ImUxrSfOGUeFiN1tzSFlTfbcCAwEAATANBgkqhkiG9w0BAQUFAAOCAgEAdZZpgWv79CgF5ny6HmMaYgsXJKJyQE9RhJ1cmzDY8KAF+nzT7q4Pgt3WbA9bpdji7C0WqKjX7hLipqhgFnqb8qZcodEKhX788qBj4X45+4nT6QipyJlz5x6KcCn/v9gQNKks7U+dBlqquiVfbXaa1EAKMeGtqinf+Y51nR/fBcr/P9TBnSJqH61KDO3qrE5KGTwHQ9VXoeKyeppGt5sYf8G0vwoHhtPTOO8TuLEIlFcXtzbC3zAtmQj6Su//fI5yjuYTkiayxMx8nCGrQhQSXdC8gYpYd0os7UY01DVu4BTCXEvf0GYXtiGJeG8lQT/eu7WdK83uJ93U/BMYzoq4lSVcqY4LNxlfAQXKhaAbioA5XyT7co7FQ0g+s2CGBUKa11wPDe8M2GVLPsxT2bXDQap5DQyVIuTwjtgL0tykGxPJPAnL2zuUy6T3/YzrWaJ9Os+6mUCVdLnXtDgZ10Ujel7mq6wo9Ns+u07grXZkXpmJYnJXBrwOsY8KZa5vFwgJrDXhWe+Fmgt1EP5VIqRCQAxH2iYvAaELi8udbN/ZiUU3K9t79MP/M3U/tEWAubHXsaAv03jRy43X0VjlZHmagU/4dU7RBWfyuwRarYIXLNT2FCd2z4kd3fsL3rB5iI+RH0uoNuOa1+UApfFCv0O65TYkp5jEWSlU8PhKYD43nXA=-----END CERTIFICATE-----',
validFrom: currentDate,
validTo: new Date(currentDate.getFullYear(), currentDate.getMonth() + 6),
lastNotified: null,
};

function onRequest() {
Expand Down
2 changes: 1 addition & 1 deletion app/routes/dev.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import invariant from 'tiny-invariant';
import { useUser } from '~/utils';
import logger from '~/lib/logger.server';
import { requireUser, requireUsername } from '~/session.server';
import { addNotification } from '~/queues/notifications.server';
import { addNotification } from '~/queues/notifications/notifications.server';
import { addCertRequest } from '~/queues/certificate/certificate-flow.server';

import { addDnsRequest, updateDnsRequest, deleteDnsRequest } from '~/queues/dns/dns-flow.server';
Expand Down
54 changes: 28 additions & 26 deletions prisma/schema.prisma
Original file line number Diff line number Diff line change
Expand Up @@ -12,40 +12,42 @@ model User {
displayName String
email String @unique
group String
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt
record Record[]
certificate Certificate[]
}

model Record {
id Int @id @default(autoincrement())
username String
name String
type RecordType
value String
description String?
course String?
ports String?
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt
expiresAt DateTime
status RecordStatus
user User @relation(fields: [username], references: [username])
id Int @id @default(autoincrement())
username String
name String
type RecordType
value String
description String?
course String?
ports String?
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt
expiresAt DateTime
status RecordStatus
lastNotified DateTime?
user User @relation(fields: [username], references: [username])
}

model Certificate {
id Int @id @default(autoincrement())
username String
domain String
orderUrl String @unique @db.VarChar(255)
privateKey String?
certificate String?
validFrom DateTime?
validTo DateTime?
status CertificateStatus @default(pending)
user User @relation(fields: [username], references: [username])
challenge Challenge[]
id Int @id @default(autoincrement())
username String
domain String
orderUrl String @unique @db.VarChar(255)
privateKey String?
certificate String?
validFrom DateTime?
validTo DateTime?
lastNotified DateTime?
status CertificateStatus @default(pending)
user User @relation(fields: [username], references: [username])
challenge Challenge[]
}

model Challenge {
Expand Down
6 changes: 4 additions & 2 deletions prisma/seed.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ async function seed() {
recordExpDate.setMonth(recordExpDate.getMonth() + 6); // a record expires after 6 months
let certExpDate = new Date();
certExpDate.setDate(certExpDate.getDate() + 90); // certificate expires after 90 days
let temporaryExpDate = new Date();
temporaryExpDate.setDate(temporaryExpDate.getDate() + 7); // expiration for notifications

// cleanup the existing database; no worries if it doesn't exist yet
await prisma.record.deleteMany().catch(() => {});
Expand Down Expand Up @@ -60,7 +62,7 @@ async function seed() {
description: 'For final project.',
course: 'OSD700',
ports: '8080, 1234',
expiresAt: recordExpDate,
expiresAt: temporaryExpDate,
status: `active`,
},
// AAAA record
Expand Down Expand Up @@ -133,7 +135,7 @@ async function seed() {
privateKey:
'-----BEGIN CERTIFICATE-----ApfFCv0O65TYkp5jEWSlU8PhKYD43nXA=-----END CERTIFICATE-----',
validFrom: new Date(),
validTo: certExpDate,
validTo: temporaryExpDate,
status: 'pending',
},
});
Expand Down
6 changes: 4 additions & 2 deletions server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@ import helmet from 'helmet';
import cors from 'cors';

import logger, { init as loggerInit } from '~/lib/logger.server';
import { notificationsWorker } from '~/queues/notifications.server';
import { notificationsWorker } from '~/queues/notifications/notifications.server';
import { init as samlInit } from '~/lib/saml.server';
import { init as dnsInit } from '~/lib/dns.server';
import { init as notificationsInit } from '~/lib/notifications.server';

import {
orderCreatorWorker,
dnsWaiterWorker,
Expand Down Expand Up @@ -91,7 +93,7 @@ app.all(
// happen here.
async function init() {
logger.info('app initializing...');
return Promise.all([loggerInit(), samlInit(), dnsInit()]);
return Promise.all([loggerInit(), samlInit(), dnsInit(), notificationsInit()]);
}

async function start() {
Expand Down

0 comments on commit 50713c7

Please sign in to comment.