Queue
Queues are an extremely powerful tool when you have thousands of tasks that could block the main thread for no reason, as they could be simply processed in the background.
The default driver that is shipped provides an in-memory queue that is capable of processing thousands of jobs but using your own cache driver is just as easy.
Queue Usage
Create an instance
1import { Contracts, Identifiers } from "@mainsail/contracts";2 3const queue: Queue = app.get<Contracts.Kernel.QueueFactory>(Identifiers.Services.Queue.Factory)();
Start the queue
1queue.start();
Stop the queue
1queue.stop();
Pause the queue
1queue.pause();
Resume the queue
1queue.resume();
Clear the queue
1queue.clear();
Push a new job onto the default queue
1queue.push(() => console.log("Hello World");
Push a new job onto the default queue after a delay
1queue.later(60, () => console.log("Hello World"));
Push an array of jobs onto the default queue
1queue.bulk([2 () => console.log("Hello World"),3 () => console.log("Hello World")4]);
Get the size of the given queue
1queue.size();
Extending
As explained in a previous article, it is possible to extend Core services due to the fact that a Manager pattern is used. Lets go over a quick example of how you could implement your own queue.
Implementing the Driver
Implementing a new driver is as simple as importing the queue contract that needs to be satisfied and implement the methods specified in it.
Information
In this example we will use p-queue which is a promise-based queue with concurrency control.
1import { Contracts } from "@mainsail/contracts"; 2 3export class MemoryQueue implements Contracts.Kernel.Queue { 4 private readonly queue: PQueue = new PQueue({ autoStart: false }); 5 6 public async start(): Promise<void> { 7 await this.queue.start(); 8 } 9 10 public async stop(): Promise<void> {11 await this.queues.delete(queue);12 }13 14 public async pause(): Promise<void> {15 await this.queue.pause();16 }17 18 public async resume(): Promise<void> {19 await this.queue.resume();20 }21 22 public async clear(): Promise<void> {23 await this.queue.clear();24 }25 26 public async push<T = any>(fn: () => PromiseLike<T>): Promise<void> {27 this.queue.add(fn);28 }29 30 public async later<T>(delay: number, fn: () => PromiseLike<T>): Promise<void> {31 setTimeout(() => this.push(fn), delay);32 }33 34 public async bulk<T>(functions: (() => PromiseLike<T>)[]): Promise<void> {35 this.queue.addAll(functions);36 }37 38 public size(): number {39 return this.queue.size;40 }41}
Implementing the service provider
Now that we have implemented our memory driver for the queue service we can create a service provider to register it.
1import { Container, Contracts, Providers, Services } from "@arkecosystem/core-kernel"; 2 3export class ServiceProvider extends Providers.ServiceProvider { 4 public async register(): Promise<void> { 5 const cacheManager: Services.Queue.QueueManager = this.app.get<Services.Queue.QueueManager>( 6 Container.Identifiers.QueueManager, 7 ); 8 9 await cacheManager.extend("memory", MemoryQueue);10 }11}12 13import { Contracts, Identifiers } from "@mainsail/contracts";14import { Providers, Services } from "@mainsail/kernel";15 16import { MemoryQueue } from "./memory-queue";17 18export class ServiceProvider extends Providers.ServiceProvider {19 public async register(): Promise<void> {20 const cacheManager = this.app.get<Services.Log.QueueManager>(21 Identifiers.Services.Queue.Manager,22 );23 24 await cacheManager.extend("console", async () =>25 this.app.resolve<Contracts.Kernel.Queue>(MemoryQueue).make()26 );27 }28}
- We retrieve an instance of the cache manager that is responsible for managing queue drivers.
- We call the
extend
method with an asynchronous function which is responsible for creating the queue instance.