自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

宜人貸蜂巢API網(wǎng)關技術解密之Netty使用實踐

企業(yè)動態(tài)
本文先簡要地介紹API網(wǎng)關的項目框架,其次對比BIO和NIO的特點,引入Netty作為項目的基礎框架,然后介紹Netty線程池的原理,最后深入Netty線程池的初始化、ServerBootstrap的初始化與啟動及channel與線程池的綁定過程,讓讀者了解Netty在承載高并發(fā)訪問的設計路思。

宜人貸蜂巢團隊,由Michael創(chuàng)立于2013年,通過使用互聯(lián)網(wǎng)科技手段助力金融生態(tài)和諧健康發(fā)展。自成立起一直致力于多維度數(shù)據(jù)閉環(huán)平臺建設。目前團隊規(guī)模超過百人,涵蓋征信、電商、金融、社交、五險一金和保險等用戶授信數(shù)據(jù)的抓取解析業(yè)務,輔以先進的數(shù)據(jù)分析、挖掘和機器學習等技術對用戶信用級別、欺詐風險進行預測評定,全面對外輸出金融反欺詐、社交圖譜、自動化模型定制等服務或產(chǎn)品。

目前宜人貸蜂巢基于用戶授權數(shù)據(jù)實時抓取解析技術,并結合***大數(shù)據(jù)技術,快速迭代和自主的創(chuàng)新,已形成了強大而領先的聚合和輸出能力。

為了適應完成宜人貸蜂巢強大的服務輸出能力,蜂巢設計開發(fā)了自己的API網(wǎng)關系統(tǒng),集中實現(xiàn)了鑒權、加解密、路由、限流等功能,使各業(yè)務抓取團隊關注其核心抓取和分析工作,而API網(wǎng)關系統(tǒng)更專注于安全、流量、路由等問題,從而更好的保障蜂巢服務系統(tǒng)的質(zhì)量。今天帶著大家解密API網(wǎng)關的Netty線程池技術實踐細節(jié)。

API網(wǎng)關作為宜人貸蜂巢數(shù)據(jù)開放平臺的統(tǒng)一入口,所有的客戶端及消費端通過統(tǒng)一的API來使用各類抓取服務。從面向?qū)ο笤O計的角度看,它與外觀模式類似,包裝各類不同的實現(xiàn)細節(jié),對外表現(xiàn)出統(tǒng)一的調(diào)用形式。

本文首先,簡要地介紹API網(wǎng)關的項目框架,其次對比BIO和NIO的特點,再引入Netty作為項目的基礎框架,然后介紹Netty線程池的原理,***深入Netty線程池的初始化、ServerBootstrap的初始化與啟動及channel與線程池的綁定過程,讓讀者了解Netty在承載高并發(fā)訪問的設計思路。

一、項目框架

API網(wǎng)關項目框架

圖1 - API網(wǎng)關項目框架

圖中描繪了API網(wǎng)關系統(tǒng)的處理流程,以及與服務注冊發(fā)現(xiàn)、日志分析、報警系統(tǒng)、各類爬蟲的關系。其中API網(wǎng)關系統(tǒng)接收請求,對請求進行編解碼、鑒權、限流、加解密,再基于Eureka服務注冊發(fā)現(xiàn)模塊,將請求發(fā)送到有效的服務節(jié)點上;網(wǎng)關及抓取系統(tǒng)的日志,會被收集到elk平臺中,做業(yè)務分析及報警處理。

二、BIO vs NIO

API網(wǎng)關承載數(shù)倍于爬蟲的流量,提升服務器的并發(fā)處理能力、縮短系統(tǒng)的響應時間,通信模型的選擇是至關重要的,是選擇BIO,還是NIO?

1. Streamvs Buffer & 阻塞 vs 非阻塞

BIO是面向流的,io的讀寫,每次只能處理一個或者多個bytes,如果數(shù)據(jù)沒有讀寫完成,線程將一直等待于此,而不能暫時跳過io或者等待io讀寫完成異步通知,線程滯留在io讀寫上,不能充分利用機器有限的線程資源,造成server的吞吐量較低,見圖2。而NIO與此不同,面向Buffer,線程不需要滯留在io讀寫上,采用操作系統(tǒng)的epoll模式,在io數(shù)據(jù)準備好了,才由線程來處理,見圖3。

 BIO 從流中讀取數(shù)據(jù)

