Arhn - архитектура программирования

NodeJS|Cluster: как отправить данные от мастера всем или одному дочернему/рабочему?

У меня есть рабочий (стандартный) скрипт с узла .

var cluster = require('cluster');
var http = require('http');
var numReqs = 0;

if (cluster.isMaster) {
  // Fork workers.
  for (var i = 0; i < 2; i++) {
    var worker = cluster.fork();

    worker.on('message', function(msg) {
      if (msg.cmd && msg.cmd == 'notifyRequest') {
        numReqs++;
      }
    });
  }

  setInterval(function() {
    console.log("numReqs =", numReqs);
  }, 1000);
} else {
  // Worker processes have a http server.
  http.Server(function(req, res) {
    res.writeHead(200);
    res.end("hello world\n");
    // Send message to master process
    process.send({ cmd: 'notifyRequest' });
  }).listen(8000);
}

В приведенном выше скрипте я могу легко отправлять данные от рабочего процесса к мастер-процессу. Но как отправить данные от мастера к воркеру/воркерам? С примерами, если можно.


Ответы:


1

Поскольку cluster.fork реализован поверх child_process.fork, вы можете отправлять сообщения от мастера к рабочему с помощью worker.send({ msg: 'test' }) и от рабочего к мастеру с помощью process.send({ msg: 'test' });. Вы получаете такие сообщения: worker.on('message', callback) (от работника к мастеру) и process.on('message', callback); (от мастера к работнику).

Вот мой полный пример, вы можете протестировать его, просмотрев http://localhost:8000/. Затем рабочий процесс отправит сообщение мастеру, а мастер ответит:

var cluster = require('cluster');
var http = require('http');
var numReqs = 0;
var worker;

