自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

IO流中「線程」模型總結(jié)

網(wǎng)絡(luò) 通信技術(shù)
客戶端與服務(wù)端進(jìn)行通信「交互」,可能是同步或者異步,服務(wù)端進(jìn)行「流」處理時(shí),可能是阻塞或者非阻塞模式,當(dāng)然也有自定義的業(yè)務(wù)流程需要執(zhí)行,從處理邏輯看就是「讀取數(shù)據(jù)-業(yè)務(wù)執(zhí)行-應(yīng)答寫(xiě)數(shù)據(jù)」的形式。

一、基礎(chǔ)簡(jiǎn)介

在IO流的網(wǎng)絡(luò)模型中,以常見(jiàn)的「客戶端-服務(wù)端」交互場(chǎng)景為例;

圖片

客戶端與服務(wù)端進(jìn)行通信「交互」,可能是同步或者異步,服務(wù)端進(jìn)行「流」處理時(shí),可能是阻塞或者非阻塞模式,當(dāng)然也有自定義的業(yè)務(wù)流程需要執(zhí)行,從處理邏輯看就是「讀取數(shù)據(jù)-業(yè)務(wù)執(zhí)行-應(yīng)答寫(xiě)數(shù)據(jù)」的形式;

Java提供「三種」IO網(wǎng)絡(luò)編程模型,即:「BIO同步阻塞」、「NIO同步非阻塞」、「AIO異步非阻塞」;

二、同步阻塞

1、模型圖解

BIO即同步阻塞,服務(wù)端收到客戶端的請(qǐng)求時(shí),會(huì)啟動(dòng)一個(gè)線程處理,「交互」會(huì)阻塞直到整個(gè)流程結(jié)束;

圖片

這種模式如果在高并發(fā)且流程復(fù)雜耗時(shí)的場(chǎng)景下,客戶端的請(qǐng)求響應(yīng)會(huì)存在嚴(yán)重的性能問(wèn)題,并且占用過(guò)多資源;

2、參考案例

【服務(wù)端】啟動(dòng)ServerSocket接收客戶端的請(qǐng)求,經(jīng)過(guò)一系列邏輯之后,向客戶端發(fā)送消息,注意這里線程的10秒休眠;

public class SocketServer01 {
public static void main(String[] args) throws Exception {
// 1、創(chuàng)建Socket服務(wù)端
ServerSocket serverSocket = new ServerSocket(8080);
// 2、方法阻塞等待,直到有客戶端連接
Socket socket = serverSocket.accept();
// 3、輸入流,輸出流
InputStream inStream = socket.getInputStream();
OutputStream outStream = socket.getOutputStream();
// 4、數(shù)據(jù)接收和響應(yīng)
int readLen = 0;
byte[] buf = new byte[1024];
if ((readLen=inStream.read(buf)) != -1){
// 接收數(shù)據(jù)
String readVar = new String(buf, 0, readLen) ;
System.out.println("readVar======="+readVar);
}
// 響應(yīng)數(shù)據(jù)
Thread.sleep(10000);
outStream.write("sever-8080-write;".getBytes());
// 5、資源關(guān)閉
IoClose.ioClose(outStream,inStream,socket,serverSocket);
}
}

【客戶端】Socket連接,先向ServerSocket發(fā)送請(qǐng)求,再接收其響應(yīng),由于Server端模擬耗時(shí),Client處于長(zhǎng)時(shí)間阻塞狀態(tài);

public class SocketClient01 {
public static void main(String[] args) throws Exception {
// 1、創(chuàng)建Socket客戶端
Socket socket = new Socket(InetAddress.getLocalHost(), 8080);
// 2、輸入流,輸出流
OutputStream outStream = socket.getOutputStream();
InputStream inStream = socket.getInputStream();
// 3、數(shù)據(jù)發(fā)送和響應(yīng)接收
// 發(fā)送數(shù)據(jù)
outStream.write("client-hello".getBytes());
// 接收數(shù)據(jù)
int readLen = 0;
byte[] buf = new byte[1024];
if ((readLen=inStream.read(buf)) != -1){
String readVar = new String(buf, 0, readLen) ;
System.out.println("readVar======="+readVar);
}
// 4、資源關(guān)閉
IoClose.ioClose(inStream,outStream,socket);
}
}

