Parcourir la source

grpc_msg_size设置接收缓冲区尺寸

hong il y a 5 ans
Parent
commit
7863887726

+ 2 - 0
.gitignore

@@ -48,3 +48,5 @@ cscope.po.out
 #*.gz
 #*.bz2
 
+bin/
+

+ 12 - 0
go.mod

@@ -0,0 +1,12 @@
+module research
+
+go 1.12
+
+require (
+	github.com/golang/protobuf v1.3.2
+	golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3 // indirect
+	golang.org/x/net v0.0.0-20190311183353-d8887717615a
+	golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135 // indirect
+	google.golang.org/grpc v1.25.1
+	honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc // indirect
+)

+ 46 - 0
go.sum

@@ -0,0 +1,46 @@
+cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
+github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
+github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
+github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
+github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
+github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
+github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
+github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
+github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
+github.com/golang/protobuf v1.3.2 h1:6nsPYzhq5kReh6QImI3k5qWzO4PEbvbIW2cwSfR/6xs=
+github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
+github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
+github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
+golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
+golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
+golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
+golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU=
+golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
+golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
+golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
+golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
+golang.org/x/net v0.0.0-20190311183353-d8887717615a h1:oWX7TPOiFAMXLq8o0ikBYfCJVlRHBcsciT5bXOrH628=
+golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
+golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
+golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
+golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
+golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
+golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY=
+golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
+golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q=
+google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
+google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
+google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
+google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55 h1:gSJIx1SDwno+2ElGhA4+qG2zF97qiUzTM+rQ0klBOcE=
+google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc=
+google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
+google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg=
+google.golang.org/grpc v1.25.1 h1:wdKvqQk7IttEw92GoRyKG2IDrUIpgpj6H6m81yfeMW0=
+google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY=
+honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
+honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=

+ 60 - 0
grpc_msg_size/client/main_cli.go

@@ -0,0 +1,60 @@
+package main
+
+import (
+	"context"
+	"fmt"
+	"io"
+	"log"
+	"math"
+	"research/grpc_msg_size/pb"
+	"time"
+
+	"google.golang.org/grpc"
+)
+
+func init() {
+	log.SetFlags(log.Lmicroseconds | log.LstdFlags | log.Lshortfile)
+}
+
+var msg string
+
+func init() {
+	var buf []byte
+	for i := 0; i < 1024*1024*1024; i++ {
+		buf = append(buf, '0')
+	}
+	msg = string(buf)
+}
+
+func main() {
+	addr := fmt.Sprintf("localhost:%v", 55555)
+	conn, err := grpc.Dial(addr, grpc.WithInsecure())
+	if err != nil {
+		log.Fatal(err)
+	}
+	defer conn.Close()
+
+	client := pb.NewTestClient(conn)
+	stream, err := client.Test(context.Background(), grpc.MaxCallRecvMsgSize(math.MaxInt32))
+
+	for {
+		err := stream.Send(&pb.Request{
+			Msg: msg,
+		})
+		if err != nil {
+			log.Fatal(err)
+		}
+		log.Println("send len", len(msg))
+		ret, err := stream.Recv()
+		if err == io.EOF {
+			log.Println("recv eof")
+			break
+		}
+		if err != nil {
+			log.Println("recv error:", err)
+			break
+		}
+		log.Println("recv len", len(ret.Msg))
+		time.Sleep(1 * time.Second)
+	}
+}

+ 189 - 0
grpc_msg_size/pb/test.pb.go

