Queue
Queues are an extremely powerful tool when you have thousands of tasks that could block the main thread for no reason, as they could be simply processed in the background.
The default driver that is shipped provides an in-memory queue that is capable of processing thousands of jobs but using your own cache driver is just as easy.
Prerequisites
Before we start, we need to establish what a few recurring variables and imports in this document refer to when they are used.
1import { app, Container, Services } from "@arkecosystem/core-kernel";
- The
app
import refers to the application instance which grants access to the container, configurations, system information and more. - The
Container
import refers to a namespace that contains all of the container specific entities like binding symbols and interfaces. - The
Services
import refers to a namespace that contains all of the core services. This generally will only be needed for type hints as Core is responsible for service creation and maintenance.
Queue Usage
Create an instance
1const queue: Queue = app.get<QueueFactory>(Container.Identifiers.QueueFactory)();
Start the queue
1queue.start();
Stop the queue
1queue.stop();
Pause the queue
1queue.pause();
Resume the queue
1queue.resume();
Clear the queue
1queue.clear();
Push a new job onto the default queue
1queue.push(() => console.log("Hello World");
Push a new job onto the default queue after a delay
1queue.later(60, () => console.log("Hello World"));
Push an array of jobs onto the default queue
1queue.bulk([2 () => console.log("Hello World"),3 () => console.log("Hello World")4]);
Get the size of the given queue
1queue.size();
Extending
As explained in a previous article, it is possible to extend Core services due to the fact that a Manager pattern is used. Lets go over a quick example of how you could implement your own queue.
Implementing the Driver
Implementing a new driver is as simple as importing the queue contract that needs to be satisfied and implement the methods specified in it.
Information
In this example we will use p-queue which is a promise-based queue with concurrency control.
1import { Contracts } from "@arkecosystem/core-kernel"; 2 3export class MemoryQueue implements Contracts.Queue.Queue { 4 private readonly queue: PQueue = new PQueue({ autoStart: false }); 5 6 public async start(): Promise<void> { 7 await this.queue.start(); 8 } 9 10 public async stop(): Promise<void> {11 await this.queues.delete(queue);12 }13 14 public async pause(): Promise<void> {15 await this.queue.pause();16 }17 18 public async resume(): Promise<void> {19 await this.queue.resume();20 }21 22 public async clear(): Promise<void> {23 await this.queue.clear();24 }25 26 public async push<T = any>(fn: () => PromiseLike<T>): Promise<void> {27 this.queue.add(fn);28 }29 30 public async later<T>(delay: number, fn: () => PromiseLike<T>): Promise<void> {31 setTimeout(() => this.push(fn), delay);32 }33 34 public async bulk<T>(functions: (() => PromiseLike<T>)[]): Promise<void> {35 this.queue.addAll(functions);36 }37 38 public size(): number {39 return this.queue.size;40 }41}
Implementing the service provider
Now that we have implemented our memory driver for the queue service we can create a service provider to register it.
1import { Container, Contracts, Providers, Services } from "@arkecosystem/core-kernel"; 2 3export class ServiceProvider extends Providers.ServiceProvider { 4 public async register(): Promise<void> { 5 const cacheManager: Services.Queue.QueueManager = this.app.get<Services.Queue.QueueManager>( 6 Container.Identifiers.QueueManager, 7 ); 8 9 await cacheManager.extend("memory", MemoryQueue);10 }11}
- We retrieve an instance of the cache manager that is responsible for managing queue drivers.
- We call the
extend
method with an asynchronous function which is responsible for creating the queue instance.