自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

Netty與WebSocket:輕松搞定消息推送

開發(fā) 前端
學(xué)過 Netty 的都知道,Netty 對 NIO 進行了很好的封裝,簡單的 API,龐大的開源社區(qū)。深受廣大程序員喜愛?;诖吮疚姆窒硪幌禄A(chǔ)的 netty 使用。實戰(zhàn)制作一個 Netty + websocket 的消息推送小栗子。

學(xué)過 Netty 的都知道,Netty 對 NIO 進行了很好的封裝,簡單的 API,龐大的開源社區(qū)。深受廣大程序員喜愛?;诖吮疚姆窒硪幌禄A(chǔ)的 netty 使用。實戰(zhàn)制作一個 Netty + websocket 的消息推送小栗子。

netty服務(wù)器

@Component
public class NettyServer {

    static final Logger log = LoggerFactory.getLogger(NettyServer.class);

    /**
     * 端口號
     */
    @Value("${webSocket.netty.port:8888}")
    int port;

    EventLoopGroup bossGroup;
    EventLoopGroup workGroup;

    @Autowired
    ProjectInitializer nettyInitializer;

    @PostConstruct
    public void start() throws InterruptedException {
        new Thread(() -> {
            bossGroup = new NioEventLoopGroup();
            workGroup = new NioEventLoopGroup();
            ServerBootstrap bootstrap = new ServerBootstrap();
            // bossGroup輔助客戶端的tcp連接請求, workGroup負責與客戶端之前的讀寫操作
            bootstrap.group(bossGroup, workGroup);
            // 設(shè)置NIO類型的channel
            bootstrap.channel(NioServerSocketChannel.class);
            // 設(shè)置監(jiān)聽端口
            bootstrap.localAddress(new InetSocketAddress(port));
            // 設(shè)置管道
            bootstrap.childHandler(nettyInitializer);

            // 配置完成,開始綁定server,通過調(diào)用sync同步方法阻塞直到綁定成功
            ChannelFuture channelFuture = null;
            try {
                channelFuture = bootstrap.bind().sync();
                log.info("Server started and listen on:{}", channelFuture.channel().localAddress());
                // 對關(guān)閉通道進行監(jiān)聽
                channelFuture.channel().closeFuture().sync();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
    }

    /**
     * 釋放資源
     */
    @PreDestroy
    public void destroy() throws InterruptedException {
        if (bossGroup != null) {
            bossGroup.shutdownGracefully().sync();
        }
        if (workGroup != null) {
            workGroup.shutdownGracefully().sync();
        }
    }
}

Netty配置

管理全局Channel以及用戶對應(yīng)的channel(推送消息)

public class NettyConfig {

    /**
     * 定義全局單利channel組 管理所有channel
     */
    private static volatile ChannelGroup channelGroup = null;

    /**
     * 存放請求ID與channel的對應(yīng)關(guān)系
     */
    private static volatile ConcurrentHashMap<String, Channel> channelMap = null;

    /**
     * 定義兩把鎖
     */
    private static final Object lock1 = new Object();
    private static final Object lock2 = new Object();


    public static ChannelGroup getChannelGroup() {
        if (null == channelGroup) {
            synchronized (lock1) {
                if (null == channelGroup) {
                    channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
                }
            }
        }
        return channelGroup;
    }

    public static ConcurrentHashMap<String, Channel> getChannelMap() {
        if (null == channelMap) {
            synchronized (lock2) {
                if (null == channelMap) {
                    channelMap = new ConcurrentHashMap<>();
                }
            }
        }
        return channelMap;
    }

    public static Channel getChannel(String userId) {
        if (null == channelMap) {
            return getChannelMap().get(userId);
        }
        return channelMap.get(userId);
    }
}

管道配置

@Component
public class ProjectInitializer extends ChannelInitializer<SocketChannel> {

    /**
     * webSocket協(xié)議名
     */
    static final String WEBSOCKET_PROTOCOL = "WebSocket";

    /**
     * webSocket路徑
     */
    @Value("${webSocket.netty.path:/webSocket}")
    String webSocketPath;
    @Autowired
    WebSocketHandler webSocketHandler;

    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        // 設(shè)置管道
        ChannelPipeline pipeline = socketChannel.pipeline();
        // 流水線管理通道中的處理程序(Handler),用來處理業(yè)務(wù)
        // webSocket協(xié)議本身是基于http協(xié)議的,所以這邊也要使用http編解碼器
        pipeline.addLast(new HttpServerCodec());
        pipeline.addLast(new ObjectEncoder());
        // 以塊的方式來寫的處理器
        pipeline.addLast(new ChunkedWriteHandler());
        pipeline.addLast(new HttpObjectAggregator(8192));
        pipeline.addLast(new WebSocketServerProtocolHandler(webSocketPath, WEBSOCKET_PROTOCOL, true, 65536 * 10));
        // 自定義的handler,處理業(yè)務(wù)邏輯
        pipeline.addLast(webSocketHandler);
    }
}

自定義handler

@Component
@ChannelHandler.Sharable
public class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
    private static final Logger log = LoggerFactory.getLogger(NettyServer.class);

    /**
     * 一旦連接,第一個被執(zhí)行
     */
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        log.info("有新的客戶端鏈接:[{}]", ctx.channel().id().asLongText());
        // 添加到channelGroup 通道組
        NettyConfig.getChannelGroup().add(ctx.channel());
    }