@@ -0,0 +1,189 @@
+// Code generated by protoc-gen-go. DO NOT EDIT.
+// source: test.proto
+
+/*
+Package pb is a generated protocol buffer package.
+
+It is generated from these files:
+	test.proto
+
+It has these top-level messages:
+	Request
+	Response
+*/
+package pb
+
+import proto "github.com/golang/protobuf/proto"
+import fmt "fmt"
+import math "math"
+
+import (
+	context "golang.org/x/net/context"
+	grpc "google.golang.org/grpc"
+)
+
+// Reference imports to suppress errors if they are not otherwise used.
+var _ = proto.Marshal
+var _ = fmt.Errorf
+var _ = math.Inf
+
+// This is a compile-time assertion to ensure that this generated file
+// is compatible with the proto package it is being compiled against.
+// A compilation error at this line likely means your copy of the
+// proto package needs to be updated.
+const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package
+
+type Request struct {
+	Msg string `protobuf:"bytes,1,opt,name=msg" json:"msg,omitempty"`
+}
+
+func (m *Request) Reset()                    { *m = Request{} }
+func (m *Request) String() string            { return proto.CompactTextString(m) }
+func (*Request) ProtoMessage()               {}
+func (*Request) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} }
+
+func (m *Request) GetMsg() string {
+	if m != nil {
+		return m.Msg
+	}
+	return ""
+}
+
+type Response struct {
+	Msg string `protobuf:"bytes,1,opt,name=msg" json:"msg,omitempty"`
+}
+
+func (m *Response) Reset()                    { *m = Response{} }
+func (m *Response) String() string            { return proto.CompactTextString(m) }
+func (*Response) ProtoMessage()               {}
+func (*Response) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{1} }
+
+func (m *Response) GetMsg() string {
+	if m != nil {
+		return m.Msg
+	}
+	return ""
+}
+
+func init() {
+	proto.RegisterType((*Request)(nil), "pb.Request")
+	proto.RegisterType((*Response)(nil), "pb.Response")
+}
+
+// Reference imports to suppress errors if they are not otherwise used.
+var _ context.Context
+var _ grpc.ClientConn
+
+// This is a compile-time assertion to ensure that this generated file
+// is compatible with the grpc package it is being compiled against.
+const _ = grpc.SupportPackageIsVersion4
+
+// Client API for Test service
+
+type TestClient interface {
+	Test(ctx context.Context, opts ...grpc.CallOption) (Test_TestClient, error)
+}
+
+type testClient struct {
+	cc *grpc.ClientConn
+}
+
+func NewTestClient(cc *grpc.ClientConn) TestClient {
+	return &testClient{cc}
+}
+
+func (c *testClient) Test(ctx context.Context, opts ...grpc.CallOption) (Test_TestClient, error) {
+	stream, err := grpc.NewClientStream(ctx, &_Test_serviceDesc.Streams[0], c.cc, "/pb.Test/Test", opts...)
+	if err != nil {
+		return nil, err
+	}
+	x := &testTestClient{stream}
+	return x, nil
+}
+
+type Test_TestClient interface {
+	Send(*Request) error
+	Recv() (*Response, error)
+	grpc.ClientStream
+}
+
+type testTestClient struct {
+	grpc.ClientStream
+}
+
+func (x *testTestClient) Send(m *Request) error {
+	return x.ClientStream.SendMsg(m)
+}
+
+func (x *testTestClient) Recv() (*Response, error) {
+	m := new(Response)
+	if err := x.ClientStream.RecvMsg(m); err != nil {
+		return nil, err
+	}
+	return m, nil
+}
+
+// Server API for Test service
+
+type TestServer interface {
+	Test(Test_TestServer) error
+}
+
+func RegisterTestServer(s *grpc.Server, srv TestServer) {
+	s.RegisterService(&_Test_serviceDesc, srv)
+}
+
+func _Test_Test_Handler(srv interface{}, stream grpc.ServerStream) error {
+	return srv.(TestServer).Test(&testTestServer{stream})
+}
+
+type Test_TestServer interface {
+	Send(*Response) error
+	Recv() (*Request, error)
+	grpc.ServerStream
+}
+
+type testTestServer struct {
+	grpc.ServerStream
+}
+
+func (x *testTestServer) Send(m *Response) error {
+	return x.ServerStream.SendMsg(m)
+}
+
+func (x *testTestServer) Recv() (*Request, error) {
+	m := new(Request)
+	if err := x.ServerStream.RecvMsg(m); err != nil {
+		return nil, err
+	}
+	return m, nil
+}
+
+var _Test_serviceDesc = grpc.ServiceDesc{
+	ServiceName: "pb.Test",
+	HandlerType: (*TestServer)(nil),
+	Methods:     []grpc.MethodDesc{},
+	Streams: []grpc.StreamDesc{
+		{
+			StreamName:    "Test",
+			Handler:       _Test_Test_Handler,
+			ServerStreams: true,
+			ClientStreams: true,
+		},
+	},
+	Metadata: "test.proto",
+}
+
+func init() { proto.RegisterFile("test.proto", fileDescriptor0) }
+
+var fileDescriptor0 = []byte{
+	// 114 bytes of a gzipped FileDescriptorProto
+	0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0x2a, 0x49, 0x2d, 0x2e,
+	0xd1, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x62, 0x2a, 0x48, 0x52, 0x92, 0xe6, 0x62, 0x0f, 0x4a,
+	0x2d, 0x2c, 0x4d, 0x2d, 0x2e, 0x11, 0x12, 0xe0, 0x62, 0xce, 0x2d, 0x4e, 0x97, 0x60, 0x54, 0x60,
+	0xd4, 0xe0, 0x0c, 0x02, 0x31, 0x95, 0x64, 0xb8, 0x38, 0x82, 0x52, 0x8b, 0x0b, 0xf2, 0xf3, 0x8a,
+	0x53, 0x31, 0x65, 0x8d, 0xf4, 0xb9, 0x58, 0x42, 0x40, 0xfa, 0xd4, 0xa1, 0x34, 0xb7, 0x5e, 0x41,
+	0x92, 0x1e, 0xd4, 0x30, 0x29, 0x1e, 0x08, 0x07, 0xa2, 0x59, 0x89, 0x41, 0x83, 0xd1, 0x80, 0x31,
+	0x89, 0x0d, 0x6c, 0xad, 0x31, 0x20, 0x00, 0x00, 0xff, 0xff, 0xb6, 0x25, 0xc3, 0x70, 0x84, 0x00,
+	0x00, 0x00,
+}