圖2 – BIO 從流中讀取數(shù)據(jù)

圖3 – NIO 從Buffer中讀取數(shù)據(jù)

2. Selectors

NIO的selector使一個線程可以監(jiān)控多個channel的讀寫,多個channel注冊到一個selector上,這個selector可以監(jiān)測到各個channel的數(shù)據(jù)準備情況,從而使用有限的線程資源處理更多的連接,見圖4。所以可以這樣說,NIO極大的提升了服務器接受并發(fā)請求的能力,而服務器性能還是要取決于業(yè)務處理時間和業(yè)務線程池模型。

NIO 單一線程管理多個連接

圖4 – NIO 單一線程管理多個連接

而BIO采用的是request-per-thread模式,用一個線程負責接收TCP連接請求,并建立鏈路,然后將請求dispatch給負責業(yè)務邏輯處理的線程,見圖5。一旦訪問量過多,就會造成機器的線程資源緊張,造成請求延遲,甚至服務宕機。

圖5 – BIO 一連接一線程

對比JDK NIO與諸多NIO框架后,鑒于Netty優(yōu)雅的設計、易用的API、優(yōu)越的性能、安全性支持、API網(wǎng)關使用Netty作為通信模型,實現(xiàn)了基礎框架的搭建。

三、線程池

考慮到API網(wǎng)關的高并發(fā)訪問需求,線程池設計,見圖6。

圖6 – API網(wǎng)關線程池設計

Netty的線程池理念有點像ForkJoinPool,不是一個線程大池子并發(fā)等待一條任務隊列,而是每條線程都有一個任務隊列。而且Netty的線程,并不只是簡單的阻塞地拉取任務,而是在每個循環(huán)中做三件事情:

  • 先SelectKeys()處理NIO的事件
  • 然后獲取本線程的定時任務,放到本線程的任務隊列里
  • ***執(zhí)行其他線程提交給本線程的任務

每個循環(huán)里處理NIO事件與其他任務的時間消耗比例,還能通過ioRatio變量來控制,默認是各占50%??梢?,Netty的線程根本沒有阻塞等待任務的清閑日子,所以也不使用有鎖的BlockingQueue來做任務隊列了,而是使用無鎖的MpscLinkedQueue(Mpsc 是Multiple Producer, Single Consumer的縮寫)。

四、NioEventLoopGroup初始化

下面分析下Netty線程池NioEventLoopGroup的設計與實現(xiàn)細節(jié),NioEventLoopGroup的類層次關系見圖7:

NioEvenrLoopGroup類層次關系

圖7 –NioEvenrLoopGroup類層次關系

其創(chuàng)建過程——方法調(diào)用,見下圖:

NioEvenrLoopGroup創(chuàng)建調(diào)用關系

圖8 –NioEvenrLoopGroup創(chuàng)建調(diào)用關系

NioEvenrLoopGroup的創(chuàng)建,具體執(zhí)行過程是執(zhí)行類MultithreadEventExecutorGroup的構造方法:

  1. /**  
  2.  * Create a new instance.  
  3.  *  
  4.  * @param nThreads          the number of threads that will be used by this instance.  
  5.  * @param executor          the Executor to use, or {@code null} if the default should be used.  
  6.  * @param chooserFactory    the {@link EventExecutorChooserFactory} to use.  
  7.  * @param args              arguments which will passed to each {@link #newChild(Executor, Object...)} call  
  8.  */  
  9. protected MultithreadEventExecutorGroup(int nThreads, Executor executor,  
  10.                                         EventExecutorChooserFactory chooserFactory, Object... args) {  
  11.     if (nThreads <= 0) {  
  12.         throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));  
  13.     }  
  14.     if (executor == null) {  
  15.         executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());  
  16.     }  
  17.     children = new EventExecutor[nThreads];  
  18.     for (int i = 0; i < nThreads; i ++) {  
  19.         boolean success = false;  
  20.         try {  
  21.             children[i] = newChild(executor, args);  
  22.             success = true;  
  23.         } catch (Exception e) {   
  24.             throw new IllegalStateException("failed to create a child event loop", e);  
  25.         } finally {  
  26.             if (!success) {  
  27.                 for (int j = 0; j < i; j ++) {  
  28.                     children[j].shutdownGracefully();  
  29.                 }  
  30.                 for (int j = 0; j < i; j ++) { 
  31.                      EventExecutor e = children[j]; 
  32.                      try { 
  33.                          while (!e.isTerminated()) {  
  34.                             e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);  
  35.                         }  
  36.                     } catch (InterruptedException interrupted) {  
  37.                         // Let the caller handle the interruption.  
  38.                         Thread.currentThread().interrupt();  
  39.                         break;  
  40.                     }  
  41.                 }  
  42.             }  
  43.         }  
  44.     }  
  45.     chooser = chooserFactory.newChooser(children);  
  46.     final FutureListener<Object> terminationListener = new FutureListener<Object>() {  
  47.         @Override  
  48.         public void operationComplete(Future<Object> future) throws Exception {  
  49.             if (terminatedChildren.incrementAndGet() == children.length) {  
  50.                 terminationFuture.setSuccess(null);  
  51.             }  
  52.         }  
  53.     };  
  54.     for (EventExecutor e: children) {  
  55.         e.terminationFuture().addListener(terminationListener);  
  56.     }  
  57.     Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);  
  58.     Collections.addAll(childrenSet, children);  
  59.     readonlyChildren = Collections.unmodifiableSet(childrenSet);  

