Java NIO TCP編程
在Java1.4以前,Java的網(wǎng)絡(luò)編程是只有阻塞方式的,在Java1.4以及之后,Java提供了非阻塞的網(wǎng)絡(luò)編程API.從Java的發(fā)展來看,由于Java的快速發(fā)展,JVM性能的提升,涉足到服務(wù)端應(yīng)用程序開發(fā)也越來越多,要求高性能的網(wǎng)絡(luò)應(yīng)用越來越多,這是Java推出非阻塞網(wǎng)絡(luò)編程的最主要原因吧。
對我而言,以前的大部分服務(wù)端應(yīng)用主要是搭建在應(yīng)用服務(wù)器之上,所以通訊這部分工作都是有應(yīng)用服務(wù)器來實(shí)現(xiàn)和管理的。這次由于通訊和協(xié)議,我們必須自己實(shí)現(xiàn)一個(gè)能處理大量并發(fā)客戶端的高性能并行處理的Java服務(wù)端程序。因此,選擇非阻塞的處理方式也是必然的。我們首先來看看阻塞的處理方式:
在阻塞的網(wǎng)絡(luò)編程方式中,針對于每一個(gè)單獨(dú)的網(wǎng)絡(luò)連接,都必須有一個(gè)線程對應(yīng)的綁定該網(wǎng)絡(luò)連接,進(jìn)行網(wǎng)絡(luò)字節(jié)流的處理。下面是一段代碼:
- public static void main(String[] args) {
- try {
- ServerSocket ssc = new ServerSocket(23456);
- while (true) {
- System.out.println("Enter Accept:");
- Socket s = ssc.accept();
- try {
- (new Thread(new Worker(s))).start();
- } catch (Exception e) {
- // TODO
- e.printStackTrace();
- }
- }
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- public static class Worker implements Runnable {
- private Socket s;
- private boolean running = true;;
- public Worker(Socket s) {
- this.s = s;
- }
- public void run() {
- try {
- InputStream is = s.getInputStream();
- OutputStream os = s.getOutputStream();
- while (running) {
- byte[] b = this.readByLength(is, 1024);
- this.process(b);
- }
- } catch (Throwable t) {
- // TODO
- t.printStackTrace();
- }
- }
- private byte[] readByLength(InputStream is, int contLen) throws IOException {
- byte[] b = new byte[contLen];
- int off = 0;
- int length = 0;
- while ((length = is.read(b, off, contLen - off)) >= 0) {
- off = +length;
- if (off >= contLen) {
- break;
- }
- }
- return b;
- }
- private void process(byte[] b) {
- }
- }
在這段代碼中,我們看到有兩個(gè)阻塞的方法,是ServerSocket的accept()方法;和InputStream的read()方式。因此我們需要兩類型的線程分別進(jìn)行處理。而且每一個(gè)阻塞方法所綁定的線程的生命周期和網(wǎng)絡(luò)連接的生命周期是一致的?;谝陨系脑颍琋IO應(yīng)運(yùn)而生,一方面,為每一個(gè)網(wǎng)絡(luò)連接建立一個(gè)線程對應(yīng),同時(shí)每一個(gè)線程有大量的線程處于讀寫以外的空閑狀態(tài),因此希望降低線程的數(shù)量,降低每個(gè)空閑狀態(tài),提高單個(gè)線程的運(yùn)行執(zhí)行效率,實(shí)際上是在更加充分運(yùn)用CPU的計(jì)算、運(yùn)行能力(因?yàn)椋绻写罅康逆溌反嬖?,就存在大量的線程,而大量的線程都阻塞在read()或者write()方法,同時(shí)CPU又需要來回頻繁的在這些線程中間調(diào)度和切換,必然帶來大量的系統(tǒng)調(diào)用和資源競爭.);另外一方面希望提高網(wǎng)絡(luò)IO和硬盤IO操作的性能。在NIO主要出現(xiàn)了三個(gè)新特性:
1.數(shù)據(jù)緩沖處理(ByteBuffer):由于操作系統(tǒng)和應(yīng)用程序數(shù)據(jù)通信的原始類型是byte,也是IO數(shù)據(jù)操作的基本單元,在NIO中,每一個(gè)基本的原生類型(boolean除外)都有Buffer的實(shí)現(xiàn):CharBuffer、IntBuffer、DoubleBuffer、ShortBuffer、LongBuffer、FloatBuffer和ByteBuffer,數(shù)據(jù)緩沖使得在IO操作中能夠連續(xù)的處理數(shù)據(jù)流。當(dāng)前有兩種ByteBuffer,一種是Direct ByteBuffer,另外一種是NonDirect ByteBuffer;ByteBuffer是普通的Java對象,遵循Java堆中對象存在的規(guī)則;而Direct ByteBuffer是native代碼,它內(nèi)存的分配不在Java的堆棧中,不受Java內(nèi)存回收的影響,每一個(gè)Direct ByteBuffer都是直接分配的一塊連續(xù)的內(nèi)存空間,也是NIO提高性能的重要辦法之一。另外數(shù)據(jù)緩沖有一個(gè)很重要的特點(diǎn)是,基于一個(gè)數(shù)據(jù)緩沖可以建立一個(gè)或者多個(gè)邏輯的視圖緩沖(View Buffer).比方說,通過View Buffer,可以將一個(gè)Byte類型的Buffer換作Int類型的緩沖;或者一個(gè)大的緩沖轉(zhuǎn)作很多小的Buffer。之所以稱為View Buffer是因?yàn)檫@個(gè)轉(zhuǎn)換僅僅是邏輯上,在物理上并沒有創(chuàng)建新的Buffer。這為我們操作Buffer帶來諸多方便。
2.異步通道(Channel):Channel是一個(gè)與操作系統(tǒng)緊密結(jié)合的本地代碼較多的對象。通過Channel來實(shí)現(xiàn)網(wǎng)絡(luò)編程的非阻塞操作,同時(shí)也是其與ByteBuffer、Socket有效結(jié)合充分利用非阻塞、ByteBuffer的特性的。在后面我們會看到具體的SocketChannel的用法。
3.有條件的選擇(Readiness Selection):大多數(shù)操作系統(tǒng)都有支持有條件選擇準(zhǔn)備就緒IO通道的API,即能夠保證一個(gè)線程同時(shí)有效管理多個(gè)IO通道。在NIO中,由Selector(維護(hù)注冊進(jìn)來的Channel和這些Channel的狀態(tài))、SelectableChannel(能被Selector管理的Channel)和SelectionKey(SelectionKey標(biāo)識Selector和SelectableChannel之間的映射關(guān)系,一旦一個(gè)Channel注冊到Selector中,就會返回一個(gè)SelectionKey對象。SelectionKey保存了兩類狀態(tài):對應(yīng)的Channel注冊了哪些操作;對應(yīng)的Channel的那些操作已經(jīng)準(zhǔn)備好了,可以進(jìn)行相應(yīng)的數(shù)據(jù)操作了)結(jié)合來實(shí)現(xiàn)這個(gè)功能的。
NIO的包中主要包含了這樣幾種抽象數(shù)據(jù)類型:
◆ Buffer:包含數(shù)據(jù)且用于讀寫的線形表結(jié)構(gòu)。其中還提供了一個(gè)特殊類用于內(nèi)存映射文件的I/O操作。
◆ Charset:它提供Unicode字符串影射到字節(jié)序列以及逆映射的操作。
◆ Channels:包含socket,file和pipe三種管道,都是全雙工的通道。
◆ Selector:多個(gè)異步I/O操作集中到一個(gè)或多個(gè)線程中(可以被看成是Unix中select()函數(shù)的面向?qū)ο蟀姹荆?/p>
NIO非阻塞的典型編程模型如下:
- private Selector selector = null;
- private static final int BUF_LENGTH = 1024;
- public void start() throws IOException {
- if (selector != null) {
- selector = Selector.open();
- }
- ServerSocketChannel ssc = ServerSocketChannel.open();
- ServerSocket serverSocket = ssc.socket();
- serverSocket.bind(new InetSocketAddress(80));
- ssc.configureBlocking(false);
- ssc.register(selector, SelectionKey.OP_ACCEPT);
- try {
- while (true) {
- int nKeys = UnblockServer.this.selector.select();
- if (nKeys > 0) {
- Iterator it = selector.selectedKeys().iterator();
- while (it.hasNext()) {
- SelectionKey key = (SelectionKey) it.next();
- if (key.isAcceptable()) {
- ServerSocketChannel server = (ServerSocketChannel) key.channel();
- SocketChannel channel = server.accept();
- if (channel == null) {
- continue;
- }
- channel.configureBlocking(false);
- channel.register(selector, SelectionKey.OP_READ);
- }
- if (key.isReadable()) {
- readDataFromSocket(key);
- }
- it.remove();
- }
- }
- }
- } catch (IOException ioe) {
- ioe.printStackTrace();
- }
- }
- /**
- * @param key
- * @throws IOException
- */
- private void readDataFromSocket(SelectionKey key) throws IOException {
- ByteBuffer buf = ByteBuffer.allocate(BUF_LENGTH);
- SocketChannel sc = (SocketChannel) key.channel();
- int readBytes = 0;
- int ret;
- try {
- while ((ret = sc.read(buf.buf())) > 0) {
- readBytes += ret;
- }
- } finally {
- buf.flip();
- }
- // process buffer
- // buf.clear();
- }
從這段程序,我們基本可以了解到NIO網(wǎng)絡(luò)編程的一些特點(diǎn),創(chuàng)建一個(gè)SocketServer的方式已經(jīng)發(fā)生了變化,需要指定非阻塞模式,需要創(chuàng)建一個(gè)Channel然后注冊到Selector中去,同樣,建立一個(gè)網(wǎng)絡(luò)連接過程也是一樣的模式,然后就是有條件的選擇(Readiness Selection).這樣,我們的每一個(gè)線程只需要處理一類型的網(wǎng)絡(luò)選擇。在代碼上,我們發(fā)現(xiàn)處理的方式和阻塞完全不一樣了,我們需要完全重新考慮如何編寫網(wǎng)絡(luò)通信的模塊了:
1.持久連接的超時(shí)問題(Timeout),因?yàn)锳PI沒有直接的支持timeout的參數(shù)設(shè)置功能,因此需要我們自己實(shí)現(xiàn)一個(gè)這樣功能。
2.如何使用Selector,由于每一個(gè)Selector的處理能力是有限的,因此在大量鏈接和消息處理過程中,需要考慮如何使用多個(gè)Selector.
3.在非阻塞情況下,read和write都不在是阻塞的,因此需要考慮如何完整的讀取到確定的消息;如何在確保在網(wǎng)絡(luò)環(huán)境不是很好的情況下,一定將數(shù)據(jù)寫進(jìn)IO中。
4.如何應(yīng)用ByteBuffer,本身大量創(chuàng)建ByteBuffer就是很耗資源的;如何有效的使用ByteBuffer?同時(shí)ByteBuffer的操作需要仔細(xì)考慮,因?yàn)橛衟osition()、mark()、limit()、capacity等方法。
5.由于每一個(gè)線程在處理網(wǎng)絡(luò)連接的時(shí)候,面對的都是一系列的網(wǎng)絡(luò)連接,需要考慮如何更好的使用、調(diào)度多線程。在對消息的處理上,也需要保證一定的順序,比方說,登錄消息***到達(dá),只有登錄消息處理之后,才有可能去處理同一個(gè)鏈路上的其他類型的消息。
6.在網(wǎng)絡(luò)編程中可能出現(xiàn)的內(nèi)存泄漏問題。
在NIO的接入處理框架上,大約有兩種并發(fā)線程:
1.Selector線程,每一個(gè)Selector單獨(dú)占用一個(gè)線程,由于每一個(gè)Selector的處理能力是有限的,因此需要多個(gè)Selector并行工作。
2.對于每一條處于Ready狀態(tài)的鏈路,需要線程對于相應(yīng)的消息進(jìn)行處理;對于這一類型的消息,需要并發(fā)線程共同工作進(jìn)行處理。在這個(gè)過程中,不斷可能需要消息的完整性;還要涉及到,每個(gè)鏈路上的消息可能有時(shí)序,因此在處理上,也可能要求相應(yīng)的時(shí)序性。
當(dāng)前社區(qū)的開源NIO框架實(shí)現(xiàn)有MINA、Grizzly、NIO framework、QuickServer、xSocket等,其中MINA和Grizzly最為活躍,而且代碼的質(zhì)量也很高。他們倆在實(shí)現(xiàn)的方法上也完全大不一樣。(大部分Java的開源服務(wù)器都已經(jīng)用NIO重寫了網(wǎng)絡(luò)部分。 )
不管是我們自己實(shí)現(xiàn)NIO的網(wǎng)絡(luò)編程框架,還是基于MINA、Grizzly等這樣的開源框架進(jìn)行開發(fā),都需要理解確定的了解NIO帶來的益處和NIO編程需要解決的眾多類型的問題。充足、有效的單元測試,是我們寫好NIO代碼的好助手:)
原文鏈接:http://www.cnblogs.com/zhuowei/archive/2009/03/19/1416421.html