go-kit 整体学习,微服务?
2012 年 11 月 16 日
首先,玩go-kit,他不是一个mvc框架,他的架构层次比较多一些。
大致上分为 transport
, endpoint
, service
http-demo
import ( "context" "encoding/json" "fmt" transport "github.com/go-kit/kit/transport/http" "net/http" ) type InfoDto struct { Version string } func main() { var endpoint = func(ctx context.Context, request interface{}) (response interface{}, err error) { dto := request.(*InfoDto) fmt.Println("version :", dto.Version) response = map[string]interface{}{ "data": "ok", } err = nil return } // transport 需要将 service+编解码 柔和起来 hand := transport.NewServer(endpoint, func(i context.Context, req *http.Request) (request interface{}, err error) { request = &InfoDto{} err = json.NewDecoder(req.Body).Decode(&request) return }, func(i context.Context, writer http.ResponseWriter, response interface{}) (err error) { err = json.NewEncoder(writer).Encode(response) return }) http.Handle("/", hand) http.ListenAndServe(":8888", nil) } 复制代码
简单的来看,其实他就是将 transport 抽象了一下,其实做的事controller的事情,endpoint可以理解为是service。
定义middlerware
中间件其实是修饰的 endpoint ,类似于Spring的interceptor
// 限流 type Limier interface { Allow() bool } type defaultLimiter struct { } func (*defaultLimiter) Allow() bool { return rand.Intn(3) == 1 } // 添加限流 func addLimier(limier Limier) endpoint.Middleware { return func(next endpoint.Endpoint) endpoint.Endpoint { return func(ctx context.Context, request interface{}) (response interface{}, err error) { if limier.Allow() { return next(ctx, request) } return endpoint.Nop(ctx, request) } } } // 使用 end = addLimier(&defaultLimiter{})(end) 复制代码
定义前置、后置器等
类似于Java的Servlet的Filter,但是不具备拦截功能
// option 的方法,很好的解决go的重载 option := transport.ServerBefore(func(ctx context.Context, request *http.Request) context.Context { fmt.Println("http before") return ctx }) 复制代码
然后看看zipkin的组合实现 ,其实就是在前后结束实现了一个拦截,但是真的符合我们的要求吗。显然不符合。
func HTTPServerTrace(tracer *zipkin.Tracer, options ...TracerOption) kithttp.ServerOption { serverBefore := kithttp.ServerBefore( ) serverAfter := kithttp.ServerAfter( ) serverFinalizer := kithttp.ServerFinalizer( ) return func(s *kithttp.Server) { serverBefore(s) serverAfter(s) serverFinalizer(s) } } 复制代码
go-kit http整体设计
结构
type Server struct { e endpoint.Endpoint //service dec DecodeRequestFunc // 编解码 enc EncodeResponseFunc before []RequestFunc //前置处理器 after []ServerResponseFunc //后置处理器 errorEncoder ErrorEncoder// error处理器 finalizer []ServerFinalizerFunc errorHandler transport.ErrorHandler// error处理器 } 复制代码
处理逻辑
// ServeHTTP implements http.Handler. func (s Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { ctx := r.Context() // defer if len(s.finalizer) > 0 { iw := &interceptingWriter{w, http.StatusOK, 0} defer func() { ctx = context.WithValue(ctx, ContextKeyResponseHeaders, iw.Header()) ctx = context.WithValue(ctx, ContextKeyResponseSize, iw.written) for _, f := range s.finalizer { f(ctx, iw.code, r) } }() w = iw } // before for _, f := range s.before { ctx = f(ctx, r) } // 解码 request, err := s.dec(ctx, r) if err != nil { // 异常处理器 s.errorHandler.Handle(ctx, err) s.errorEncoder(ctx, err, w) return } // 处理器,异常处理器 response, err := s.e(ctx, request) if err != nil { s.errorHandler.Handle(ctx, err) s.errorEncoder(ctx, err, w) return } // 后置处理器 for _, f := range s.after { ctx = f(ctx, w) } // 解码 if err := s.enc(ctx, w, response); err != nil { s.errorHandler.Handle(ctx, err) s.errorEncoder(ctx, err, w) return } } 复制代码
基本流程
go-kit 整合grpc
首先整合过程很麻烦,它基本是基于service实现的,
syntax = "proto3"; package grpc_demo; service Add { rpc Sum (SumRequest) returns (SumReply) { } } message SumRequest { int64 a = 1; int64 b = 2; } message SumReply { int64 v = 1; string err = 2; } 复制代码
脚本
#!/usr/bin/env sh # Install proto3 from source # brew install autoconf automake libtool # git clone https://github.com/google/protobuf # ./autogen.sh ; ./configure ; make ; make install # # Update protoc Go bindings via # go get -u github.com/golang/protobuf/{proto,protoc-gen-go} # # See also # https://github.com/grpc/grpc-go/tree/master/examples protoc addsvc.proto --go_out=plugins=grpc:. 复制代码
程序
package grpc_demo import ( "context" "fmt" grpctransport "github.com/go-kit/kit/transport/grpc" ) // 实现sum方法 type grpcServer struct { sum grpctransport.Handler concat grpctransport.Handler } //生成器 func NewGRPCServer(service endpoint.Endpoint) AddServer { return &grpcServer{ sum: grpctransport.NewServer( service, decodeGRPCSumRequest, encodeGRPCSumResponse, ), } } //代码生成器生成 func (s *grpcServer) Sum(ctx context.Context, req *SumRequest) (*SumReply, error) { _, rep, err := s.sum.ServeGRPC(ctx, req) if err != nil { return nil, err } return rep.(*SumReply), nil } // 代码生成器生成 func decodeGRPCSumRequest(_ context.Context, grpcReq interface{}) (interface{}, error) { req, _ := grpcReq.(*SumRequest) return &SumRequest{A: int64(req.A), B: int64(req.B)}, nil } // 代码生成器 func encodeGRPCSumResponse(_ context.Context, response interface{}) (interface{}, error) { resp, _ := response.(*SumReply) return resp, nil } 复制代码
以上代码均可以使用代码生成器生成
一下,是主方法
func Main() { grpcListener, err := net.Listen("tcp", ":8888") if err != nil { panic(err) } g := grpc.NewServer() RegisterAddServer(g, NewGRPCServer(func(ctx context.Context, request interface{}) (response interface{}, err error) { req := request.(*SumRequest) return &SumReply{V: req.B + req.A, Err: ""}, nil })) err = g.Serve(grpcListener) if err != nil { panic(err) } } 复制代码
客户端方法
func NewClient() { // 1. 创建一个连接 conn, err := grpc.Dial(":8888", grpc.WithInsecure(), grpc.WithBlock()) if err != nil { log.Fatalf("did not connect: %v", err) } defer conn.Close() // 2. 然后创建客户端 client := NewAddClient(conn) // 3. rpc调用 ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() response, err := client.Sum(ctx, &SumRequest{A: 1, B: 2}) if err != nil { fmt.Printf("超时 , 原因 : %s\n", err) } else { fmt.Printf("结果 : %v\n", response.V) } } 复制代码
这里我并没有get 到 go-kit的好处,发现很多没有用,也就是编解码器,根本没有用处,我觉得他的源码中,是不是可以加入一个如果他为空,可以跳过呢。
其实性能压测来说,grpc本身的效率只和 http快2-3倍,并没有想象中那么快。
grpc真的那么快吗
我自己测试,本地测试,grpc并没有http快,可能是go的http的transport比较好,复用连接,不需要重复建立,但是可以看到http在大多数场景下还是可以起到很大作用的。
我感觉dubbo会很快。
import ( "bytes" "encoding/json" "fmt" "go-kit/demo" "io" "io/ioutil" "net/http" "runtime" "time" ) func main() { server() runtime.Gosched() client() } func server() { http.HandleFunc("/add", func(writer http.ResponseWriter, request *http.Request) { req := demo.AddReqeust{} err := json.NewDecoder(request.Body).Decode(&req) if err != nil { panic(err) } defer request.Body.Close() _ = json.NewEncoder(writer).Encode(map[string]interface{}{ "result": req.A + req.B, }) }) go http.ListenAndServe(":8888", nil) } func client() { now := time.Now() count := 10000 for x := 0; x < count; x++ { request() } fmt.Println(time.Now().Sub(now).Milliseconds()) } func request() { reader, err := addJsonRequestParams(&demo.AddReqeust{ A: 1, B: 2, }) if err != nil { return } resp, err := http.Post("http://127.0.0.1:8888/add", "application/json", reader) if err != nil { return } defer resp.Body.Close() all, err := ioutil.ReadAll(resp.Body) fmt.Printf("%s", all) } func addJsonRequestParams(params interface{}) (io.Reader, error) { var buf bytes.Buffer if err := json.NewEncoder(&buf).Encode(params); err != nil { return nil, err } return ioutil.NopCloser(&buf), nil } 复制代码
grpc测试
import ( "context" "fmt" "go-kit/grep_demo" "google.golang.org/grpc" "log" "net" "runtime" "time" ) type demos struct { } func (*demos) Sum(ctx context.Context, req *grep_demo.SumRequest) (*grep_demo.SumReply, error) { return &grep_demo.SumReply{V: req.A + req.B, Err: ""}, nil } func main() { rpcserver() runtime.Gosched() rpcclient() } func rpcserver() { listener, err := net.Listen("tcp", ":8888") if err != nil { return } server := grpc.NewServer() dd := demos{} grep_demo.RegisterAddServer(server, ⅆ) go server.Serve(listener) } func rpcclient() { // 1. 创建一个连接 conn, err := grpc.Dial(":8888", grpc.WithInsecure(), grpc.WithBlock()) if err != nil { log.Fatalf("did not connect: %v", err) return } defer conn.Close() // 2. 然后创建客户端 client := grep_demo.NewAddClient(conn) now := time.Now() for x := 0; x < 10000; x++ { func() { // 3. rpc调用 ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() response, err := client.Sum(ctx, &grep_demo.SumRequest{A: 1, B: 2}) if err != nil { fmt.Printf("err=%s\n", err) } else { fmt.Printf("{\"result\":%d}\n", response.V) } }() } fmt.Println(time.Now().Sub(now).Milliseconds()) } 复制代码
欢迎关注我们的微信公众号,每天学习Go知识