基于管道的即時(shí)通信項(xiàng)目實(shí)現(xiàn)原理分析
項(xiàng)目實(shí)現(xiàn)原理
sevice只需往管道中(數(shù)據(jù)池)中發(fā)送數(shù)據(jù),等到池中有數(shù)據(jù)了,它自動(dòng)會(huì)找你。你不必要關(guān)心數(shù)據(jù)怎么發(fā)送與接收,只需要關(guān)注你業(yè)務(wù)的處理。
如下圖
優(yōu)點(diǎn):
基于管道的實(shí)現(xiàn)是消息的發(fā)送或接受只需要發(fā)送到管道或者從管道讀取,而不用關(guān)注如何通過(guò)Channer發(fā)送,這樣則實(shí)現(xiàn)了service層與socket的解耦。
依賴于廣播而不依賴于回調(diào)函數(shù),與nio的異步非阻塞,真正實(shí)現(xiàn)線程的零等待。
缺點(diǎn):
發(fā)送的數(shù)據(jù)很難通過(guò)會(huì)掉函數(shù)實(shí)現(xiàn)(或者根本不能),只能通過(guò)廣播實(shí)現(xiàn)。
相關(guān)類(lèi)介紹
● ClientMessagePool,ServiceMessagePool管道(數(shù)據(jù)池)
內(nèi)部實(shí)現(xiàn)原理是一個(gè)鏈表隊(duì)列,數(shù)據(jù)的增加讀取對(duì)應(yīng)隊(duì)列中的壓入隊(duì)列,讀取隊(duì)列頭元素
● Sevice
業(yè)務(wù)邏輯處理類(lèi),必須實(shí)現(xiàn)IMessageSevice接口,并向MessageObserver注冊(cè)
● MessageObserver
內(nèi)部有一個(gè)IMessageSevice的鏈表,保存各個(gè)實(shí)現(xiàn)IMessageSevice接口的Service,與Sevice 構(gòu)成觀察者模式,
會(huì)有一個(gè)線程專門(mén)監(jiān)測(cè)MessagePool,一旦有數(shù)據(jù),就交給MessageObserver。MessageObserver根據(jù)特定消息類(lèi)推送給制定的service.
● SocketChannel
實(shí)現(xiàn)了一個(gè)SockenChannel的類(lèi),相當(dāng)于一個(gè)客戶端。從管道中(ClientMessagePool)中讀取數(shù)據(jù),一旦有數(shù)據(jù),則將數(shù)據(jù)寫(xiě)入管道
● Selector
接收管道的注冊(cè),并根紛發(fā)條件,向指定的SocketChannel推動(dòng)數(shù)據(jù)。也會(huì)根據(jù)過(guò)濾條件過(guò)濾數(shù)據(jù)。
代碼實(shí)現(xiàn)
管道代碼實(shí)現(xiàn)
- package com.pool;
- import java.util.Queue;
- import java.util.concurrent.LinkedBlockingQueue;
- public class MessagePool {
- public static Queue<String> clintmessageQueue = new LinkedBlockingQueue<String>();
- public static Queue<String> serverMessageQueue = new LinkedBlockingQueue<String>();
- }
接口
- package com.pool;
- public interface IMessagePool {
- public void addMessage(String message);
- public String pollMessage();
- public boolean isEmpty();
- }
實(shí)現(xiàn)類(lèi)
- package com.pool.impl;
- import com.pool.IMessagePool;
- import com.pool.MessagePool;
- public class ClientMessagePool implements IMessagePool {
- @Override
- public void addMessage(String message) {
- MessagePool.clintmessageQueue.add(message);
- }
- @Override
- public String pollMessage() {
- return MessagePool.clintmessageQueue.poll();
- }
- @Override
- public boolean isEmpty() {
- if(MessagePool.clintmessageQueue.size()>0)
- return true;
- else
- return false;
- }
- }
#p#客戶端
- package com.socket;
- import java.io.IOException;
- import java.net.InetSocketAddress;
- import java.nio.ByteBuffer;
- import java.nio.channels.SelectionKey;
- import java.nio.channels.Selector;
- import java.nio.channels.SocketChannel;
- import java.util.Iterator;
- import org.apache.commons.lang.ArrayUtils;
- import com.pool.IMessagePool;
- import com.pool.impl.ClientMessagePool;
- import com.util.PackageUtil;
- public class MySocket {
- private SocketChannel mSocketChannel;
- private SelectionKey key;
- public static String CHARSET = "utf-8";
- public static String ADDRESS = "127.0.0.1";
- public static int HOST = 34521;
- protected Selector mSelector;
- protected IMessagePool messagePool = new ClientMessagePool();;
- ByteBuffer buffer;
- public MySocket() {
- try {
- mSelector = Selector.open();
- initSocketChannel();
- initBassiness();
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- try {
- key.channel().close();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- }
- /**
- * 業(yè)務(wù)邏輯
- *
- * @throws Exception
- */
- private void initBassiness() throws Exception {
- while (true) {
- checkWriteable();
- // 瞬時(shí)檢測(cè)
- if (mSelector.select(100) > 0) {
- Iterator<SelectionKey> keys = mSelector.selectedKeys()
- .iterator();
- while (keys.hasNext()) {
- SelectionKey key = keys.next();
- if (key.isReadable()) {
- dispose4Readable(key);
- }
- if (key.isValid() && key.isWritable()) {
- dispose4Writable(key);
- }
- keys.remove();
- }
- }
- }
- }
- /**
- * 可讀請(qǐng)求
- *
- * @param key
- * @throws Exception
- */
- protected void dispose4Readable(SelectionKey key) throws Exception {
- SocketChannel mSocketChannel = ((SocketChannel) key.channel());
- buffer = ByteBuffer.allocate(1024);
- mSocketChannel.read(buffer);
- buffer.flip();
- this.unPacket(buffer.array(), key);
- }
- /**
- * 可寫(xiě)請(qǐng)求
- *
- * @param key
- * @throws Exception
- */
- protected void dispose4Writable(SelectionKey key) throws Exception {
- SocketChannel mSocketChannel = ((SocketChannel) key.channel());
- int value = 0;
- do{
- value = mSocketChannel.write(buffer);
- }while(value!=0);
- key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
- key.interestOps(SelectionKey.OP_READ);
- }
- /**
- * 解包
- *
- * @param buf
- * @return
- */
- public byte[] unPacket(byte[] buf, SelectionKey key) {
- int len = buf.length;// 37
- int i;
- for (i = 0; i < len; i++) {
- if (len < i + PackageUtil.PACKAGEHEADERLENGTH
- + PackageUtil.PACKAGESAVEDATALENGTH) {
- break;
- }
- String tmp = new String(ArrayUtils.subarray(buf, i, i
- + PackageUtil.PACKAGEHEADERLENGTH));
- if (tmp.equals(PackageUtil.PACKAGEHEADER)) {
- int messageLength = PackageUtil.byte2Int(ArrayUtils.subarray(
- buf, i + PackageUtil.PACKAGEHEADERLENGTH, i
- + PackageUtil.PACKAGEHEADERLENGTH
- + PackageUtil.PACKAGESAVEDATALENGTH));
- if (len < i + PackageUtil.PACKAGEHEADERLENGTH
- + PackageUtil.PACKAGESAVEDATALENGTH + messageLength) {
- break;
- }
- byte[] data = ArrayUtils.subarray(buf, i
- + PackageUtil.PACKAGEHEADERLENGTH
- + PackageUtil.PACKAGESAVEDATALENGTH, i
- + PackageUtil.PACKAGEHEADERLENGTH
- + PackageUtil.PACKAGESAVEDATALENGTH + messageLength);
- String message = new String(data);
- System.out.println(message);
- // Filter.filterRead(message, key, messagePool);
- i += PackageUtil.PACKAGEHEADERLENGTH
- + PackageUtil.PACKAGESAVEDATALENGTH + messageLength - 1;
- }
- }
- if (i == len) {
- return new byte[0];
- }
- return ArrayUtils.subarray(buf, i, buf.length);
- }
- void initSocketChannel() throws Exception {
- mSocketChannel = SocketChannel.open();
- mSocketChannel.connect(new InetSocketAddress(ADDRESS, HOST));
- mSocketChannel.configureBlocking(false);
- key = mSocketChannel.register(mSelector, SelectionKey.OP_CONNECT|SelectionKey.OP_READ);
- }
- void checkWriteable() {
- if (messagePool.isEmpty()) {
- String values = messagePool.pollMessage();
- System.out.println(" "+values);
- buffer = ByteBuffer.wrap(PackageUtil.packet(values.getBytes()));
- key.interestOps(SelectionKey.OP_WRITE);
- }
- }
- }
#p#服務(wù)器
- package com.socket;
- import java.io.IOException;
- import java.net.InetSocketAddress;
- import java.nio.ByteBuffer;
- import java.nio.channels.SelectionKey;
- import java.nio.channels.Selector;
- import java.nio.channels.ServerSocketChannel;
- import java.nio.channels.SocketChannel;
- import java.util.Iterator;
- import java.util.Set;
- import org.apache.commons.lang.ArrayUtils;
- import com.filter.Filter;
- import com.pool.IMessagePool;
- import com.pool.impl.ServerMessagePoll;
- import com.util.PackageUtil;
- public class MyServerSocket {
- private ServerSocketChannel mServerSocketChannel;
- private static MyServerSocket serverSocket;
- public static String CHARSET = "utf-8";
- public static String ADDRESS = "127.0.0.1";
- public static int HOST = 34521;
- protected Selector mSelector;
- protected IMessagePool messagePool = new ServerMessagePoll();;
- ByteBuffer buffer;
- private MyServerSocket() throws Exception {
- try {
- mSelector = Selector.open();
- initSocketChannel();
- initBassiness();
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- Set<SelectionKey> keys = mSelector.keys();
- {
- for (SelectionKey key : keys) {
- try {
- key.channel().close();
- } catch (IOException e) {
- e.printStackTrace();
- continue;
- }
- }
- }
- }
- }
- /**
- * 業(yè)務(wù)邏輯
- *
- * @throws Exception
- */
- private void initBassiness() throws Exception {
- while (true) {
- checkWriteable();
- // 瞬時(shí)檢測(cè)
- if (mSelector.select() > 0) {
- Iterator<SelectionKey> keys = mSelector.selectedKeys()
- .iterator();
- while (keys.hasNext()) {
- SelectionKey key = keys.next();
- if (key.isAcceptable()) {
- dispose4Acceptable(key);
- }
- if (key.isReadable()) {
- dispose4Readable(key);
- }
- if (key.isValid() && key.isWritable()) {
- dispose4Writable(key);
- }
- keys.remove();
- }
- }
- }
- }
- /**
- * 響應(yīng)讀
- * @param key
- * @throws Exception
- */
- protected void dispose4Readable(SelectionKey key) throws Exception {
- SocketChannel mSocketChannel = ((SocketChannel) key.channel());
- buffer = ByteBuffer.allocate(1024);
- mSocketChannel.read(buffer);
- buffer.flip();
- this.unPacket(buffer.array(), key);
- }
- /**
- * 可寫(xiě)請(qǐng)求
- *
- * @param key
- * @throws Exception
- */
- protected void dispose4Writable(SelectionKey key) throws Exception {
- SocketChannel mSocketChannel = ((SocketChannel) key.channel());
- if(mSocketChannel.write(buffer)!=-1){
- buffer.clear();
- }
- key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
- // key.interestOps(SelectionKey.OP_READ);
- }
- /**
- * 解包
- *
- * @param buf
- * @return
- */
- private byte[] unPacket(byte[] buf, SelectionKey key) {
- int len = buf.length;// 37
- int i;
- for (i = 0; i < len; i++) {
- if (len < i + PackageUtil.PACKAGEHEADERLENGTH
- + PackageUtil.PACKAGESAVEDATALENGTH) {
- break;
- }
- String tmp = new String(ArrayUtils.subarray(buf, i, i
- + PackageUtil.PACKAGEHEADERLENGTH));
- if (tmp.equals(PackageUtil.PACKAGEHEADER)) {
- int messageLength = PackageUtil.byte2Int(ArrayUtils.subarray(
- buf, i + PackageUtil.PACKAGEHEADERLENGTH, i
- + PackageUtil.PACKAGEHEADERLENGTH
- + PackageUtil.PACKAGESAVEDATALENGTH));
- if (len < i + PackageUtil.PACKAGEHEADERLENGTH
- + PackageUtil.PACKAGESAVEDATALENGTH + messageLength) {
- break;
- }
- byte[] data = ArrayUtils.subarray(buf, i
- + PackageUtil.PACKAGEHEADERLENGTH
- + PackageUtil.PACKAGESAVEDATALENGTH, i
- + PackageUtil.PACKAGEHEADERLENGTH
- + PackageUtil.PACKAGESAVEDATALENGTH + messageLength);
- String message = new String(data);
- System.out.println("server read message" + message);
- Filter.filterRead(message, key, messagePool);
- i += PackageUtil.PACKAGEHEADERLENGTH
- + PackageUtil.PACKAGESAVEDATALENGTH + messageLength - 1;
- }
- }
- if (i == len) {
- return new byte[0];
- }
- return ArrayUtils.subarray(buf, i, buf.length);
- }
- public static MyServerSocket newInstence() throws Exception {
- if (serverSocket == null) {
- return new MyServerSocket();
- }
- return serverSocket;
- }
- /**
- * SocketChannel初始化
- * @throws Exception
- */
- void initSocketChannel() throws Exception {
- mServerSocketChannel = ServerSocketChannel.open();
- mServerSocketChannel.configureBlocking(false);
- mServerSocketChannel.bind(new InetSocketAddress(ADDRESS, HOST));
- mServerSocketChannel.register(mSelector, SelectionKey.OP_ACCEPT);
- }
- void dispose4Acceptable(SelectionKey key) throws Exception {
- SocketChannel mSocketChannel = ((ServerSocketChannel) key.channel())
- .accept();
- mSocketChannel.configureBlocking(false);
- mSocketChannel.register(mSelector, SelectionKey.OP_READ);
- }
- void checkWriteable() {
- if (messagePool.isEmpty()) {
- String value = messagePool.pollMessage();
- String result = Filter.filterWrite(value, mSelector);
- if (result != null) {
- System.out.println("server:" + result);
- buffer = ByteBuffer.wrap(PackageUtil.packet(result.getBytes()));
- }
- }
- }
- }
#p#過(guò)濾器
- package com.filter;
- import java.nio.channels.SelectionKey;
- import java.nio.channels.Selector;
- import java.util.Set;
- import com.model.BaseModule;
- import com.model.Chat;
- import com.model.User;
- import com.pool.IMessagePool;
- import com.util.StringUtil;
- public class Filter {
- private static final String LOGIN = "login";
- private static BaseModule modul = new BaseModule();
- private static SelectionKey selectionKey=null;
- private static Selector selector = null;
- /**
- * TODO 線程啟動(dòng)
- *
- * @param message
- * @return
- */
- public static void filterRead(String message, SelectionKey key,
- IMessagePool messagePool) {
- selectionKey = key;
- try {
- BaseModule filterModul = (BaseModule) StringUtil.string2Bean(modul,
- message);
- if(filterType(filterModul.getType())){
- if(filterValue(filterModul.getMessage())){
- // messagePool.addMessage(message);
- }else{
- }
- }else{
- messagePool.addMessage(message);
- }
- } catch (Exception e) {
- return;
- }
- }
- public static String filterWrite(String message,Selector mSelector){
- selector = mSelector;
- return filter(message);
- }
- private static String filter(String message){
- BaseModule filterModul = (BaseModule) StringUtil.string2Bean(modul,
- message);
- Chat chat = (Chat) StringUtil.string2Bean(new Chat(),
- filterModul.getMessage());
- Set<SelectionKey> keys=selector.keys();
- for(SelectionKey key:keys){
- String markString=key.attachment()!=null?key.attachment().toString():null;
- if(markString!=null && markString.equals(chat.getTo())){
- key.interestOps(SelectionKey.OP_WRITE);
- return chat.getMessage();
- }
- }
- return null;
- }
- /**
- * 過(guò)濾類(lèi)型
- * @param value
- * @return
- */
- private static boolean filterType(String value) {
- if (LOGIN.equals(value)) {
- return true;
- }
- return false;
- }
- /**
- * 過(guò)濾內(nèi)容
- * @param value
- * @return
- */
- private static boolean filterValue(String value) {
- return filterLogin(value);
- }
- private static boolean filterLogin(String value) {
- User user = (User) StringUtil.string2Bean(new User(), value);
- if (user.getUserName() != null) {
- selectionKey.attach(user.getUserName());
- return true;
- }
- return false;
- }
- }
service接口
- package com.service;
- public interface IMessageService {
- void doMessage(String message);
- }
#p#util
- package com.util;
- import java.io.UnsupportedEncodingException;
- import org.apache.commons.lang.ArrayUtils;
- public class PackageUtil {
- public static final String PACKAGEHEADER = "↨-↨";//消息長(zhǎng)度
- public static final int PACKAGEHEADERLENGTH = 7; //數(shù)據(jù)頭長(zhǎng)�?
- public static final int PACKAGESAVEDATALENGTH = 4; //數(shù)據(jù)長(zhǎng)度站的位數(shù)
- /**
- * 打包
- * @param pkg 要打包的字節(jié)數(shù)組
- * @return
- */
- public static byte[] packet(byte[] pkg) {
- int intValue = pkg.length;
- byte[] b = new byte[4];
- for (int i = 0; i < 4; i++) {
- b[i] = (byte) (intValue >> 8 * (3 - i) & 0xFF);
- // System.out.print(Integer.toBinaryString(b[i])+" ");
- //System.out.println((b[i] & 0xFF) + " ");
- }
- try {
- byte[] newPkg = ArrayUtils.addAll(PackageUtil.PACKAGEHEADER.getBytes("utf-8"), b);
- newPkg = ArrayUtils.addAll(newPkg, pkg);
- return newPkg;
- } catch (UnsupportedEncodingException e) {
- e.printStackTrace();
- }
- return null;
- }
- /**
- * 字節(jié)數(shù)組轉(zhuǎn)整形
- * @param b
- * @return
- */
- public static int byte2Int(byte[] b) {
- int intValue = 0;
- for (int i = 0; i < b.length; i++) {
- intValue += (b[i] & 0xFF) << (8 * (3 - i));
- // System.out.print(Integer.toBinaryString(intValue)+" ");
- }
- return intValue;
- }
- /**
- * @param args
- */
- public static void main(String[] args) {
- // TODO Auto-generated method stub
- }
- }
StringUtil
- package com.util;
- import com.google.gson.Gson;
- public class StringUtil {
- private static Gson json =new Gson();
- /**
- * 將字符串專為json
- * @param clazz
- * @param message
- * @return
- */
- public static Object string2Bean(Object clazz,String message){
- return json.fromJson(message, clazz.getClass());
- }
- /**
- * 將json專為字符串
- * @param clazz
- * @return
- */
- public static String bean2Json(Object clazz){
- return json.toJson(clazz);
- }
- }
module
- package com.model;
- /**
- * 默認(rèn)包裝
- * @author Administrator
- *
- */
- public class BaseModule {
- String type ;
- String message;
- public String getType() {
- return type;
- }
- public void setType(String type) {
- this.type = type;
- }
- public String getMessage() {
- return message;
- }
- public void setMessage(String message) {
- this.message = message;
- }
- }
這個(gè)為原型。后面會(huì)對(duì)具體細(xì)節(jié)進(jìn)行實(shí)現(xiàn)以及原理進(jìn)行講解