Server streaming 是當 client 發送一個 request , 伺服器在一個 RPC 的回應裡,回應零個或多個訊息。 最經典的例子就是 client 做查詢,伺服器回應多個結果回來。
這篇只會說明怎麼實作 Server 與 Client 程式碼,如果要了解怎麼建立 gRPC 服務,可以參考 用 Go 建立 gRPC 的 Server 與 Client
讓我們用下列的訊息當作範例,我們定義了一個 RPC 方法,會接收 SearchRequest 的訊息,並且回傳 stream SearchResponse 給 Client 端。
proto1 2 3 4 5 6 7 8 9 10 11
| syntax = "proto3"; service Database { rpc Search(SearchRequest) returns (stream SearchResponse); } message SearchRequest { string term = 1; } message SearchResponse { string matched_term = 1; string content = 2; }
|
有定義檔之後,使用 protoc
產生程式碼,使用方法可參考 用 Go 建立 gRPC 的 Server 與 Client。 以我這篇的範例是執行 $ protoc -I pb/ pb/*.proto --go_out=plugins=grpc:pb
產生程式碼。
Server Code
接下來就是來實作 Server Code 啦。 我建立一個 DatabaseService 實作 Search
的方法,裡面我預設就回傳四筆資料。
其中比較特別的是這行 time.Sleep(1 * time.Second) ,我在每一次送資料的時候暫停一秒,用來驗證說每一次送資料給 Client 的時候會直接送到,而不會等到全部執行完才傳給 Client,這等等會在 Client Code 那邊看到效果。
那怎樣才算 Server 回傳結束呢? 以 Golang 的範例來說,只要回傳 error 就好了。 如果你是回傳 nil 則 Client 的 error 物件會收到 io.EOF,代表資料已經傳輸完畢。
server1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| package service
import ( "log" "time" "trygrpc/pb" )
type DatabaseService struct {}
func (d DatabaseService) Search(r *pb.SearchRequest, s pb.Database_SearchServer) error {
responses := []string{ "Highest ranked content", "Some ranked content", "Some ranked content", "Lowest ranked content", }
for _, resp := range responses { result := &pb.SearchResponse{MatchedTerm: r.Term, Content: resp} if err := s.Send(result); err != nil { log.Printf("Error sending message to the client:%v", err) return err }
time.Sleep(1 * time.Second) }
return nil }
|
如果要回傳其他 error 的話建議使用 grpc package google.golang.org/grpc/codes
, google.golang.org/grpc/status
裡面預設的 error code 。
使用方法 err := status.Error(codes.NotFound, "id was not found")
。
Client Code
Clinet code 接收資料的方式就是建立一個無窮迴圈,呼叫 Recv() 這個方法接收資料,當這個方法回傳的 error 是 io.EOF
的時候呢,代表資料傳輸結束,就可以中斷迴圈了。
client1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
| package main
import ( "context" "fmt" "google.golang.org/grpc" "google.golang.org/grpc/credentials" "io" "log" "time" "trygrpc/pb" )
const ( address = "localhost:50051" )
func main() { creds, _ := credentials.NewClientTLSFromFile("../cert.pem", "")
conn, _ := grpc.Dial(address, grpc.WithTransportCredentials(creds), grpc.WithBlock())
defer conn.Close()
c := pb.NewDatabaseClient(conn)
result, _ := c.Search(context.Background(), &pb.SearchRequest{ Term: "test term", })
for { res, err := result.Recv()
if err == io.EOF { break } else if err != nil { log.Fatal(err) }
fmt.Printf("Received time: %v:%v, data: %v \n", time.Now().Minute(), time.Now().Second() ,res) } }
|
這邊可以看到 Client 也是每隔一秒收到一個訊息。 根據這個驗證,可以得知我送一個 request 給 Server 後,Server 回傳多個 Responses 給我
這邊補充一個 Client error 處理的方式,如果 Server 是使用 grpc package 裡面預設的 error code ,在 client 就可以使用下方的方式比對 error。
s := status.Convert(err)
if s.Code() == codes.Aborted {
fmt.Println(“比對成功”)
}
延伸閱讀
[gRPC Basics - Go]
[Go by Example: Time Formatting / Parsing]
[Advanced gRPC Error Usage]