    /**
     * 讀取數(shù)據(jù)
     */
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
        log.info("服務(wù)器收到消息:{}", msg.text());

        // 獲取用戶ID,關(guān)聯(lián)channel
        JSONObject jsonObject = JSONUtil.parseObj(msg.text());
        String uid = jsonObject.getStr("uid");
        NettyConfig.getChannelMap().put(uid, ctx.channel());

        // 將用戶ID作為自定義屬性加入到channel中,方便隨時channel中獲取用戶ID
        AttributeKey<String> key = AttributeKey.valueOf("userId");
        ctx.channel().attr(key).setIfAbsent(uid);

        // 回復(fù)消息
        ctx.channel().writeAndFlush(new TextWebSocketFrame("服務(wù)器收到消息啦"));
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        log.info("用戶下線了:{}", ctx.channel().id().asLongText());
        // 刪除通道
        NettyConfig.getChannelGroup().remove(ctx.channel());
        removeUserId(ctx);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        log.info("異常:{}", cause.getMessage());
        // 刪除通道
        NettyConfig.getChannelGroup().remove(ctx.channel());
        removeUserId(ctx);
        ctx.close();
    }

    /**
     * 刪除用戶與channel的對應(yīng)關(guān)系
     */
    private void removeUserId(ChannelHandlerContext ctx) {
        AttributeKey<String> key = AttributeKey.valueOf("userId");
        String userId = ctx.channel().attr(key).get();
        NettyConfig.getChannelMap().remove(userId);
    }
}

推送消息接口及實現(xiàn)類

public interface PushMsgService {

    /**
     * 推送給指定用戶
     */
    void pushMsgToOne(String userId, String msg);

    /**
     * 推送給所有用戶
     */
    void pushMsgToAll(String msg);

}
@Service
public class PushMsgServiceImpl implements PushMsgService {

    @Override
    public void pushMsgToOne(String userId, String msg) {
        Channel channel = NettyConfig.getChannel(userId);
        if (Objects.isNull(channel)) {
            throw new RuntimeException("未連接socket服務(wù)器");
        }

        channel.writeAndFlush(new TextWebSocketFrame(msg));
    }

    @Override
    public void pushMsgToAll(String msg) {
        NettyConfig.getChannelGroup().writeAndFlush(new TextWebSocketFrame(msg));
    }
}

測試

圖片圖片

鏈接服務(wù)器

圖片圖片

發(fā)送消息

圖片圖片

調(diào)用接口,往前端推送消息!

圖片圖片


圖片圖片

OK!

一個簡單的 netty 小栗子就完成了。


責任編輯:武曉燕 來源: 一安未來
相關(guān)推薦

2021-02-05 07:28:11

SpringbootNettyWebsocke

2024-09-12 14:50:08

2023-07-26 07:28:55

WebSocket服務(wù)器方案

2025-02-07 08:39:32

Shell部署測試

2024-09-02 09:31:19

2018-04-20 09:36:23

NettyWebSocket京東

2021-03-26 08:16:32

SpringbootWebsocket前端

2022-10-26 09:27:59

Python編程迭代器協(xié)議

2022-09-16 08:04:25

阿里云權(quán)限網(wǎng)絡(luò)

2024-10-11 11:32:22

Spring6RSocket服務(wù)

2009-12-11 15:37:58

Linux日志處理

2017-05-11 15:01:43

Androidweb布局

2009-10-23 17:51:51

Oracle用戶密碼

2010-09-17 14:04:14

JVM內(nèi)存設(shè)置

2022-12-25 10:47:52

2024-09-11 08:35:54

2020-05-11 10:59:02

PythonWord工具

2024-08-09 08:52:26

2024-11-18 17:04:03

Vue3C#

2010-06-04 09:08:56

點贊
收藏

51CTO技術(shù)棧公眾號