Client streaming 就是 gRCP Server streaming 的相反行為,client 會發送多個訊息給伺服器,伺服器會回應一個 Response 給 Client。

範例說明

今天的範例會模擬使用者上傳一個圖片給伺服器,最主要使用者要提供兩個訊息,一個是要告訴伺服器是哪一個 EmployeeId 上傳的,一個是將 圖片轉成 bytes 格式 送出給伺服器。

這是這篇的 message type 範例,有一個 Employee Service ,且有 AddPhoto 方法提供上傳圖片。 這邊可以看到 message AddPhotoRequest 只有一個 data 的欄位,為什麼沒有 EmployeeId 的欄位呢? 因為如果放在 message 裡面的話,我們會多傳送好多次 EmployeeId 給 Server ,造成多餘的流量傳送,所以 EmployeeId 我們會採用 metadata 方式來傳送。

proto
  • proto
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
syntax = "proto3";

package pb;

message AddPhotoRequest {
bytes data = 1;
}

message AddPhotoResponse {
int32 employeeId = 1;
bool isOk = 2;
}

service Employee {
rpc AddPhoto (stream AddPhotoRequest) returns (AddPhotoResponse);
}

Client Code

我們的 Client 的範例會上傳一個 58k 左右的圖片檔案,並且一次傳 10k 給伺服器,總共會送 6 次訊息給伺服器。 實作此範例 code 的重點如下

  1. 在傳送資料給伺服器的時候,使用 google.golang.org/grpc/metadata 建立自訂的 metadata
  2. 讀取圖片,每次讀取 10k 的大小,然後傳送給伺服器
  3. 最後圖片傳送完之後,呼叫 CloseAndRecv() 跟伺服器說資料傳送完畢。
client
  • go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
package main

import (
"context"
"fmt"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/metadata"
"io"
"os"
"log"
"trygrpc/pb"
)

const (
address = "localhost:50051"
)

func main() {
// Create tls based credential.
creds, _ := credentials.NewClientTLSFromFile("../cert.pem", "")

// Set up a connection to the server.
conn, _ := grpc.Dial(address, grpc.WithTransportCredentials(creds), grpc.WithBlock())

defer conn.Close()

c := pb.NewEmployeeClient(conn)
// metadata 加入 employeeid 的資料
md := metadata.New(map[string]string{"employeeid":"9527"})
ctx := metadata.NewOutgoingContext(context.Background(), md)
stream, _ := c.AddPhoto(ctx)

// 打開圖片
f, _ := os.Open("goteam.jpg")
for {

// 一次讀取 10k
chunk := make([]byte, 10 * 1024)
n, err := f.Read(chunk)

if err == io.EOF {
break
}

if err != nil {
log.Fatal(err)
}

// 最後一個 chunk 不一定剛好 10k ,所以就把不滿10k的 array 去除,防止送出多餘的 byte。
if n < len(chunk) {
chunk = chunk[:n]
}

stream.Send(&pb.AddPhotoRequest{Data: chunk})
}

res, _ := stream.CloseAndRecv()

fmt.Println(res)
}

這裡要特別說的是,我在建立 metadata 的時候 key 是用全小寫的英文 md := metadata.New(map[string]string{"employeeid":"9527"})。 因為當 metadata 在送的時候會全部轉成小寫,這是為了有效率的壓縮 header。 所以就算你是設定大寫,在伺服器端也是要用全小寫的方式取值。

出處: Practical gRPC - CHAPTER 4: gRPC basics

Server Code

Server code 就是用 buufer 儲存 client 傳過來的 bytes,等傳送完之後,將 buffer 的資料轉換成圖片檔案。 實作重點如下

  1. 從 metadata 讀取 client 傳過的 employeeid
  2. 建立 buffer 接收 client 傳過來的圖片
  3. 等 client 結束傳送後,將 buffer 的資料編碼成圖片
server
  • go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
package service

import (
"bytes"
"fmt"
"google.golang.org/grpc/metadata"
"image"
"image/jpeg"
"io"
"os"
"strconv"
"trygrpc/pb"
)

type EmployeeService struct {}

func (e EmployeeService) AddPhoto(r pb.Employee_AddPhotoServer) error {

// 從 Context 讀取 employeeId
md, _ := metadata.FromIncomingContext(r.Context())
employeeId, _ := strconv.Atoi(md["employeeid"][0])

// 建立 buufer 讀取接收的 bytes
buf := new(bytes.Buffer)

for {
res, err := r.Recv()

if err == io.EOF {

// 將 buffer 的 bytes 寫入檔案
out, _ := os.Create("./testread.jpeg")
img, _ , _ := image.Decode(buf)

// 使用 jpeg Encode
jpeg.Encode(out, img, &jpeg.Options{Quality:80})

out.Close()
return r.SendAndClose(&pb.AddPhotoResponse{IsOk:true, EmployeeId: int32(employeeId)})
}

if err != nil {
return err
}

// 將傳送的資料寫入 buffer
buf.Write(res.Data)

// 印出 buffer 的資料長度
fmt.Printf("buffer len: %d \n", len(buf.Bytes()))
}
}

執行結果

可以看到執行結果,Server 每一次接收到資料的時候, buffer 就增加 10k 的大小。 而 Client 最後會得到一個 Server 的回應。

resultresult

延伸閱讀

[Golang : Convert []byte to image]
[gRPC Basics - Go]