Home | Core

Introduction

Getting Started

Architecture

Services

Testing

Command Line Interface (CLI)

Configuration

Transactions

Security

How-To Guides

Upgrade Guides

Release Guides

V3 Testing

Queue

Queues are an extremely powerful tool when you have thousands of tasks that could block the main thread for no reason, as they could be simply processed in the background.

The default driver that is shipped provides an in-memory queue that is capable of processing thousands of jobs but using your own cache driver is just as easy.

Prerequisites

Before we start, we need to establish what a few recurring variables and imports in this document refer to when they are used.

import { app, Container, Services } from "@arkecosystem/core-kernel";
  • The app import refers to the application instance which grants access to the container, configurations, system information and more.
  • The Container import refers to a namespace that contains all of the container specific entities like binding symbols and interfaces.
  • The Services import refers to a namespace that contains all of the core services. This generally will only be needed for type hints as Core is responsible for service creation and maintenance.

Queue Usage

Create an instance

const queue: Queue = app.get(Container.Identifiers.QueueFactory)();

Start the queue

queue.start();

Stop the queue

queue.stop();

Pause the queue

queue.pause();

Resume the queue

queue.resume();

Clear the queue

queue.clear();

Push a new job onto the default queue

queue.push(() => console.log("Hello World");

Push a new job onto the default queue after a delay

queue.later(60, () => console.log("Hello World"));

Push an array of jobs onto the default queue

queue.bulk([
    () => console.log("Hello World"),
    () => console.log("Hello World")
]);

Get the size of the given queue

queue.size();

Extending

As explained in a previous article, it is possible to extend Core services due to the fact that a Manager pattern is used. Lets go over a quick example of how you could implement your own queue.

Implementing the Driver

Implementing a new driver is as simple as importing the queue contract that needs to be satisfied and implement the methods specified in it.

In this example we will use p-queue which is a promise-based queue with concurrency control.

import { Contracts } from "@arkecosystem/core-kernel";

export class MemoryQueue implements Contracts.Queue.Queue {
    private readonly queue: PQueue = new PQueue({ autoStart: false });

    public async start(): Promise {
        await this.queue.start();
    }

    public async stop(): Promise {
        await this.queues.delete(queue);
    }

    public async pause(): Promise {
        await this.queue.pause();
    }

    public async resume(): Promise {
        await this.queue.resume();
    }

    public async clear(): Promise {
        await this.queue.clear();
    }

    public async push(fn: () => PromiseLike): Promise {
        this.queue.add(fn);
    }

    public async later(delay: number, fn: () => PromiseLike): Promise {
        setTimeout(() => this.push(fn), delay);
    }

    public async bulk(functions: (() => PromiseLike)[]): Promise {
        this.queue.addAll(functions);
    }

    public size(): number {
        return this.queue.size;
    }
}

Implementing the service provider

Now that we have implemented our memory driver for the queue service we can create a service provider to register it.

import { Container, Contracts, Providers, Services } from "@arkecosystem/core-kernel";

export class ServiceProvider extends Providers.ServiceProvider {
    public async register(): Promise {
        const cacheManager: Services.Queue.QueueManager = this.app.get(
            Container.Identifiers.QueueManager,
        );

        await cacheManager.extend("memory", MemoryQueue);
    }
}
  1. We retrieve an instance of the cache manager that is responsible for managing queue drivers.
  2. We call the extend method with an asynchronous function which is responsible for creating the queue instance.

2020 © ARK.io | All rights reserved | An ARK.io Product