Pipeline

Pipelines are an implementation of the pipeline pattern. It allows you to easily compose sequential stages by chaining stages. The implementation consists of 2 parts, the Pipeline and Stage(s).

In this article we’ll look into what they do, how to use them, what ups and downs there are and what the recommended way to use them is to get the most out of your pipeline and stages for your use-cases.

Information

All stage functions and stage handlers can be synchronous and asynchronous. By default, we recommend to use the process method to process all stages sequentially with async/await. If you wish to process all stages synchronously then you can use the processSync method.

Prerequisites

Before we start, we need to establish what a few recurring variables and imports in this document refer to when they are used.

1import { app, Container, Services } from "@arkecosystem/core-kernel";
  • The app import refers to the application instance which grants access to the container, configurations, system information and more.
  • The Container import refers to a namespace that contains all of the container specific entities like binding symbols and interfaces.
  • The Services import refers to a namespace that contains all of the core services. This generally will only be needed for type hints as Core is responsible for service creation and maintenance.

Creating a Pipeline

When you have the need for a new pipeline, you’ll need to resolve it through the application in order to ensure that it has all of the required dependencies for it to function injected into it.

1const pipeline: Pipeline = app.get<PipelineFactory>(Container.Identifiers.PipelineFactory)();

Stage Functions

The first way is to use functions that serve as stages. This is a more JavaScript way of doing things but won’t give you full access to the application and its bindings.

1import { Services } from "@arkecosystem/core-kernel";
2 
3const removeDash = async (payload: string) => payload.replace("_", "");
4const removeUnderscore = async (payload: string) => payload.replace("-", " ");
5 
6console.log(
7 await pipeline
8 .pipe(removeDash)
9 .pipe(removeUnderscore)
10 .process("_Hello-World")
11); // Hello World

Stage Handlers

The second way is to use class-based handlers. The implementation of those handlers is up to for the most part but there are 2 ways to implement and use them.

Container Resolution

If your stage handler needs full access to the application, you should resolve it through the container which will give you full access to the application and its bindings. This is the recommended approach for handlers that work with application internals as you’ll be able to make use of dependency injection.

1import { Container } from "@arkecosystem/core-kernel";
2 
3@Container.injectable()
4class RemoveDash implements Stage {
5 async process(payload: string) {
6 return payload.replace("_", "");
7 }
8}
9 
10@Container.injectable()
11class RemoveUnderscore implements Stage {
12 async process(payload: string) {
13 return payload.replace("-", " ");
14 }
15}
16 
17console.log(
18 await pipeline
19 .pipe(app.resolve(RemoveDash))
20 .pipe(app.resolve(RemoveUnderscore))
21 .process("_Hello-World")
22); // Hello World

Class Instantiation

If your stage handler is fairly simple and doesn’t need access to any application bindings, you can use a simple ES6 class, instantiate it and pass the instance to the pipeline. The advantage of this approach is that you are in full of control of the class instantiation and instance management.

1class RemoveDash implements Stage {
2 async process(payload: string) {
3 return payload.replace("_", "");
4 }
5}
6 
7class RemoveUnderscore implements Stage {
8 async process(payload: string) {
9 return payload.replace("-", " ");
10 }
11}
12 
13console.log(
14 await pipeline
15 .pipe(new RemoveDash())
16 .pipe(new RemoveUnderscore())
17 .process("_Hello-World")
18); // Hello World
Last updated 2 years ago
Edit Page
Share: