From dfe79f7fb48c1033511a5ad02bd2cb7e18ee77dc Mon Sep 17 00:00:00 2001 From: James Taylor Date: Wed, 15 Dec 2021 11:50:05 +0000 Subject: [PATCH] Add comments to jobs.ts Signed-off-by: James Taylor --- .../rest-api-typescript/src/jobs.ts | 270 ++++++++++-------- 1 file changed, 144 insertions(+), 126 deletions(-) diff --git a/asset-transfer-basic/rest-api-typescript/src/jobs.ts b/asset-transfer-basic/rest-api-typescript/src/jobs.ts index de2ab458..c46af0e5 100644 --- a/asset-transfer-basic/rest-api-typescript/src/jobs.ts +++ b/asset-transfer-basic/rest-api-typescript/src/jobs.ts @@ -26,7 +26,6 @@ export type JobResult = { transactionError?: string; }; -// TODO include attempts made? export type JobSummary = { jobId: string; transactionIds: string[]; @@ -53,6 +52,9 @@ const connection: ConnectionOptions = { password: config.redisPassword, }; +/* + * Set up the queue for submit jobs + */ export const initJobQueue = (): Queue => { const submitQueue = new Queue(config.JOB_QUEUE_NAME, { connection, @@ -70,6 +72,10 @@ export const initJobQueue = (): Queue => { return submitQueue; }; +/* + * Set up a worker to process submit jobs on the queue, using the + * processSubmitTransactionJob function below + */ export const initJobQueueWorker = (app: Application): Worker => { const worker = new Worker( config.JOB_QUEUE_NAME, @@ -80,7 +86,7 @@ export const initJobQueueWorker = (app: Application): Worker => { ); worker.on('failed', (job) => { - logger.error({ job }, 'Job failed'); // WHY?! + logger.warn({ job }, 'Job failed'); }); // Important: need to handle this error otherwise worker may stop @@ -98,130 +104,6 @@ export const initJobQueueWorker = (app: Application): Worker => { return worker; }; -export const initJobQueueScheduler = (): QueueScheduler => { - const queueScheduler = new QueueScheduler(config.JOB_QUEUE_NAME, { - connection, - }); - - queueScheduler.on('failed', (jobId, failedReason) => { - // TODO when does this happen, and how should it be handled? - logger.error({ jobId, failedReason }, 'Queue sceduler failure'); - }); - - return queueScheduler; -}; - -export const addSubmitTransactionJob = async ( - submitQueue: Queue, - mspid: string, - transactionName: string, - ...transactionArgs: string[] -): Promise => { - const jobName = `submit ${transactionName} transaction`; - const job = await submitQueue.add(jobName, { - mspid, - transactionName, - transactionArgs: transactionArgs, - transactionIds: [], - }); - - if (job?.id === undefined) { - throw new Error('Submit transaction job ID not available'); - } - - return job.id; -}; - -/* - * Gets a summary for the jobs endpoint - */ -export const getJobSummary = async ( - queue: Queue, - jobId: string -): Promise => { - const job: Job | undefined = await queue.getJob(jobId); - logger.debug({ job }, 'Got job'); - - if (!(job && job.id != undefined)) { - throw new JobNotFoundError(`Job ${jobId} not found`, jobId); - } - - let transactionIds: string[]; - if (job.data && job.data.transactionIds) { - transactionIds = job.data.transactionIds; - } else { - transactionIds = []; - } - - let transactionError; - let transactionPayload; - const returnValue = job.returnvalue; - if (returnValue) { - if (returnValue.transactionError) { - transactionError = returnValue.transactionError; - } - - if ( - returnValue.transactionPayload && - returnValue.transactionPayload.length > 0 - ) { - transactionPayload = returnValue.transactionPayload.toString(); - } else { - transactionPayload = ''; - } - } - - const jobSummary: JobSummary = { - jobId: job.id, - transactionIds, - transactionError, - transactionPayload, - }; - - return jobSummary; -}; - -export const updateJobData = async ( - job: Job, - transaction: Transaction | undefined -): Promise => { - const newData = { ...job.data }; - - if (transaction != undefined) { - const transationIds = ([] as string[]).concat( - newData.transactionIds, - transaction.getTransactionId() - ); - newData.transactionIds = transationIds; - - newData.transactionState = transaction.serialize(); - } else { - newData.transactionState = undefined; - } - - await job.update(newData); -}; - -/* - * Get the current job counts - * - * This function is used for the liveness REST endpoint - */ -export const getJobCounts = async ( - queue: Queue -): Promise<{ [index: string]: number }> => { - const jobCounts = await queue.getJobCounts( - 'active', - 'completed', - 'delayed', - 'failed', - 'waiting' - ); - logger.debug({ jobCounts }, 'Current job counts'); - - return jobCounts; -}; - /* * Process a submit transaction request from the job queue * @@ -326,3 +208,139 @@ export const processSubmitTransactionJob = async ( throw err; } }; + +/* + * Set up a scheduler for the submit job queue + * + * This manages stalled and delayed jobs and is required for retries with backoff + */ +export const initJobQueueScheduler = (): QueueScheduler => { + const queueScheduler = new QueueScheduler(config.JOB_QUEUE_NAME, { + connection, + }); + + queueScheduler.on('failed', (jobId, failedReason) => { + logger.error({ jobId, failedReason }, 'Queue sceduler failure'); + }); + + return queueScheduler; +}; + +/* + * Helper to add a new submit transaction job to the queue + */ +export const addSubmitTransactionJob = async ( + submitQueue: Queue, + mspid: string, + transactionName: string, + ...transactionArgs: string[] +): Promise => { + const jobName = `submit ${transactionName} transaction`; + const job = await submitQueue.add(jobName, { + mspid, + transactionName, + transactionArgs: transactionArgs, + transactionIds: [], + }); + + if (job?.id === undefined) { + throw new Error('Submit transaction job ID not available'); + } + + return job.id; +}; + +/* + * Helper to update the data for an existing job + */ +export const updateJobData = async ( + job: Job, + transaction: Transaction | undefined +): Promise => { + const newData = { ...job.data }; + + if (transaction != undefined) { + const transationIds = ([] as string[]).concat( + newData.transactionIds, + transaction.getTransactionId() + ); + newData.transactionIds = transationIds; + + newData.transactionState = transaction.serialize(); + } else { + newData.transactionState = undefined; + } + + await job.update(newData); +}; + +/* + * Gets a job summary + * + * This function is used for the jobs REST endpoint + */ +export const getJobSummary = async ( + queue: Queue, + jobId: string +): Promise => { + const job: Job | undefined = await queue.getJob(jobId); + logger.debug({ job }, 'Got job'); + + if (!(job && job.id != undefined)) { + throw new JobNotFoundError(`Job ${jobId} not found`, jobId); + } + + let transactionIds: string[]; + if (job.data && job.data.transactionIds) { + transactionIds = job.data.transactionIds; + } else { + transactionIds = []; + } + + let transactionError; + let transactionPayload; + const returnValue = job.returnvalue; + if (returnValue) { + if (returnValue.transactionError) { + transactionError = returnValue.transactionError; + } + + if ( + returnValue.transactionPayload && + returnValue.transactionPayload.length > 0 + ) { + transactionPayload = returnValue.transactionPayload.toString(); + } else { + transactionPayload = ''; + } + } + + const jobSummary: JobSummary = { + jobId: job.id, + transactionIds, + transactionError, + transactionPayload, + }; + + return jobSummary; +}; + +/* + * Get the current job counts + * + * This function is used for the liveness REST endpoint + */ +export const getJobCounts = async ( + queue: Queue +): Promise<{ [index: string]: number }> => { + const jobCounts = await queue.getJobCounts( + 'active', + 'completed', + 'delayed', + 'failed', + 'waiting' + ); + logger.debug({ jobCounts }, 'Current job counts'); + + return jobCounts; +};