基于事件的NIO多線程服務器
JDK1.4的NIO有效解決了原有流式IO存在的線程開銷的問題,在NIO中使用多線程,主要目的已不是為了應對每個客戶端請求而分配獨立的服務線程,而是通過多線程充分使用用多個CPU的處理能力和處理中的等待時間,達到提高服務能力的目的。
線程模型
NIO的選擇器采用了多路復用(Multiplexing)技術,可在一個選擇器上處理多個套接字,通過獲取讀寫通道來進行IO操作。由于網(wǎng)絡帶寬等原因,在通道的讀、寫操作中是容易出現(xiàn)等待的,所以在讀、寫操作中引入多線程,對性能提高明顯,而且可以提高客戶端的感知服務質量。所以本文的模型將主要通過使用讀、寫線程池來提高與客戶端的數(shù)據(jù)交換能力。
同時整個服務端的流程處理,建立于事件機制上。在 [接受連接->讀->業(yè)務處理->寫 ->關閉連接 ]這個過程中,觸發(fā)器將觸發(fā)相應事件,由事件處理器對相應事件分別響應,完成服務器端的業(yè)務處理。
下面我們就來詳細看一下這個模型的各個組成部分。
相關事件定義 在這個模型中,我們定義了一些基本的事件:
(1)onAccept:
當服務端收到客戶端連接請求時,觸發(fā)該事件。通過該事件我們可以知道有新的客戶端呼入。該事件可用來控制服務端的負載。例如,服務器可設定同時只為一定數(shù)量客戶端提供服務,當同時請求數(shù)超出數(shù)量時,可在響應該事件時直接拋出異常,以拒絕新的連接。
(2)onAccepted:
當客戶端請求被服務器接受后觸發(fā)該事件。該事件表明一個新的客戶端與服務器正式建立連接。
(3)onRead:
當客戶端發(fā)來數(shù)據(jù),并已被服務器控制線程正確讀取時,觸發(fā)該事件。該事件通知各事件處理器可以對客戶端發(fā)來的數(shù)據(jù)進行實際處理了。需要注意的是,在本模型中,客戶端的數(shù)據(jù)讀取是由控制線程交由讀線程完成的,事件處理器不需要在該事件中進行專門的讀操作,而只需將控制線程傳來的數(shù)據(jù)進行直接處理即可。
(4)onWrite:
當客戶端可以開始接受服務端發(fā)送數(shù)據(jù)時觸發(fā)該事件,通過該事件,我們可以向客戶端發(fā)送回應數(shù)據(jù)。在本模型中,事件處理器只需要在該事件中設置 。
(5)onClosed:
當客戶端與服務器斷開連接時觸發(fā)該事件。
(6)onError:
當客戶端與服務器從連接開始到***斷開連接期間發(fā)生錯誤時觸發(fā)該事件。通過該事件我們可以知道有什么錯誤發(fā)生。
事件回調機制的實現(xiàn)
在這個模型中,事件采用廣播方式,也就是所有在冊的事件處理器都能獲得事件通知。這樣可以將不同性質的業(yè)務處理,分別用不同的處理器實現(xiàn),使每個處理器的業(yè)務功能盡可能單一。
如下圖:整個事件模型由監(jiān)聽器、事件適配器、事件觸發(fā)器、事件處理器組成。
(事件模型)
1.監(jiān)聽器(Serverlistener):
這是一個事件接口,定義需監(jiān)聽的服務器事件,如果您需要定義更多的事件,可在這里進行擴展。
- public interface Serverlistener
- {
- public void onError(String error);
- public void onAccept() throws Exception;
- public void onAccepted(Request request) throws Exception;
- public void onRead(Request request) throws Exception;
- public void onWrite(Request request, Response response) throws Exception;
- public void onClosed(Request request) throws Exception;
- }
2. 事件適配器(EventAdapter):
對Serverlistener接口實現(xiàn)一個適配器(EventAdapter),這樣的好處是最終的事件處理器可以只處理所關心的事件。
- public abstract class EventAdapter
- implements Serverlistener
- {
- public EventAdapter() {}
- public void onError(String error) {}
- public void onAccept() throws Exception {}
- public void onAccepted(Request request) throws Exception {}
- public void onRead(Request request) throws Exception {}
- public void onWrite(Request request, Response response) throws Exception {}
- public void onClosed(Request request) throws Exception {}
- }
3. 事件觸發(fā)器(Notifier):
用于在適當?shù)臅r候通過觸發(fā)服務器事件,通知在冊的事件處理器對事件做出響應。觸發(fā)器以Singleton模式實現(xiàn),統(tǒng)一控制整個服務器端的事件,避免造成混亂。
- public class Notifier
- {
- private static Arraylist listeners = null;
- private static Notifier instance = null;
- private Notifier()
- {
- listeners = new Arraylist();
- }
- /**
- * 獲取事件觸發(fā)器
- * @return 返回事件觸發(fā)器
- */
- public static synchronized Notifier
- getNotifier()
- {
- if (instance == null)
- {
- instance = new Notifier();
- return instance;
- }
- else
- {
- return instance;
- }
- }
- /**
- * 添加事件監(jiān)聽器
- * @param l 監(jiān)聽器
- */
- public void addlistener(Serverlistener l)
- {
- synchronized (listeners)
- {
- if (!listeners.contains(l))
- {
- listeners.add(l);
- }
- }
- }
- public void fireOnAccept()
- throws Exception
- {
- for (int i = listeners.size() - 1;
- i >= 0; i--)
- {
- ( (Serverlistener) listeners.
- get(i)).onAccept();
- }
- }
- // other fire method
- }
4. 事件處理器(Handler):
繼承事件適配器,對感興趣的事件進行響應處理,實現(xiàn)業(yè)務處理。以下是一個簡單的事件處理器實現(xiàn),它響應onRead事件,在終端打印出從客戶端讀取的數(shù)據(jù)。
- public class ServerHandler
- extends EventAdapter
- {
- public ServerHandler() {}
- public void onRead(Request request)
- throws Exception
- {
- System.out.println("Received: " +
- new String(data));
- }
- }
5. 事件處理器的注冊。
為了能讓事件處理器獲得服務線程的事件通知,事件處理器需在觸發(fā)器中注冊。
- ServerHandler handler = new ServerHandler();
- Notifier.addlistener(handler);
實現(xiàn)NIO多線程服務器
NIO多線程服務器主要由主控服務線程、讀線程和寫線程組成。
1. 主控服務線程(Server):
主控線程將創(chuàng)建讀、寫線程池,實現(xiàn)監(jiān)聽、接受客戶端請求,同時將讀、寫通道提交由相應的讀線程(Reader)和寫服務線程(Writer),由讀寫線程分別完成對客戶端數(shù)據(jù)的讀取和對客戶端的回應操作。
- public class Server implements Runnable
- {
- private static List wpool = new LinkedList();
- private static Selector selector;
- private ServerSocketChannel sschannel;
- private InetSocketAddress address;
- protected Notifier notifier;
- private int port;
- private static int MAX_THREADS = 4;
- /**
- * Creat the main thread
- * @param port server port
- * @throws java.lang.Exception
- */
- public Server(int port) throws Exception
- {
- this.port = port;
- // event dispatcher
- notifier = Notifier.getNotifier();
- // create the thread pool for reading and writing
- for (int i = 0; i < MAX_THREADS; i++)
- {
- Thread r = new Reader();
- Thread w = new Writer();
- r.start();
- w.start();
- }
- // create nonblocking socket
- selector = Selector.open();
- sschannel = ServerSocketChannel.open();
- sschannel.configureBlocking(false);
- address = new InetSocketAddress(port);
- ServerSocket ss = sschannel.socket();
- ss.bind(address);
- sschannel.register(selector, SelectionKey.OP_ACCEPT);
- }
- public void run()
- {
- System.out.println("Server started ");
- System.out.println("Server listening on port: " + port);
- while (true)
- {
- try
- {
- int num = 0;
- num = selector.select();
- if (num > 0)
- {
- Set selectedKeys = selector.selectedKeys();
- Iterator it = selectedKeys.iterator();
- while (it.hasNext())
- {
- SelectionKey key = (SelectionKey) it.next();
- it.remove();
- if ((key.readyOps() & SelectionKey.OP_ACCEPT) == SelectionKey.OP_ACCEPT)
- {
- // Accept the new connection
- ServerSocketChannel ssc =
- (ServerSocketChannel) key.channel();
- notifier.fireOnAccept();
- SocketChannel sc = ssc.accept();
- sc.configureBlocking(false);
- Request request = new Request(sc);
- notifier.fireOnAccepted(request);
- sc.register(selector, SelectionKey.OP_READ,request);
- }
- else if ((key.readyOps() & SelectionKey.OP_READ) == SelectionKey.OP_READ)
- {
- Reader.processRequest(key);
- key.cancel();
- }
- else if ((key.readyOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE)
- {
- Writer.processRequest(key);
- key.cancel();
- }
- }
- }
- //this selector's wakeup method is invoked
- else
- {
- //register new channel for writing to selector
- addRegister();
- }
- }
- catch (Exception e)
- {
- notifier.fireOnError("Error occured in Server: "
- + e.getMessage());
- continue;
- }
- }
- }
- private void addRegister()
- {
- synchronized (wpool)
- {
- while (!wpool.isEmpty())
- {
- SelectionKey key = (SelectionKey) wpool.remove(0);
- SocketChannel schannel = (SocketChannel) key.channel();
- try
- {
- schannel.register(selector, SelectionKey.OP_WRITE, key
- .attachment());
- }
- catch (Exception e)
- {
- try
- {
- schannel.finishConnect();
- schannel.close();
- schannel.socket().close();
- notifier.fireOnClosed((Request) key.attachment());
- }
- catch (Exception e1)
- {
- }
- notifier.fireOnError("Error occured in addRegister: "
- + e.getMessage());
- }
- }
- }
- }
- public static void processWriteRequest(SelectionKey key)
- {
- synchronized (wpool)
- {
- wpool.add(wpool.size(), key);
- wpool.notifyAll();
- }
- selector.wakeup();
- }
- }
2. 讀線程(Reader):
使用線程池技術,通過多個線程讀取客戶端數(shù)據(jù),以充分利用網(wǎng)絡數(shù)據(jù)傳輸?shù)臅r間,提高讀取效率。
- public class Reader extends Thread
- {
- public void run()
- {
- while (true)
- {
- try
- {
- SelectionKey key;
- synchronized (pool)
- {
- while (pool.isEmpty())
- {
- pool.wait();
- }
- key = (SelectionKey) pool.remove(0);
- }
- // 讀取客戶端數(shù)據(jù),并觸發(fā)onRead事件
- read(key);
- }
- catch (Exception e)
- {
- continue;
- }
- }
- }
- }
3. 寫線程(Writer):
和讀操作一樣,使用線程池,負責將服務器端的數(shù)據(jù)發(fā)送回客戶端。
- public final class Writer extends Thread
- {
- public void run()
- {
- while (true)
- {
- try
- {
- SelectionKey key;
- synchronized (pool)
- {
- while (pool.isEmpty())
- {
- pool.wait();
- }
- key = (SelectionKey) pool.remove(0);
- }
- write(key);
- }
- catch (Exception e)
- {
- continue;
- }
- }
- }
- }
具體應用
NIO多線程模型的實現(xiàn)告一段落,現(xiàn)在我們可以暫且將NIO的各個API和煩瑣的調用方法拋于腦后,專心于我們的實際應用中。
我們用一個簡單的TimeServer(時間查詢服務器)來看看該模型能帶來多么簡潔的開發(fā)方式。
在這個TimeServer中,將提供兩種語言(中文、英文)的時間查詢服務。我們將讀取客戶端的查詢命令(GB/EN),并回應相應語言格式的當前時間。在應答客戶的請求的同時,服務器將進行日志記錄。做為示例,對日志記錄,我們只是簡單地將客戶端的訪問時間和IP地址輸出到服務器的終端上。
1. 實現(xiàn)時間查詢服務的事件處理器(TimeHandler):
- public class TimeHandler extends EventAdapter
- {
- public TimeHandler() {}
- public void onWrite(Request request, Response response) throws Exception
- {
- String command = new String(request.getDataInput());
- String time = null;
- Date date = new Date();
- if (command.equals("GB"))
- {
- DateFormat cnDate = DateFormat.getDateTimeInstance(DateFormat.FulL,
- DateFormat.FulL, Locale.CHINA);
- time = cnDate.format(date);
- }
- else
- {
- DateFormat enDate = DateFormat.getDateTimeInstance(DateFormat.FulL,
- DateFormat.FulL, Locale.US);
- time = enDate.format(date);
- }
- response.send(time.getBytes());
- }
- }
2. 實現(xiàn)日志記錄服務的事件處理器(LogHandler):
- public class LogHandler extends EventAdapter
- {
- public LogHandler() {}
- public void onClosed(Request request)
- throws Exception
- {
- String log = new Date().toString() + " from " + request.getAddress().toString();
- System.out.println(log);
- }
- public void onError(String error)
- {
- System.out.println("Error: " + error);
- }
- }
3. 啟動程序:
- public class Start
- {
- public static void main(String[] args)
- {
- try
- {
- LogHandler loger = new LogHandler();
- TimeHandler timer = new TimeHandler();
- Notifier notifier = Notifier.getNotifier();
- notifier.addlistener(loger);
- notifier.addlistener(timer);
- System.out.println("Server starting ");
- Server server = new Server(5100);
- Thread tServer = new Thread(server);
- tServer.start();
- }
- catch (Exception e)
- {
- System.out.println("Server error: " + e.getMessage());
- System.exit(-1);
- }
- }
- }
小 結
通過例子我們可以看到,基于事件回調的NIO多線程服務器模型,提供了清晰直觀的實現(xiàn)方式,可讓開發(fā)者從NIO及多線程的技術細節(jié)中擺脫出來,集中精力關注具體的業(yè)務實現(xiàn)。
原文鏈接:http://www.cnblogs.com/longb/archive/2006/04/04/366800.html
【編輯推薦】