Pipeline
Pipelines are an implementation of the pipeline pattern. It allows you to easily compose sequential stages by chaining stages. The implementation consists of 2 parts, the Pipeline and Stage(s).
In this article we’ll look into what they do, how to use them, what ups and downs there are and what the recommended way to use them is to get the most out of your pipeline and stages for your use-cases.
Information
All stage functions and stage handlers can be synchronous and asynchronous. By default, we recommend to use the process method to process all stages sequentially with async/await. If you wish to process all stages synchronously then you can use the processSync method.
Prerequisites
Before we start, we need to establish what a few recurring variables and imports in this document refer to when they are used.
1import { app, Container, Services } from "@arkecosystem/core-kernel";
- The
app
import refers to the application instance which grants access to the container, configurations, system information and more. - The
Container
import refers to a namespace that contains all of the container specific entities like binding symbols and interfaces. - The
Services
import refers to a namespace that contains all of the core services. This generally will only be needed for type hints as Core is responsible for service creation and maintenance.
Creating a Pipeline
When you have the need for a new pipeline, you’ll need to resolve it through the application in order to ensure that it has all of the required dependencies for it to function injected into it.
1const pipeline: Pipeline = app.get<PipelineFactory>(Container.Identifiers.PipelineFactory)();
Stage Functions
The first way is to use functions that serve as stages. This is a more JavaScript way of doing things but won’t give you full access to the application and its bindings.
1import { Services } from "@arkecosystem/core-kernel"; 2 3const removeDash = async (payload: string) => payload.replace("_", ""); 4const removeUnderscore = async (payload: string) => payload.replace("-", " "); 5 6console.log( 7 await pipeline 8 .pipe(removeDash) 9 .pipe(removeUnderscore)10 .process("_Hello-World")11); // Hello World
Stage Handlers
The second way is to use class-based handlers. The implementation of those handlers is up to for the most part but there are 2 ways to implement and use them.
Container Resolution
If your stage handler needs full access to the application, you should resolve it through the container which will give you full access to the application and its bindings. This is the recommended approach for handlers that work with application internals as you’ll be able to make use of dependency injection.
1import { Container } from "@arkecosystem/core-kernel"; 2 3@Container.injectable() 4class RemoveDash implements Stage { 5 async process(payload: string) { 6 return payload.replace("_", ""); 7 } 8} 9 10@Container.injectable()11class RemoveUnderscore implements Stage {12 async process(payload: string) {13 return payload.replace("-", " ");14 }15}16 17console.log(18 await pipeline19 .pipe(app.resolve(RemoveDash))20 .pipe(app.resolve(RemoveUnderscore))21 .process("_Hello-World")22); // Hello World
Class Instantiation
If your stage handler is fairly simple and doesn’t need access to any application bindings, you can use a simple ES6 class, instantiate it and pass the instance to the pipeline. The advantage of this approach is that you are in full of control of the class instantiation and instance management.
1class RemoveDash implements Stage { 2 async process(payload: string) { 3 return payload.replace("_", ""); 4 } 5} 6 7class RemoveUnderscore implements Stage { 8 async process(payload: string) { 9 return payload.replace("-", " ");10 }11}12 13console.log(14 await pipeline15 .pipe(new RemoveDash())16 .pipe(new RemoveUnderscore())17 .process("_Hello-World")18); // Hello World