gRPC入門指南之 雙向流式RPC
本文轉(zhuǎn)載自微信公眾號(hào)「Golang來(lái)啦」,作者Seekload。轉(zhuǎn)載本文請(qǐng)聯(lián)系Golang來(lái)啦公眾號(hào)。
你好,我是 Seekload!
前言
前一篇文章我們學(xué)習(xí)了客戶端流式 RPC,客戶端多次向服務(wù)端發(fā)送數(shù)據(jù),發(fā)送結(jié)束之后,由服務(wù)端返回一個(gè)響應(yīng)。與服務(wù)端流式 RPC類似,都只支持單項(xiàng)連續(xù)發(fā)送數(shù)據(jù),今天我們要來(lái)學(xué)習(xí)雙向流式 RPC 支持通信雙方同時(shí)多次發(fā)送或接收數(shù)據(jù)。如下如所示:
新建并編譯proto文件
新建 bidirectional_stream.proto 文件:
- syntax = "proto3";
- package proto;
- // 定義流式請(qǐng)求信息
- message StreamRequest{
- // 參數(shù)類型 參數(shù)名稱 標(biāo)識(shí)號(hào)
- string data = 1;
- }
- // 定義流響應(yīng)信息
- message StreamResponse{
- int32 code = 1;
- string value = 2;
- }
- // 定義我們的服務(wù)(可以定義多個(gè)服務(wù),每個(gè)服務(wù)可以定義多個(gè)接口)
- service StreamService{
- // 雙向流RPC,需要在請(qǐng)求、響應(yīng)數(shù)據(jù)前加stream
- rpc Record(stream StreamRequest) returns (stream StreamResponse){};
- }
雙向流式 RPC,定義方法時(shí)需要在請(qǐng)求值和返回值之前加上 stream。
進(jìn)入 bidirectional_stream.proto 所在的目錄,使用如下命令編譯文件
- protoc --go_out=plugins=grpc:. bidirectional_stream.proto
執(zhí)行完成之后會(huì)生成 bidirectional_stream.pb.go 文件。
創(chuàng)建server端
- package main
- import (
- pb "go-grpc-example/4-bidirectional_stream_rpc/proto"
- "google.golang.org/grpc"
- "io"
- "log"
- "net"
- "strconv"
- "time"
- )
- const (
- Address string = ":8000"
- Network string = "tcp"
- )
- // 定義我們的服務(wù)
- type StreamService struct{}
- // 實(shí)現(xiàn) Record() 方法
- func (s *StreamService) Record(srv pb.StreamService_RecordServer) error {
- n := 1
- for {
- // 接收數(shù)據(jù)
- req, err := srv.Recv()
- if err == io.EOF {
- return nil
- }
- if err != nil {
- log.Fatalf("stream get from client err: %v", err)
- return err
- }
- // 發(fā)送數(shù)據(jù)
- err = srv.Send(&pb.StreamResponse{
- Code: int32(n),
- Value: "This is the " + strconv.Itoa(n) + " message",
- })
- if err != nil {
- log.Fatalf("stream send to client err: %v", err)
- return err
- }
- n++
- log.Println("stream get from client: ", req.Data)
- time.Sleep(1 * time.Second)
- }
- return nil
- }
- func main() {
- // 1.監(jiān)聽端口
- listener, err := net.Listen(Network, Address)
- if err != nil {
- log.Fatalf("listener err: %v", err)
- }
- log.Println(Address + " net.Listing...")
- // 2.實(shí)例化gRPC實(shí)例
- grpcServer := grpc.NewServer()
- // 3.注冊(cè)我們的服務(wù)
- pb.RegisterStreamServiceServer(grpcServer, &StreamService{})
- // 4.啟動(dòng)gRPC服務(wù)端
- err = grpcServer.Serve(listener)
- if err != nil {
- log.Fatalf("grpc server err: %v", err)
- }
- }
在實(shí)現(xiàn)的 Record() 方法中,for() 循環(huán)里面讀取客戶端發(fā)送的消息并返回一個(gè)響應(yīng)數(shù)據(jù)。
運(yùn)行服務(wù)端:
- go run server.go
- 輸出:
- :8000 net listening...
創(chuàng)建client端
- package main
- import (
- "context"
- pb "go-grpc-example/4-bidirectional_stream_rpc/proto"
- "google.golang.org/grpc"
- "io"
- "log"
- "strconv"
- "time"
- )
- const Address = ":8000"
- func main() {
- // 1.連接服務(wù)端
- conn, err := grpc.Dial(Address, grpc.WithInsecure())
- if err != nil {
- log.Fatalf("grpc conn err: %v", err)
- }
- defer conn.Close()
- // 2.創(chuàng)建gRPC客戶端
- grpcClient := pb.NewStreamServiceClient(conn)
- // 3.調(diào)用 Record() 方法獲取流
- stream, err := grpcClient.Record(context.Background())
- if err != nil {
- log.Fatalf("call record err: %v", err)
- }
- for i := 0; i < 5; i++ {
- // 4.發(fā)送數(shù)據(jù)
- err := stream.Send(&pb.StreamRequest{
- Data: strconv.Itoa(i),
- })
- if err != nil {
- log.Fatalf("stream send to server err: %v", err)
- }
- // 5.接收服務(wù)端發(fā)送過(guò)來(lái)的數(shù)據(jù)
- resp, err := stream.Recv()
- if err == io.EOF {
- break
- }
- if err != nil {
- log.Fatalf("stream get from server err: %v", err)
- }
- log.Printf("stream get from server,code:%v,value:%v", resp.GetCode(), resp.Value)
- time.Sleep(1 * time.Second)
- }
- // 6.關(guān)閉流
- err = stream.CloseSend()
- if err != nil {
- log.Fatalf("close stream err:%v", err)
- }
- }
客戶端代碼,在 for() 循環(huán)里面向服務(wù)端發(fā)送了 5 次消息,并接收服務(wù)端返回的數(shù)據(jù),5次數(shù)據(jù)交互之后調(diào)用 CloseSend() 關(guān)閉流。
運(yùn)行客戶端:
- go run client.go
客戶端輸出:
- stream get from server,code:1,value:This is the 1 message
- stream get from server,code:2,value:This is the 2 message
- stream get from server,code:3,value:This is the 3 message
- stream get from server,code:4,value:This is the 4 message
- stream get from server,code:5,value:This is the 5 message
服務(wù)端輸出:
- stream get from client: 0
- stream get from client: 1
- stream get from client: 2
- stream get from client: 3
- stream get from client: 4
觀察仔細(xì)的同學(xué)會(huì)注意到,客戶端和服務(wù)端是交替輸出的。
總結(jié)
這篇文章我們簡(jiǎn)單介紹了 gRPC 的雙向流式 RPC,支持通信雙方同時(shí)多次發(fā)送或接收數(shù)據(jù)。