Solving Concurrency the Bull Queue way!

How to Get Concurrency Issue Solved With Bull Queue?

Quick Summary: This blog will guide you in concurrency issues in Node.js using Blue Queue, offering practical solutions and insights for smoother asynchronous operations.

Introduction

Concurrency is one of the major challenges in building robust and efficient Node.js applications. As your application scales and becomes more complex, managing concurrency issues becomes crucial to avoid race conditions, resource contention, deadlocks, and other common pitfalls.

When you seek assistance from an expert software solutions company, you can mitigate Node js database concurrency issues. Their expertise, tools, and strategies are essential in optimizing code and ensuring seamless concurrent operations.

In this blog, we will explore how to solve data concurrency issues in Node.js with the help of Bull Queue, a powerful library for managing background jobs. Also, read our blog, which provides an in-depth overview of Redis, a powerful data store and caching system, if you want to gain a deeper understanding.

The Concurrency Problem

Concurrency issues arise when multiple threads or processes share the same resources simultaneously. Furthermore, it means executing multiple asynchronous operations concurrently. While Node.js’s event-driven, non-blocking architecture offers excellent performance, it also opens the door to database concurrency problems.

Race Conditions

Race conditions occur when the outcome of a program depends on the relative timing of events, which can lead to unpredictable behavior. For example, two asynchronous operations update the same variable concurrently, resulting in unexpected values or errors.

Resource Contention

Resource contention arises when multiple parts of your application compete for limited resources, like database connections, API calls, or CPU cycles. Without proper coordination, contention causes bottlenecks and slow performance.

Deadlocks

Deadlocks occur when two processes cannot work together as they wait for each other to release a resource. Furthermore, it puts your application at a standstill, causing poor user experiences or crashes.

Task Prioritization

In some cases, you must execute certain tasks with higher priority than others. With a proper mechanism for task prioritization, your application might be able to meet critical requirements.

Let’s understand the problem again

We’re planning to watch the latest hit movie. As you were walking, someone passed you faster than you. At that point, you joined the line together. You missed the opportunity to watch the movie because the person before you got the last ticket.

Let’s imagine there is a scam going on. There’s someone who has the same ticket as you. The likelihood of fights is high. We must defend ourselves against this race condition.

There can be thousands of people in an online queue, just like in a real queue. The problem is that there are more users than resources available. We must implement proper mechanisms to handle concurrent allocations since one seat/slot should only be available to one user.

Solution

When purchasing a ticket for a movie in the real world, there is one queue. This means that everyone who wants a ticket enters the queue and takes tickets one by one. However, when purchasing a ticket online, there is no queue that manages sequence, so numerous users can request the same set or a different set at the same time.

So, in the online situation, we’re also keeping a queue, based on the movie name so user’s concurrent requests are kept in the queue, and the queue handles request processing in a synchronous manner, so if two users request for the same seat number, the first user in the queue gets the seat, and the second user gets a notice saying “seat is already reserved.”

Concurrency Issue Solved With Bull Queue

  • Create one class that handles the queue

Queue/buyTicketQueue.js
			var Queue = require('bull');
			var _ = require('lodash');
			
			const { redisCredentials: redis } = require('../config');
			
			
			module.exports = class buyTicketQueue {
			static instances = []
			
			static async getInstance(queueName){
			const result = this.instances.find( ({ queueInstanceName }) => queueInstanceName === queueName );
			if(result){
			return result.queueInstance
			}
			return await this.createInstance(queueName)
			}
			
			static async createInstance(queueName){
			const result = this.instances.find( ({ queueInstanceName }) => queueInstanceName === queueName );
			if(result){
			return result.queueInstance
			}else{
			var queueInstance = await this.initializeQueue(queueName);
			this.instances.push({queueInstanceName:queueName , queueInstance : queueInstance})
			return queueInstance;
			}
			}
			
			static async initializeQueue(queueName){
			const debug = require('debug')(queueName)
			const queueInstance = Queue(queueName, redis);
			
			
			// Queue processor
			queueInstance.process(async job => {
			// write on your logic that example like
			const seatNumber = job.data.seatNumber;
			if (seatNumber === 1) return { status: 422, message: "Ticket already booked with this seat number." }
			else return { status: 200, message: "Ticket buy successfully" };
			
			});
			queueInstance.on('completed', (job, result) => {
			debug(`\n ${queueName} Job completed with result +++ \n`,result);
			})
			queueInstance.on('error', (err) => {
			debug(`\n ${queueName} Job error with result +++ \n`,err );
			})
			queueInstance.on('failed', (job, err) => {
			debug(`\n ${queueName} Job failed with result +++ \n` ,err );
			var data = job.data;
			queueInstance.add(data, { delay : 60000});
			})
			return queueInstance;
			}
			
			static async removeInstance(queueName){
			const queueIndex = this.instances.findIndex( ({ queueInstanceName }) => queueInstanceName === queueName );
			if(queueIndex >= 0) _.pullAt(this.instances, queueIndex);
			}
			}
  • Add Redis configuration in the config file: config/index.js

module.exports = {
			redisCredentials : {
			host : process.env.REDIS_HOST,
			port : process.env.REDIS_PORT
			}
			}
  • Add user requests into a queue in buyTicket API: controller/buyTicket.js

const buyTicketQueue = require('../queue/buyTicketQueue');
			
			exports.buyTicket = async (req, res, next) => {
			try {
			const { body, user } = req;
			
			const queueWorker = await buyTicketQueue.getInstance(`movie:${body.movieId}`)
			const job = await queueWorker.add({ payload: body })
			
			const result = await job.finished();
			if(result.status !== 200) return res.json(result);
			return res.sendJson(result);
			}
			catch (err) { next(err); }
			}

Conclusion

In Conclusion, here is a solution for handling concurrent requests at the same time when some users are restricted and only one person can purchase a ticket.
I appreciate you taking the time to read my Blog.

FAQ

Database concurrency refers to the simultaneous execution of multiple SQL concurrent transactions or operations on a database. Furthermore, it can lead to conflicts and issues when multiple users or processes attempt to access or modify the same data concurrently.

Concurrency issues are resolved through locking, isolation levels, optimistic concurrency control, and using concurrency control mechanisms provided by databases or programming frameworks. These methods ensure data consistency and prevent conflicts.

It is common to use optimistic or pessimistic concurrency control when dealing with database concurrency issues. Additionally, Sql server optimistic concurrency assumes few conflicts among concurrent transactions and does not lock data until they are committed.

Avoid concurrency issues in SQL Server by using transactions, row-level locking, appropriate isolation levels (e.g., READ COMMITTED), and implementing proper application logic for conflict resolution.

Databases handle concurrency by using mechanisms such as locking, timestamps, or versioning to control access to data, ensuring that multiple transactions do not interfere with each other, thus maintaining data integrity.