首页前端开发JavaScriptJS 实现请求调度器

JS 实现请求调度器

时间2024-02-01 02:01:03发布访客分类JavaScript浏览388
导读:收集整理的这篇文章主要介绍了JS 实现请求调度器,觉得挺不错的,现在分享给大家,也给大家做个参考。 目录抽象和复用串行分段串行,段中并行总结前言:JS 天然支持并行请求,但与此同时会带来...
收集整理的这篇文章主要介绍了JS 实现请求调度器,觉得挺不错的,现在分享给大家,也给大家做个参考。
目录
  • 抽象和复用
    • 串行
    • 分段串行,段中并行
  • 总结

    前言:JS 天然支持并行请求,但与此同时会带来一些问题,比如会造成目标服务器压力过大,所以本文引入“请求调度器”来节制并发度。

    TLDR; 直接跳转『抽象和复用』章节。

    为了获取一批互不依赖的资源,通常从性能考虑可以用 Promise.all(arrayOfPRomises)来并发执行。比如我们已有 100 个应用的 id,需求是聚合所有应用的 pv,我们通常会这么写:

    const ids = [1001, 1002, 1003, 1004, 1005];
        const urlPrefix = 'http://oPEnseArch.example.COM/api/apps';
        // fetch 函数发送 HTTP 请求,返回 Promiseconst appPromises = ids.map(id =>
     `${
    urlPrefix}
    /${
    id}
        `).map(fetch);
        Promise.all(appPromises) // 通过 reduce 做累加 .then(apps =>
         apps.reduce((inITial, current) =>
         initial + current.pv, 0)) .catch((error) =>
         console.LOG(error));
        

    上面的代码在应用个数不多的情况下,可以运行正常。当应用个数达到成千上万时,对支持并发数不是很好的系统,你的「压测」会把第三放服务器搞挂,暂时无法响应请求:

    htML>
        head>
        title>
        502 Bad Gateway/title>
        /head>
        body bgcolor="white">
        center>
        h1>
        502 Bad Gateway/h1>
        /center>
        hr>
        center>
        nginx/1.10.1/center>
        /body>
        /html>
        

    如何解决呢?

    一个很自然的想法是,既然不支持这么多的并发请求,那就分割成几大块,每块为一个 chunkchunk 内部的请求依然并发,但块的大小(chunkSize)限制在系统支持的最大并发数以内。前一个 chunk 结束后一个 chunk 才能继续执行,也就是说 chunk 内部的请求是并发的,但 chunk 之间是串行的。思路其实很简单,写起来却有一定难度。总结起来三个操作:分块、串行、聚合

    难点在如何串行执行 Promise,Promise 仅提供了并行(Promise.all)功能,并没有提供串行功能。我们从简单的三个请求开始,看如何实现,启发式解决问题(heuristic)。

    // task1, task2, task3 是三个返回 Promise 的工厂函数,模拟我们的异步请求const task1 = () =>
         new Promise((resolve) =>
     {
         setTimeout(() =>
     {
         resolve(1);
         console.log('task1 executed');
     }
        , 1000);
    }
        );
        const task2 = () =>
         new Promise((resolve) =>
     {
         setTimeout(() =>
     {
         resolve(2);
         console.log('task2 executed');
     }
        , 1000);
    }
        );
        const task3 = () =>
         new Promise((resolve) =>
     {
         setTimeout(() =>
     {
         resolve(3);
         console.log('task3 executed');
     }
        , 1000);
    }
        );
        // 聚合结果let result = 0;
        const resultPromise = [task1, task2, task3].reduce((current, next) =>
         	  current.then((number) =>
     {
         console.log('resolved with number', number);
         // task2, task3 的 Promise 将在这里被 resolve result += number;
         return next();
     }
    ),  Promise.resolve(0)) // 聚合初始值 .then(function(last) {
         console.log('The last promise resolved with number', last);
         // task3 的 Promise 在这里被 resolve result += last;
         console.log('all executed with result', result);
         return Promise.resolve(result);
     }
        );
        

    运行结果如图 1:

    代码解析:我们想要的效果,直观展示其实是 fn1().then(() => fn2()).then(() => fn3())。上面代码能让一组 Promise 按顺序执行的关键之处就在 reduce 这个“引擎”在一步步推动 Promise 工厂函数的执行。

    难点解决了,我们看看最终代码:

    /** * 模拟 HTTP 请求 * @param {
    String}
     url  * @return {
    Promise}
     */function fetch(url) {
     console.log(`Fetching ${
    url}
        `);
         return new Promise((resolve) =>
     {
         setTimeout(() =>
     resolve({
     pv: Number(url.match(/\d+$/)) }
        ), 2000);
     }
        );
    }
        const urlPrefix = 'http://opensearch.example.com/api/apps';
    const aggregator = {
     /** * 入口方法,开启定时任务 *  * @return {
    Promise}
     */ start() {
         return this.fetchAppIds() .then(ids =>
         this.fetchAppsSerially(ids, 2)) .then(apps =>
         this.sumPv(apps)) .catch(error =>
         console.error(error));
     }
    ,  /** * 获取所有应用的 ID * * @private *  * @return {
    Promise}
     */ fetchAppIds() {
         return Promise.resolve([1001, 1002, 1003, 1004, 1005]);
     }
    , promiseFactory(ids) {
         return () =>
         Promise.all(ids.map(id =>
     `${
    urlPrefix}
    /${
    id}
        `).map(fetch));
     }
    ,  /** * 获取所有应用的详情 *  * 一次并发请求 `concurrency` 个应用,称为一个 chunk * 前一个 `chunk` 并发完成后一个才继续,直至所有应用获取完毕 * * @private * * @param {
    [Number]}
     ids * @param {
    Number}
     concurrency 一次并发的请求数量 * @return {
    [Object]}
      所有应用的信息 */ fetchAppsSerially(ids, concurrency = 100) {
         // 分块 let chunkOfIds = ids.splice(0, concurrency);
         const tasks = [];
      while (chunkOfIds.length !== 0) {
         tasks.push(this.promiseFactory(chunkOfIds));
         chunkOfIds = ids.splice(0, concurrency);
     }
          // 按块顺序执行 const result = [];
         return tasks.reduce((current, next) =>
         current.then((chunkOfApps) =>
     {
         console.info('Chunk of', chunkOfApps.length, 'concurrency requests has finished with result:', chunkOfApps, '\n\n');
         result.push(...chunkOfApps);
         // 拍扁数组 return next();
     }
        ), Promise.resolve([])) .then((lastchunkOfApps) =>
     {
         console.info('Chunk of', lastchunkOfApps.length, 'concurrency requests has finished with result:', lastchunkOfApps, '\n\n');
         result.push(...lastchunkOfApps);
         // 再次拍扁它 console.info('All chunks has been executed with result', result);
         return result;
     }
        );
     }
    ,  /** * 聚合所有应用的 PV *  * @private *  * @param {
    []}
     apps  * @return {
    [type]}
     [description] */ sumPv(apps) {
     const initial = {
     pv: 0 }
        ;
         return apps.reduce((accumulator, app) =>
     ({
     pv: accumulator.pv + app.pv }
        ), initial);
     }
    }
        ;
        // 开始运行aggregator.start().then(console.log);
        

    运行结果如图 2:

    抽象和复用

    目的达到了,因具备通用性,下面开始抽象成一个模式以便复用。

    串行

    先模拟一个 http get 请求。

    /** * mocked http get. * @param {
    string}
     url * @returns {
    {
         url: string;
         delay: number;
     }
    }
     */function httpGet(url) {
         const delay = Math.random() * 1000;
         console.info('GET', url);
         return new Promise((resolve) =>
     {
         setTimeout(() =>
     {
     resolve({
     url, delay, at: Date.now() }
    ) }
        , delay);
     }
    )}
        

    串行执行一批请求。

    const ids = [1, 2, 3, 4, 5, 6, 7];
        // 批量请求函数,注意是 delay 执行的『函数』对了,否则会立即将请求发送出去,达不到串行的目的const httpGetters = ids.map(id =>
          () =>
     httpGet(`https://jsonplaceholder.typicode.com/posts/${
    id}
        `));
        // 串行执行之const tasks = await httpGetters.reduce((acc, cur) =>
     {
         return acc.then(cur);
          // 简写,等价于 // return acc.then(() =>
         cur());
    }
        , Promise.resolve());
        tasks.then(() =>
     {
         console.log('done');
    }
        );
        

    注意观察控制台输出,应该串行输出以下内容:

    GET https://jsonplaceholder.typicode.com/posts/1GET https://jsonplaceholder.typicode.com/posts/2GET https://jsonplaceholder.typicode.com/posts/3GET https://jsonplaceholder.typicode.com/posts/4GET https://jsonplaceholder.typicode.com/posts/5GET https://jsonplaceholder.typicode.com/posts/6GET https://jsonplaceholder.typicode.com/posts/7

    分段串行,段中并行

    重点来了。本文的请求调度器实现

    /** * Schedule promises. * @param {
        Array(...arg: any[]) =>
         Promiseany>
        >
    }
     factories  * @param {
    number}
     concurrency  */function schedulePromises(factories, concurrency) {
     /** * chunk * @param {
    any[]}
     arr  * @param {
    number}
     size  * @returns {
        Arrayany[]>
    }
         */ const chunk = (arr, size = 1) =>
     {
         return arr.reduce((acc, cur, idx) =>
     {
         const modulo = idx % size;
     if (modulo === 0) {
         acc[acc.length] = [cur];
     }
     else {
         acc[acc.length - 1].push(cur);
     }
         return acc;
     }
    , []) }
        ;
         const chunks = chunk(factories, concurrency);
         let resps = [];
         return chunks.reduce( (acc, cur) =>
     {
         return acc .then(() =>
     {
          console.log('---');
          return Promise.all(cur.map(f =>
         f()));
     }
        ) .then((intermediateResponses) =>
     {
          resps.push(...intermediateResponses);
          return resps;
     }
    ) }
        , Promise.resolve() );
    }
        

    测试下,执行调度器:

    // 分段串行,段中并行schedulePromises(httpGetters, 3).then((resps) =>
     {
         console.log('resps:', resps);
    }
        );
        

    控制台输出:

    ---GET https://jsonplaceholder.typicode.com/posts/1GET https://jsonplaceholder.typicode.com/posts/2GET https://jsonplaceholder.typicode.com/posts/3---GET https://jsonplaceholder.typicode.com/posts/4GET https://jsonplaceholder.typicode.com/posts/5GET https://jsonplaceholder.typicode.com/posts/6---GET https://jsonplaceholder.typicode.com/posts/7resps: [ {
     "url": "https://jsonplaceholder.typicode.com/posts/1", "delay": 733.010980640727, "at": 1615131322163 }
    , {
     "url": "https://jsonplaceholder.typicode.com/posts/2", "delay": 594.5056229848931, "at": 1615131322024 }
    , {
     "url": "https://jsonplaceholder.typicode.com/posts/3", "delay": 738.8230109146299, "at": 1615131322168 }
    , {
     "url": "https://jsonplaceholder.typicode.com/posts/4", "delay": 525.4604386109747, "at": 1615131322698 }
    , {
     "url": "https://jsonplaceholder.typicode.com/posts/5", "delay": 29.086379722201183, "at": 1615131322201 }
    , {
     "url": "https://jsonplaceholder.typicode.com/posts/6", "delay": 592.2345027398272, "at": 1615131322765 }
    , {
     "url": "https://jsonplaceholder.typicode.com/posts/7", "delay": 513.0684467560949, "at": 1615131323284 }
        ]

    总结

    1. 如果并发请求的数量太大,可以考虑分块串行,块中请求并发。
    2. 问题看似复杂,不放先简化之,然后一步步推导出关键点,最后抽象,就能找到解决方案。
    3. 本文的精髓在于使用 reduce 作为串行推动的引擎,故掌握其对我们日常开发遇到的迷局破解可提供新思路,reduce 精通见上篇 你终于用 Reduce 了 🎉。

    以上就是JS 实现请求调度器的详细内容,更多关于JS 请求调度器的资料请关注其它相关文章!

    您可能感兴趣的文章:
    • js实现axios限制请求队列
    • JavaScript如何利用Promise控制并发请求个数
    • 利用js实现Ajax并发请求限制请求数量的示例代码
    • gin 获取post请求的json body操作
    • PHP实现chrome表单请求数据转换为接口使用的json数据
    • JavaScript 中断请求几种方案详解

    声明:本文内容由网友自发贡献,本站不承担相应法律责任。对本内容有异议或投诉,请联系2913721942#qq.com核实处理,我们将尽快回复您,谢谢合作!

    js

    若转载请注明出处: JS 实现请求调度器
    本文地址: https://pptw.com/jishu/594773.html
    jQuery中ajax的相关知识点汇总 c语言源文件经过编译后生成文件的后缀是什么?

    游客 回复需填写必要信息