Add comments to jobs.ts

Signed-off-by: James Taylor <jamest@uk.ibm.com>
This commit is contained in:
James Taylor 2021-12-15 11:50:05 +00:00
parent 5447e3534d
commit dfe79f7fb4

View file

@ -26,7 +26,6 @@ export type JobResult = {
transactionError?: string;
};
// TODO include attempts made?
export type JobSummary = {
jobId: string;
transactionIds: string[];
@ -53,6 +52,9 @@ const connection: ConnectionOptions = {
password: config.redisPassword,
};
/*
* Set up the queue for submit jobs
*/
export const initJobQueue = (): Queue => {
const submitQueue = new Queue(config.JOB_QUEUE_NAME, {
connection,
@ -70,6 +72,10 @@ export const initJobQueue = (): Queue => {
return submitQueue;
};
/*
* Set up a worker to process submit jobs on the queue, using the
* processSubmitTransactionJob function below
*/
export const initJobQueueWorker = (app: Application): Worker => {
const worker = new Worker<JobData, JobResult>(
config.JOB_QUEUE_NAME,
@ -80,7 +86,7 @@ export const initJobQueueWorker = (app: Application): Worker => {
);
worker.on('failed', (job) => {
logger.error({ job }, 'Job failed'); // WHY?!
logger.warn({ job }, 'Job failed');
});
// Important: need to handle this error otherwise worker may stop
@ -98,130 +104,6 @@ export const initJobQueueWorker = (app: Application): Worker => {
return worker;
};
export const initJobQueueScheduler = (): QueueScheduler => {
const queueScheduler = new QueueScheduler(config.JOB_QUEUE_NAME, {
connection,
});
queueScheduler.on('failed', (jobId, failedReason) => {
// TODO when does this happen, and how should it be handled?
logger.error({ jobId, failedReason }, 'Queue sceduler failure');
});
return queueScheduler;
};
export const addSubmitTransactionJob = async (
submitQueue: Queue<JobData, JobResult>,
mspid: string,
transactionName: string,
...transactionArgs: string[]
): Promise<string> => {
const jobName = `submit ${transactionName} transaction`;
const job = await submitQueue.add(jobName, {
mspid,
transactionName,
transactionArgs: transactionArgs,
transactionIds: [],
});
if (job?.id === undefined) {
throw new Error('Submit transaction job ID not available');
}
return job.id;
};
/*
* Gets a summary for the jobs endpoint
*/
export const getJobSummary = async (
queue: Queue,
jobId: string
): Promise<JobSummary> => {
const job: Job<JobData, JobResult> | undefined = await queue.getJob(jobId);
logger.debug({ job }, 'Got job');
if (!(job && job.id != undefined)) {
throw new JobNotFoundError(`Job ${jobId} not found`, jobId);
}
let transactionIds: string[];
if (job.data && job.data.transactionIds) {
transactionIds = job.data.transactionIds;
} else {
transactionIds = [];
}
let transactionError;
let transactionPayload;
const returnValue = job.returnvalue;
if (returnValue) {
if (returnValue.transactionError) {
transactionError = returnValue.transactionError;
}
if (
returnValue.transactionPayload &&
returnValue.transactionPayload.length > 0
) {
transactionPayload = returnValue.transactionPayload.toString();
} else {
transactionPayload = '';
}
}
const jobSummary: JobSummary = {
jobId: job.id,
transactionIds,
transactionError,
transactionPayload,
};
return jobSummary;
};
export const updateJobData = async (
job: Job<JobData, JobResult>,
transaction: Transaction | undefined
): Promise<void> => {
const newData = { ...job.data };
if (transaction != undefined) {
const transationIds = ([] as string[]).concat(
newData.transactionIds,
transaction.getTransactionId()
);
newData.transactionIds = transationIds;
newData.transactionState = transaction.serialize();
} else {
newData.transactionState = undefined;
}
await job.update(newData);
};
/*
* Get the current job counts
*
* This function is used for the liveness REST endpoint
*/
export const getJobCounts = async (
queue: Queue
): Promise<{ [index: string]: number }> => {
const jobCounts = await queue.getJobCounts(
'active',
'completed',
'delayed',
'failed',
'waiting'
);
logger.debug({ jobCounts }, 'Current job counts');
return jobCounts;
};
/*
* Process a submit transaction request from the job queue
*
@ -326,3 +208,139 @@ export const processSubmitTransactionJob = async (
throw err;
}
};
/*
* Set up a scheduler for the submit job queue
*
* This manages stalled and delayed jobs and is required for retries with backoff
*/
export const initJobQueueScheduler = (): QueueScheduler => {
const queueScheduler = new QueueScheduler(config.JOB_QUEUE_NAME, {
connection,
});
queueScheduler.on('failed', (jobId, failedReason) => {
logger.error({ jobId, failedReason }, 'Queue sceduler failure');
});
return queueScheduler;
};
/*
* Helper to add a new submit transaction job to the queue
*/
export const addSubmitTransactionJob = async (
submitQueue: Queue<JobData, JobResult>,
mspid: string,
transactionName: string,
...transactionArgs: string[]
): Promise<string> => {
const jobName = `submit ${transactionName} transaction`;
const job = await submitQueue.add(jobName, {
mspid,
transactionName,
transactionArgs: transactionArgs,
transactionIds: [],
});
if (job?.id === undefined) {
throw new Error('Submit transaction job ID not available');
}
return job.id;
};
/*
* Helper to update the data for an existing job
*/
export const updateJobData = async (
job: Job<JobData, JobResult>,
transaction: Transaction | undefined
): Promise<void> => {
const newData = { ...job.data };
if (transaction != undefined) {
const transationIds = ([] as string[]).concat(
newData.transactionIds,
transaction.getTransactionId()
);
newData.transactionIds = transationIds;
newData.transactionState = transaction.serialize();
} else {
newData.transactionState = undefined;
}
await job.update(newData);
};
/*
* Gets a job summary
*
* This function is used for the jobs REST endpoint
*/
export const getJobSummary = async (
queue: Queue,
jobId: string
): Promise<JobSummary> => {
const job: Job<JobData, JobResult> | undefined = await queue.getJob(jobId);
logger.debug({ job }, 'Got job');
if (!(job && job.id != undefined)) {
throw new JobNotFoundError(`Job ${jobId} not found`, jobId);
}
let transactionIds: string[];
if (job.data && job.data.transactionIds) {
transactionIds = job.data.transactionIds;
} else {
transactionIds = [];
}
let transactionError;
let transactionPayload;
const returnValue = job.returnvalue;
if (returnValue) {
if (returnValue.transactionError) {
transactionError = returnValue.transactionError;
}
if (
returnValue.transactionPayload &&
returnValue.transactionPayload.length > 0
) {
transactionPayload = returnValue.transactionPayload.toString();
} else {
transactionPayload = '';
}
}
const jobSummary: JobSummary = {
jobId: job.id,
transactionIds,
transactionError,
transactionPayload,
};
return jobSummary;
};
/*
* Get the current job counts
*
* This function is used for the liveness REST endpoint
*/
export const getJobCounts = async (
queue: Queue
): Promise<{ [index: string]: number }> => {
const jobCounts = await queue.getJobCounts(
'active',
'completed',
'delayed',
'failed',
'waiting'
);
logger.debug({ jobCounts }, 'Current job counts');
return jobCounts;
};