if (cluster.isMaster) {
  // Fork workers.
  for (var i = 0; i < 2; i++) {
    worker = cluster.fork();

    worker.on('message', function(msg) {
      // we only want to intercept messages that have a chat property
      if (msg.chat) {
        console.log('Worker to master: ', msg.chat);
        worker.send({ chat: 'Ok worker, Master got the message! Over and out!' });
      }
    });

  }
} else {
  process.on('message', function(msg) {
    // we only want to intercept messages that have a chat property
    if (msg.chat) {
      console.log('Master to worker: ', msg.chat);
    }
  });
  // Worker processes have a http server.
  http.Server(function(req, res) {
    res.writeHead(200);
    res.end("hello world\n");
    // Send message to master process
    process.send({ chat: 'Hey master, I got a new request!' });
  }).listen(8000);
}
16.12.2011
  • На самом деле я хочу создать push-сервер для сокет-клиентов (web/flash). Текущая версия поддерживает 1000 одновременных подключений. Поэтому я решил создать несколько воркеров со слушателями socket.io. Это означает, что мне нужно передавать данные рабочим асинхронно. 16.12.2011
  • Звучит нормально, убедитесь, что вы используете Socket.IO с RedisStore. 16.12.2011
  • Это не сработает. var внутри for? worker будет удерживать последнего разветвленного рабочего, а не каждого (особенно внутри обратного вызова события). Либо вы не заботитесь обо всех, и вы просто заключаете свой обратный вызов, либо держите всех рабочих в массиве. 21.12.2011
  • Извините за вар, я скопировал часть его кода. Я не держу всех своих воркеров в массиве, потому что я просто хотел доказать функциональность. 21.12.2011
  • @alessioalex Будет ли отправка данных через воркеров медленнее или быстрее, чем при использовании Redis? 03.09.2015
  • @NiCkNewman, я не совсем уверен. С кластером вы используете child_process, который, я думаю, открывает сокет для обработки межпроцессного взаимодействия. Вы должны сделать несколько тестов с реальными данными и посмотреть, как они работают. 14.09.2015
  • Если мы хотим отправить объекты JavaScript, это будет toJSON и fromJSON с другой стороны? 15.06.2018

  • 2

    Я нашел этот поток, когда искал способ отправить сообщение всем дочерним процессам, и, к счастью, смог понять это благодаря комментариям о массивах. Просто хотел проиллюстрировать потенциальное решение для отправки сообщения всем дочерним процессам, использующим этот подход.

    var cluster = require('cluster');
    var http = require('http');
    var numReqs = 0;
    var workers = [];
    
    if (cluster.isMaster) {
      // Broadcast a message to all workers
      var broadcast = function() {
        for (var i in workers) {
          var worker = workers[i];
          worker.send({ cmd: 'broadcast', numReqs: numReqs });
        }
      }
    
      // Fork workers.
      for (var i = 0; i < 2; i++) {
        var worker = cluster.fork();
    
        worker.on('message', function(msg) {
          if (msg.cmd) {
            switch (msg.cmd) {
              case 'notifyRequest':
                numReqs++;
              break;
              case 'broadcast':
                broadcast();
              break;
            }
        });
    
        // Add the worker to an array of known workers
        workers.push(worker);
      }
    
      setInterval(function() {
        console.log("numReqs =", numReqs);
      }, 1000);
    } else {
      // React to messages received from master
      process.on('message', function(msg) {
        switch(msg.cmd) {
          case 'broadcast':
            if (msg.numReqs) console.log('Number of requests: ' + msg.numReqs);
          break;
        }
      });
    
      // Worker processes have a http server.
      http.Server(function(req, res) {
        res.writeHead(200);
        res.end("hello world\n");
        // Send message to master process
        process.send({ cmd: 'notifyRequest' });
        process.send({ cmd: 'broadcast' });
      }).listen(8000);
    }
    
    12.06.2012

    3

    Вот как я реализовал решение аналогичной проблемы. Подключаясь к cluster.on('fork'), вы можете прикреплять обработчики сообщений к рабочим процессам по мере их разветвления (вместо того, чтобы хранить их в массиве), что дает дополнительное преимущество при работе со случаями, когда рабочие умирают или отключаются, а новый рабочий разветвляется.

    Этот фрагмент отправляет сообщение от мастера всем рабочим процессам.

    if (cluster.isMaster) {
        for (var i = 0; i < require('os').cpus.length; i++) {
            cluster.fork();
        }
    
        cluster.on('disconnect', function(worker) {
            cluster.fork();
        }
    
        // When a new worker process is forked, attach the handler
        // This handles cases where new worker processes are forked
        // on disconnect/exit, as above.
        cluster.on('fork', function(worker) {
            worker.on('message', messageRelay);
        }
    
        var messageRelay = function(msg) {
            Object.keys(cluster.workers).forEach(function(id) {
                cluster.workers[id].send(msg);
            });
        };
    }
    else {
        process.on('message', messageHandler);
    
        var messageHandler = function messageHandler(msg) {
            // Worker received message--do something
        };
    }
    
    02.07.2015

    4

    Я понимаю вашу цель трансляции всем рабочим процессам узла в кластере, хотя вы не можете отправлять компонент сокета как таковой, но есть обходной путь для этой цели. Попробую объяснить на примере:

    Шаг 1. Если для действия клиента требуется широковещательная рассылка:

    Child.js (Process that has been forked) :
    
    socket.on("BROADCAST_TO_ALL_WORKERS", function (data) 
    {
        process.send({cmd : 'BROADCAST_TO_ALL_WORKERS', message :data.message});
    }) 
    

    Шаг 2. На стороне создания кластера

    Server.js (Place where cluster forking happens):
    
    if (cluster.isMaster) {
    
      for (var i = 0; i < numCPUs; i++) {
    
        var worker = cluster.fork();
    
        worker.on('message', function (data) {
         if (data.cmd === "BROADCAST_TO_ALL_WORKERS") {
           console.log(server_debug_prefix() + "Server Broadcast To All, Message : " + data.message + " , Reload : " + data.reload + " Player Id : " + data.player_id);
            Object.keys(cluster.workers).forEach(function(id) {
                cluster.workers[id].send({cmd : "BROADCAST_TO_WORKER", message : data.message});
            });
          }
        });
      }
    
      cluster.on('exit', function (worker, code, signal) {
        var newWorker = cluster.fork();
        newWorker.on('message', function (data) {
          console.log(data);
          if (data.cmd === "BROADCAST_TO_ALL_WORKERS") {
            console.log(data.cmd,data);
            Object.keys(cluster.workers).forEach(function(id) {
                cluster.workers[id].send({cmd : "BROADCAST_TO_WORKER", message : data.message});
            });
          }
        });
      });
    } 
    else {
      //Node Js App Entry
      require("./Child.js");
    }
    

    Шаг 3. Трансляция в дочерний процесс —

    -> Поместите это перед io.on("connection") в Child.js

    process.on("message", function(data){
        if(data.cmd === "BROADCAST_TO_WORKER"){
            io.sockets.emit("SERVER_MESSAGE", { message: data.message, reload: data.reload, player_id : data.player_id });
        }
    });
    

    Надеюсь, это поможет. Пожалуйста, дайте мне знать, если требуются дополнительные разъяснения.

    19.08.2017

    5

    Вы должны иметь возможность отправить сообщение от мастера к работнику следующим образом:

    worker.send({message:'hello'})
    

    потому что «cluster.fork реализован поверх child_process.fork» (cluster.fork реализован поверх child_process.fork)

    16.12.2011
  • Да работает, спасибо! Другими словами: при разветвлении рабочих я должен хранить их в массиве. И повторите этот массив, чтобы отправить данные каждому ребенку. Есть ли другой способ отправить данные всем воркерам без сохранения и итерации. 16.12.2011
  • Если вы не хотите хранить рабочих в массиве и перебирать их для отправки сообщений, вы можете использовать сокет домена unix для передачи сообщений от мастера к рабочим. 16.12.2011
  • Я полагаю, вы можете создать EventEmitter в мастере, чтобы они генерировали событие всякий раз, когда получено сообщение. После создания каждого воркера вам просто нужно добавить слушателя к EventEmitter, который будет отправлять сообщение воркеру. Конечно, это все еще реализовано, сохраняя ссылки слушателей (а значит, и воркеров) в EventEmitter, но, по крайней мере, вам не нужно на это смотреть 24.02.2012

  • 6

    Если вам нужно отправить только простые данные конфигурации для вашего дочернего процесса, вы можете отправить переменные среды с помощью cluster.fork(). Это полезно и имеет преимущества перед отправкой сообщения через методы кластера и процесса send.

    const cluster = require('cluster')
    
    if (cluster.isMaster) {
      cluster.fork({
        MY_DATA: 'something here'
      })
    } else {
      console.log(process.env.MY_DATA) // "something here"
    }
    
    08.11.2020
    Новые материалы

    Коллекции публикаций по глубокому обучению
    Последние пару месяцев я создавал коллекции последних академических публикаций по различным подполям глубокого обучения в моем блоге https://amundtveit.com - эта публикация дает обзор 25..

    Представляем: Pepita
    Фреймворк JavaScript с открытым исходным кодом Я знаю, что недостатка в фреймворках JavaScript нет. Но я просто не мог остановиться. Я хотел написать что-то сам, со своими собственными..

    Советы по коду Laravel #2
    1-) Найти // You can specify the columns you need // in when you use the find method on a model User::find(‘id’, [‘email’,’name’]); // You can increment or decrement // a field in..

    Работа с временными рядами спутниковых изображений, часть 3 (аналитика данных)
    Анализ временных рядов спутниковых изображений для данных наблюдений за большой Землей (arXiv) Автор: Рольф Симоэс , Жильберто Камара , Жильберто Кейрос , Фелипе Соуза , Педро Р. Андраде ,..

    3 способа решить квадратное уравнение (3-й мой любимый) -
    1. Методом факторизации — 2. Используя квадратичную формулу — 3. Заполнив квадрат — Давайте поймем это, решив это простое уравнение: Мы пытаемся сделать LHS,..

    Создание VR-миров с A-Frame
    Виртуальная реальность (и дополненная реальность) стали главными модными терминами в образовательных технологиях. С недорогими VR-гарнитурами, такими как Google Cardboard , и использованием..

    Демистификация рекурсии
    КОДЕКС Демистификация рекурсии Упрощенная концепция ошеломляющей О чем весь этот шум? Рекурсия, кажется, единственная тема, от которой у каждого начинающего студента-информатика..