How to Get Concurrency Issue Solved With Bull Queue?
Introduction
Are you looking for a way to solve your concurrency issues? Bull Queue may be the answer. However, there are multiple domains with reservations built into them, and they all face the same problem. How do you deal with concurrent users attempting to reserve the same resource?
This can happen in systems like,
Appointment with the doctor
Booking of airline tickets
Movie tickets
Hotel reservations
Tickets for the train
and so on…
Let’s understand the problem again
We’re planning to watch the latest hit movie. As you were walking, someone passed you faster than you. At that point, you joined the line together. You missed the opportunity to watch the movie because the person before you got the last ticket.
Let’s imagine there is a scam going on. There’s someone who has the same ticket as you. Fights are guaranteed to occur.
We must defend ourselves against this race condition.
An online queue can be flooded with thousands of users, just as in a real queue. The problem is that there are more users than resources available. We need to implement proper mechanisms to handle concurrent allocations since one seat/slot should only be available to one user.
Solution
When purchasing a ticket for a movie in the real world, there is one queue. This means that everyone who wants a ticket enters the queue and takes tickets one by one. However, when purchasing a ticket online, there is no queue that manages sequence, so numerous users can request the same set or a different set at the same time.
So, in the online situation, we’re also keeping a queue, based on the movie name so user’s concurrent requests are kept in the queue, and the queue handles request processing in a synchronous manner, so if two users request for the same seat number, the first user in the queue gets the seat, and the second user gets a notice saying “seat is already reserved.”
Concurrency Issue Solved With Bull Queue
Create one class that handles the queue
Queue/buyTicketQueue.js var Queue = require('bull'); var _ = require('lodash'); const { redisCredentials: redis } = require('../config'); module.exports = class buyTicketQueue { static instances = [] static async getInstance(queueName){ const result = this.instances.find( ({ queueInstanceName }) => queueInstanceName === queueName ); if(result){ return result.queueInstance } return await this.createInstance(queueName) } static async createInstance(queueName){ const result = this.instances.find( ({ queueInstanceName }) => queueInstanceName === queueName ); if(result){ return result.queueInstance }else{ var queueInstance = await this.initializeQueue(queueName); this.instances.push({queueInstanceName:queueName , queueInstance : queueInstance}) return queueInstance; } } static async initializeQueue(queueName){ const debug = require('debug')(queueName) const queueInstance = Queue(queueName, redis); // Queue processor queueInstance.process(async job => { // write on your logic that example like const seatNumber = job.data.seatNumber; if (seatNumber === 1) return { status: 422, message: "Ticket already booked with this seat number." } else return { status: 200, message: "Ticket buy successfully" }; }); queueInstance.on('completed', (job, result) => { debug(`\n ${queueName} Job completed with result +++ \n`,result); }) queueInstance.on('error', (err) => { debug(`\n ${queueName} Job error with result +++ \n`,err ); }) queueInstance.on('failed', (job, err) => { debug(`\n ${queueName} Job failed with result +++ \n` ,err ); var data = job.data; queueInstance.add(data, { delay : 60000}); }) return queueInstance; } static async removeInstance(queueName){ const queueIndex = this.instances.findIndex( ({ queueInstanceName }) => queueInstanceName === queueName ); if(queueIndex >= 0) _.pullAt(this.instances, queueIndex); } }
Add Redis configuration in the config file: config/index.js
module.exports = { redisCredentials : { host : process.env.REDIS_HOST, port : process.env.REDIS_PORT } }
Add user requests into a queue in buyTicket API: controller/buyTicket.js
const buyTicketQueue = require('../queue/buyTicketQueue'); exports.buyTicket = async (req, res, next) => { try { const { body, user } = req; const queueWorker = await buyTicketQueue.getInstance(`movie:${body.movieId}`) const job = await queueWorker.add({ payload: body }) const result = await job.finished(); if(result.status !== 200) return res.json(result); return res.sendJson(result); } catch (err) { next(err); } }
Conclusion
In Conclusion, here is a solution for handling concurrent requests at the same time when some users are restricted and only one person can purchase a ticket.
I appreciate you taking the time to read my Blog.
Leave a Reply
Want to join the discussion?Feel free to contribute!