三、同步非阻塞

1、模型圖解

NIO即同步非阻塞,服務(wù)端可以實(shí)現(xiàn)一個(gè)線程,處理多個(gè)客戶端請(qǐng)求連接,服務(wù)端的并發(fā)能力得到極大的提升;

圖片

這種模式下客戶端的請(qǐng)求連接都會(huì)注冊(cè)到Selector多路復(fù)用器上,多路復(fù)用器會(huì)進(jìn)行輪詢,對(duì)請(qǐng)求連接的IO流進(jìn)行處理;

2、參考案例

【服務(wù)端】單線程可以處理多個(gè)客戶端請(qǐng)求,通過(guò)輪詢多路復(fù)用器查看是否有IO請(qǐng)求;

public class SocketServer01 {
public static void main(String[] args) throws Exception {
try {
//啟動(dòng)服務(wù)開(kāi)啟監(jiān)聽(tīng)
ServerSocketChannel socketChannel = ServerSocketChannel.open();
socketChannel.socket().bind(new InetSocketAddress("127.0.0.1", 8989));
// 設(shè)置非阻塞,接受客戶端
socketChannel.configureBlocking(false);
// 打開(kāi)多路復(fù)用器
Selector selector = Selector.open();
// 服務(wù)端Socket注冊(cè)到多路復(fù)用器,指定興趣事件
socketChannel.register(selector, SelectionKey.OP_ACCEPT);
// 多路復(fù)用器輪詢
ByteBuffer buffer = ByteBuffer.allocateDirect(1024);
while (selector.select() > 0){
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> selectionKeyIter = selectionKeys.iterator();
while (selectionKeyIter.hasNext()){
SelectionKey selectionKey = selectionKeyIter.next() ;
selectionKeyIter.remove();
if(selectionKey.isAcceptable()) {
// 接受新的連接
SocketChannel client = socketChannel.accept();
// 設(shè)置讀非阻塞
client.configureBlocking(false);
// 注冊(cè)到多路復(fù)用器
client.register(selector, SelectionKey.OP_READ);
} else if (selectionKey.isReadable()) {
// 通道可讀
SocketChannel client = (SocketChannel) selectionKey.channel();
int len = client.read(buffer);
if (len > 0){
buffer.flip();
byte[] readArr = new byte[buffer.limit()];
buffer.get(readArr);
System.out.println(client.socket().getPort() + "端口數(shù)據(jù):" + new String(readArr));
buffer.clear();
}
}
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}

【客戶端】每隔3秒持續(xù)的向通道內(nèi)寫(xiě)數(shù)據(jù),服務(wù)端通過(guò)輪詢多路復(fù)用器,持續(xù)的讀取數(shù)據(jù);

public class SocketClient01 {
public static void main(String[] args) throws Exception {
try {
// 連接服務(wù)端
SocketChannel socketChannel = SocketChannel.open();
socketChannel.connect(new InetSocketAddress("127.0.0.1", 8989));
ByteBuffer writeBuffer = ByteBuffer.allocate(1024);
String conVar = "client-hello";
writeBuffer.put(conVar.getBytes());
writeBuffer.flip();
// 每隔3S發(fā)送一次數(shù)據(jù)
while (true) {
Thread.sleep(3000);
writeBuffer.rewind();
socketChannel.write(writeBuffer);
writeBuffer.clear();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}

四、異步非阻塞

1、模型圖解

AIO即異步非阻塞,對(duì)于通道內(nèi)數(shù)據(jù)的「讀」和「寫(xiě)」動(dòng)作,都是采用異步的模式,對(duì)于性能的提升是巨大的;

圖片

這與常規(guī)的第三方對(duì)接模式很相似,本地服務(wù)在請(qǐng)求第三方服務(wù)時(shí),請(qǐng)求過(guò)程耗時(shí)很大,會(huì)異步執(zhí)行,第三方第一次回調(diào),確認(rèn)請(qǐng)求可以被執(zhí)行;第二次回調(diào)則是推送處理結(jié)果,這種思想在處理復(fù)雜問(wèn)題時(shí),可以很大程度的提高性能,節(jié)省資源:

2、參考案例

【服務(wù)端】各種「accept」、「read」、「write」動(dòng)作是異步,通過(guò)Future來(lái)獲取計(jì)算的結(jié)果;

public class SocketServer01 {
public static void main(String[] args) throws Exception {
// 啟動(dòng)服務(wù)開(kāi)啟監(jiān)聽(tīng)
AsynchronousServerSocketChannel socketChannel = AsynchronousServerSocketChannel.open() ;
socketChannel.bind(new InetSocketAddress("127.0.0.1", 8989));
// 指定30秒內(nèi)獲取客戶端連接,否則超時(shí)
Future<AsynchronousSocketChannel> acceptFuture = socketChannel.accept();
AsynchronousSocketChannel asyChannel = acceptFuture.get(30, TimeUnit.SECONDS);

if (asyChannel != null && asyChannel.isOpen()){
// 讀數(shù)據(jù)
ByteBuffer inBuffer = ByteBuffer.allocate(1024);
Future<Integer> readResult = asyChannel.read(inBuffer);
readResult.get();
System.out.println("read:"+new String(inBuffer.array()));

// 寫(xiě)數(shù)據(jù)
inBuffer.flip();
Future<Integer> writeResult = asyChannel.write(ByteBuffer.wrap("server-hello".getBytes()));
writeResult.get();
}

// 關(guān)閉資源
asyChannel.close();
}
}

【客戶端】相關(guān)「connect」、「read」、「write」方法調(diào)用是異步的,通過(guò)Future來(lái)獲取計(jì)算的結(jié)果;

public class SocketClient01 {
public static void main(String[] args) throws Exception {
// 連接服務(wù)端
AsynchronousSocketChannel socketChannel = AsynchronousSocketChannel.open();
Future<Void> result = socketChannel.connect(new InetSocketAddress("127.0.0.1", 8989));
result.get();

// 寫(xiě)數(shù)據(jù)
String conVar = "client-hello";
ByteBuffer reqBuffer = ByteBuffer.wrap(conVar.getBytes());
Future<Integer> writeFuture = socketChannel.write(reqBuffer);
writeFuture.get();

// 讀數(shù)據(jù)
ByteBuffer inBuffer = ByteBuffer.allocate(1024);
Future<Integer> readFuture = socketChannel.read(inBuffer);
readFuture.get();
System.out.println("read:"+new String(inBuffer.array()));

// 關(guān)閉資源
socketChannel.close();
}
}

五、Reactor模型

1、模型圖解

這部分內(nèi)容,可以參考「Doug Lea的《IO》」文檔,查看更多細(xì)節(jié);

1.1 Reactor設(shè)計(jì)原理

Reactor模式基于事件驅(qū)動(dòng)設(shè)計(jì),也稱為「反應(yīng)器」模式或者「分發(fā)者」模式;服務(wù)端收到多個(gè)客戶端請(qǐng)求后,會(huì)將請(qǐng)求分派給對(duì)應(yīng)的線程處理;

圖片

Reactor:負(fù)責(zé)事件的監(jiān)聽(tīng)和分發(fā);Handler:負(fù)責(zé)處理事件,核心邏輯「read讀」、「decode解碼」、「compute業(yè)務(wù)計(jì)算」、「encode編碼」、「send應(yīng)答數(shù)據(jù)」;

1.2 單Reactor單線程

圖片

【1】Reactor線程通過(guò)select監(jiān)聽(tīng)客戶端的請(qǐng)求事件,收到事件后通過(guò)Dispatch進(jìn)行分發(fā);

【2】如果是建立連接請(qǐng)求事件,Acceptor通過(guò)「accept」方法獲取連接,并創(chuàng)建一個(gè)Handler對(duì)象來(lái)處理后續(xù)業(yè)務(wù);

【3】如果不是連接請(qǐng)求事件,則Reactor會(huì)將該事件交由當(dāng)前連接的Handler來(lái)處理;

【4】在Handler中,會(huì)完成相應(yīng)的業(yè)務(wù)流程;

這種模式將所有邏輯「連接、讀寫(xiě)、業(yè)務(wù)」放在一個(gè)線程中處理,避免多線程的通信,資源競(jìng)爭(zhēng)等問(wèn)題,但是存在明顯的并發(fā)和性能問(wèn)題;

1.3 單Reactor多線程

圖片

【1】Reactor線程通過(guò)select監(jiān)聽(tīng)客戶端的請(qǐng)求事件,收到事件后通過(guò)Dispatch進(jìn)行分發(fā);

【2】如果是建立連接請(qǐng)求事件,Acceptor通過(guò)「accept」方法獲取連接,并創(chuàng)建一個(gè)Handler對(duì)象來(lái)處理后續(xù)業(yè)務(wù);

【3】如果不是連接請(qǐng)求事件,則Reactor會(huì)將該事件交由當(dāng)前連接的Handler來(lái)處理;

【4】在Handler中,只負(fù)責(zé)事件響應(yīng)不處理具體業(yè)務(wù),將數(shù)據(jù)發(fā)送給Worker線程池來(lái)處理;

【5】Worker線程池會(huì)分配具體的線程來(lái)處理業(yè)務(wù),最后把結(jié)果返回給Handler做響應(yīng);

這種模式將業(yè)務(wù)從Reactor單線程分離處理,可以讓其更專注于事件的分發(fā)和調(diào)度,Handler使用多線程也充分的利用cpu的處理能力,導(dǎo)致邏輯變的更加復(fù)雜,Reactor單線程依舊存在高并發(fā)的性能問(wèn)題;

1.4 主從Reactor多線程

圖片

【1】 MainReactor主線程通過(guò)select監(jiān)聽(tīng)客戶端的請(qǐng)求事件,收到事件后通過(guò)Dispatch進(jìn)行分發(fā);

【2】如果是建立連接請(qǐng)求事件,Acceptor通過(guò)「accept」方法獲取連接,之后MainReactor將連接分配給SubReactor;

【3】如果不是連接請(qǐng)求事件,則MainReactor將連接分配給SubReactor,SubReactor調(diào)用當(dāng)前連接的Handler來(lái)處理;

【4】在Handler中,只負(fù)責(zé)事件響應(yīng)不處理具體業(yè)務(wù),將數(shù)據(jù)發(fā)送給Worker線程池來(lái)處理;

【5】Worker線程池會(huì)分配具體的線程來(lái)處理業(yè)務(wù),最后把結(jié)果返回給Handler做響應(yīng);

這種模式Reactor線程分工明確,MainReactor負(fù)責(zé)接收新的請(qǐng)求連接,SubReactor負(fù)責(zé)后續(xù)的交互業(yè)務(wù),適應(yīng)于高并發(fā)的處理場(chǎng)景,是Netty組件通信框架的所采用的模式;

2、參考案例

【服務(wù)端】提供兩個(gè)EventLoopGroup,「ParentGroup」主要是用來(lái)接收客戶端的請(qǐng)求連接,真正的處理是轉(zhuǎn)交給「ChildGroup」執(zhí)行,即Reactor多線程模型;

@Slf4j
public class NettyServer {
public static void main(String[] args) {
// EventLoop組,處理事件和IO
EventLoopGroup parentGroup = new NioEventLoopGroup();
EventLoopGroup childGroup = new NioEventLoopGroup();
try {
// 服務(wù)端啟動(dòng)引導(dǎo)類
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(parentGroup, childGroup)
.channel(NioServerSocketChannel.class).childHandler(new ServerChannelInit());

// 異步IO的結(jié)果
ChannelFuture channelFuture = serverBootstrap.bind(8989).sync();
channelFuture.channel().closeFuture().sync();
} catch (Exception e){
e.printStackTrace();
} finally {
parentGroup.shutdownGracefully();
childGroup.shutdownGracefully();
}
}
}

class ServerChannelInit extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel socketChannel) {
// 獲取管道
ChannelPipeline pipeline = socketChannel.pipeline();
// 編碼、解碼器
pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
// 添加自定義的handler
pipeline.addLast("serverHandler", new ServerHandler());
}
}

class ServerHandler extends ChannelInboundHandlerAdapter {
/**
* 通道讀和寫(xiě)
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("Server-Msg【"+msg+"】");
TimeUnit.MILLISECONDS.sleep(2000);
String nowTime = DateTime.now().toString(DatePattern.NORM_DATETIME_PATTERN) ;
ctx.channel().writeAndFlush("hello-client;time:" + nowTime);
ctx.fireChannelActive();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx,Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}

【客戶端】通過(guò)Bootstrap類,與服務(wù)器建立連接,服務(wù)端通過(guò)ServerBootstrap啟動(dòng)服務(wù),綁定在8989端口,然后服務(wù)端和客戶端進(jìn)行通信;

public class NettyClient {
public static void main(String[] args) {
// EventLoop處理事件和IO
NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();
try {
// 客戶端通道引導(dǎo)
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup)
.channel(NioSocketChannel.class).handler(new ClientChannelInit());

// 異步IO的結(jié)果
ChannelFuture channelFuture = bootstrap.connect("localhost", 8989).sync();
channelFuture.channel().closeFuture().sync();
} catch (Exception e){
e.printStackTrace();
} finally {
eventLoopGroup.shutdownGracefully();
}
}
}

class ClientChannelInit extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel socketChannel) {
// 獲取管道
ChannelPipeline pipeline = socketChannel.pipeline();
// 編碼、解碼器
pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
// 添加自定義的handler
pipeline.addLast("clientHandler", new ClientHandler());
}
}

class ClientHandler extends ChannelInboundHandlerAdapter {
/**
* 通道讀和寫(xiě)
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("Client-Msg【"+msg+"】");
TimeUnit.MILLISECONDS.sleep(2000);
String nowTime = DateTime.now().toString(DatePattern.NORM_DATETIME_PATTERN) ;
ctx.channel().writeAndFlush("hello-server;time:" + nowTime);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.channel().writeAndFlush("channel...active");
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx,Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}

六、參考源碼

編程文檔:
https://gitee.com/cicadasmile/butte-java-note

應(yīng)用倉(cāng)庫(kù):
https://gitee.com/cicadasmile/butte-flyer-parent
責(zé)任編輯:武曉燕 來(lái)源: 知了一笑
相關(guān)推薦

2024-06-07 00:09:50

2017-07-07 16:36:28

BIOIO模型 NIO

2022-02-21 10:21:17

網(wǎng)絡(luò)IO模型

2020-10-23 07:56:04

Java中的IO流

2023-05-10 08:26:33

IO模型API

2025-01-14 08:42:34

IO流程序語(yǔ)句

2022-05-09 08:37:43

IO模型Java

2021-07-01 07:34:09

LinuxIO模型

2022-04-12 08:00:17

socket 編程網(wǎng)絡(luò)編程網(wǎng)絡(luò) IO 模型

2017-01-17 14:21:27

LinuxIO模型Unix

2023-01-09 10:04:47

IO多路復(fù)用模型

2023-06-26 00:26:40

I/OJava字節(jié)流

2021-08-27 07:06:10

IOJava抽象

2017-01-19 13:34:54

AndroidRxJava線程模型

2024-04-18 09:02:11

數(shù)據(jù)流Mixtral混合模型

2025-03-24 00:11:05

IO模型計(jì)算機(jī)

2025-01-07 00:07:17

2020-09-23 12:32:18

網(wǎng)絡(luò)IOMySQL

2023-02-27 07:22:53

RPC網(wǎng)絡(luò)IO

2011-07-22 14:14:23

java
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)