其中,創(chuàng)建細節(jié)見下:

  • 線程池中的線程數(shù)nThreads必須大于0;
  • 如果executor為null,創(chuàng)建默認executor,executor用于創(chuàng)建線程(newChild方法使用executor對象);
  • 依次創(chuàng)建線程池中的每一個線程即NioEventLoop,如果其中有一個創(chuàng)建失敗,將關閉之前創(chuàng)建的所有線程;
  • chooser為線程池選擇器,用來選擇下一個EventExecutor,可以理解為,用來選擇一個線程來執(zhí)行task。

chooser的創(chuàng)建細節(jié),見下:

DefaultEventExecutorChooserFactory根據(jù)線程數(shù)創(chuàng)建具體的EventExecutorChooser,線程數(shù)如果等于2^n,可使用按位與替代取模運算,節(jié)省cpu的計算資源,見源碼:

  1. @SuppressWarnings("unchecked")  
  2. @Override  
  3. public EventExecutorChooser newChooser(EventExecutor[] executors) {  
  4.     if (isPowerOfTwo(executors.length)) {  
  5.         return new PowerOfTowEventExecutorChooser(executors);  
  6.     } else {  
  7.         return new GenericEventExecutorChooser(executors);  
  8.     }  
  9. }   
  10.     private static final class PowerOfTowEventExecutorChooser implements EventExecutorChooser {  
  11.         private final AtomicInteger idx = new AtomicInteger();  
  12.         private final EventExecutor[] executors;   
  13.  
  14.         PowerOfTowEventExecutorChooser(EventExecutor[] executors) {  
  15.             this.executors = executors;  
  16.         }   
  17.  
  18.         @Override  
  19.         public EventExecutor next() {  
  20.             return executors[idx.getAndIncrement() & executors.length - 1];  
  21.         }  
  22.     }   
  23.  
  24.     private static final class GenericEventExecutorChooser implements EventExecutorChooser {  
  25.         private final AtomicInteger idx = new AtomicInteger();  
  26.         private final EventExecutor[] executors;   
  27.  
  28.         GenericEventExecutorChooser(EventExecutor[] executors) {  
  29.             this.executors = executors;  
  30.         }   
  31.  
  32.         @Override  
  33.         public EventExecutor next() {  
  34.             return executors[Math.abs(idx.getAndIncrement() % executors.length)];  
  35.         }  
  36.     } 

newChild(executor, args)的創(chuàng)建細節(jié),見下:

MultithreadEventExecutorGroup的newChild方法是一個抽象方法,故使用NioEventLoopGroup的newChild方法,即調(diào)用NioEventLoop的構造函數(shù):

  1. @Override  
  2.     protected EventLoop newChild(Executor executor, Object... args) throws Exception {  
  3.         return new NioEventLoop(this, executor, (SelectorProvider) args[0], 
  4.             ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);  
  5.     } 

在這里先看下NioEventLoop的類層次關系:

NioEventLoop的繼承關系比較復雜,在AbstractScheduledEventExecutor 中,Netty 實現(xiàn)了 NioEventLoop 的 schedule 功能,即我們可以通過調(diào)用一個 NioEventLoop 實例的 schedule 方法來運行一些定時任務。而在 SingleThreadEventLoop 中,又實現(xiàn)了任務隊列的功能,通過它,我們可以調(diào)用一個NioEventLoop 實例的 execute 方法來向任務隊列中添加一個 task, 并由 NioEventLoop 進行調(diào)度執(zhí)行。

通常來說,NioEventLoop 肩負著兩種任務,***個是作為 IO 線程,執(zhí)行與 Channel 相關的 IO 操作,包括調(diào)用 select 等待就緒的 IO 事件、讀寫數(shù)據(jù)與數(shù)據(jù)的處理等;而第二個任務是作為任務隊列,執(zhí)行 taskQueue 中的任務,例如用戶調(diào)用 eventLoop.schedule 提交的定時任務也是這個線程執(zhí)行的。

具體的構造過程,見下:

創(chuàng)建任務隊列tailTasks(內(nèi)部為有界的LinkedBlockingQueue):

創(chuàng)建線程的任務隊列taskQueue(內(nèi)部為有界的LinkedBlockingQueue),以及任務過多防止系統(tǒng)宕機的拒絕策略rejectedHandler。

其中tailTasks和taskQueue均是任務隊列,而優(yōu)先級不同,taskQueue的優(yōu)先級高于tailTasks,定時任務的優(yōu)先級高于taskQueue。

五、ServerBootstrap初始化及啟動

了解了Netty線程池NioEvenrLoopGroup的創(chuàng)建過程后,下面看下API網(wǎng)關服務ServerBootstrap的是如何使用線程池引入服務中,為高并發(fā)訪問服務的。

API網(wǎng)關ServerBootstrap初始化及啟動代碼,見下:

  1. serverBootstrap = new ServerBootstrap();  
  2. bossGroup = new NioEventLoopGroup(config.getBossGroupThreads());  
  3. workerGroup = new NioEventLoopGroup(config.getWorkerGroupThreads());   
  4.  
  5. serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)  
  6.         .option(ChannelOption.TCP_NODELAY, config.isTcpNoDelay())  
  7.         .option(ChannelOption.SO_BACKLOG, config.getBacklogSize())  
  8.         .option(ChannelOption.SO_KEEPALIVE, config.isSoKeepAlive())  
  9.         // Memory pooled  
  10.         .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)  
  11.         .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)  
  12.         .childHandler(channelInitializer);    
  13.  
  14. ChannelFuture future = serverBootstrap.bind(config.getPort()).sync();  
  15. log.info("API-gateway started on port: {}", config.getPort());  
  16. future.channel().closeFuture().sync(); 

API網(wǎng)關系統(tǒng)使用netty自帶的線程池,共有三組線程池,分別為bossGroup、workerGroup和executorGroup(使用在channelInitializer中,本文暫不作介紹)。其中,bossGroup用于接收客戶端的TCP連接,workerGroup用于處理I/O、執(zhí)行系統(tǒng)task和定時任務,executorGroup用于處理網(wǎng)關業(yè)務加解密、限流、路由,及將請求轉發(fā)給后端的抓取服務等業(yè)務操作。

六、Channel與線程池的綁定

ServerBootstrap初始化后,通過調(diào)用bind(port)方法啟動Server,bind的調(diào)用鏈如下:

  1. AbstractBootstrap.bind ->AbstractBootstrap.doBind -> AbstractBootstrap.initAndRegister 

其中,ChannelFuture regFuture = config().group().register(channel);中的group()方法返回bossGroup,而channel在serverBootstrap的初始化過程指定channel為NioServerSocketChannel.class,至此將NioServerSocketChannel與bossGroup綁定到一起,bossGroup負責客戶端連接的建立。那么NioSocketChannel是如何與workerGroup綁定到一起的?

調(diào)用鏈AbstractBootstrap.initAndRegister -> AbstractBootstrap. init-> ServerBootstrap.init ->ServerBootstrapAcceptor.ServerBootstrapAcceptor ->ServerBootstrapAcceptor.channelRead:

  1. public void channelRead(ChannelHandlerContext ctx, Object msg) {  
  2.     final Channel child = (Channel) msg;  
  3.     child.pipeline().addLast(childHandler);  
  4.     for (Entry<ChannelOption<?>, Object> e: childOptions) {  
  5.         try {  
  6.             if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {  
  7.                 logger.warn("Unknown channel option: " + e);  
  8.             }  
  9.         } catch (Throwable t) {  
  10.             logger.warn("Failed to set a channel option: " + child, t); 
  11.         }  
  12.     }  
  13.     for (Entry<AttributeKey<?>, Object> e: childAttrs) {  
  14.         child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());  
  15.     } 
  16.  
  17.     try {  
  18.         childGroup.register(child).addListener(new ChannelFutureListener() {  
  19.             @Override  
  20.             public void operationComplete(ChannelFuture future) throws Exception {  
  21.                 if (!future.isSuccess()) { 
  22.                      forceClose(child, future.cause());  
  23.                 }  
  24.             }  
  25.         });  
  26.     } catch (Throwable t) {  
  27.         forceClose(child, t);  
  28.     }  

