Server streaming 是當 client 發送一個 request , 伺服器在一個 RPC 的回應裡,回應零個或多個訊息。 最經典的例子就是 client 做查詢,伺服器回應多個結果回來。

這篇只會說明怎麼實作 Server 與 Client 程式碼,如果要了解怎麼建立 gRPC 服務,可以參考 用 Go 建立 gRPC 的 Server 與 Client

讓我們用下列的訊息當作範例,我們定義了一個 RPC 方法,會接收 SearchRequest 的訊息,並且回傳 stream SearchResponse 給 Client 端。

proto
  • proto
1
2
3
4
5
6
7
8
9
10
11
syntax = "proto3";
service Database {
rpc Search(SearchRequest) returns (stream SearchResponse);
}
message SearchRequest {
string term = 1;
}
message SearchResponse {
string matched_term = 1;
string content = 2;
}

有定義檔之後,使用 protoc 產生程式碼,使用方法可參考 用 Go 建立 gRPC 的 Server 與 Client。 以我這篇的範例是執行 $ protoc -I pb/ pb/*.proto --go_out=plugins=grpc:pb 產生程式碼。

Server Code

接下來就是來實作 Server Code 啦。 我建立一個 DatabaseService 實作 Search 的方法,裡面我預設就回傳四筆資料。

其中比較特別的是這行 time.Sleep(1 * time.Second) ,我在每一次送資料的時候暫停一秒,用來驗證說每一次送資料給 Client 的時候會直接送到,而不會等到全部執行完才傳給 Client,這等等會在 Client Code 那邊看到效果。

那怎樣才算 Server 回傳結束呢? 以 Golang 的範例來說,只要回傳 error 就好了。 如果你是回傳 nil 則 Client 的 error 物件會收到 io.EOF,代表資料已經傳輸完畢。

server
  • go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package service

import (
"log"
"time"
"trygrpc/pb"
)

type DatabaseService struct {}

func (d DatabaseService) Search(r *pb.SearchRequest, s pb.Database_SearchServer) error {

// 預設回傳 4 個 Responses
responses := []string{
"Highest ranked content",
"Some ranked content",
"Some ranked content",
"Lowest ranked content",
}

for _, resp := range responses {
result := &pb.SearchResponse{MatchedTerm: r.Term, Content: resp}
if err := s.Send(result); err != nil {
log.Printf("Error sending message to the client:%v", err)
return err
}

// 測試 stream 的效果
time.Sleep(1 * time.Second)
}

return nil
}

如果要回傳其他 error 的話建議使用 grpc package google.golang.org/grpc/codes, google.golang.org/grpc/status 裡面預設的 error code 。

使用方法 err := status.Error(codes.NotFound, "id was not found")

Client Code

Clinet code 接收資料的方式就是建立一個無窮迴圈,呼叫 Recv() 這個方法接收資料,當這個方法回傳的 error 是 io.EOF 的時候呢,代表資料傳輸結束,就可以中斷迴圈了。

client
  • go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
package main

import (
"context"
"fmt"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"io"
"log"
"time"
"trygrpc/pb"
)

const (
address = "localhost:50051"
)

func main() {
// Create tls based credential.
creds, _ := credentials.NewClientTLSFromFile("../cert.pem", "")

// Set up a connection to the server.
conn, _ := grpc.Dial(address, grpc.WithTransportCredentials(creds), grpc.WithBlock())

defer conn.Close()

c := pb.NewDatabaseClient(conn)

result, _ := c.Search(context.Background(),
&pb.SearchRequest{
Term: "test term",
})

for {
res, err := result.Recv()

if err == io.EOF {
break
} else if err != nil {
log.Fatal(err)
}

fmt.Printf("Received time: %v:%v, data: %v \n", time.Now().Minute(), time.Now().Second() ,res)
}
}

這邊可以看到 Client 也是每隔一秒收到一個訊息。 根據這個驗證,可以得知我送一個 request 給 Server 後,Server 回傳多個 Responses 給我

這邊補充一個 Client error 處理的方式,如果 Server 是使用 grpc package 裡面預設的 error code ,在 client 就可以使用下方的方式比對 error。

s := status.Convert(err)
if s.Code() == codes.Aborted {
fmt.Println(“比對成功”)
}

延伸閱讀

[gRPC Basics - Go]
[Go by Example: Time Formatting / Parsing]
[Advanced gRPC Error Usage]