JS 实现请求调度器
目录
- 抽象和复用
- 串行
- 分段串行,段中并行
- 总结
前言:JS 天然支持并行请求,但与此同时会带来一些问题,比如会造成目标服务器压力过大,所以本文引入“请求调度器”来节制并发度。
TLDR; 直接跳转『抽象和复用』章节。
为了获取一批互不依赖的资源,通常从性能考虑可以用 Promise.all(arrayOfPRomises)来并发执行。比如我们已有 100 个应用的 id,需求是聚合所有应用的 pv,我们通常会这么写:
const ids = [1001, 1002, 1003, 1004, 1005];
const urlPrefix = 'http://oPEnseArch.example.COM/api/apps';
// fetch 函数发送 HTTP 请求,返回 Promiseconst appPromises = ids.map(id =>
`${
urlPrefix}
/${
id}
`).map(fetch);
Promise.all(appPromises) // 通过 reduce 做累加 .then(apps =>
apps.reduce((inITial, current) =>
initial + current.pv, 0)) .catch((error) =>
console.LOG(error));
上面的代码在应用个数不多的情况下,可以运行正常。当应用个数达到成千上万时,对支持并发数不是很好的系统,你的「压测」会把第三放服务器搞挂,暂时无法响应请求:
htML>
head>
title>
502 Bad Gateway/title>
/head>
body bgcolor="white">
center>
h1>
502 Bad Gateway/h1>
/center>
hr>
center>
nginx/1.10.1/center>
/body>
/html>
如何解决呢?
一个很自然的想法是,既然不支持这么多的并发请求,那就分割成几大块,每块为一个 chunk,chunk 内部的请求依然并发,但块的大小(chunkSize)限制在系统支持的最大并发数以内。前一个 chunk 结束后一个 chunk 才能继续执行,也就是说 chunk 内部的请求是并发的,但 chunk 之间是串行的。思路其实很简单,写起来却有一定难度。总结起来三个操作:分块、串行、聚合
难点在如何串行执行 Promise,Promise 仅提供了并行(Promise.all)功能,并没有提供串行功能。我们从简单的三个请求开始,看如何实现,启发式解决问题(heuristic)。
// task1, task2, task3 是三个返回 Promise 的工厂函数,模拟我们的异步请求const task1 = () =>
new Promise((resolve) =>
{
setTimeout(() =>
{
resolve(1);
console.log('task1 executed');
}
, 1000);
}
);
const task2 = () =>
new Promise((resolve) =>
{
setTimeout(() =>
{
resolve(2);
console.log('task2 executed');
}
, 1000);
}
);
const task3 = () =>
new Promise((resolve) =>
{
setTimeout(() =>
{
resolve(3);
console.log('task3 executed');
}
, 1000);
}
);
// 聚合结果let result = 0;
const resultPromise = [task1, task2, task3].reduce((current, next) =>
current.then((number) =>
{
console.log('resolved with number', number);
// task2, task3 的 Promise 将在这里被 resolve result += number;
return next();
}
), Promise.resolve(0)) // 聚合初始值 .then(function(last) {
console.log('The last promise resolved with number', last);
// task3 的 Promise 在这里被 resolve result += last;
console.log('all executed with result', result);
return Promise.resolve(result);
}
);
运行结果如图 1:
代码解析:我们想要的效果,直观展示其实是 fn1().then(() =>
fn2()).then(() =>
fn3())。上面代码能让一组 Promise 按顺序执行的关键之处就在 reduce 这个“引擎”在一步步推动 Promise 工厂函数的执行。
难点解决了,我们看看最终代码:
/** * 模拟 HTTP 请求 * @param {
String}
url * @return {
Promise}
*/function fetch(url) {
console.log(`Fetching ${
url}
`);
return new Promise((resolve) =>
{
setTimeout(() =>
resolve({
pv: Number(url.match(/\d+$/)) }
), 2000);
}
);
}
const urlPrefix = 'http://opensearch.example.com/api/apps';
const aggregator = {
/** * 入口方法,开启定时任务 * * @return {
Promise}
*/ start() {
return this.fetchAppIds() .then(ids =>
this.fetchAppsSerially(ids, 2)) .then(apps =>
this.sumPv(apps)) .catch(error =>
console.error(error));
}
, /** * 获取所有应用的 ID * * @private * * @return {
Promise}
*/ fetchAppIds() {
return Promise.resolve([1001, 1002, 1003, 1004, 1005]);
}
, promiseFactory(ids) {
return () =>
Promise.all(ids.map(id =>
`${
urlPrefix}
/${
id}
`).map(fetch));
}
, /** * 获取所有应用的详情 * * 一次并发请求 `concurrency` 个应用,称为一个 chunk * 前一个 `chunk` 并发完成后一个才继续,直至所有应用获取完毕 * * @private * * @param {
[Number]}
ids * @param {
Number}
concurrency 一次并发的请求数量 * @return {
[Object]}
所有应用的信息 */ fetchAppsSerially(ids, concurrency = 100) {
// 分块 let chunkOfIds = ids.splice(0, concurrency);
const tasks = [];
while (chunkOfIds.length !== 0) {
tasks.push(this.promiseFactory(chunkOfIds));
chunkOfIds = ids.splice(0, concurrency);
}
// 按块顺序执行 const result = [];
return tasks.reduce((current, next) =>
current.then((chunkOfApps) =>
{
console.info('Chunk of', chunkOfApps.length, 'concurrency requests has finished with result:', chunkOfApps, '\n\n');
result.push(...chunkOfApps);
// 拍扁数组 return next();
}
), Promise.resolve([])) .then((lastchunkOfApps) =>
{
console.info('Chunk of', lastchunkOfApps.length, 'concurrency requests has finished with result:', lastchunkOfApps, '\n\n');
result.push(...lastchunkOfApps);
// 再次拍扁它 console.info('All chunks has been executed with result', result);
return result;
}
);
}
, /** * 聚合所有应用的 PV * * @private * * @param {
[]}
apps * @return {
[type]}
[description] */ sumPv(apps) {
const initial = {
pv: 0 }
;
return apps.reduce((accumulator, app) =>
({
pv: accumulator.pv + app.pv }
), initial);
}
}
;
// 开始运行aggregator.start().then(console.log);
运行结果如图 2:
抽象和复用
目的达到了,因具备通用性,下面开始抽象成一个模式以便复用。
串行
先模拟一个 http get 请求。
/** * mocked http get. * @param {
string}
url * @returns {
{
url: string;
delay: number;
}
}
*/function httpGet(url) {
const delay = Math.random() * 1000;
console.info('GET', url);
return new Promise((resolve) =>
{
setTimeout(() =>
{
resolve({
url, delay, at: Date.now() }
) }
, delay);
}
)}
串行执行一批请求。
const ids = [1, 2, 3, 4, 5, 6, 7];
// 批量请求函数,注意是 delay 执行的『函数』对了,否则会立即将请求发送出去,达不到串行的目的const httpGetters = ids.map(id =>
() =>
httpGet(`https://jsonplaceholder.typicode.com/posts/${
id}
`));
// 串行执行之const tasks = await httpGetters.reduce((acc, cur) =>
{
return acc.then(cur);
// 简写,等价于 // return acc.then(() =>
cur());
}
, Promise.resolve());
tasks.then(() =>
{
console.log('done');
}
);
注意观察控制台输出,应该串行输出以下内容:
GET https://jsonplaceholder.typicode.com/posts/1GET https://jsonplaceholder.typicode.com/posts/2GET https://jsonplaceholder.typicode.com/posts/3GET https://jsonplaceholder.typicode.com/posts/4GET https://jsonplaceholder.typicode.com/posts/5GET https://jsonplaceholder.typicode.com/posts/6GET https://jsonplaceholder.typicode.com/posts/7
分段串行,段中并行
重点来了。本文的请求调度器实现
/** * Schedule promises. * @param {
Array(...arg: any[]) =>
Promiseany>
>
}
factories * @param {
number}
concurrency */function schedulePromises(factories, concurrency) {
/** * chunk * @param {
any[]}
arr * @param {
number}
size * @returns {
Arrayany[]>
}
*/ const chunk = (arr, size = 1) =>
{
return arr.reduce((acc, cur, idx) =>
{
const modulo = idx % size;
if (modulo === 0) {
acc[acc.length] = [cur];
}
else {
acc[acc.length - 1].push(cur);
}
return acc;
}
, []) }
;
const chunks = chunk(factories, concurrency);
let resps = [];
return chunks.reduce( (acc, cur) =>
{
return acc .then(() =>
{
console.log('---');
return Promise.all(cur.map(f =>
f()));
}
) .then((intermediateResponses) =>
{
resps.push(...intermediateResponses);
return resps;
}
) }
, Promise.resolve() );
}
测试下,执行调度器:
// 分段串行,段中并行schedulePromises(httpGetters, 3).then((resps) =>
{
console.log('resps:', resps);
}
);
控制台输出:
---GET https://jsonplaceholder.typicode.com/posts/1GET https://jsonplaceholder.typicode.com/posts/2GET https://jsonplaceholder.typicode.com/posts/3---GET https://jsonplaceholder.typicode.com/posts/4GET https://jsonplaceholder.typicode.com/posts/5GET https://jsonplaceholder.typicode.com/posts/6---GET https://jsonplaceholder.typicode.com/posts/7resps: [ {
"url": "https://jsonplaceholder.typicode.com/posts/1", "delay": 733.010980640727, "at": 1615131322163 }
, {
"url": "https://jsonplaceholder.typicode.com/posts/2", "delay": 594.5056229848931, "at": 1615131322024 }
, {
"url": "https://jsonplaceholder.typicode.com/posts/3", "delay": 738.8230109146299, "at": 1615131322168 }
, {
"url": "https://jsonplaceholder.typicode.com/posts/4", "delay": 525.4604386109747, "at": 1615131322698 }
, {
"url": "https://jsonplaceholder.typicode.com/posts/5", "delay": 29.086379722201183, "at": 1615131322201 }
, {
"url": "https://jsonplaceholder.typicode.com/posts/6", "delay": 592.2345027398272, "at": 1615131322765 }
, {
"url": "https://jsonplaceholder.typicode.com/posts/7", "delay": 513.0684467560949, "at": 1615131323284 }
]总结
- 如果并发请求的数量太大,可以考虑分块串行,块中请求并发。
- 问题看似复杂,不放先简化之,然后一步步推导出关键点,最后抽象,就能找到解决方案。
- 本文的精髓在于使用
reduce作为串行推动的引擎,故掌握其对我们日常开发遇到的迷局破解可提供新思路,reduce精通见上篇 你终于用 Reduce 了 🎉。
以上就是JS 实现请求调度器的详细内容,更多关于JS 请求调度器的资料请关注其它相关文章!
您可能感兴趣的文章:- js实现axios限制请求队列
- JavaScript如何利用Promise控制并发请求个数
- 利用js实现Ajax并发请求限制请求数量的示例代码
- gin 获取post请求的json body操作
- PHP实现chrome表单请求数据转换为接口使用的json数据
- JavaScript 中断请求几种方案详解
声明:本文内容由网友自发贡献,本站不承担相应法律责任。对本内容有异议或投诉,请联系2913721942#qq.com核实处理,我们将尽快回复您,谢谢合作!
若转载请注明出处: JS 实现请求调度器
本文地址: https://pptw.com/jishu/594773.html