其中,childGroup.register(child)就是將NioSocketChannel與workderGroup綁定到一起,那又是什么觸發(fā)了ServerBootstrapAcceptor的channelRead方法?

其實當一個 client 連接到 server 時,Java 底層的 NIO ServerSocketChannel 會有一個SelectionKey.OP_ACCEPT 就緒事件,接著就會調(diào)用到 NioServerSocketChannel.doReadMessages方法。

  1. @Override  
  2. protected int doReadMessages(List<Object> buf) throws Exception {  
  3.     SocketChannel ch = javaChannel().accept();  
  4.     try {  
  5.         if (ch != null) {  
  6.             buf.add(new NioSocketChannel(this, ch));  
  7.             return 1;  
  8.         }  
  9.     } catch (Throwable t) {          … 
  10.  
  11.     }  
  12.     return 0;  

javaChannel().accept() 會獲取到客戶端新連接的SocketChannel,實例化為一個 NioSocketChannel, 并且傳入 NioServerSocketChannel 對象(即 this),由此可知, 我們創(chuàng)建的這個NioSocketChannel 的父 Channel 就是 NioServerSocketChannel 實例 。

接下來就經(jīng)由 Netty 的 ChannelPipeline 機制,將讀取事件逐級發(fā)送到各個 handler 中,于是就會觸發(fā)前面我們提到的 ServerBootstrapAcceptor.channelRead 方法啦。

至此,分析了Netty線程池的初始化、ServerBootstrap的啟動及channel與線程池的綁定過程,能夠看出Netty中線程池的優(yōu)雅設計,使用不同的線程池負責連接的建立、IO讀寫等,為API網(wǎng)關項目的高并發(fā)訪問提供了技術基礎。

七、總結

至此,對API網(wǎng)關技術的Netty實踐分享就到這里,各位如果對中間的各個環(huán)節(jié)有什么疑問和建議,歡迎大家指正,我們一起討論,共同學習提高。

參考:

  • http://tutorials.jenkov.com/java-nio/nio-vs-io.html
  • http://netty.io/wiki/user-guide-for-4.x.html
  • http://netty.io/
  • http://www.tuicool.com/articles/mUFnqeM
  • https://segmentfault.com/a/1190000007403873
  • https://segmentfault.com/a/1190000007283053

 【本文是51CTO專欄機構宜信技術學院的原創(chuàng)文章,微信公眾號“宜信技術學院( id: CE_TECH)”】

戳這里,看該作者更多好文

責任編輯:趙寧寧 來源: 51CTO專欄
相關推薦

2019-04-18 22:40:42

蜂巢機器人金融機器人

2017-03-28 19:46:36

外設

2024-09-03 13:59:37

2017-06-19 11:05:44

互聯(lián)網(wǎng)

2023-08-09 20:43:32

2023-06-26 18:13:56

開源API

2022-08-22 08:40:42

API網(wǎng)關開發(fā)

2015-11-02 14:17:59

IoT蜂巢式

2017-03-13 14:09:19

RESTful API實踐

2017-04-19 14:20:09

宜人貸

2020-04-22 09:00:00

REST API參數(shù)化前端

2023-06-26 10:51:56

開源API

2017-04-25 10:57:16

宜人貸

2023-09-07 10:56:36

2022-12-26 00:38:00

外聯(lián)網(wǎng)關平臺

2019-06-13 13:51:10

白山API大數(shù)據(jù)

2014-04-18 10:58:44

AndroidAPI實踐

2014-08-05 15:10:05

Larbin搜索引擎

2021-11-24 08:55:38

代理網(wǎng)關Netty

2020-07-07 07:54:01

API網(wǎng)關微服務
點贊
收藏

51CTO技術棧公眾號