Your Web News in One Place

Help Webnuz

Referal links:

Sign up for GreenGeeks web hosting
April 20, 2021 01:22 pm GMT

Extract-Transform-Load with RxJS: save time and memory with backpressure

Let's say that you have to extract 100M objects from a database, make some transformations on them and then load them into another storage system.

Problems will arise as soon as writing into the second DB will become slower than reading from the first. Depending on the implementation, you could face one of these issues:

  • extracted data stacks up in your memory, and your program crashes because of the memory usage;
  • you send too many requests in parallel to your target database;
  • your program is slow because you process each page of data in sequence.

At Forest Admin, we recently faced this issue to move data from a Postgresql database to ElasticSearch.

These problems can be addressed by processing data in streams that support backpressure. It allows the stream to process data at the pace of the slowest asynchronous processing in the chain.

RxJS is a great streaming library, but it does not natively support backpressure, and it's not easy to find examples. So, I decided to share one.

Let's illustrate with an example

Let's fake the extract method just for the purpose of this article:

async function extract(pageSize, page) {  // Just fake an async network access that  // resolves after 200ms  await new Promise((resolve) => setTimeout(resolve, Math.random() * 100));  if (pageSize * (page - 1) >= 100_000_000) {    return []  }  return new Array(pageSize)    .fill()    .map((_, index) => ({      id: pageSize * (page - 1) + index + 1,      label: `Random label ${Math.random()}`,      title: `Random title ${Math.random()}`,      value: Math.random(),      createdAt: new Date()    }));}

The load method, could be asynchronous but that's not useful in this example.

function transform(i) { return i; }

And now, let's fake the load method:

async function load(items){  // Let's fake an async network access that takes  // max 150ms to write all the items  await new Promise((resolve) =>     setTimeout(resolve, Math.random() * 150)  );}

Example of backpressure in RxJS

The backpressure is ensured by the BehaviorSubject named drain in the example below. You'll see that the code allow to push data concurrently on the target database, with a limit of 5 requests in parallel.

Input data is also loaded with concurrency, but this time the pace is regulated by the drain subject. Every time a page is sent to the target database, we allow another one to be extracted.

const { BehaviorSubject } = require('rxjs');const { mergeMap, map, tap, filter } = require('rxjs/operators')async function extractTransformLoad() {  const CONCURRENCY = 5;  const PAGE_SIZE = 1000;  // This allows us to load a fixed number  // of pages from the beginning  const drain = new BehaviorSubject(    new Array(CONCURRENCY * 2).fill()  );  return drain    // This is necessary because the observable    // streams arrays. This allows us to push    // a fixed number of pages to load from     // the beginning    .pipe(mergeMap(v => v))    // Values inside the arrays don't really matter    // we only use values indices to generate page    // numbers    .pipe(map((_, index) => index + 1))    // EXTRACT    .pipe(mergeMap((page) => extract(PAGE_SIZE, page)))    // Terminate if it was an empty page = the last page    .pipe(tap((results) => {      if (!results.length) drain.complete();    }))    .pipe(filter(results => results.length))    // TRANSFORM and LOAD    .pipe(transform)    .pipe(mergeMap(load, CONCURRENCY))    // Just make sure to not keep results in memory    .pipe(map(() => undefined))    // When a page has been processed, allow to extract    // a new one    .pipe(tap(() => {      drain.next([undefined])    }))    .toPromise()}

In the example above, we initialized the concurrency to 5, meaning that 5 requests can be sent to the target database at the same time. In order to reduce the time waiting for new data, the BehaviorSubject named drain ensures to load twice as much pages of data.

In this example,

  • memory will contain 10 pages of data at the maximum;
  • the processing will be as fast as possible with the maximum concurrency that we defined;
  • only 5 queries can be made in parallel to the target database.

Original Link: https://dev.to/ghusse_16/extract-transform-load-with-rxjs-save-time-and-memory-with-backpressure-jaa

Share this article:    Share on Facebook
View Full Article

Dev To

An online community for sharing and discovering great ideas, having debates, and making friends

More About this Source Visit Dev To