Queue Provider
The Queue provider allows you to enqueue large amounts of tasks, and then dequeue at a specified rate. You can specify the maximum number of tasks happening simultaneously, as well as the number of tasks you want to be dequeued per second. Tasks which fail will be added to a separate dead letter queue where you can dequeue and retry.
How it works
When you create a queue, using createQueue, you specify the maxInflight and the msgPerSecond parameters. Using those in conjunction will allow you to control the dequeue rate.
For example, if you created a queue with maxInflight set to 1 and msgPerSecond was also set to 1, a task will be sent to your callback endpoint every second, providing you return a 2xx status to let the queue provider know you have received the task. If the provider does not get a 2xx response, it will retry 3 times before marking the task as failed. Failed tasks will be added to the dead letter queue.
In this case, because maxInflight was set to one, the queue will wait for the retries before moving to the next task. A higher maxInflight value would allow the queue provider to send and wait for additional tasks simultaneously.
Note: The Queue provider has a payload limit of 8000 bytes.
Functions
list- this allows you to list the queues you have created. See the code snippet here.createQueue- use this to control the number of tasks you concurrently execute, along with the dequeue rate. See the code snippet here.getQueueDetails- this retrieves details and statistics about a queue. See the code snippet here.updateQueue- use this to update a previously created queue. See the code snippet here.deleteQueue- use this to delete a previously created queue. See the code snippet here.pauseQueue- use this to pause a previously created queue. See the code snippet here.resumeQueue- this will resume a previously paused queue or a queue that was created withisActiveset tofalse. See the code snippet here.enqueue- enqueue a list of tasks to a previously paused queue. See the code snippet here.enqueueSingle- enqueue a single task to a previously paused queue. See the code snippet here.deadLetterList- list the tasks in the dead letter queue. See the code snippet here.deadLetterDequeue- dequeue the failed tasks for you to handle. See the code snippet here.
Note: Use the vcr.verifyAuth method to verify that callbacks originate from the Cloud Runtime Platform.
Initializing the Queue Provider
To use the Queue Provider you need to create an instance of the provider using a session:
const session = vcr.createSession();
const queue = new Queue(session);
Use Case
For example, to use the queue provider to queue and dequeue tasks:
app.post('/queue', async (req, res, next) => {
await queue.createQueue(req.body.name, 'execute', {
maxInflight: req.body.maxInflight,
msgPerSecond: req.body.msgPerSecond,
active: true
});
res.sendStatus(200);
});
This creates an active queue, meaning that as soon as tasks are enqueued, the queue will start processing them straight away. An inactive queue will need to be started using resumeQueue.
app.post('/enqueue', async (req, res, next) => {
await queue.enqueueSingle(req.body.name, req.body.data);
res.sendStatus(200);
});
You can enqueue tasks by calling enqueueSingle or enqueue for a list of tasks. The task is expected to be a JSON object which will be sent as the payload to the callback specified when the queue was created.
app.post('/execute', async (req, res, next) => {
await processTask(req.body);
res.sendStatus(200);
Make sure to return a 2xx status if you have handled your task correctly.