在Node.js中使用SO_RESUEPORT
前言:今天下載了Node.js最新版代碼,并為Node.js的TCP模塊增加了SO_RESUEPORT的能力,本文介紹一下具體的實(shí)現(xiàn),關(guān)于SO_RESUEPORT的知識(shí)可以參考之前的文章或者網(wǎng)上文章。
1 Libuv
SO_RESUEPORT是操作系統(tǒng)內(nèi)核提供的能力,所以第一步首先修改Libuv??紤]到操作系統(tǒng)兼容性的問題,目前只支持Linux系統(tǒng),舊版Mac OS也支持相關(guān)屬性但是效果不符合預(yù)期,新版Mac OS倒是支持,考慮到Node.js在幾乎都是部署到Linux,所以可以先關(guān)注Linux內(nèi)核。首先修改deps/uv/include/uv.h。
- enum uv_tcp_flags {
- UV_TCP_IPV6ONLY = 1,
- // 支持SO_RESUEPORT flags
- UV_TCP_REUSEPORT = 2
- };
接著修改deps/uv/src/unix/tcp.c。
- #if defined(SO_REUSEPORT) && defined(__linux__)
- on = 1;
- if ((flags & UV_TCP_REUSEPORT) && setsockopt(tcp->io_watcher.fd, SOL_SOCKET, SO_REUSEPORT, &on, sizeof(on)))
- return UV__ERR(errno);
- #endif
這里判斷一下是否有兩個(gè)宏,有的話才能使用SO_RESUEPORT。如果支持則通過setsockopt設(shè)置socket的SO_REUSEPORT標(biāo)記,這是最核心的邏輯。
2 修改C++層
修改完底層的Libuv后,繼續(xù)修改C++層,因?yàn)檫@是一個(gè)可選的屬性,所以我們需要增加相關(guān)的邏輯。修改src/tcp_wrap.cc。首先導(dǎo)出一個(gè)新的常量
- #if defined(SO_REUSEPORT) && defined(__linux__)
- NODE_DEFINE_CONSTANT(constants, UV_TCP_REUSEPORT);
- #endif
在JS層可以通過判斷是否導(dǎo)出了這個(gè)常量來判斷系統(tǒng)是否支持SO_RESUEPORT。接著修改bind函數(shù),因?yàn)槲覀冊(cè)賐ind的時(shí)候可以設(shè)置SO_RESUEPORT。
- template <typename T>
- void TCPWrap::Bind(
- const FunctionCallbackInfo<Value>& args,
- int family,
- std::function<int(const char* ip_address, int port, T* addr)> uv_ip_addr) {
- TCPWrap* wrap;
- ASSIGN_OR_RETURN_UNWRAP(&wrap,
- args.Holder(),
- args.GetReturnValue().Set(UV_EBADF));
- Environment* env = wrap->env();
- node::Utf8Value ip_address(env->isolate(), args[0]);
- int port;
- unsigned int flags = 0;
- if (!args[1]->Int32Value(env->context()).To(&port)) return;
- // ipv6支持ipv6Only和SO_RESUEPORT
- if (family == AF_INET6 &&
- !args[2]->Uint32Value(env->context()).To(&flags)) {
- return;
- // ipv4之前是不支持任何標(biāo)記的,這里需要加上這個(gè)邏輯,因?yàn)槲覀冃枰С諷O_RESUEPORT
- } else if (family == AF_INET4 &&
- !args[2]->Uint32Value(env->context()).To(&flags)) {
- return;
- }
- T addr;
- int err = uv_ip_addr(*ip_address, port, &addr);
- if (err == 0) {
- err = uv_tcp_bind(&wrap->handle_,
- reinterpret_cast<const sockaddr*>(&addr),
- flags);
- }
- args.GetReturnValue().Set(err);
- }
C++主要是完成透?jìng)鱢lags的邏輯。
3 修改JS層
修改JS層是最復(fù)雜的地方,主要是為了應(yīng)用層的兼容性問題。也就是說如果Node.js真的支持了SO_RESUEPORT,在某些平臺(tái)不支持SO_RESUEPORT的情況下,我們?nèi)绾文鼙WC我們的代碼能在各個(gè)平臺(tái)上跑。簡(jiǎn)單來說,如果我們平臺(tái)支持SO_RESUEPORT,我們可以開啟多個(gè)子進(jìn)程,然后分別執(zhí)行以下代碼。
- const http = require('http');
- http.createServer((req, res) => {
- res.end('hello');
- })
- .listen({port: 8000, reuseport: true});
這時(shí)候,只需要修改一下Node.js的net.js,把reuseport標(biāo)記傳到C++層再傳到Libuv就行,但是問題是,如果我們這樣寫代碼,就無法在不支持SO_RESUEPORT的平臺(tái)跑了,因?yàn)闀?huì)導(dǎo)致重復(fù)監(jiān)聽端口的錯(cuò)誤。所以為了兼容性,我想的方案是利用Cluster模塊,目前Cluster模塊支持輪詢和共享兩種模式,那么我們?cè)偌右环Nreuseport模式就好了,這樣的好處是一旦我們平臺(tái)不支持SO_RESUEPORT,我們可以降級(jí)到Node.js現(xiàn)在到模式。我們知道Cluster模塊的原理有兩種,一種是主進(jìn)程監(jiān)聽,分發(fā)連接給子進(jìn)程,另一種是主進(jìn)程創(chuàng)建socket,通過文件描述符傳遞的方式傳給子進(jìn)程,所有的進(jìn)程都是共享一個(gè)socket的。下面我們看看怎么做。首先修改lib/internal/cluster/primary.js。
- // 增加這if的邏輯
- if ((message.addressType === 4 ||
- message.addressType === 6) &&
- (message.flags & TCPConstants.UV_TCP_REUSEPORT)) {
- handle = new ReusePort(key, address, message);
- } else if (schedulingPolicy !== SCHED_RR ||
- message.addressType === 'udp4' ||
- message.addressType === 'udp6') {
- handle = new SharedHandle(key, address, message);
- } else {
- handle = new RoundRobinHandle(key, address, message);
- }
我們?cè)趒ueryServer函數(shù)里增加了一個(gè)if的邏輯。如果addressType是4或6說明是TCP協(xié)議,并且設(shè)置了UV_TCP_REUSEPORT(listen的時(shí)候傳入),就會(huì)走到reuseport的邏輯,剩下的兩個(gè)else是目前Node.js的邏輯。我們看看ReusePort.js做了什么。
- 'use strict';
- const assert = require('internal/assert');
- const net = require('net');
- const { constants: TCPConstants } = internalBinding('tcp_wrap');
- module.exports = ReusePort;
- function ReusePort(key, address, {port, addressType, fd, flags}) {
- this.key = key;
- this.workers = [];
- this.handles = [];
- this.list = [address, port, addressType, fd, flags];
- }
- ReusePort.prototype.add = function(worker, send) {
- assert(!this.workers.includes(worker));
- const rval = net._createServerHandle(...this.list);
- let errno;
- let handle;
- if (typeof rval === 'number')
- errno = rval;
- else
- handle = rval;
- this.workers.push(worker);
- this.handles.push(handle);
- send(errno, null, handle);
- };
- ReusePort.prototype.remove = function(worker) {
- const index = this.workers.indexOf(worker);
- if (index === -1)
- return false; // The worker wasn't sharing this handle.
- this.workers.splice(index, 1);
- this.handles[index].close();
- this.handles.splice(index, 1);
- return true;
- };
上面的代碼我們只需要關(guān)注net._createServerHandle。在不能多個(gè)進(jìn)程同時(shí)監(jiān)聽同一個(gè)端口的情況下,Node.js只會(huì)調(diào)net._createServerHandle創(chuàng)建一個(gè)socket,然后多個(gè)進(jìn)程共享。而我們這里會(huì)給每個(gè)進(jìn)程創(chuàng)建一個(gè)socket。這個(gè)socket就是在子進(jìn)程調(diào)用queryServer的時(shí)候返回給子進(jìn)程的。剩下的邏輯我們暫時(shí)不用關(guān)注。最后看一下_createServerHandle的邏輯。
- const handle = new TCP(TCPConstants.SERVER);
- if (addressType === 6) {
- err = handle.bind6(address, port, flags);}
- else {
- err = handle.bind(address, port, flags || 0);
- }
_createServerHandle的邏輯是創(chuàng)建一個(gè)socket并且給socket綁定IP和端口,我們看到這里會(huì)給C++層傳入flags,C++層就會(huì)傳到LIbuv了,這樣我們就完成了整個(gè)過程,整體的流程如下。
1 子進(jìn)程執(zhí)行l(wèi)isten的時(shí)候,傳入reuseport為true
2 子進(jìn)程通過進(jìn)程間通信請(qǐng)求主進(jìn)程
3 主進(jìn)程返回一個(gè)新的socket并綁定到對(duì)應(yīng)的地址
4 子進(jìn)程執(zhí)行l(wèi)isten啟動(dòng)服務(wù)器。
4 使用
接下來我們看看如何使用,首先創(chuàng)建一個(gè)server.js。
- const cluster = require('cluster');
- const os = require('os');
- const http = require('http');
- const cpus = os.cpus().length;
- if (cluster.isPrimary) {
- const map = {};
- for (let i = 0; i < cpus; i++) {
- const worker = cluster.fork();
- map[worker.process.pid] = 0;
- worker.on('message', (pid) => {
- map[pid]++;
- });
- }
- process.on('SIGINT', () => {
- console.log(map);
- });
- } else {
- http.createServer((req, res) => {
- process.send(process.pid);
- res.end('hello');
- })
- .listen({reuseport: true, port: 8000});
- }
再創(chuàng)建一個(gè)客戶端client.js
- const http = require('http');
- function connect() {
- setTimeout(() => {
- http.get('http://localhost:8000/', (res) => {
- console.log(res.statusCode);
- connect();
- });
- }, 50);
- }
- connect();
客戶端串行訪問服務(wù)器,我們看到使用方式和目前Node.js的Cluster使用一樣。即使我們把reuseport改成false或者其他平臺(tái)跑也沒問題,效果如下

我們看到在reuseport的情況下,負(fù)載還是挺均衡的。
后記:目前是通過listen的時(shí)候傳入?yún)?shù)去控制是否開啟SO_RESUEPORT的,后續(xù)可以增加通過設(shè)置cluster.schedulingPolicy的方式,和目前共享、輪詢模式對(duì)齊,考慮到Cluster模塊不是必須,因?yàn)槲覀兛梢灾苯佑米舆M(jìn)程模塊監(jiān)聽同一個(gè)端口。所以通過listen函數(shù)去控制是非常必要的。目前通過修改Node.js內(nèi)核大概體驗(yàn)了一下SO_RESUEPORT,后續(xù)review和改進(jìn)一下代碼。