基于Java NIO的即時(shí)聊天服務(wù)器模型
前不久自己動(dòng)手寫了一個(gè)Android的聊天工具,跟服務(wù)器的交互還是基于HTTP方式的,在一般通訊上還算湊活,但是在即時(shí)聊天的時(shí)候就有點(diǎn)惡心了,客戶端開(kāi)啟Service每隔3秒去詢問(wèn)服務(wù)器是否有自己的新消息(當(dāng)然3秒有點(diǎn)太快了),在心疼性能和流量的前提下,只能自己動(dòng)手寫個(gè)服務(wù)器,傳統(tǒng)的Socket是阻塞的,這樣的話服務(wù)器對(duì)每個(gè)Socket都需要建立一個(gè)線程來(lái)操作,資源開(kāi)銷很大,而且線程多了直接會(huì)影響服務(wù)端的性能(曾經(jīng)測(cè)試開(kāi)了3000多個(gè)線程就不讓創(chuàng)建了,所以并發(fā)數(shù)目也是有限制的),聽(tīng)說(shuō)從JDK1.5就多了個(gè)New IO,灰常不錯(cuò)的樣子,找了找相關(guān)的資料,網(wǎng)上竟然全都是最最最簡(jiǎn)單的一個(gè)demo,然后去CSDN發(fā)帖,基本上都是建議直接使用MINA框架的,這樣一來(lái)根本達(dá)不到學(xué)習(xí)NIO的目的,而且現(xiàn)在的技術(shù)也太快餐了,只知道使用前輩留下的東西,知其然不知其所以然。
折騰了一個(gè)周,終于搞出來(lái)了一個(gè)雛形,相比于xmpp的xml,本人更喜歡json的簡(jiǎn)潔,為了防止客戶端異常斷開(kāi)等,準(zhǔn)備采用心跳檢測(cè)的機(jī)制來(lái)判斷用戶是否在線,另外還有一種方法是學(xué)習(xí)例如Tomcat等Servlet中間件的方式,設(shè)置Session周期,定時(shí)清除過(guò)期Session。本Demo暫時(shí)實(shí)現(xiàn)了Session過(guò)期檢測(cè),心跳檢測(cè)有空再搞,如果本例子在使用過(guò)程中有性能漏洞或者什么bug請(qǐng)及時(shí)通知我,謝謝。
廢話不多說(shuō),關(guān)于NIO的SelectionKey、Selector、Channel網(wǎng)上的介紹例子都很多,直接上代碼:
JsonParser
Json的解析類,隨便封裝了下,使用的最近比較火的fastjson
- public class JsonParser {
- private static JSONObject mJson;
- public synchronized static String get(String json,String key) {
- mJson = JSON.parseObject(json);
- return mJson.getString(key);
- }
- }
Main
入口,不解釋
- public class Main {
- public static void main(String... args) {
- new SeekServer().start();
- }
- }
Log
- public class Log {
- public static void i(Object obj) {
- System.out.println(obj);
- }
- public static void e(Object e) {
- System.err.println(e);
- }
- }
SeekServer:
服務(wù)器端的入口,請(qǐng)求的封裝和接收都在此類,端口暫時(shí)寫死在了代碼里,mSelector.select(TIME_OUT) > 0 目的是為了當(dāng)服務(wù)器空閑的時(shí)候(沒(méi)有任何讀寫甚至請(qǐng)求斷開(kāi)事件),循環(huán)時(shí)有個(gè)間隔時(shí)間,不然基本上相當(dāng)于while(true){//nothing}了,你懂的。
- public class SeekServer extends Thread{
- private final int ACCPET_PORT = 55555;
- private final int TIME_OUT = 1000;
- private Selector mSelector = null;
- private ServerSocketChannel mSocketChannel = null;
- private ServerSocket mServerSocket = null;
- private InetSocketAddress mAddress = null;
- public SeekServer() {
- long sign = System.currentTimeMillis();
- try {
- mSocketChannel = ServerSocketChannel.open();
- if(mSocketChannel == null) {
- System.out.println("can't open server socket channel");
- }
- mServerSocket = mSocketChannel.socket();
- mAddress = new InetSocketAddress(ACCPET_PORT);
- mServerSocket.bind(mAddress);
- Log.i("server bind port is " + ACCPET_PORT);
- mSelector = Selector.open();
- mSocketChannel.configureBlocking(false);
- SelectionKey key = mSocketChannel.register(mSelector, SelectionKey.OP_ACCEPT);
- key.attach(new Acceptor());
- //檢測(cè)Session狀態(tài)
- Looper.getInstance().loop();
- //開(kāi)始處理Session
- SessionProcessor.start();
- Log.i("Seek server startup in " + (System.currentTimeMillis() - sign) + "ms!");
- } catch (ClosedChannelException e) {
- Log.e(e.getMessage());
- } catch (IOException e) {
- Log.e(e.getMessage());
- }
- }
- public void run() {
- Log.i("server is listening...");
- while(!Thread.interrupted()) {
- try {
- if(mSelector.select(TIME_OUT) > 0) {
- Set<SelectionKey> keys = mSelector.selectedKeys();
- Iterator<SelectionKey> iterator = keys.iterator();
- SelectionKey key = null;
- while(iterator.hasNext()) {
- key = iterator.next();
- Handler at = (Handler) key.attachment();
- if(at != null) {
- at.exec();
- }
- iterator.remove();
- }
- }
- } catch (IOException e) {
- Log.e(e.getMessage());
- }
- }
- }
- class Acceptor extends Handler{
- public void exec(){
- try {
- SocketChannel sc = mSocketChannel.accept();
- new Session(sc, mSelector);
- } catch (ClosedChannelException e) {
- Log.e(e);
- } catch (IOException e) {
- Log.e(e);
- }
- }
- }
- }
Handler:
只有一個(gè)抽象方法exec,Session將會(huì)繼承它。
- public abstract class Handler {
- public abstract void exec();
- }
Session:
封裝了用戶的請(qǐng)求和SelectionKey和SocketChannel,每次接收到新的請(qǐng)求時(shí)都重置它的最后活動(dòng)時(shí)間,通過(guò)狀態(tài)mState=READING or SENDING 去執(zhí)行消息的接收與發(fā)送,當(dāng)客戶端異常斷開(kāi)時(shí)則從SessionManager清除該會(huì)話。
- public class Session extends Handler{
- private SocketChannel mChannel;
- private SelectionKey mKey;
- private ByteBuffer mRreceiveBuffer = ByteBuffer.allocate(10240);
- private Charset charset = Charset.forName("UTF-8");
- private CharsetDecoder mDecoder = charset.newDecoder();
- private CharsetEncoder mEncoder = charset.newEncoder();
- private long lastPant;//最后活動(dòng)時(shí)間
- private final int TIME_OUT = 1000 * 60 * 5; //Session超時(shí)時(shí)間
- private String key;
- private String sendData = "";
- private String receiveData = null;
- public static final int READING = 0,SENDING = 1;
- int mState = READING;
- public Session(SocketChannel socket, Selector selector) throws IOException {
- this.mChannel = socket;
- mChannel = socket;
- mChannel.configureBlocking(false);
- mKey = mChannel.register(selector, 0);
- mKey.attach(this);
- mKey.interestOps(SelectionKey.OP_READ);
- selector.wakeup();
- lastPant = Calendar.getInstance().getTimeInMillis();
- }
- public String getReceiveData() {
- return receiveData;
- }
- public void clear() {
- receiveData = null;
- }
- public void setSendData(String sendData) {
- mState = SENDING;
- mKey.interestOps(SelectionKey.OP_WRITE);
- this.sendData = sendData + "\n";
- }
- public boolean isKeekAlive() {
- return lastPant + TIME_OUT > Calendar.getInstance().getTimeInMillis();
- }
- public void setAlive() {
- lastPant = Calendar.getInstance().getTimeInMillis();
- }
- /**
- * 注銷當(dāng)前Session
- */
- public void distroy() {
- try {
- mChannel.close();
- mKey.cancel();
- } catch (IOException e) {}
- }
- @Override
- public synchronized void exec() {
- try {
- if(mState == READING) {
- read();
- }else if(mState == SENDING) {
- write();
- }
- } catch (IOException e) {
- SessionManager.remove(key);
- try {
- mChannel.close();
- } catch (IOException e1) {
- Log.e(e1);
- }
- mKey.cancel();
- }
- }
- public void read() throws IOException{
- mRreceiveBuffer.clear();
- int sign = mChannel.read(mRreceiveBuffer);
- if(sign == -1) { //客戶端連接關(guān)閉
- mChannel.close();
- mKey.cancel();
- }
- if(sign > 0) {
- mRreceiveBuffer.flip();
- receiveData = mDecoder.decode(mRreceiveBuffer).toString();
- setAlive();
- setSign();
- SessionManager.addSession(key, this);
- }
- }
- private void setSign() {
- //設(shè)置當(dāng)前Session的Key
- key = JsonParser.get(receiveData,"imei");
- //檢測(cè)消息類型是否為心跳包
- // String type = jo.getString("type");
- // if(type.equals("HEART_BEAT")) {
- // setAlive();
- // }
- }
- /**
- * 寫消息
- */
- public void write() {
- try {
- mChannel.write(mEncoder.encode(CharBuffer.wrap(sendData)));
- sendData = null;
- mState = READING;
- mKey.interestOps(SelectionKey.OP_READ);
- } catch (CharacterCodingException e) {
- e.printStackTrace();
- } catch (IOException e) {
- try {
- mChannel.close();
- } catch (IOException e1) {
- Log.e(e1);
- }
- }
- }
- }
SessionManager:
將所有Session存放到ConcurrentHashMap,這里使用手機(jī)用戶的imei做key,ConcurrentHashMap因?yàn)槭蔷€程安全的,所以能很大程度上避免自己去實(shí)現(xiàn)同步的過(guò)程,
封裝了一些操作Session的方法例如get,remove等。
- public class SessionManager {
- private static ConcurrentHashMap<String, Session> sessions = new ConcurrentHashMap<String, Session>();
- public static void addSession(String key,Session session) {
- sessions.put(key, session);
- }
- public static Session getSession(String key) {
- return sessions.get(key);
- }
- public static Set<String> getSessionKeys() {
- return sessions.keySet();
- }
- public static int getSessionCount() {
- return sessions.size();
- }
- public static void remove(String[] keys) {
- for(String key:keys) {
- if(sessions.containsKey(key)) {
- sessions.get(key).distroy();
- sessions.remove(key);
- }
- }
- }
- public static void remove(String key) {
- if(sessions.containsKey(key)) {
- sessions.get(key).distroy();
- sessions.remove(key);
- }
- }
- }
SessionProcessor
里面使用了JDK自帶的線程池,用來(lái)分發(fā)處理所有Session中當(dāng)前需要處理的請(qǐng)求(線程池的初始化參數(shù)不是太熟,望有了解的童鞋能告訴我),內(nèi)部類Process則是將Session再次封裝成SocketRequest和SocketResponse(看到這里是不是有點(diǎn)熟悉的感覺(jué),對(duì)沒(méi)錯(cuò),JavaWeb里到處都是request和response)。
- public class SessionProcessor implements Runnable{
- private static Runnable processor = new SessionProcessor();
- private static ThreadPoolExecutor pool = new ThreadPoolExecutor(10, 200, 500, TimeUnit.MILLISECONDS,new ArrayBlockingQueue<Runnable>(10),new ThreadPoolExecutor.CallerRunsPolicy());
- public static void start() {
- new Thread(processor).start();
- }
- @Override
- public void run() {
- while(true) {
- Session tmp = null;
- for(String key:SessionManager.getSessionKeys()) {
- tmp = SessionManager.getSession(key);
- //處理Session未處理的請(qǐng)求
- if(tmp.getReceiveData() != null) {
- pool.execute(new Process(tmp));
- }
- }
- try {
- Thread.sleep(10);
- } catch (InterruptedException e) {
- Log.e(e);
- }
- }
- }
- class Process implements Runnable {
- private SocketRequest request;
- private SocketResponse response;
- public Process(Session session) {
- //將Session封裝成Request和Response
- request = new SocketRequest(session);
- response = new SocketResponse(session);
- }
- @Override
- public void run() {
- new RequestTransform().transfer(request, response);
- }
- }
- }
RequestTransform里的transfer方法利用反射對(duì)請(qǐng)求參數(shù)中的請(qǐng)求類別和請(qǐng)求動(dòng)作來(lái)調(diào)用不同類的不同方法(UserHandler和MessageHandler)
- public class RequestTransform {
- public void transfer(SocketRequest request,SocketResponse response) {
- String action = request.getValue("action");
- String handlerName = request.getValue("handler");
- //根據(jù)Session的請(qǐng)求類型,讓不同的類方法去處理
- try {
- Class<?> c= Class.forName("com.seek.server.handler." + handlerName);
- Class<?>[] arg=new Class[]{SocketRequest.class,SocketResponse.class};
- Method method=c.getMethod(action,arg);
- method.invoke(c.newInstance(), new Object[]{request,response});
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
SocketRequest和SocketResponse
- public class SocketRequest {
- private Session mSession;
- private String mReceive;
- public SocketRequest(Session session) {
- mSession = session;
- mReceive = session.getReceiveData();
- mSession.clear();
- }
- public String getValue(String key) {
- return JsonParser.get(mReceive, key);
- }
- public String getQueryString() {
- return mReceive;
- }
- }
- public class SocketResponse {
- private Session mSession;
- public SocketResponse(Session session) {
- mSession = session;
- }
- public void write(String msg) {
- mSession.setSendData(msg);
- }
- }
最后則是兩個(gè)處理請(qǐng)求的Handler
- public class UserHandler {
- public void login(SocketRequest request,SocketResponse response) {
- System.out.println(request.getQueryString());
- //TODO: 處理用戶登錄
- response.write("你肯定收到消息了");
- }
- }
- public class MessageHandler {
- public void send(SocketRequest request,SocketResponse response) {
- System.out.println(request.getQueryString());
- //消息發(fā)送
- String key = request.getValue("imei");
- Session session = SessionManager.getSession(key);
- new SocketResponse(session).write(request.getValue("sms"));
- }
- }
還有個(gè)監(jiān)測(cè)是否超時(shí)的類Looper,定期去刪除Session
- public class Looper extends Thread{
- private static Looper looper = new Looper();
- private static boolean isStart = false;
- private final int INTERVAL = 1000 * 60 * 5;
- private Looper(){}
- public static Looper getInstance() {
- return looper;
- }
- public void loop() {
- if(!isStart) {
- isStart = true;
- this.start();
- }
- }
- public void run() {
- Task task = new Task();
- while(true) {
- //Session過(guò)期檢測(cè)
- task.checkState();
- //心跳包檢測(cè)
- //task.sendAck();
- try {
- Thread.sleep(INTERVAL);
- } catch (InterruptedException e) {
- Log.e(e);
- }
- }
- }
- }
- public class Task {
- public void checkState() {
- Set<String> keys = SessionManager.getSessionKeys();
- if(keys.size() == 0) {
- return;
- }
- List<String> removes = new ArrayList<String>();
- Iterator<String> iterator = keys.iterator();
- String key = null;
- while(iterator.hasNext()) {
- key = iterator.next();
- if(!SessionManager.getSession(key).isKeekAlive()) {
- removes.add(key);
- }
- }
- if(removes.size() > 0) {
- Log.i("sessions is time out,remove " + removes.size() + "session");
- }
- SessionManager.remove(removes.toArray(new String[removes.size()]));
- }
- public void sendAck() {
- Set<String> keys = SessionManager.getSessionKeys();
- if(keys.size() == 0) {
- return;
- }
- Iterator<String> iterator = keys.iterator();
- while(iterator.hasNext()) {
- iterator.next();
- //TODO 發(fā)送心跳包
- }
- }
- }
注意,在Task和SessionProcessor類里都有對(duì)SessionManager的sessions做遍歷,文中使用的方法并不是很好,主要是效率問(wèn)題,推薦使用遍歷Entry的方式來(lái)獲取Key和Value,因?yàn)橐恢痹贘avaWeb上折騰,所以會(huì)的童鞋看到Request和Response會(huì)挺親切,這個(gè)例子沒(méi)有經(jīng)過(guò)任何安全和性能測(cè)試,如果需要放到生產(chǎn)環(huán)境上得話請(qǐng)先自行做測(cè)試- -!
客戶端請(qǐng)求時(shí)的數(shù)據(jù)內(nèi)容例如{handler:"UserHandler",action:"login",imei:"2364656512636".......},這些約定就自己來(lái)定了。
原文鏈接:http://www.cnblogs.com/freedom-elf/archive/2011/08/11/2135015.html
【編輯推薦】