转 kubernetes集群中利用etcd和grpc实现golang服务间通信
 4137 |  0 |  0
kubernetes集群中利用etcd和grpc实现golang服务间通信
注:文中涉及工作环境相关的网址和IP已经被替换
 
 
1. 项目背景
服务运行于docker容器中
使用kubernetes管理容器
服务有多个节点作为一个集群
使用rest接口设置服务缓存中的信息
需要将信息同步到集群中其他节点
2. 项目方案
使用grpc做服务间通信
从etcd中读取服务所有状态为running的节点信息,包括:podIp、status、hostIp、startedAt(启动时间)
服务启动时选取运行时间最长的节点,调用grpc接口请求缓存的信息同步到本容器的服务中
使用rest接口设置缓存的时候,遍历所有节点(不包括自身),调用grpc接口将信息同步到其他节点
方案特点:
- 不需要借助额外的配置管理工具(如:zookeeper)
- 不需要自行管理节点的配置信息(因为kubernetes的etcd中已经有完整的节点信息)
- grpc开发、传输效率高,扩展性好
- grpc使用http2.0方便后续提供rest接口
 
 
1. etcd简介
etcd 是用 golang 实现的一种 K-V 分布式存储系统,内部用raft协议做一致性校验,对外提供http的访问接口,最新版中提供了grpc的访问接口。
etcd主要用于:
- 配置管理
- 服务注册于发现
- 选主
- 应用调度
- 分布式队列
- 分布式锁
 
 
与etcd类似的还有zookeeper
这里 有一篇文章简单介绍了etcd和zookeeper的优缺点以及etcd的工作原理
 
 
2. kubernetes与etcd
前面介绍了etcd特别适合用于做集群服务的配置管理,kubernets 是用于docker容器编排的,也是用golang实现的,所以自然而然就采用etcd作为服务配置的存储方式了。这里 有一篇kubernets的架构介绍。
 
 
etcd在kubernetes中的最大作用是保存容器节点(pod)信息,包括:容器的服务名、状态、IP、版本以及其他信息
 
 
通过类似如下的命令可以获取到pod的信息
curl http://10.20.30.40:2379/v2/keys/registry/pods/default
etcd中保存的容器节点信息格式如下:
{
    "action": "get",
    "node": {
      "key": "/registry/pods/default",
      "dir": true,
      "nodes": [
        {
          "key": "/registry/pods/default/hello-web-29a74e26ea3c2138e1727f35a111f4c6-dknwh",
          "value": "{\"kind\":\"Pod\",\"apiVersion\":\"v1\",\"metadata\":{\"name\":\"hello-web-29a74e26ea3c2138e1727f35a111f4c6-dknwh\",\"generateName\":\"hello-web-29a74e26ea3c2138e1727f35a111f4c6-\",\"namespace\":\"default\",\"selfLink\":\"/api/v1/namespaces/default/pods/hello-web-29a74e26ea3c2138e1727f35a111f4c6-dknwh\",\"uid\":\"09c45029-3fa0-11e7-a46c-00163e327954\",\"creationTimestamp\":\"2017-05-23T10:10:24Z\",\"labels\":{\"app\":\"hello\",\"deployment\":\"bb6de7bfc7f357818a8c07faf3987d40\",\"tier\":\"frontend\"},\"annotations\":{\"kubernetes.io/created-by\":\"{\\\"kind\\\":\\\"SerializedReference\\\",\\\"apiVersion\\\":\\\"v1\\\",\\\"reference\\\":{\\\"kind\\\":\\\"ReplicationController\\\",\\\"namespace\\\":\\\"default\\\",\\\"name\\\":\\\"hello-web-29a74e26ea3c2138e1727f35a111f4c6\\\",\\\"uid\\\":\\\"e42ce61a-3f9f-11e7-a46c-00163e327954\\\",\\\"apiVersion\\\":\\\"v1\\\",\\\"resourceVersion\\\":\\\"4361319\\\"}}\\n\"},\"ownerReferences\":[{\"apiVersion\":\"v1\",\"kind\":\"ReplicationController\",\"name\":\"hello-web\",\"uid\":\"32559b88-3fa0-11e7-a46c-00163e327954\",\"controller\":true}]},\"spec\":{\"containers\":[{\"name\":\"hello-web\",\"image\":\"docker.helloword.com/hello-web:f022d25\",\"ports\":[{\"containerPort\":8087,\"protocol\":\"TCP\"}],\"env\":[{\"name\":\"SERVER\",\"valueFrom\":{\"configMapKeyRef\":{\"name\":\"cluster-config\",\"key\":\"external.ip\"}}},{\"name\":\"SERVER_PORT\",\"valueFrom\":{\"configMapKeyRef\":{\"name\":\"hello-config\",\"key\":\"hello.api.port\"}}}],\"resources\":{\"limits\":{\"cpu\":\"1\",\"memory\":\"1Gi\"},\"requests\":{\"cpu\":\"100m\",\"memory\":\"512Mi\"}},\"terminationMessagePath\":\"/dev/termination-log\",\"imagePullPolicy\":\"IfNotPresent\"}],\"restartPolicy\":\"Always\",\"terminationGracePeriodSeconds\":30,\"dnsPolicy\":\"ClusterFirst\",\"nodeName\":\"10.30.58.179\",\"securityContext\":{},\"imagePullSecrets\":[{\"name\":\"cn-registry\"}]},\"status\":{\"phase\":\"Running\",\"conditions\":[{\"type\":\"Initialized\",\"status\":\"True\",\"lastProbeTime\":null,\"lastTransitionTime\":\"2017-05-23T10:10:24Z\"},{\"type\":\"Ready\",\"status\":\"True\",\"lastProbeTime\":null,\"lastTransitionTime\":\"2017-05-23T10:10:29Z\"},{\"type\":\"PodScheduled\",\"status\":\"True\",\"lastProbeTime\":null,\"lastTransitionTime\":\"2017-05-23T10:10:24Z\"}],\"hostIP\":\"10.30.58.179\",\"podIP\":\"172.80.13.4\",\"startTime\":\"2017-05-23T10:10:24Z\",\"containerStatuses\":[{\"name\":\"hello-web\",\"state\":{\"running\":{\"startedAt\":\"2017-05-23T10:10:29Z\"}},\"lastState\":{},\"ready\":true,\"restartCount\":0,\"image\":\"docker.helloword.com/hello-web:f022d25\",\"imageID\":\"docker-pullable://docker.helloword.com/hello-web@sha256:f8e0460983b0d3f87733453b588469d8e225afbfc764da2ae55238cd524ef70a\",\"containerID\":\"docker://78cd912de942f744a36bd51907562c5e670fb300ddc85267e3ec72572fdb5617\"}]}}\n",
          "modifiedIndex": 4361528,
          "createdIndex": 4361320
        }
      ]
    }
} 
其中value部分的json数据格式化后如下:
{
  "kind": "Pod",
  "apiVersion": "v1",
  "metadata": {
    "name": "hello-web-29a74e26ea3c2138e1727f35a111f4c6-dknwh",
    "generateName": "hello-web-29a74e26ea3c2138e1727f35a111f4c6-",
    "namespace": "default",
    "selfLink": "/api/v1/namespaces/default/pods/hello-web-29a74e26ea3c2138e1727f35a111f4c6-dknwh",
    "uid": "09c45029-3fa0-11e7-a46c-00163e327954",
    "creationTimestamp": "2017-05-23T10:10:24Z",
    "labels": {
      "app": "hello",
      "deployment": "bb6de7bfc7f357818a8c07faf3987d40",
      "tier": "frontend"
    },
    "annotations": {
      "kubernetes.io/created-by": "{\"kind\":\"SerializedReference\",\"apiVersion\":\"v1\",\"reference\":{\"kind\":\"ReplicationController\",\"namespace\":\"default\",\"name\":\"hello-web-29a74e26ea3c2138e1727f35a111f4c6\",\"uid\":\"e42ce61a-3f9f-11e7-a46c-00163e327954\",\"apiVersion\":\"v1\",\"resourceVersion\":\"4361319\"}}\n"
    },
    "ownerReferences": [
      {
        "apiVersion": "v1",
        "kind": "ReplicationController",
        "name": "hello-web",
        "uid": "32559b88-3fa0-11e7-a46c-00163e327954",
        "controller": true
      }
    ]
  },
  "spec": {
    "containers": [
      {
        "name": "hello-web",
        "image": "docker.helloword.com/hello-web:f022d25",
        "ports": [
          {
            "containerPort": 8087,
            "protocol": "TCP"
          }
        ],
        "env": [
          {
            "name": "SERVER",
            "valueFrom": {
              "configMapKeyRef": {
                "name": "cluster-config",
                "key": "external.ip"
              }
            }
          },
          {
            "name": "SERVER_PORT",
            "valueFrom": {
              "configMapKeyRef": {
                "name": "hello-config",
                "key": "hello.api.port"
              }
            }
          }
        ],
        "resources": {
          "limits": {
            "cpu": "1",
            "memory": "1Gi"
          },
          "requests": {
            "cpu": "100m",
            "memory": "512Mi"
          }
        },
        "terminationMessagePath": "/dev/termination-log",
        "imagePullPolicy": "IfNotPresent"
      }
    ],
    "restartPolicy": "Always",
    "terminationGracePeriodSeconds": 30,
    "dnsPolicy": "ClusterFirst",
    "nodeName": "10.30.58.179",
    "securityContext": {},
    "imagePullSecrets": [
      {
        "name": "cn-registry"
      }
    ]
  },
  "status": {
    "phase": "Running",
    "conditions": [
      {
        "type": "Initialized",
        "status": "True",
        "lastProbeTime": null,
        "lastTransitionTime": "2017-05-23T10:10:24Z"
      },
      {
        "type": "Ready",
        "status": "True",
        "lastProbeTime": null,
        "lastTransitionTime": "2017-05-23T10:10:29Z"
      },
      {
        "type": "PodScheduled",
        "status": "True",
        "lastProbeTime": null,
        "lastTransitionTime": "2017-05-23T10:10:24Z"
      }
    ],
    "hostIP": "10.30.58.179",
    "podIP": "172.80.13.4",
    "startTime": "2017-05-23T10:10:24Z",
    "containerStatuses": [
      {
        "name": "hello-web",
        "state": {
          "running": {
            "startedAt": "2017-05-23T10:10:29Z"
          }
        },
        "lastState": {},
        "ready": true,
        "restartCount": 0,
        "image": "docker.helloword.com/hello-web:f022d25",
        "imageID": "docker-pullable://docker.helloword.com/hello-web@sha256:f8e0460983b0d3f87733453b588469d8e225afbfc764da2ae55238cd524ef70a",
        "containerID": "docker://78cd912de942f744a36bd51907562c5e670fb300ddc85267e3ec72572fdb5617"
      }
    ]
  }
} 
3. grpc简介
grpc是google实现的一种基于protobuf的远程服务调用框架,数据采用二进制传输,其传输协议是基于http2.0。
 
 
相比于其他各种rpc框架,grpc由于基于protobuf和http2.0,具有以下优点:
- 通用性好,支持各种语言
- 二进制传输,效率高
- 扩展性好,只需要修改protobuf文件并重新生成代码
 
 
4. grpc开发环境搭建
4.1 protobuf环境
首先,去https://github.com/google/protobuf/releases/tag/v3.3.0 这个页面下载对应的protobuf编译器安装文件并安装好protoc
go get -u github.com/golang/protobuf cd $GOPATH/src/github.com/golang/protobuf # 如果有安装makefile,直接执行make install,如果没有则执行以下命令 go install ./proto ./jsonpb ./ptypes go install ./protoc-gen-go
4.2 grpc环境
#安装grpc依赖库 go get -u google.golang.org/grpc #安装grpc-go插件,用于将proto文件编译成grpc的golang代码 go get -u github.com/grpc/grpc-go cd $GOPATH/src mv github.com/grpc/grpc-go google.golang.org/grpc/grpc-go
遇到go get无法下载的包,也可以通过 http://gopm.io/ 或者 http://golangtc.com/download/package 进行下载
 
 
5. 定义proto文件
syntax = "proto3"; //使用proto3版本
//用于java等语言的package配置
option java_multiple_files = true;
option java_package = "io.grpc.examples.hellorpc";
option java_outer_classname = "hellorpcProto";
//用于golang等语言的package配置
package hellorpc;
//定义服务接口,其中rpc关键字表示 rpc 接口,用于生成grpc接口代码
service Sync {
    rpc Get (SyncRequest) returns(SyncResponse) {}
    rpc Set (SyncRequest) returns(SyncResponse) {}
    rpc GetAll(SyncRequest)returns(SyncResponse) {}
}
//定义请求数据类型, repeated最终会转换成golang中的数组/切片
message SyncRequest {
    repeated SyncData data= 1;
}
//定义返回的数据类型
message SyncResponse {
    repeated SyncData data= 1;
}
//定义实体数据类型,用type字段表示请求的数据类型,用data字段保存请求的数据或者返回的数据
//map最终会转换成golang中的map[string]string类型
message SyncData {
    int32 type = 1;
    map data = 2;
} 
编译proto文件
protoc --go_out=plugins=grpc:./hellorpc hellorpc.proto
其中–go_out用于指定go的proto编译插件以及插件参数
编译成功后,会在 hellorpc目录中生成 hellorpc.pb.go 文件,可以在其他go文件中通过 import “hello-api/hellorpc” 来使用文件中定义的接口
 
 
6. hellorpc.pb.go 文件分析
前面提到的 service Sync 部分会编译成如下两部分
type SyncClient interface {
    Get(ctx context.Context, in *SyncRequest, opts ...grpc.CallOption) (*SyncResponse, error)
    Set(ctx context.Context, in *SyncRequest, opts ...grpc.CallOption) (*SyncResponse, error)
    GetAll(ctx context.Context, in *SyncRequest, opts ...grpc.CallOption) (*SyncResponse, error)
}
type SyncServer interface {
    Get(context.Context, *SyncRequest) (*SyncResponse, error)
    Set(context.Context, *SyncRequest) (*SyncResponse, error)
    GetAll(context.Context, *SyncRequest) (*SyncResponse, error)
} 
其中 SyncClient 的接口 在 hellorpc.pb.go 里面已经实现好了接口,直接调用即可,但SyncServer定义的接口是需要我们自己实现
 
 
7. 服务端代码实现(rtc_server.go)
//先定义server类型,并实现好SyncServer定义的接口
type server struct {}
const (
    HELLO_SYNC_REST_CLUSTER_INFO = iota
)
func (s *server)Get(ctx context.Context, in *hellorpc.SyncRequest) (*hellorpc.SyncResponse, error){
    var response = hellorpc.SyncResponse{Data: make([]*hellorpc.SyncData, 0, 10)}
    for i := 0; i < len(in.Data); i++{ request := in.Data[i] switch request.Type { case hello_SYNC_REST_CLUSTER_INFO: // get something from local cache and set to response break } } return &response, nil } func (s *server)Set(ctx context.Context, in *hellorpc.SyncRequest) (*hellorpc.SyncResponse, error){ var response = hellorpc.SyncResponse{Data: make([]*hellorpc.SyncData, 0, 10)} for i := 0; i < len(in.Data); i++{ request := in.Data[i] switch request.Type { case HELLO_SYNC_REST_CLUSTER_INFO: // set something to local cache, and set the result to response break } } return &response, nil } func (s *server)GetAll(ctx context.Context, in *hellorpc.SyncRequest) (*hellorpc.SyncResponse, error){ var response = hellorpc.SyncResponse{Data: make([]*hellorpc.SyncData, 0, 10)} for i := 0; i < len(in.Data); i++{ request := in.Data[i] switch request.Type { case HELLO_SYNC_REST_CLUSTER_INFO: // get all data from local cache, and set the result to response break } } return &response, nil } 
实现好接口后,我们需要将服务注册到grpc,这里我们实现一个名为StartSyncServer的函数来做这些事情
func StartSyncServer(address string) error{
    lis, err := net.Listen("tcp", address)
    if err != nil {
        beego.Debug("start sync server error: %v", err)
        return err
    }
    s := grpc.NewServer()
    hellorpc.RegisterSyncServer(s, &server{})
    //由于s.Serve方法是会一直阻塞住,所以我们需要起一个go routine来执行,在其停止后输出错误信息
    go func(){
        err := s.Serve(lis)
        beego.Debug("sync server stopped with error: %v", err)
    }()
    return nil
} 
将StartSyncServer函数添加到模块的 init 函数中执行,我们服务端的代码就基本完成了
 
 
8. 客户端代码实现(rtc_client.go)
//先定义好客户端类型syncClient,这里我们利用继承的方式将hellorpc.SyncClient实现的方法继承过来
type syncClient struct{
    hellorpc.SyncClient
    conn *grpc.ClientConn
    address string
}
func OpenSyncClient(address string)(syncClient, error) {
    s := syncClient{}
    //grpc.WithInsecure用于关闭安全验证,因为我们是在docker内部环境里使用,不暴露在外网,就没有加安全认证了
    conn, err := grpc.Dial(address, grpc.WithInsecure(), grpc.WithTimeout(5*time.Second))
    if err != nil {
        fmt.Println("----open client error %v, conn: %v", err, conn)
        return s, err
    }
    s.conn = conn
    s.address = address
    s.client = hellorpc.NewSyncClient(conn)
    return s, nil
}
func CloseSyncClient(s *syncClient) {
    if s.conn != nil {
        s.conn.Close()
        s.conn = nil
        s.client = nil
    }
} 
这样我们只需要编写 c, err := OpenSyncClient(address),既可通过 response, err := c.Get(context.Background(), request) 的方式调用hellorpc.SyncClient定义的方法
 
 
9. etcd客户端代码实现(部分功能)
根据etcd的返回值数据结构,我们定义一下两种类型的数据
//用于保存etcd的返回的数据
type EtcdData struct{
    Key             string
    Dir             bool
    Value           interface{}
    CreatedIndex    int32
    ModifiedIndex   int32
    Nodes           []EtcdData
}
//用于保存pod相关的数据
type PodData struct {
    Name            string
    PodIP           string
    HostIP          string
    Status          string
    UpdateTime      string
    Timestamp       int64
}
func newEtcdData() EtcdData{
    return EtcdData{Dir: false, Value: "", Key: "", Nodes: make([]EtcdData, 0, 100)}
} 
接下来我们实现EtcdClient
//先定义好EtcdClient的数据结构
type EtcdClient struct{}
//用于解析etcd返回的数据
func parseEtcdData(dataIn map[string]interface{}, dataOut *EtcdData) error {
    if key, ok := dataIn["key"]; ok {
        dataOut.Key = key.(string)
    }
    if isDir, ok := dataIn["dir"]; ok {
        dataOut.Dir = isDir.(bool)
    }
    if value, ok := dataIn["value"]; ok {
        dataOut.Value = value
    }
    if createdIndex, ok := dataIn["createdIndex"]; ok {
        dataOut.CreatedIndex = int32(createdIndex.(float64))
    }
    if modifiedIndex, ok := dataIn["modifiedIndex"]; ok {
        dataOut.ModifiedIndex = int32(modifiedIndex.(float64))
    }
    if nodes, ok := dataIn["nodes"]; ok {
        var subnodes = nodes.([]interface{})
        for i := 0; i < len(subnodes); i++{ node := subnodes[i].(map[string]interface{}) var nodeData = newEtcdData() parseEtcdData(node, &nodeData) dataOut.Nodes = append(dataOut.Nodes,nodeData) } } return nil } //实现Get方法用于获取某个key的值 func (c *EtcdClient)Get(baseUrl, key string)(EtcdData, error){ var url = baseUrl + key var res = newEtcdData() var result = make(map[string]interface{}) resp, err := http.Get(url) if err == nil{ out, err1 := ioutil.ReadAll(resp.Body) if err1 == nil{ err2 := json.Unmarshal([]byte(out), &result) if err2 != nil{ return res, err2 } node := result["node"].(map[string]interface{}) err = parseEtcdData(node, &res) }else{ return res, err1 } } return res, err } 
由于我们的服务是跑在docker里,由kubernetes进行服务编排,所以我们需要解析kubernetes在etcd中保存的数据
//用于解析pod的状态信息func parsePodStatus(podStatus interface{}, podData *PodData){
    pod_status := podStatus.(map[string]interface{})    if podIP, ok := pod_status["podIP"]; ok {
        podData.PodIP = podIP.(string)
    }    if hostIP, ok := pod_status["hostIP"]; ok {
        podData.HostIP = hostIP.(string)
    }    if status, ok := pod_status["phase"]; ok{
        podData.Status = strings.ToLower(status.(string))        if containerStatuses, ok := pod_status["containerStatuses"]; ok{            for i := 0; i
	10. EtcdClient结合SyncClient
	拿到了服务所有容器的IP
	遍历所有pod的IP
	使用IP+端口建立连接(OpenSyncClient)
	执行grpc server端提供的服务接口,如:c.Get …
	校验/处理返回值(同步本地信息)
	断开连接(CloseSyncClient)
	11. 总结
	直接使用kubernetes的etcd,主要是因为kubernetes的etcd已经有所有节点的信息,不需要另外再维护节点信息
	protobuf文件中的request、response数据结构中使用repeate以及SyncData采用map,主要用于批量请求、返回结果以及方便扩展
	自定义数据结构保存etcd返回的数据,而不是直接使用json处理后的数据,主要是因为各接口之间使用方便,更易于维护。
	使用继承的方式来扩展的接口,可以有效减少代码量
	
	文章来源:http://lib.csdn.net/article/microservice/66976?knId=1888
0

一默
3人已关注
 领课教育 32532
领课教育 32532
 10328
 update 47766
update 47766
 5156
 领课教育 18475
领课教育 18475
 husheng 21156
husheng 21156
 请更新代码 41839
请更新代码 41839
 凯哥Java 2425
凯哥Java 2425
 凯哥Java 2861
凯哥Java 2861
 凯哥Java 2152
凯哥Java 2152