Netty源碼之Reactor模式
學(xué)習(xí)目標(biāo)
- 什么是Reactor模式?
- Reactor模式由什么組成的?
- Reactor模式解決什么問(wèn)題?
- Reactor模式線程模型有哪些?演進(jìn)過(guò)程?
web處理請(qǐng)求架構(gòu)
大多數(shù)web請(qǐng)求處理流程可以抽象成這幾個(gè)步驟:讀取(read),解碼(decode),處理(process),編碼(encode),發(fā)送(send),如下圖所示:
同時(shí),處理web請(qǐng)求通常有兩種架構(gòu):傳統(tǒng)基于線程架構(gòu)和事件驅(qū)動(dòng)架構(gòu)。
傳統(tǒng)基于線程架構(gòu)
概念
每個(gè)新連接創(chuàng)建一個(gè)線程來(lái)處理。對(duì)于長(zhǎng)連接服務(wù),如果一個(gè)client和server保持一個(gè)連接的話,有多少個(gè)client接入,server就需要?jiǎng)?chuàng)建同等的線程來(lái)處理。線程上下文切換,數(shù)據(jù)同步和內(nèi)存消耗,對(duì)server來(lái)說(shuō),將是非常大的開銷。
代碼實(shí)現(xiàn)
傳統(tǒng)基于線程架構(gòu)通常采用BIO的方式來(lái)實(shí)現(xiàn),代碼如下:
- public class Server implements Runnable {
- int port;
- public Server(int port) {
- this.port = port;
- }
- @Override
- public void run() {
- try {
- ServerSocket serverSocket = new ServerSocket(port);
- while (true){
- System.out.println("等待新連接...");
- new Thread(new Handler(serverSocket.accept())).start();
- }
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- static class Handler implements Runnable{
- private Socket socket;
- public Handler(Socket socket){
- this.socket = socket;
- }
- @Override
- public void run() {
- try {
- byte[] input = new byte[1024];
- this.socket.getInputStream().read(input);
- byte[] output = process(input);
- this.socket.getOutputStream().write(output);
- this.socket.getOutputStream().flush();
- this.socket.close();
- System.out.println("響應(yīng)完成!");
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- private byte[] process(byte[] input) {
- System.out.println("讀取內(nèi)容:" + new String(input));
- return input;
- }
- }
- public static void main(String[] args) throws InterruptedException {
- Thread thread = new Thread(new Server(2021));
- thread.setDaemon(true);
- thread.start();
- synchronized (Server.class) {
- Server.class.wait();
- }
- }
- }
為了避免線程創(chuàng)建銷毀的開銷,我們通常會(huì)采用線程池,但是同樣也有很大的弊端:
- 同步阻塞IO,讀寫阻塞,線程等待時(shí)間過(guò)長(zhǎng)
- 在制定線程策略的時(shí)候,只能根據(jù)CPU的數(shù)目來(lái)限定可用線程資源,不能根據(jù)連接并發(fā)數(shù)目來(lái)制定,也就是連接有限制。否則很難保證對(duì)客戶端請(qǐng)求的高效和公平。
- 多線程之間的上下文切換,造成線程使用效率并不高,并且不易擴(kuò)展
- 狀態(tài)數(shù)據(jù)以及其他需要保持一致的數(shù)據(jù),需要采用并發(fā)同步控制
應(yīng)用場(chǎng)景
既然傳統(tǒng)基于線程架構(gòu)弊端這么多,它存在還有什么價(jià)值?它的應(yīng)用場(chǎng)景是什么?
傳統(tǒng)基于線程架構(gòu)適用于連接數(shù)目比較小且一次傳輸大量數(shù)據(jù)的場(chǎng)景,比如上傳,下載。
事件驅(qū)動(dòng)架構(gòu)
事件驅(qū)動(dòng)架構(gòu):可以把線程和連接解耦,線程只用于執(zhí)行事件注冊(cè)的回調(diào)函數(shù)。事件驅(qū)動(dòng)架構(gòu)由事件生產(chǎn)者和事件消費(fèi)者組成,前者是事件的來(lái)源,它只負(fù)責(zé)監(jiān)聽哪些事件發(fā)生;后者是直接處理事件或者事件發(fā)生時(shí),響應(yīng)事件的實(shí)體。
Reactor模式
什么是Reactor模式?
Reactor模式是事件驅(qū)動(dòng)架構(gòu)的一種具體實(shí)現(xiàn)方法,簡(jiǎn)而言之,就是一個(gè)單線程進(jìn)行循環(huán)監(jiān)聽就緒IO事件,并將就緒IO事件分發(fā)給對(duì)應(yīng)的回調(diào)函數(shù)。
Reactor模式由什么組成的?
Reactor模式分為兩個(gè)重要組成部分,Reactor和Handler。 Reactor(反應(yīng)器):循環(huán)監(jiān)聽就緒IO事件,并分發(fā)給回調(diào)函數(shù)。 Handler(回調(diào)函數(shù)):執(zhí)行對(duì)應(yīng)IO事件的實(shí)際業(yè)務(wù)邏輯。
Reactor模式解決什么問(wèn)題?
反應(yīng)器模式可以實(shí)現(xiàn)同步的多路復(fù)用,同步是指按照事件到達(dá)的順序分發(fā)處理。反應(yīng)器 接收來(lái)自不同的客戶端的消息、請(qǐng)求和連接,盡管客戶端是并發(fā)的,但是反應(yīng)器可以按照事件到達(dá)的順序觸發(fā)回調(diào)函數(shù)。因此,Reactor模式將連接和線程解耦,不需要為每個(gè)連接創(chuàng)建單獨(dú)線程。這個(gè)問(wèn)題和C10K問(wèn)題相同,提供了一個(gè)解決思路。
Reactor模式下的三種模型
單線程模型:IO事件輪詢,分發(fā)(accept)和IO事件執(zhí)行(read,decode,compute,encode,send)都在一個(gè)線程中完成,如下圖所示:
在單線程模型下,不僅IO操作在Reactor線程上,而非IO操作(handlder中process()方法)也在Reactor線程上執(zhí)行了,當(dāng)非IO操作執(zhí)行慢的話,這會(huì)大大延遲IO請(qǐng)求響應(yīng),所以應(yīng)該把非IO操作拆出來(lái),來(lái)加速Reactor線程對(duì)IO請(qǐng)求響應(yīng),就出現(xiàn)多線程模型。
單線程模型實(shí)現(xiàn):
- // reactor
- public class Reactor implements Runnable {
- int port;
- Selector selector;
- ServerSocketChannel serverSocket;
- public Reactor(int port) throws IOException {
- this.port = port;
- // 創(chuàng)建serverSocket對(duì)象
- serverSocket = ServerSocketChannel.open();
- // 綁定端口
- serverSocket.socket().bind(new InetSocketAddress(port));
- // 配置非阻塞
- serverSocket.configureBlocking(false);
- // 創(chuàng)建selector對(duì)象
- selector = Selector.open();
- // serversocket注冊(cè)到selector上,幫忙監(jiān)聽accpet事件
- serverSocket.register(selector, SelectionKey.OP_ACCEPT, new Acceptor(serverSocket,selector));
- /** 還可以使用 SPI provider,來(lái)創(chuàng)建selector和serversocket對(duì)象
- SelectorProvider p = SelectorProvider.provider();
- selector = p.openSelector();
- serverSocket = p.openServerSocketChannel();
- */
- }
- @Override
- public void run() {
- try {
- while (!Thread.interrupted()) {
- System.out.println("start select event...");
- selector.select();
- Set selectedKeys = selector.selectedKeys();
- Iterator it = selectedKeys.iterator();
- while (it.hasNext()) {
- dispatch((SelectionKey)it.next());
- }
- selectedKeys.clear();
- }
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- private void dispatch(SelectionKey key) {
- Runnable r = (Runnable) key.attachment();
- if (r != null) {
- r.run();
- }
- }
- public static void main(String[] args) throws IOException, InterruptedException {
- Thread thread = new Thread(new Reactor(2021));
- thread.start();
- synchronized (Reactor.class) {
- Reactor.class.wait();
- }
- }
- }
- // acceptor調(diào)度器
- public class Acceptor implements Runnable {
- ServerSocketChannel serverSocket;
- Selector selector;
- public Acceptor(ServerSocketChannel serverSocket,Selector selector) {
- this.serverSocket = serverSocket;
- this.selector = selector;
- }
- @Override
- public void run() {
- try {
- SocketChannel socket = this.serverSocket.accept();
- if (socket != null) {
- new Handler(selector,socket);
- }
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- }
- // 回調(diào)函數(shù)handler
- public class Handler implements Runnable {
- Selector selector;
- SocketChannel socket;
- SelectionKey sk;
- ByteBuffer input = ByteBuffer.allocate(1024);
- ByteBuffer output = ByteBuffer.allocate(1024);
- static final int READING = 0, SENDING = 1;
- int state = READING;
- public Handler(Selector selector, SocketChannel socket) throws IOException {
- this.selector = selector;
- this.socket = socket;
- this.socket.configureBlocking(false);
- sk = this.socket.register(selector,0);
- sk.attach(this);
- sk.interestOps(SelectionKey.OP_READ);
- selector.wakeup();
- }
- @Override
- public void run() {
- try{
- if (state == READING) {
- read();
- } else if (state == SENDING) {
- send();
- }
- } catch (IOException ex) {
- ex.printStackTrace();
- }
- }
- private void read() throws IOException {
- socket.read(input);
- if (inputIsComplete()) {
- // 執(zhí)行業(yè)務(wù)邏輯代碼
- process();
- state = SENDING;
- // Normally also do first write now
- sk.interestOps(SelectionKey.OP_WRITE);
- }
- }
- private void send() throws IOException {
- socket.write(output);
- socket.close();
- if (outputIsComplete()) sk.cancel();
- }
- boolean inputIsComplete() { return true;}
- boolean outputIsComplete() {return true;}
- // 處理非IO操作(業(yè)務(wù)邏輯代碼)
- void process(){
- String msg = new String(input.array());
- System.out.println("讀取內(nèi)容:" + msg);
- output.put(msg.getBytes());
- output.flip();
- }
- }
- 多線程模型:與單線程模型不同的是添加一個(gè)業(yè)務(wù)線程池,將非IO操作(業(yè)務(wù)邏輯處理)交給業(yè)務(wù)線程池來(lái)處理,提高Reactor線程的IO響應(yīng),如圖所示:
在多線程模型下,雖然將非IO操作拆出去了,但是所有IO操作都在Reactor單線程中完成的。在高負(fù)載、高并發(fā)場(chǎng)景下,也會(huì)成為瓶頸,于是對(duì)Reactor單線程進(jìn)行了優(yōu)化,出現(xiàn)了主從線程模型。
多線程模型實(shí)現(xiàn):
- public class Reactor implements Runnable {
- int port;
- Selector selector;
- ServerSocketChannel serverSocket;
- public Reactor(int port) throws IOException {
- this.port = port;
- // 創(chuàng)建serverSocket對(duì)象
- serverSocket = ServerSocketChannel.open();
- // 綁定端口
- serverSocket.socket().bind(new InetSocketAddress(port));
- // 配置非阻塞
- serverSocket.configureBlocking(false);
- // 創(chuàng)建selector對(duì)象
- selector = Selector.open();
- // serversocket注冊(cè)到selector上,幫忙監(jiān)聽accpet事件
- serverSocket.register(selector, SelectionKey.OP_ACCEPT, new Acceptor("Acceptor",serverSocket,selector));
- /** 還可以使用 SPI provider,來(lái)創(chuàng)建selector和serversocket對(duì)象
- SelectorProvider p = SelectorProvider.provider();
- selector = p.openSelector();
- serverSocket = p.openServerSocketChannel();
- */
- }
- @Override
- public void run() {
- try {
- while (!Thread.interrupted()) {
- System.out.println("start select event...");
- selector.select();
- Set selectedKeys = selector.selectedKeys();
- Iterator it = selectedKeys.iterator();
- while (it.hasNext()) {
- dispatch((SelectionKey)it.next());
- }
- selectedKeys.clear();
- }
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- private void dispatch(SelectionKey key) {
- SelfRunable r = (SelfRunable) key.attachment();
- if (r != null) {
- System.out.println("dispatch to " + r.getName() + "====");
- r.run();
- }
- }
- public static void main(String[] args) throws IOException, InterruptedException {
- Thread thread = new Thread(new Reactor(2021));
- thread.start();
- synchronized (Reactor.class) {
- Reactor.class.wait();
- }
- }
- }
- public class Acceptor implements SelfRunable {
- ServerSocketChannel serverSocket;
- Selector selector;
- String name;
- public Acceptor(String name, ServerSocketChannel serverSocket,Selector selector) {
- this.name = name;
- this.serverSocket = serverSocket;
- this.selector = selector;
- }
- @Override
- public void run() {
- try {
- SocketChannel socket = this.serverSocket.accept();
- if (socket != null) {
- new Handler("handler_" + ((InetSocketAddress)socket.getLocalAddress()).getPort(), selector,socket);
- }
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- @Override
- public String getName() {
- return this.name;
- }
- }
- public class Handler implements SelfRunable {
- String name;
- Selector selector;
- SocketChannel socket;
- SelectionKey sk;
- ByteBuffer input = ByteBuffer.allocate(1024);
- ByteBuffer output = ByteBuffer.allocate(1024);
- static final int READING = 0, SENDING = 1, PROCESSING = 3;
- volatile int state = READING;
- static ExecutorService poolExecutor = Executors.newFixedThreadPool(5);
- public Handler(String name, Selector selector, SocketChannel socket) throws IOException {
- this.selector = selector;
- this.socket = socket;
- this.name = name;
- this.socket.configureBlocking(false);
- sk = this.socket.register(selector,0);
- sk.attach(this);
- sk.interestOps(SelectionKey.OP_READ);
- selector.wakeup();
- }
- @Override
- public void run() {
- try{
- System.out.println("state:" + state);
- if (state == READING) {
- read();
- } else if (state == SENDING) {
- send();
- }
- } catch (IOException ex) {
- ex.printStackTrace();
- }
- }
- synchronized void read() throws IOException {
- socket.read(input);
- if (inputIsComplete()) {
- state = PROCESSING;
- poolExecutor.execute(new Processer());
- }
- }
- synchronized void processAndHandOff() {
- System.out.println("processAndHandOff=========");
- process();
- state = SENDING; // or rebind attachment
- sk.interestOps(SelectionKey.OP_WRITE);
- selector.wakeup();
- System.out.println("processAndHandOff finish ! =========");
- }
- private void send() throws IOException {
- System.out.println("start send ...");
- socket.write(output);
- socket.close();
- System.out.println("start send finish!");
- if (outputIsComplete()) sk.cancel();
- }
- boolean inputIsComplete() { return true;}
- boolean outputIsComplete() {return true;}
- void process(){
- String msg = new String(input.array());
- System.out.println("讀取內(nèi)容:" + msg);
- output.put(msg.getBytes());
- output.flip();
- }
- @Override
- public String getName() {
- return this.name;
- }
- class Processer implements Runnable {
- public void run() { processAndHandOff(); }
- }
- }
- 主從線程模型: 相比多線程模型而言,對(duì)于多核cpu,為了充分利用資源,將Reactor拆分成了mainReactor 和 subReactor,但是,主從線程模型也有弊端,不適合大量數(shù)據(jù)傳輸。 mainReactor:負(fù)責(zé)監(jiān)聽接收(accpet)新連接,將新連接后續(xù)操作交給subReactor來(lái)處理,通常由一個(gè)線程處理。 subReactor: 負(fù)責(zé)處理IO的讀寫操作,通常由多個(gè)線程處理。 非IO操作依然由業(yè)務(wù)線程池來(lái)處理。
主從線程模型實(shí)現(xiàn):
- public class Reactor implements Runnable {
- int port;
- Selector selector;
- ServerSocketChannel serverSocket;
- int SUBREACTOR_SIZE = 1;
- SubReactor[] subReactorPool = new SubReactor[SUBREACTOR_SIZE];
- public Reactor(int port) throws IOException {
- this.port = port;
- // 創(chuàng)建serverSocket對(duì)象
- serverSocket = ServerSocketChannel.open();
- // 綁定端口
- serverSocket.socket().bind(new InetSocketAddress(port));
- // 配置非阻塞
- serverSocket.configureBlocking(false);
- // 創(chuàng)建selector對(duì)象
- selector = Selector.open();
- // serversocket注冊(cè)到selector上,幫忙監(jiān)聽accpet事件
- serverSocket.register(selector, SelectionKey.OP_ACCEPT, new Acceptor("Acceptor",serverSocket,subReactorPool));
- // 初始化subreactor pool
- initSubReactorPool();
- /** 還可以使用 SPI provider,來(lái)創(chuàng)建selector和serversocket對(duì)象
- SelectorProvider p = SelectorProvider.provider();
- selector = p.openSelector();
- serverSocket = p.openServerSocketChannel();
- */
- }
- @Override
- public void run() {
- try {
- while (!Thread.interrupted()) {
- System.out.println("mainReactor start select event...");
- selector.select();
- Set selectedKeys = selector.selectedKeys();
- Iterator it = selectedKeys.iterator();
- while (it.hasNext()) {
- dispatch((SelectionKey)it.next());
- }
- selectedKeys.clear();
- }
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- void initSubReactorPool() {
- try {
- for (int i = 0; i < SUBREACTOR_SIZE; i++) {
- subReactorPool[i] = new SubReactor("SubReactor" + i);
- }
- } catch (IOException ex) { /* ... */ }
- }
- private void dispatch(SelectionKey key) {
- SelfRunable r = (SelfRunable) key.attachment();
- if (r != null) {
- System.out.println("mainReactor dispatch to " + r.getName() + "====");
- r.run();
- }
- }
- public static void main(String[] args) throws IOException, InterruptedException {
- Thread thread = new Thread(new Reactor(2021));
- thread.start();
- synchronized (Reactor.class) {
- Reactor.class.wait();
- }
- }
- }
- public class SubReactor implements SelfRunable {
- private Selector selector;
- private String name;
- private List<SelfRunable> task = new ArrayList<SelfRunable>();
- public SubReactor(String name) throws IOException {
- this.name = name;
- selector = Selector.open();
- new Thread(this).start();
- }
- @Override
- public String getName() {
- return this.name;
- }
- @Override
- public void run() {
- try {
- while (!Thread.interrupted()) {
- System.out.println("subReactor start select event...");
- selector.select(5000);
- Set selectedKeys = selector.selectedKeys();
- Iterator it = selectedKeys.iterator();
- while (it.hasNext()) {
- dispatch((SelectionKey)it.next());
- }
- selectedKeys.clear();
- }
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- private void dispatch(SelectionKey key) {
- SelfRunable r = (SelfRunable) key.attachment();
- if (r != null) {
- System.out.println("subReactor dispatch to " + r.getName() + "====");
- r.run();
- }
- }
- public Selector getSelector(){
- return this.selector;
- }
- public void submit(SelfRunable runnable) {
- task.add(runnable);
- }
- }
- public class Acceptor implements SelfRunable {
- int next = 0;
- String name;
- SubReactor[] subReactorPool;
- ServerSocketChannel serverSocket;
- public Acceptor(String name, ServerSocketChannel serverSocket,SubReactor[] subReactorPool) {
- this.name = name;
- this.serverSocket = serverSocket;
- this.subReactorPool = subReactorPool;
- }
- @Override
- public void run() {
- try {
- SocketChannel socket = this.serverSocket.accept();
- if (socket != null) {
- new Handler("handler", subReactorPool[next].getSelector(),socket);
- }
- if (++next == subReactorPool.length) {next=0;}
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- @Override
- public String getName() {
- return this.name;
- }
- }
- public class Handler implements SelfRunable {
- String name;
- Selector selector;
- SocketChannel socket;
- SelectionKey sk;
- ByteBuffer input = ByteBuffer.allocate(1024);
- ByteBuffer output = ByteBuffer.allocate(1024);
- static final int READING = 0, SENDING = 1, PROCESSING = 3;
- volatile int state = READING;
- static ExecutorService poolExecutor = Executors.newFixedThreadPool(5);
- public Handler(String name, Selector selector, SocketChannel socket) throws IOException {
- this.selector = selector;
- this.socket = socket;
- this.name = name;
- this.socket.configureBlocking(false);
- sk = this.socket.register(this.selector,0);
- sk.attach(this);
- sk.interestOps(SelectionKey.OP_READ);
- selector.wakeup();
- }
- @Override
- public void run() {
- try{
- System.out.println("state:" + state);
- if (state == READING) {
- read();
- } else if (state == SENDING) {
- send();
- }
- } catch (IOException ex) {
- ex.printStackTrace();
- }
- }
- synchronized void read() throws IOException {
- socket.read(input);
- if (inputIsComplete()) {
- state = PROCESSING;
- poolExecutor.execute(new Processer());
- }
- }
- synchronized void processAndHandOff() {
- System.out.println("processAndHandOff=========");
- process();
- state = SENDING; // or rebind attachment
- sk.interestOps(SelectionKey.OP_WRITE);
- selector.wakeup();
- System.out.println("processAndHandOff finish ! =========");
- }
- private void send() throws IOException {
- System.out.println("start send ...");
- socket.write(output);
- socket.close();
- System.out.println("start send finish!");
- if (outputIsComplete()) sk.cancel();
- }
- boolean inputIsComplete() { return true;}
- boolean outputIsComplete() {return true;}
- void process(){
- String msg = new String(input.array());
- System.out.println("讀取內(nèi)容:" + msg);
- output.put(msg.getBytes());
- output.flip();
- }
- @Override
- public String getName() {
- return this.name;
- }
- class Processer implements Runnable {
- public void run() { processAndHandOff(); }
- }
- }
Reactor線程模型演進(jìn)
模型 |
簡(jiǎn)介 |
弊端 |
單線程模型 |
IO/非IO操作都在Reactor單線程中完成 |
非IO操作執(zhí)行慢,影響IO操作響應(yīng)延遲 |
多線程模型 |
拆分非IO操作交給業(yè)務(wù)線程池執(zhí)行,IO操作由Reator單線程執(zhí)行 |
高并發(fā),高負(fù)載場(chǎng)景下,Reactor單線程會(huì)成為瓶頸 |
主從線程模型 |
Reactor單線程拆分為mainReactor和subReactor |
不適合大量數(shù)據(jù)傳輸 |
Netty線程模型
Reactor主從線程模型-抽象模型
- 創(chuàng)建ServerSocketChannel過(guò)程(創(chuàng)建channel,配置非阻塞)
- ServerSocketChannel注冊(cè)到mainReactor的selector對(duì)象上,監(jiān)聽accept事件
- mainReactor的selector監(jiān)聽到新連接SocketChannel,將SocketChannel注冊(cè)到subReactor的selector對(duì)象上,監(jiān)聽read/write事件
- subReactor的selector監(jiān)聽到read/write事件,移交給業(yè)務(wù)線程池(對(duì)應(yīng)netty的pipeline)
Netty線程模型
我們?cè)俸煤每纯磎ainReactor和subReactor,其實(shí)這兩個(gè)類功能非常相似,所以Netty將mainReactor和subReactor統(tǒng)一成了EventLoop。對(duì)于Netty零基礎(chǔ)的,請(qǐng)參考這個(gè)Reactor主從線程模型-抽象模型和下面這張圖來(lái)理解EventLoop。