基于okhttp和RxJava封裝的自動重連的WebSocket
51CTO和華為官方合作共建的鴻蒙技術(shù)社區(qū)
一. 概述
RxWebSocket是一個(gè)基于okhttp和RxJava封裝的WebSocket客戶端,此庫的核心特點(diǎn)是 除了手動關(guān)閉WebSocket(就是RxJava取消訂閱),WebSocket在異常關(guān)閉的時(shí)候(onFailure,發(fā)生異常,如WebSocketException等等),會自動重連,永不斷連.其次,對WebSocket做的緩存處理,同一個(gè)URL,共享一個(gè)WebSocket.
由于是基于RxJava封裝,所以帶來了無限可能,可以和RxBinding,Rxlifecycle一起使用,方便對WebSocket的管理.
效果圖

項(xiàng)目已經(jīng)上傳Jcenter,依賴方法:
- //本項(xiàng)目
- compile 'io.github.dzsf:RxWebSocket:1.0.0'
二. 使用方法
0.初始化,可以也忽略直接使用.
如果你想使用自己的okhttpClient:
- OkHttpClient yourClient = new OkHttpClient();
- RxWebSocketUtil.getInstance().setClient(yourClient);
是否打印日志:
- RxWebSocketUtil.getInstance().setShowLog(BuildConfig.DEBUG);
1.獲取一個(gè)WebSocket,接收消息,多種方式:
- RxWebSocketUtil.getInstance().getWebSocketInfo(url)
- .subscribe(new Action1<WebSocketInfo>() {
- @Override
- public void call(WebSocketInfo webSocketInfo) {
- mWebSocket = webSocketInfo.getWebSocket();
- Log.d("MainActivity", webSocketInfo.getString());
- Log.d("MainActivity", "ByteString:" + webSocketInfo.getByteString());
- }
- });
- mWebSocket.send("hello word");
- //get StringMsg
- RxWebSocketUtil.getInstance().getWebSocketString(url)
- .subscribe(new Action1<String>() {
- @Override
- public void call(String s) {
- }
- });
- // get ByteString
- RxWebSocketUtil.getInstance().getWebSocketByteString(url)
- .subscribe(new Action1<ByteString>() {
- @Override
- public void call(ByteString byteString) {
- }
- });
- //get WebSocket
- RxWebSocketUtil.getInstance().getWebSocket(url)
- .subscribe(new Action1<WebSocket>() {
- @Override
- public void call(WebSocket webSocket) {
- }
- });
- // 帶timeout的WebSocket,當(dāng)在指定時(shí)間內(nèi)沒有收到消息,就重連WebSocket.為了適配小米平板.
- RxWebSocketUtil.getInstance().getWebSocketInfo(url,10, TimeUnit.SECONDS)
- .subscribe(new Action1<WebSocketInfo>() {
- @Override
- public void call(WebSocketInfo webSocketInfo) {
- }
- });
2.發(fā)送消息:
- //用WebSocket的引用直接發(fā)
- mWebSocket.send("hello word");
- //url 對應(yīng)的WebSocket 必須打開,否則報(bào)錯(cuò)
- RxWebSocket.send(sendUrl, "hello");
- RxWebSocket.send(sendUrl, ByteString.EMPTY);
- //異步發(fā)送,若WebSocket已經(jīng)打開,直接發(fā)送,若沒有打開,打開一個(gè)WebSocket發(fā)送完數(shù)據(jù),直接關(guān)閉.
- RxWebSocket.asyncSend(sendUrl, "hello");
- RxWebSocket.asyncSend(sendUrl, ByteString.EMPTY);
3.關(guān)閉WebSocket:
項(xiàng)目是依托RxJava實(shí)現(xiàn)的,所以關(guān)閉WebSocket的方法也就是在適當(dāng)?shù)臅r(shí)候注銷 Observable,項(xiàng)目里的demo里,寫了一個(gè)簡單的lifecycle,將Observable生命綁定到Activity的onDestroy,自動注銷.代碼細(xì)節(jié)請看demo,因?yàn)閮?nèi)部實(shí)現(xiàn)了同一個(gè)URL的WebSocket共享機(jī)制,所以當(dāng)外部所有持有這個(gè)URL的Observable都注銷后,這個(gè)WebSocket連接就會自動關(guān)閉.請看原理解析部分
- //注意取消訂閱,有多種方式,比如 rxlifecycle
- mSubscription = RxWebSocketUtil.getInstance().getWebSocketInfo(url)
- .subscribe(new Action1<WebSocketInfo>() {
- @Override
- public void call(WebSocketInfo webSocketInfo) {
- mWebSocket = webSocketInfo.getWebSocket();
- if (webSocketInfo.isOnOpen()) {
- Log.d("MainActivity", " on WebSocket open");
- } else {
- String string = webSocketInfo.getString();
- if (string != null) {
- Log.d("MainActivity", string);
- textview.setText(Html.fromHtml(string));
- }
- ByteString byteString = webSocketInfo.getByteString();
- if (byteString != null) {
- Log.d("MainActivity",
- "webSocketInfo.getByteString():" +
- byteString);
- }
- }
- }
- });
- //注銷
- if (mSubscription != null) {
- mSubscription.unsubscribe();
- }
- //lifecycle注銷,詳情看demo
- RxWebSocketUtil.getInstance().getWebSocketString(url)
- .compose(this.<String>bindOnActivityEvent(ActivityEvent.onDestory))
- .subscribe(new Action1<String>() {
- @Override
- public void call(String s) {
- }
- });
三. 原理解析
1. 首先需要將okhttp的WebSocket包裝成Observable,由于需要將WebSocket,Stringmsg,ByteString等信息一同發(fā)送給觀察者所以先構(gòu)建一個(gè)WebSocketInfo類,將信息封裝:
- public class WebSocketInfo {
- private WebSocket mWebSocket;
- private String mString;
- private ByteString mByteString;
- private boolean onOpen;
- //其他省略
- }
onOpen字段主要用來判斷當(dāng)前的這個(gè)WebSocketInfo是否是當(dāng)WebSocket打開時(shí)發(fā)送的消息(onOpen),這時(shí),Stringmsg和ByteString都是null.
2. 將WebSocketInfo包裝成Observable發(fā)出:
- private final class WebSocketOnSubscribe implements Observable.OnSubscribe<WebSocketInfo> {
- private String url;
- private WebSocket webSocket;
- private WebSocketInfo startInfo;
- private WebSocketInfo stringInfo;
- private WebSocketInfo byteStringInfo;
- public WebSocketOnSubscribe(String url) {
- this.url = url;
- startInfo = new WebSocketInfo(true);
- stringInfo = new WebSocketInfo();
- byteStringInfo = new WebSocketInfo();
- }
- @Override
- public void call(final Subscriber<?super WebSocketInfo> subscriber) {
- if (webSocket != null) {
- //降低重連頻率
- if (!"main".equals(Thread.currentThread().getName())) {
- SystemClock.sleep(2000);
- }
- }
- initWebSocket(subscriber);
- }
- private void initWebSocket(
- final Subscriber<?super WebSocketInfo> subscriber) {
- webSocket = client.newWebSocket(getRequest(url),
- new WebSocketListener() {
- @Override
- public void onOpen(final WebSocket webSocket,
- Response response) {
- if (showLog) {
- Log.d("RxWebSocketUtil", url + " --> onOpen");
- }
- webSocketMap.put(url, webSocket);
- AndroidSchedulers.mainThread().createWorker().schedule(new Action0() {
- @Override
- public void call() {
- if (!subscriber.isUnsubscribed()) {
- subscriber.onStart();
- startInfo.setWebSocket(webSocket);
- subscriber.onNext(startInfo);
- }
- }
- });
- }
- @Override
- public void onMessage(WebSocket webSocket, String text) {
- if (!subscriber.isUnsubscribed()) {
- stringInfo.setWebSocket(webSocket);
- stringInfo.setString(text);
- subscriber.onNext(stringInfo);
- }
- }
- @Override
- public void onMessage(WebSocket webSocket, ByteString bytes) {
- if (!subscriber.isUnsubscribed()) {
- byteStringInfo.setWebSocket(webSocket);
- byteStringInfo.setByteString(bytes);
- subscriber.onNext(byteStringInfo);
- }
- }
- @Override
- public void onFailure(WebSocket webSocket, Throwable t,
- Response response) {
- if (showLog) {
- Log.e("RxWebSocketUtil",
- t.toString() +
- webSocket.request().url().uri().getPath());
- }
- if (!subscriber.isUnsubscribed()) {
- subscriber.onError(t);
- }
- }
- @Override
- public void onClosing(WebSocket webSocket, int code,
- String reason) {
- webSocket.close(1000, null);
- }
- @Override
- public void onClosed(WebSocket webSocket, int code,
- String reason) {
- if (showLog) {
- Log.d("RxWebSocketUtil",
- url + " --> onClosed:code= " + code);
- }
- }
- });
- subscriber.add(new MainThreadSubscription() {
- @Override
- protected void onUnsubscribe() {
- webSocket.close(3000, "手動關(guān)閉");
- }
- });
- }
- }
實(shí)現(xiàn)一個(gè)WebSocketOnSubscribe 將WebSocket的回調(diào)轉(zhuǎn)化成subscriber調(diào)用.發(fā)送給Observable下游.在onOpen時(shí)調(diào)用 subscriber.onStart(),并且發(fā)送一個(gè)onOpen的WebSocketInfo.在subscriber注銷的時(shí)候關(guān)閉WebSocket.在call方法最上面有個(gè)SystemClock.sleep(2000),這個(gè)主要是為了降低在斷連的時(shí)候的重連頻率,將在下面講到.
包裝成Observable:
- Observable.create(new WebSocketOnSubscribe(url))
- .subscribeOn(Schedulers.io())
- .observeOn(AndroidSchedulers.mainThread());
3. 實(shí)現(xiàn)自動重連:
- Observable.create(new WebSocketOnSubscribe(url))
- //自動重連
- .timeout(timeout, timeUnit).retry()
- .subscribeOn(Schedulers.io())
- .observeOn(AndroidSchedulers.mainThread());
RxJava retry操作符,很完美的實(shí)現(xiàn)了這個(gè)功能,當(dāng)上游發(fā)出Throwable的時(shí)候,retry將錯(cuò)誤吃掉,并重新調(diào)用 onSubscribe的call方法,也就是WebSocketOnSubscribe的call,就會重新初始化一個(gè)WebSocket連接,達(dá)到重連的目的,如果一直沒有網(wǎng)絡(luò),這個(gè)retry的調(diào)用頻率非常高,所以在call方法里面,當(dāng)是重連的時(shí)候,就SystemClock.sleep(2000),休眠2秒,這樣重連的頻率就是2秒重連一次. 當(dāng)然在retry上面還有一個(gè)timeout操作符.當(dāng)subscriber.onNext()在指定時(shí)間間隔里沒有調(diào)用,就發(fā)出一個(gè)timeoutException,讓retry重連WebSocket.這個(gè)主要是為了適配部分國產(chǎn)機(jī)型,當(dāng)WebSocket發(fā)生連接異常時(shí),不會及時(shí)發(fā)出錯(cuò)誤,如小米平板.在每次重連都會把原來的WebSocket關(guān)閉.
4. 實(shí)現(xiàn)同一個(gè)URL的WebSocket共享
- Observable.create(new WebSocketOnSubscribe(url))
- //自動重連
- .timeout(timeout, timeUnit)
- .retry()
- //共享
- .doOnUnsubscribe(new Action0() {
- @Override
- public void call() {
- observableMap.remove(url);
- webSocketMap.remove(url);
- if (showLog) {
- Log.d("RxWebSocketUtil", "注銷");
- }
- }
- })
- .doOnNext(new Action1<WebSocketInfo>() {
- @Override
- public void call(WebSocketInfo webSocketInfo) {
- if (webSocketInfo.isOnOpen()) {
- webSocketMap.put(url, webSocketInfo.getWebSocket());
- }
- }
- })
- .share()
- .subscribeOn(Schedulers.io())
- .observeOn(AndroidSchedulers.mainThread());
實(shí)現(xiàn)共享功能,主要是為了防止一個(gè)URL的WebSocket,建立多個(gè)連接,這個(gè)主要是由RxJava的share操作符實(shí)現(xiàn),share操作符,使得一個(gè)Observable可以有多個(gè)subscriber,當(dāng)有多個(gè)subscriber時(shí),當(dāng)所有的subscriber都取消訂閱,這個(gè)Observable才會取消訂閱.
getWebSocketInfo()方法完整代碼:
- public Observable<WebSocketInfo> getWebSocketInfo(final String url, final long timeout, final TimeUnit timeUnit) {
- Observable<WebSocketInfo> observable = observableMap.get(url);
- if (observable == null) {
- observable = Observable.create(new WebSocketOnSubscribe(url))
- //自動重連
- .timeout(timeout, timeUnit)
- .retry()
- //共享
- .doOnUnsubscribe(new Action0() {
- @Override
- public void call() {
- observableMap.remove(url);
- webSocketMap.remove(url);
- if (showLog) {
- Log.d("RxWebSocketUtil", "注銷");
- }
- }
- })
- .doOnNext(new Action1<WebSocketInfo>() {
- @Override
- public void call(WebSocketInfo webSocketInfo) {
- if (webSocketInfo.isOnOpen()) {
- webSocketMap.put(url, webSocketInfo.getWebSocket());
- }
- }
- })
- .share()
- .subscribeOn(Schedulers.io())
- .observeOn(AndroidSchedulers.mainThread());
- observableMap.put(url, observable);
- } else {
- observable = Observable.merge(Observable.just(new WebSocketInfo(webSocketMap.get(url), true)), observable);
- }
- return observable;
- }
doOnUnsubscribe作用:在Observable注銷,即 WebSocket關(guān)閉時(shí),移除map中的緩存的Observable和WebSocket.
doOnNext作用: 判斷接收到的WebSocketInfo是否是WebSocket在onOpen的時(shí)候發(fā)的,然后將其緩存起來.作用就是:如果有一個(gè)相同的URL訂閱Observable,就從緩存中取,這個(gè)時(shí)候我們應(yīng)該把一個(gè)WebSocket的onOpen事件也發(fā)給這個(gè)訂閱者:
- //使用merge操作符,將onOpen事件發(fā)給訂閱者
- observable = Observable.merge(Observable.just(new WebSocketInfo(webSocketMap.get(url), true)), observable);
這樣的話,同一個(gè)URL的WebSocket,不管在什么地方什么時(shí)間訂閱,都能收到一個(gè)onOpen事件,外部表現(xiàn)的就像一個(gè)新的WebSocket.
getWebSocketInfo方法的幾種變體:
- /**
- * default timeout: 30 days
- * <p>
- * 若忽略小米平板,請調(diào)用這個(gè)方法
- * </p>
- */
- public Observable<WebSocketInfo> getWebSocketInfo(String url) {
- return getWebSocketInfo(url, 30, TimeUnit.DAYS);
- }
- public Observable<String> getWebSocketString(String url) {
- return getWebSocketInfo(url)
- .map(new Func1<WebSocketInfo, String>() {
- @Override
- public String call(WebSocketInfo webSocketInfo) {
- return webSocketInfo.getString();
- }
- })
- .filter(new Func1<String, Boolean>() {
- @Override
- public Boolean call(String s) {
- return s != null;
- }
- });
- }
- public Observable<ByteString> getWebSocketByteString(String url) {
- return getWebSocketInfo(url)
- .map(new Func1<WebSocketInfo, ByteString>() {
- @Override
- public ByteString call(WebSocketInfo webSocketInfo) {
- return webSocketInfo.getByteString();
- }
- })
- .filter(new Func1<ByteString, Boolean>() {
- @Override
- public Boolean call(ByteString byteString) {
- return byteString != null;
- }
- });
- }
- public Observable<WebSocket> getWebSocket(String url) {
- return getWebSocketInfo(url)
- .map(new Func1<WebSocketInfo, WebSocket>() {
- @Override
- public WebSocket call(WebSocketInfo webSocketInfo) {
- return webSocketInfo.getWebSocket();
- }
- });
- }
5 . send信息到服務(wù)端
上面已經(jīng)講到WebSocketInfo包含了WebSocket,所以在訂閱后,就可以拿到這個(gè)WebSocket引用就可以WebSocket.send發(fā)送消息到服務(wù)端.當(dāng)然我們的RxWebSocketUtil已經(jīng)將開啟的WebSocket已經(jīng)緩存.所以我們也可以這樣發(fā)消息:
- /**
- * 如果url的WebSocket已經(jīng)打開,可以直接調(diào)用這個(gè)發(fā)送消息.
- *
- * @param url
- * @param msg
- */
- public void send(String url, String msg) {
- WebSocket webSocket = webSocketMap.get(url);
- if (webSocket != null) {
- webSocket.send(msg);
- } else {
- throw new IllegalStateException("The WebSokcet not open");
- }
- }
- /**
- * 如果url的WebSocket已經(jīng)打開,可以直接調(diào)用這個(gè)發(fā)送消息.
- *
- * @param url
- * @param byteString
- */
- public void send(String url, ByteString byteString) {
- WebSocket webSocket = webSocketMap.get(url);
- if (webSocket != null) {
- webSocket.send(byteString);
- } else {
- throw new IllegalStateException("The WebSokcet not open");
- }
- }
當(dāng)指定的URL的WebSocket沒有打開會直接報(bào)錯(cuò).
異步發(fā)送消息到服務(wù)端
- /**
- * 不用關(guān)心url 的WebSocket是否打開,可以直接發(fā)送
- *
- * @param url
- * @param msg
- */
- public void asyncSend(String url, final String msg) {
- getWebSocket(url)
- .first()
- .subscribe(new Action1<WebSocket>() {
- @Override
- public void call(WebSocket webSocket) {
- webSocket.send(msg);
- }
- });
- }
- /**
- * 不用關(guān)心url 的WebSocket是否打開,可以直接發(fā)送
- *
- * @param url
- * @param byteString
- */
- public void asyncSend(String url, final ByteString byteString) {
- getWebSocket(url)
- .first()
- .subscribe(new Action1<WebSocket>() {
- @Override
- public void call(WebSocket webSocket) {
- webSocket.send(byteString);
- }
- });
- }
這兩種發(fā)送方式,你不用關(guān)心URL的WebSocket是否打開,可以直接發(fā)送.實(shí)現(xiàn)思路也很簡單,getWebSocket(url)會獲取到Observable,或者是從緩存中取,或者是重新開啟一個(gè)WebSocket,但你都不需要關(guān)心,經(jīng)過first操作符后,如果是從緩存取的Observable,就注銷的當(dāng)前的Observable,當(dāng)是新開的WebSocket,注銷掉當(dāng)前的subscriber后,就沒有其他subscriber了,這個(gè)新開的WebSocket就會關(guān)閉(share操作符作用).
51CTO和華為官方合作共建的鴻蒙技術(shù)社區(qū)