Java Socket通信技術(shù)收發(fā)線程互斥的解決方法
作者:佚名
Java Socket通信技術(shù)一直活躍在編程世界中,有不少的程序員都在使用這個(gè)技術(shù),下面我們就來看看在Java Socket通信技術(shù)中收發(fā)線程互斥的代碼。
Java Socket通信技術(shù)在很長的時(shí)間里都在使用,在不少的程序員眼中都有很多高的評價(jià)。那么下面我們就看看如何才能掌握這門復(fù)雜的編程語言,希望大家在今后的Java Socket通信技術(shù)使用中有所收獲。
下面就是Java Socket通信技術(shù)在解決收發(fā)線程互斥的代碼介紹。
- package com.bill99.svr;
- import java.io.IOException;
- import java.io.InputStream;
- import java.io.OutputStream;
- import java.net.InetSocketAddress;
- import java.net.Socket;
- import java.net.SocketException;
- import java.net.SocketTimeoutException;
- import java.text.SimpleDateFormat;
- import java.util.Date;
- import java.util.Properties;
- import java.util.Timer;
- import java.util.TimerTask;
- import java.util.concurrent.ConcurrentHashMap;
- import java.util.concurrent.TimeUnit;
- import java.util.concurrent.locks.Condition;
- import java.util.concurrent.locks.ReentrantLock;
- import org.apache.log4j.Logger;
- /**
- *<p>title: socket通信包裝類</p>
- *<p>Description: </p>
- *<p>CopyRight: CopyRight (c) 2009</p>
- *<p>Company: 99bill.com</p>
- *<p>Create date: 2009-10-14</P>
- *author sunnylocus<A href="mailto:sunnylocus@163.com">
- </A> * v0.10 2009-10-14 初類
- * v0.11 2009-11-12 對命令收發(fā)邏輯及收發(fā)線程互斥機(jī)制進(jìn)行了優(yōu)化,
處理命令速度由原來8~16個(gè)/秒提高到25~32個(gè)/秒- */ public class SocketConnection {
- private volatile Socket socket;
- private int timeout = 1000*10; //超時(shí)時(shí)間,初始值10秒
- private boolean isLaunchHeartcheck = false;//是否已啟動(dòng)心跳檢測
- private boolean isNetworkConnect = false; //網(wǎng)絡(luò)是否已連接
- private static String host = "";
- private static int port;
- static InputStream inStream = null;
- static OutputStream outStream = null;
- private static Logger log =Logger.getLogger
(SocketConnection.class);- private static SocketConnection socketConnection = null;
- private static java.util.Timer heartTimer=null;
- //private final Map<String, Object> recMsgMap= Collections.
synchronizedMap(new HashMap<String, Object>());- private final ConcurrentHashMap<String, Object> recMsgMap
= new ConcurrentHashMap<String, Object>();- private static Thread receiveThread = null;
- private final ReentrantLock lock = new ReentrantLock();
- private SocketConnection(){
- Properties conf = new Properties();
- try {
- conf.load(SocketConnection.class.getResourceAsStream
("test.conf"));- this.timeout = Integer.valueOf(conf.getProperty("timeout"));
- init(conf.getProperty("ip"),Integer.valueOf
(conf.getProperty("port")));- } catch(IOException e) {
- log.fatal("socket初始化異常!",e);
- throw new RuntimeException("socket初始化異常,請檢查配置參數(shù)");
- }
- }
- /**
- * 單態(tài)模式
- */
- public static SocketConnection getInstance() {
- if(socketConnection==null) {
- synchronized(SocketConnection.class) {
- if(socketConnection==null) {
- socketConnection = new SocketConnection();
- return socketConnection;
- }
- }
- }
- return socketConnection;
- }
- private void init(String host,int port) throws IOException {
- InetSocketAddress addr = new InetSocketAddress(host,port);
- socket = new Socket();
- synchronized (this) {
- log.info("【準(zhǔn)備與"+addr+"建立連接】");
- socket.connect(addr, timeout);
- log.info("【與"+addr+"連接已建立】");
- inStream = socket.getInputStream();
- outStream = socket.getOutputStream();
- socket.setTcpNoDelay(true);//數(shù)據(jù)不作緩沖,立即發(fā)送
- socket.setSoLinger(true, 0);//socket關(guān)閉時(shí),立即釋放資源
- socket.setKeepAlive(true);
- socket.setTrafficClass(0x04|0x10);//高可靠性和最小延遲傳輸
- isNetworkConnect=true;
- receiveThread = new Thread(new ReceiveWorker());
- receiveThread.start();
- SocketConnection.host=host;
- SocketConnection.port=port;
- if(!isLaunchHeartcheck)
- launchHeartcheck();
- }
- }
- /**
- * 心跳包檢測
- */
- private void launchHeartcheck() {
- if(socket == null)
- throw new IllegalStateException("socket is not
established!");- heartTimer = new Timer();
- isLaunchHeartcheck = true;
- heartTimer.schedule(new TimerTask() {
- public void run() {
- String msgStreamNo = StreamNoGenerator.getStreamNo("kq");
- int mstType =9999;//999-心跳包請求
- SimpleDateFormat dateformate = new SimpleDateFormat
("yyyyMMddHHmmss");- String msgDateTime = dateformate.format(new Date());
- int msgLength =38;//消息頭長度
- String commandstr = "00" +msgLength + mstType + msgStreamNo;
- log.info("心跳檢測包 -> IVR "+commandstr);
- int reconnCounter = 1;
- while(true) {
- String responseMsg =null;
- try {
- responseMsg = readReqMsg(commandstr);
- } catch (IOException e) {
- log.error("IO流異常",e);
- reconnCounter ++;
- }
- if(responseMsg!=null) {
- log.info("心跳響應(yīng)包 <- IVR "+responseMsg);
- reconnCounter = 1;
- break;
- } else {
- reconnCounter ++;
- }
- if(reconnCounter >3) {//重連次數(shù)已達(dá)三次,判定網(wǎng)絡(luò)連接中斷,
重新建立連接。連接未被建立時(shí)不釋放鎖- reConnectToCTCC(); break;
- }
- }
- }
- },1000 * 60*1,1000*60*2);
- }
- /**
- * 重連與目標(biāo)IP建立重連
- */
- private void reConnectToCTCC() {
- new Thread(new Runnable(){
- public void run(){
- log.info("重新建立與"+host+":"+port+"的連接");
- //清理工作,中斷計(jì)時(shí)器,中斷接收線程,恢復(fù)初始變量
- heartTimer.cancel();
- isLaunchHeartcheck=false;
- isNetworkConnect = false;
- receiveThread.interrupt();
- try {
- socket.close();
- } catch (IOException e1) {log.error("重連時(shí),關(guān)閉socket連
接發(fā)生IO流異常",e1);}- //----------------
- synchronized(this){
- for(; ;){
- try {
- Thread.currentThread();
- Thread.sleep(1000 * 1);
- init(host,port);
- this.notifyAll();
- break ;
- } catch (IOException e) {
- log.error("重新建立連接未成功",e);
- } catch (InterruptedException e){
- log.error("重連線程中斷",e);
- }
- }
- }
- }
- }).start();
- }
- /**
- * 發(fā)送命令并接受響應(yīng)
- * @param requestMsg
- * @return
- * @throws SocketTimeoutException
- * @throws IOException
- */
- public String readReqMsg(String requestMsg) throws IOException {
- if(requestMsg ==null) {
- return null;
- }
- if(!isNetworkConnect) {
- synchronized(this){
- try {
- this.wait(1000*5); //等待5秒,如果網(wǎng)絡(luò)還沒有恢復(fù),拋出IO流異常
- if(!isNetworkConnect) {
- throw new IOException("網(wǎng)絡(luò)連接中斷!");
- }
- } catch (InterruptedException e) {
- log.error("發(fā)送線程中斷",e);
- }
- }
- }
- String msgNo = requestMsg.substring(8, 8 + 24);//讀取流水號(hào)
- outStream = socket.getOutputStream();
- outStream.write(requestMsg.getBytes());
- outStream.flush();
- Condition msglock = lock.newCondition(); //消息鎖
- //注冊等待接收消息
- recMsgMap.put(msgNo, msglock);
- try {
- lock.lock();
- msglock.await(timeout,TimeUnit.MILLISECONDS);
- } catch (InterruptedException e) {
- log.error("發(fā)送線程中斷",e);
- } finally {
- lock.unlock();
- }
- Object respMsg = recMsgMap.remove(msgNo); //響應(yīng)信息
- if(respMsg!=null &&(respMsg != msglock)) {
- //已經(jīng)接收到消息,注銷等待,成功返回消息
- return (String) respMsg;
- } else {
- log.error(msgNo+" 超時(shí),未收到響應(yīng)消息");
- throw new SocketTimeoutException(msgNo+" 超時(shí),未收到響應(yīng)消息");
- }
- }
- public void finalize() {
- if (socket != null) {
- try {
- socket.close();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- }
- //消息接收線程
- private class ReceiveWorker implements Runnable {
- String intStr= null;
- public void run() {
- while(!Thread.interrupted()){
- try {
- byte[] headBytes = new byte[4];
- if(inStream.read(headBytes)==-1){
- log.warn("讀到流未尾,對方已關(guān)閉流!");
- reConnectToCTCC();//讀到流未尾,對方已關(guān)閉流
- return;
- }
- byte[] tmp =new byte[4];
- tmp = headBytes;
- String tempStr = new String(tmp).trim();
- if(tempStr==null || tempStr.equals("")) {
- log.error("received message is null");
- continue;
- }
- intStr = new String(tmp);
- int totalLength =Integer.parseInt(intStr);
- //----------------
- byte[] msgBytes = new byte[totalLength-4];
- inStream.read(msgBytes);
- String resultMsg = new String(headBytes)+ new
String(msgBytes);- //抽出消息ID
- String msgNo = resultMsg.substring(8, 8 + 24);
- Condition msglock =(Condition) recMsgMap.get(msgNo);
- if(msglock ==null) {
- log.warn(msgNo+"序號(hào)可能已被注銷!響應(yīng)消息丟棄");
- recMsgMap.remove(msgNo);
- continue;
- }
- recMsgMap.put(msgNo, resultMsg);
- try{
- lock.lock();
- msglock.signalAll();
- }finally {
- lock.unlock();
- }
- }catch(SocketException e){
- log.error("服務(wù)端關(guān)閉socket",e);
- reConnectToCTCC();
- } catch(IOException e) {
- log.error("接收線程讀取響應(yīng)數(shù)據(jù)時(shí)發(fā)生IO流異常",e);
- } catch(NumberFormatException e){
- log.error("收到?jīng)]良心包,String轉(zhuǎn)int異常,異常字符:"+intStr);
- }
- }
- }
- }
- }
以上就是對Java Socket通信技術(shù)中收發(fā)線程互斥的詳細(xì)解決方法。希望大家有所領(lǐng)悟。
【編輯推薦】
責(zé)任編輯:張浩
來源:
博客園