bullmq

JavaScript
const { Worker } = require('bullmq')
const userSchema = require('./model.js')
const { resultsPublisher } = require('./publisher')

/**
 * @description queueCreatePublisher
 */

const queueCreatePublisher = new Worker('create service', async (job) => {
	if (job.name == 'create:service') {
		queueCreatePublisher.emit('create:service', JSON.stringify({ data: job.data }))
	}
})

queueCreatePublisher.on('completed', (job) => console.log(`job create completed ${job.id}`))
queueCreatePublisher.on('waiting', (job) => console.log(`job create waiting ${job.id}`))
queueCreatePublisher.on('active', (job) => console.log(`job create active ${job.id}`))
queueCreatePublisher.on('failed', (job) => console.log(`job create failed ${job.id}`))

exports.createSubscriber = () => {
	return new Promise((resolve, reject) => {
		queueCreatePublisher.once('create:service', async (data) => {
			const response = await insertOne(JSON.parse(data).data)
			resolve(response)
		})
	})
}

function insertOne(res) {
	return new Promise(async (resolve, reject) => {
		try {
			const checkEmail = await userSchema.findOne({ email: res.email }).lean()
			if (checkEmail) {
				resolve({ statusCode: 409, message: 'email already exist' })
			}
			const saveEmail = await userSchema.create({ email: res.email })
			if (saveEmail) {
				resolve({ statusCode: 201, message: 'add new email successfully' })
			} else {
				resolve({ statusCode: 400, message: 'add new email failed' })
			}
		} catch (err) {
			reject({ statusCode: 500, message: 'internal server error' })
		}
	})
}

/**
 * @description queueResultsPublisher
 */

const queueResultsPublisher = new Worker('results service', async (job) => {
	if (job.name == 'results:service') {
		queueResultsPublisher.emit('results:service', JSON.stringify({ data: job.data }))
	}
})

queueResultsPublisher.on('completed', (job) => console.log(`job results completed ${job.id}`))
queueResultsPublisher.on('waiting', (job) => console.log(`job results waiting ${job.id}`))
queueResultsPublisher.on('active', (job) => console.log(`job results active ${job.id}`))
queueResultsPublisher.on('failed', (job) => console.log(`job results failed ${job.id}`))

exports.findAllSubscriber = async () => {
	await findAll()
	return new Promise((resolve, reject) => {
		queueResultsPublisher.once('results:service', (data) => {
			const response = JSON.parse(data).data
			resolve(response)
		})
	})
}

async function findAll() {
	try {
		const findAllEmail = await userSchema.find({}).lean()

		if (findAllEmail.length < 1) {
			await resultsPublisher({ statusCode: 404, message: 'email is not exist', data: findAllEmail })
		} else {
			await resultsPublisher({ statusCode: 200, message: 'email already to use', data: findAllEmail })
		}
	} catch (err) {
		await resultsPublisher({ statusCode: 500, message: 'internal server error' })
	}
}
example bullmq todoapp + custom bullmq pub/sub

https://github.com/restuwahyu13/express-todo-bullmq
Source

Also in JavaScript: