首页前端开发JavaScript浅谈Node.js多进程模型中如何实现共享内存(代码详解)

浅谈Node.js多进程模型中如何实现共享内存(代码详解)

时间2024-01-30 03:46:02发布访客分类JavaScript浏览1047
导读:收集整理的这篇文章主要介绍了浅谈Node.js多进程模型中如何实现共享内存(代码详解),觉得挺不错的,现在分享给大家,也给大家做个参考。本篇文章和大家探讨一下Node.js利用多个核心的方法--worker_threads模块提供的多线程模...
收集整理的这篇文章主要介绍了浅谈Node.js多进程模型中如何实现共享内存(代码详解),觉得挺不错的,现在分享给大家,也给大家做个参考。本篇文章和大家探讨一下Node.js利用多个核心的方法--worker_threads模块提供的多线程模型,介绍一下Node.js多进程模型中实现共享内存的方法。

Node.js 由于其单线程模型的设计,导致一个Node进程(的主线程)只能利用一个CPU核心,然而现在的机器基本上都是多核的,这造成了严重的性能浪费。通常来说,想要利用到多个核心一般有以下的方法:

  • 编写Node的C++插件扩充线程池,并在JS代码中将CPU耗时任务委托给其它线程处理。

  • 使用worker_threads模块提供的多线程模型(尚在实验阶段)。

  • 使用child_PRocess 或者 cluster模块提供的多进程模型,每个进程都是一个独立的Node.js进程。

从易用、代码入侵性、稳定性的角度来说,多进程模型通常是首要的选择。【推荐学习:《nodejs 教程》】

Node.js cluster 多进程模型存在的问题

在cluster模块提供的多进程模型中,每个Node进程都是一个独立且完整的应用进程,有自己的内存空间,其它进程无法访问。因此虽然在项目启动时,所有Worker进程具有一致的状态和行为,但在之后的运行中无法保证其状态维持一致

例如,项目启动时有两个Worker进程,进程A和进程B,两个进程都声明了变量a=1。但之后项目接收到一个请求,Master进程将其分派给进程A来处理,这个请求将a的值变更为了2,那么此时进程A的内存空间中a=2,但是进程B的内存空间中a依旧是1。此时如果有个请求读取a的值,Master进程将这个请求分派给进程A和进程B时读取到的结果是不一致的,这就出现了一致性问题。

cluster模块在设计时并没有给出解决方案,而是要求Worker进程是无状态的,即程序员在写代码时不应该允许在处理请求时修改内存中的值,以此来保障所有Worker进程的一致性。然而在实践中总会有各种各样的情况需要写内存,比如记录用户的登录状态等,在许多企业的实践中,通常会把这些状态数据记录在外部,例如数据库、redis、消息队列、文件系统等,每次处理有状态请求时会读写外部存储空间。

这不失为一种有效的做法,然而这需要额外引入一个外部存储空间,同时还要自行处理多进程并发访问下的一致性问题,自行维护数据的生命周期(因为Node进程和维护在外部的数据并不是同步创建和销毁的),以及在高并发访问情况下的IO性能瓶颈(如果是存储在数据库等非内存环境中)。其实本质上来说,我们只是需要一个可供多个进程共享访问的空间罢了,并不需要持久化存储,这段空间的生命周期最好与Node进程强绑定,这样在使用时能省去不少麻烦。因此跨进程的共享内存就成了最适合在这种场景使用的方式。

Node.js 的共享内存

很遗憾Node本身并未提供共享内存的实现,因此我们可以看看npm仓库中第三方库的实现。这些库有些是通过C++插件扩充Node的函数实现的,有些是通过Node提供的IPC机制实现的,但很遗憾它们的实现都很简单,并未提供互斥访问、对象监听等功能,这使得使用者必须自己小心维护这段共享内存,否则就会导致时序问题。

转了一圈下来没找到我想要的。。。那就算了,我自己写一个。

共享内存的设计

首先我们必须理清楚到底需要个什么样的共享内存,我是根据我自身的需求出发(为了在项目中用它来存储跨进程访问的状态数据),同时兼顾通用性,因此会首先考虑以下几点:

  • 以JS对象为基本单位进行读写访问。

  • 能够进程间互斥访问,一个进程访问时,其它进程被阻塞。

  • 能够监听共享内存中的对象,当对象发生变化的时候监听的进程能被通知到。

  • 在满足上述条件的前提下,实现方式尽可能简单。

可以发现,其实我们并不需要操作系统层面的共享内存,只需要能够多个Node进程能访问同一个对象就行了,那么就可以在Node本身提供的机制上实现。可以使用Master进程的一段内存空间作为共享内存空间,Worker进程通过IPC将读写请求委托给Master进程,由Master进程进行读写,然后再通过IPC将结果返回给Worker进程。

为了让共享内存的使用方式在Master进程和Worker进程中一致,我们可以将对共享内存的操作抽离成一个接口,在Master进程和Worker进程中各自实现这个接口。类图如下图所示,用一个SharedMemory类作为抽象接口,在server.js入口文件中声明该对象。其在Master进程中实例化为Manager对象,在Worker进程中实例化为Worker对象。Manager对象来维护共享内存,并处理对共享内存的读写请求,而Worker对象则将读写请求发送到Master进程。

可以使用Manager类中的一个属性作为共享内存对象,访问该对象的方式与访问普通JS对象的方式一致,然后再做一层封装,只暴露getsetremove等基本操作,避免该属性直接被修改。

由于Master进程会优先于所有Worker进程创建,因此,可以在Master进程中声明共享内存空间之后再创建Worker进程,以此来保证每个Worker进程创建后都可以立即访问共享内存。

为了使用简单,我们可以将SharedMemory设计成单例,这样每个进程中就只有一个实例,并可以在importSharedMemory之后直接使用。

代码实现

读写控制与IPC通信

首先实现对外接口SharedMemory类,这里没有使用让ManagerWorker继承SharedMemory的方式,而是让SharedMemory在实例化的时候返回一个ManagerWorker的实例,从而实现自动选择子类。

在Node 16中isPrimary替代了isMaster,这里为了兼容使用了两种写法。

// shared-memory.jsclass SharedMemory {
  constructor() {
    if (cluster.isMaster || cluster.isPrimary) {
          return new Manager();
    }
 else {
          return new Worker();
    }
  }
}
    

Manager负责管理共享内存空间,我们直接在Manager对象中增加__sharedMemory__属性,由于其本身也是JS对象,会被纳入JS的垃圾回收管理中,因此我们不需要进行内存清理、数据迁移等操作,使得实现上非常简洁。之后在__sharedMemory__之中定义setgetremove等标准操作来提供访问方式。

我们通过cluster.on('online', callback)来监听worker进程的创建事件,并在创建后立即用worker.on('message', callback)来监听来自worker进程的IPC通信,并把通信消息交给handle函数处理。

handle函数的职责是区分worker进程是想进行哪种操作,并取出操作的参数委托给对应的setgetremove函数(注意不是__sharedMemory__中的setgetremove)进行处理,并将处理后的结果返还给worker进程。

// manager.jsconst cluster = require('cluster');
class Manager {
  constructor() {
    this.__sharedMemory__ = {
      set(key, value) {
            this.memory[key] = value;
      }
,      get(key) {
            return this.memory[key];
      }
,      remove(key) {
            delete this.memory[key];
      }
,      memory: {
}
,    }
    ;
        // Listen the messages From worker processes.    cluster.on('online', (worker) =>
 {
          worker.on('message', (data) =>
 {
            this.handle(data, worker);
            return false;
      }
    );
    }
    );
  }
  handle(data, target) {
        const args = data.value ? [data.key, data.value] : [data.key];
        this[data.method](...args).then((value) =>
 {
      const msg = {
        id: data.id, // workerId        uuid: data.uuid, // communicationID        value,      }
    ;
          target.send(msg);
    }
    );
  }
  set(key, value) {
        return new Promise((resolve) =>
 {
          this.__sharedMemory__.set(key, value);
          resolve('OK');
    }
    );
  }
  get(key) {
        return new Promise((resolve) =>
 {
          resolve(this.__sharedMemory__.get(key));
    }
    );
  }
  remove(key) {
        return new Promise((resolve) =>
 {
          this.__sharedMemory__.remove(key);
          resolve('OK');
    }
    );
  }
}
    

Worker自对象创建开始就使用process.on监听来自Master进程的返回消息(毕竟不能等消息发送出去以后再监听吧,那就来不及了)。至于__getCallbacks__对象的作用一会儿再说。此时Worker对象便创建完成。

之后项目运行到某个地方的时候,如果要访问共享内存,就会调用Workersetgetremove函数,它们又会调用handle函数将消息通过process.send发送到master进程,同时,将得到返回结果时要进行的操作记录在__getCallbacks__中。当结果返回时,会被之前在process.on中的函数监听到,并从__getCallbacks__中取出对应的回调函数,并执行。

因为访问共享内存的过程中会经过IPC,所以必定是异步操作,所以需要记录回调函数,不能实现成同步的方式,不然会阻塞原本的任务。

// worker.jsconst cluster = require('cluster');
const {
 v4: uuid4 }
     = require('uuid');
class Worker {
  constructor() {
    this.__getCallbacks__ = {
}
    ;
        process.on('message', (data) =>
 {
          const callback = this.__getCallbacks__[data.uuid];
          if (callback &
    &
 tyPEof callback === 'function') {
            callback(data.value);
      }
          delete this.__getCallbacks__[data.uuid];
    }
    );
  }
  set(key, value) {
        return new Promise((resolve) =>
 {
          this.handle('set', key, value, () =>
 {
            resolve();
      }
    );
    }
    );
  }
  get(key) {
        return new Promise((resolve) =>
 {
          this.handle('get', key, null, (value) =>
 {
            resolve(value);
      }
    );
    }
    );
  }
  remove(key) {
        return new Promise((resolve) =>
 {
          this.handle('remove', key, null, () =>
 {
            resolve();
      }
    );
    }
    );
  }
  handle(method, key, value, callback) {
        const uuid = uuid4();
 // 每次通信的uuid    process.send({
      id: cluster.worker.id,      method,      uuid,      key,      value,    }
    );
        this.__getCallbacks__[uuid] = callback;
  }
}
    

一次共享内存访问的完整流程是:调用Workerset/get/remove函数 -> 调用Workerhandle函数,向master进程通信并将回调函数记录在__getCallbacks__ -> master进程监听到来自worker进程的消息 -> 调用Managerhandle函数 -> 调用Managerset/get/remove函数 -> 调用__sharedMemory__set/get/remove函数 -> 操作完成返回Managerset/get/remove函数 -> 操作完成返回handle函数 -> 向worker进程发送通信消息 -> worker进程监听到来自master进程的消息 -> 从__getCallbacks__中取出回调函数并执行。

互斥访问

到目前为止,我们已经实现了读写共享内存,但还没有结束,目前的共享内存是存在严重安全问题的。因为这个共享内存是可以所有进程同时访问的,然而我们并没有考虑并发访问时的时序问题。我们来看下面这个例子:

时间进程A进程B共享内存中变量x的值
t0

0
t1读取x(x=0)
0
t2x1=x+1(x1=1)读取x(x=0)0
t3将x1的值写回xx2=x+1(x2=1)1
t4
将x2的值写回x1

进程A和进程B的目的都是将x的值加1,理想情况下最后x的值应该是2,可是最后的结果却是1。这是因为进程B在t3时刻给x的值加1的时候,使用的是t2时刻读取出来的x的值,但此时从全局角度来看,这个值已经过期了,因为t3时刻x最新的值已经被进程A写为了1,可是进程B无法知道进程外部的变化,所以导致了t4时刻最后写回的值又覆盖掉了进程A写回的值,等于是进程A的行为被覆盖掉了。

在多线程、多进程和分布式中并发情况下的数据一致性问题是老大难问题了,这里不再展开讨论。

为了解决上述问题,我们必须实现进程间互斥访问某个对象,来避免同时操作一个对象,从而使进程可以进行原子操作,所谓原子操作就是不可被打断的一小段连续操作,为此需要引入锁的概念。由于读写均以对象为基本单位,因此锁的粒度设置为对象级别。在某一个进程(的某一任务)获取了某个对象的锁之后,其它要获取锁的进程(的任务)会被阻塞,直到锁被归还。而要进行写操作,则必须要先获取对象的锁。这样在获取到锁直到锁被释放的这段时间里,该对象在共享内存中的值不会被其它进程修改,从而导致错误。

Manager__sharedMemory__中加入locks属性,用来记录哪个对象的锁被拿走了,lockRequestQueues属性用来记录被阻塞的任务(正在等待锁的任务)。并增加getLock函数和releaseLock函数,用来申请和归还锁,以及handleLockRequest函数,用来使被阻塞的任务获得锁。在申请锁时,会先将回调函数记录到lockRequestQueues队尾(因为此时该对象的锁可能已被拿走),然后再调用handleLockRequest检查当前锁是否被拿走,若锁还在,则让队首的任务获得锁。归还锁时,先将__sharedMemory__.locks中对应的记录删掉,然后再调用handleLockRequest让队首的任务获得锁。

// manager.jsconst {
 v4: uuid4 }
     = require('uuid');
class Manager {
  constructor() {
    this.__sharedMemory__ = {
      ...      locks: {
}
,      lockRequestQueues: {
}
,    }
    ;
  }
  getLock(key) {
        return new Promise((resolve) =>
 {
          this.__sharedMemory__.lockRequestQueues[key] =        this.__sharedMemory__.lockRequestQueues[key] ?? [];
          this.__sharedMemory__.lockRequestQueues[key].push(resolve);
          this.handleLockRequest(key);
    }
    );
  }
  releaseLock(key, lockId) {
        return new Promise((resolve) =>
 {
      if (lockId === this.__sharedMemory__.locks[key]) {
            delete this.__sharedMemory__.locks[key];
            this.handleLockRequest(key);
      }
          resolve('OK');
    }
    );
  }
  handleLockRequest(key) {
        return new Promise((resolve) =>
 {
          if (        !this.__sharedMemory__.locks[key] &
    &
            this.__sharedMemory__.lockRequestQueues[key]?.length >
 0      ) {
            const callback = this.__sharedMemory__.lockRequestQueues[key].shift();
            const lockId = uuid4();
            this.__sharedMemory__.locks[key] = lockId;
            callback(lockId);
      }
          resolve();
    }
    );
  }
  ...}
    

Worker中,则是增加getLockreleaseLock两个函数,行为与getset类似,都是调用handle函数。

// worker.jsclass Worker {
  getLock(key) {
        return new Promise((resolve) =>
 {
          this.handle('getLock', key, null, (value) =>
 {
            resolve(value);
      }
    );
    }
    );
  }
  releaseLock(key, lockId) {
        return new Promise((resolve) =>
 {
          this.handle('releaseLock', key, lockId, (value) =>
 {
            resolve(value);
      }
    );
    }
    );
  }
  ...}
    

监听对象

有时候我们需要监听某个对象值的变化,在单进程Node应用中这很容易做到,只需要重写对象的set属性就可以了,然而在多进程共享内存中,对象和监听者都不在一个进程中,这只能依赖Manager的实现。这里,我们选择了经典的观察者模式来实现监听共享内存中的对象。

为此,我们先在__sharedMemory__中加入listeners属性,用来记录在对象值发生变化时监听者注册的回调函数。然后增加listen函数,其将监听回调函数记录到__sharedMemory__.listeners中,这个监听回调函数会将变化的值发送给对应的worker进程。最后,在setremove函数返回前调用notifyListener,将所有记录在__sharedMemory__.listeners中监听该对象的所有函数取出并调用。

// manager.jsclass Manager {
  constructor() {
    this.__sharedMemory__ = {
      ...      listeners: {
}
,    }
    ;
  }
  handle(data, target) {
    if (data.method === 'listen') {
          this.listen(data.key, (value) =>
 {
        const msg = {
          isNotified: true,          id: data.id,          uuid: data.uuid,          value,        }
    ;
            target.send(msg);
      }
    );
    }
 else {
      ...    }
  }
  notifyListener(key) {
        const listeners = this.__sharedMemory__.listeners[key];
        if (listeners?.length >
 0) {
          Promise.all(        listeners.map(          (callback) =>
                new Promise((resolve) =>
 {
                  callback(this.__sharedMemory__.get(key));
                  resolve();
            }
    )        )      );
    }
  }
  set(key, value) {
        return new Promise((resolve) =>
 {
          this.__sharedMemory__.set(key, value);
          this.notifyListener(key);
          resolve('OK');
    }
    );
  }
  remove(key) {
        return new Promise((resolve) =>
 {
          this.__sharedMemory__.remove(key);
          this.notifyListener(key);
          resolve('OK');
    }
    );
  }
  listen(key, callback) {
    if (typeof callback === 'function') {
          this.__sharedMemory__.listeners[key] =        this.__sharedMemory__.listeners[key] ?? [];
          this.__sharedMemory__.listeners[key].push(callback);
    }
 else {
          throw new Error('a listener must have a callback.');
    }
  }
  ...}
    

Worker中由于监听操作与其它操作不一样,它是一次注册监听回调函数之后对象的值每次变化都会被通知,因此需要在增加一个__getListenerCallbacks__属性用来记录监听操作的回调函数,与__getCallbacks__不同,它里面的函数在收到master的回信之后不会删除。

// worker.jsclass Worker {
  constructor() {
    ...    this.__getListenerCallbacks__ = {
}
    ;
        process.on('message', (data) =>
 {
      if (data.isNotified) {
            const callback = this.__getListenerCallbacks__[data.uuid];
            if (callback &
    &
 typeof callback === 'function') {
              callback(data.value);
        }
      }
 else {
        ...      }
    }
    );
  }
  handle(method, key, value, callback) {
    ...    if (method === 'listen') {
          this.__getListenerCallbacks__[uuid] = callback;
    }
 else {
          this.__getCallbacks__[uuid] = callback;
    }
  }
  listen(key, callback) {
    if (typeof callback === 'function') {
          this.handle('listen', key, null, callback);
    }
 else {
          throw new Error('a listener must have a callback.');
    }
  }
  ...}
    

LRU缓存

有时候我们需要用用内存作为缓存,但多进程中各进程的内存空间独立,不能共享,因此也需要用到共享内存。但是如果用共享内存中的一个对象作为缓存的话,由于每次IPC都需要传输整个缓存对象,会导致缓存对象不能太大(否则序列化和反序列化耗时太长),而且由于写缓存对象的操作需要加锁,进一步影响了性能,而原本我们使用缓存就是为了加快访问速度。其实在使用缓存的时候通常不会做复杂操作,大多数时候也不需要保障一致性,因此我们可以在Manager再增加一个共享内存__sharedLRUMemory__,其为一个lru-cache实例,并增加getLRUsetLRUremoveLRU函数,与setgetremove函数类似。

// manager.jsconst LRU = require('lru-cache');
class Manager {
  constructor() {
    ...    this.defaultLRUOptions = {
 max: 10000, maxAge: 1000 * 60 * 5 }
    ;
        this.__sharedLRUMemory__ = new LRU(this.defaultLRUOptions);
  }
  getLRU(key) {
        return new Promise((resolve) =>
 {
          resolve(this.__sharedLRUMemory__.get(key));
    }
    );
  }
  setLRU(key, value) {
        return new Promise((resolve) =>
 {
          this.__sharedLRUMemory__.set(key, value);
          resolve('OK');
    }
    );
  }
  removeLRU(key) {
        return new Promise((resolve) =>
 {
          this.__sharedLRUMemory__.del(key);
          resolve('OK');
    }
    );
  }
  ...}
    

Worker中也增加getLRUsetLRUremoveLRU函数。

// worker.jsclass Worker {
  getLRU(key) {
        return new Promise((resolve) =>
 {
          this.handle('getLRU', key, null, (value) =>
 {
            resolve(value);
      }
    );
    }
    );
  }
  setLRU(key, value) {
        return new Promise((resolve) =>
 {
          this.handle('setLRU', key, value, () =>
 {
            resolve();
      }
    );
    }
    );
  }
  removeLRU(key) {
        return new Promise((resolve) =>
 {
          this.handle('removeLRU', key, null, () =>
 {
            resolve();
      }
    );
    }
    );
  }
  ...}
    

共享内存的使用方式

目前共享内存的实现已发到npm仓库(文档和源代码在GIThub仓库,欢迎pull request和报bug),可以直接通过npm安装:

npm i cluster-shared-memory

下面的示例包含了基本使用方法:

const cluster = require('cluster');
    // 引入模块时会根据当前进程 master 进程还是 worker 进程自动创建对应的 SharedMemory 对象require('cluster-shared-memory');
if (cluster.isMaster) {
      // 在 master 进程中 fork 子进程  for (let i = 0;
     i  2;
 i++) {
        cluster.fork();
  }
}
 else {
      const sharedMemoryController = require('./src/shared-memory');
  const obj = {
    name: 'Tom',    age: 10,  }
    ;
        // 写对象  await sharedMemoryController.set('myObj', obj);
        // 读对象  const myObj = await sharedMemoryController.get('myObj');
        // 互斥访问对象,首先获得对象的锁  const lockId = await sharedMemoryController.getLock('myObj');
      const newObj = await sharedMemoryController.get('myObj');
      newObj.age = newObj.age + 1;
      await sharedMemoryController.set('myObj', newObj);
      // 操作完之后释放锁  await sharedMemoryController.releaseLock('requestTimes', lockId);
        // 或者使用 mutex 函数自动获取和释放锁  await sharedMemoryController.mutex('myObj', async () =>
 {
        const newObjM = await sharedMemoryController.get('myObj');
        newObjM.age = newObjM.age + 1;
        await sharedMemoryController.set('myObj', newObjM);
  }
    );
        // 监听对象  sharedMemoryController.listen('myObj', (value) =>
 {
    console.LOG(`myObj: ${
value}
    `);
  }
    );
    //写LRU缓存  await sharedMemoryController.setLRU('cacheitem', {
user: 'Tom'}
    );
        // 读对象  const cacheItem = await sharedMemoryController.getLRU('cacheItem');
}
    

缺点

这种实现目前尚有几个缺点:

  • 不能使用PM2的自动创建worker进程的功能。

由于PM2会使用自己的cluster模块的master进程的实现,而我们的共享内存模块需要在master进程维护一个内存空间,则不能使用PM2的实现,因此不能使用PM2的自动创建worker进程的功能。

  • 传输的对象必须可序列化,且不能太大。

  • 如果使用者在获取锁之后忘记释放,会导致其它进程一直被阻塞,这要求程序员有良好的代码习惯。

原文地址:https://juejin.cn/post/6992091006220894215

作者:FinalZJY

更多编程相关知识,请访问:编程视频!!

以上就是浅谈Node.js多进程模型中如何实现共享内存(代码详解)的详细内容,更多请关注其它相关文章!

声明:本文内容由网友自发贡献,本站不承担相应法律责任。对本内容有异议或投诉,请联系2913721942#qq.com核实处理,我们将尽快回复您,谢谢合作!

上一篇: 浅谈Angular模板指令:ng-templa...下一篇:Angular中什么是Ivy编译?如何开...猜你在找的JavaScript相关文章 html font标签如何设置字体大小?html font标签属性用法介绍2022-05-16vue3+TypeScript+vue-router的使用方法2022-04-16vue3获取当前路由地址2022-04-16如何利用React实现图片识别App2022-04-16JavaScript展开运算符和剩余运算符的区别详解2022-04-16微信小程序中使用vant框架的具体步骤2022-04-16Vue elementUI表单嵌套表格并对每行进行校验详解2022-04-16如何利用Typescript封装本地存储2022-04-16微信小程序中wxs文件的一些妙用分享2022-04-16JavaScript的Set数据结构详解2022-04-16 其他相关热搜词更多phpjavapython程序员loadpost-format-gallery

若转载请注明出处: 浅谈Node.js多进程模型中如何实现共享内存(代码详解)
本文地址: https://pptw.com/jishu/591998.html
Node.js如何使用文件系统模块?常用fs模块方法介绍 浅谈Node.js中ES6导入语法的使用方法

游客 回复需填写必要信息