skip to Main Content

I’m not able to achieve something like this:

  • Get data from mongodb, returns ARRAY
  • For each element of array go inside and get a field that is an array of arrays
  • For each array inside the array of arrays call an api
  • Delay the api request for 5 seconds (ex)

The structure is something like:

  1. ARRAY OF ELEMENTS [X,Y,Z,…]
  2. X is like : X = {FIELD1, FIELD2, FIELDTOUSE,…}
  3. FIELDTOUSE is like : FIELDTOUSE = [ EL1, EL2…]
  4. ELs are like : EL1 = [A,B,C,D,F……] (each EL has a length near 200)

So each EL will request 200 api calls, so a single FIELDTOUSE, that have something like 900 elements will request a total of 200*900 = 180k calls
Then consider the primary array that is long 160. 160*180k = 28.8M calls

So I need for each element inside EL to add a delay of 5 seconds, for example, to prevent the too Many Requests.

Here is some code. I’m working in nodejs, using mongodb of course, and I want to implement the solution via rxjs.

function getAPIdata(res) {
    //SOME LOGIC
    if(//CONDITIONS){
      return axios.post(
        urlOTP,
        stringLL,
        { headers: headers }
      ) 
    }else{
      return of(null).pipe(delay(1000))
    }
    
  }
//CALL MONGODB COLLECTION
XModel.find({}).lean().exec((err, ELEMENTS) => {
    //SOME VARIABLE DECLARATION
    // 160 elements
    from(ELEMENTS).pipe(concatMap(el => {
    //900 elements
      return from(el.x).pipe(concatMap(el_ => {
        //200 elements
        _id = el._id;
        //I WANT THIS FUNCTION TO DELAY EACH 10 ELEMENTS WITH 5 SECONDS
        return getAPIdata(el_) //<--------------------
      }))
    }),concatMap(g => g.data.hasOwnProperty("results") ? of(g.data.results).pipe(delay(1000)) : of(null).pipe(delay(1000)))).subscribe(r => {
      //SOME LOGIC FOR UPDATING DATA IN DB  
      XModel.updateOne({ _id: _id }, {$set:set}, (e, done) => {
        //SOME LOGIC HERE
      })
    });
  });

2

Answers


  1. You can use bufferCount() to batch requests and when they all complete add 5s delay:

    from(ELEMENTS)
      .pipe(
        bufferCount(10),
        concatMap(buffered => forkJoin(buffered.map(item => getAPIdata(item))).pipe(
          delay(5000),
        ),
      )
      .subscribe(...)
    
    Login or Signup to reply.
  2. function getAPIdata(res) {
      return from(axios.post(urlOTP, stringLL, { headers: headers }));
    }
    
    1. Send 5 APIs and wait 5 seconds
    from(ELEMENTS).pipe(
         concatMap((el) => from(el.x)),
         concatMap((el) => from(el)),
         bufferCount(5),
         concatMap((el) => from(el).pipe(delay(5000), concatMap(getAPIdata)))
    ).subscribe(...);
    
    1. Every 5 seconds, Call 5 APIs
    from(ELEMENTS).pipe(
        concatMap((el) => from(el.x)),
        concatMap((el) => from(el)),
        delayWhen((v, i) => timer(5000 * Math.floor(i / 5))),
        concatMap(getAPIdata)
    ).subscribe(...);
    

    https://stackblitz.com/edit/rxjs-swxtui

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search