+ 14 - 0
grpc_msg_size/pb/test.proto

@@ -0,0 +1,14 @@
+syntax = "proto3";
+package pb;
+
+service Test {
+    rpc Test(stream Request) returns (stream Response){}
+}
+
+message Request {
+    string msg = 1;
+}
+
+message Response {
+    string msg = 1;
+}

+ 30 - 0
grpc_msg_size/server/main_svr.go

@@ -0,0 +1,30 @@
+package main
+
+import (
+	"fmt"
+	"log"
+	"math"
+	"net"
+	"research/grpc_msg_size/pb"
+
+	"google.golang.org/grpc"
+)
+
+func init() {
+	log.SetFlags(log.Lmicroseconds | log.LstdFlags | log.Lshortfile)
+}
+
+func main() {
+	addr := fmt.Sprintf(":%d", 55555)
+	lis, err := net.Listen("tcp", addr)
+	if err != nil {
+		log.Fatal(err)
+	}
+
+	s := grpc.NewServer(grpc.MaxRecvMsgSize(math.MaxInt32))
+	pb.RegisterTestServer(s, &server{})
+
+	log.Println("listen", addr)
+	err = s.Serve(lis)
+	log.Println(err)
+}

+ 40 - 0
grpc_msg_size/server/service.go

@@ -0,0 +1,40 @@
+package main
+
+import (
+	"io"
+	"log"
+	"research/grpc_msg_size/pb"
+)
+
+type server struct{}
+
+func (s server) Test(stream pb.Test_TestServer) error {
+	for {
+		req, err := stream.Recv()
+		if err == io.EOF {
+			log.Println("recv eof")
+			break
+		}
+		if err != nil {
+			log.Println("recv error:", err)
+			break
+		}
+		log.Println("recv len", len(req.Msg))
+		err = stream.Send(&pb.Response{Msg: msg})
+		if err != nil {
+			log.Println("send error:", err)
+		}
+		log.Println("send msg", len(msg))
+	}
+	return nil
+}
+
+var msg string
+
+func init() {
+	var buf []byte
+	for i := 0; i < 1024*1024*1024; i++ {
+		buf = append(buf, '1')
+	}
+	msg = string(buf)
+}