etcd 笔记(09)— 基于 etcd 实现微服务的注册与发现
生活随笔
收集整理的這篇文章主要介紹了
etcd 笔记(09)— 基于 etcd 实现微服务的注册与发现
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
1. 服務注冊與發現基本概念
在單體應用向微服務架構演進的過程中,原本的巨石型應用會按照業務需求被拆分成多個微服務,每個服務提供特定的功能,也可能依賴于其他的微服務。此時,每個微服務實例都可以動態部署,服務實例之間的調用通過輕量級的遠程調用方式(HTTP、消息隊列等)實現,它們之間通過預先定義好的接口進行訪問。
在微服務架構中,多個微服務間的通信需要依賴服務注冊與發現組件獲取指定服務實例的地址信息,才能正確地發起 RPC 調用,保證分布式系統的高可用、高并發。
服務注冊與發現主要包含兩部分:服務注冊的功能與服務發現的功能。
- 服務注冊是指服務實例啟動時將自身信息注冊到服務注冊與發現中心,并在運行時通過心跳等方式向其匯報自身服務狀態;
- 服務發現是指服務實例向服務注冊與發現中心獲取其他服務實例信息,用于遠程調用;
2. 服務注冊與發現中心功能
服務注冊與發現中心主要有以下職責:
-
管理當前注冊到服務注冊與發現中心的微服務實例元數據信息,包括服務實例的服務名、
IP地址、端口號、服務描述和服務狀態等; -
與注冊到服務發現與注冊中心的微服務實例維持心跳,定期檢查注冊表中的服務實例是否在線,并剔除無效服務實例信息;
-
提供服務發現能力,為服務調用方提供服務提供方的服務實例元數據;
通過服務發現與注冊中心,我們可以很方便地管理系統中動態變化的服務實例信息。但是與此同時,它也可能成為系統的瓶頸和故障點。因為服務之間的調用信息來自服務注冊與發現中心,當它不可用時,服務之間的調用也就無法正常進行。因此服務發現與注冊中心一般會集群化部署,提供高可用性和高穩定性。
3. 服務端代碼
package mainimport ("context""fmt""time""github.com/coreos/etcd/clientv3"
)//服務注冊對象
type ServiceRegister struct {client *clientv3.Clientkv clientv3.KVlease clientv3.Leasecanclefunc func()key stringleaseResp *clientv3.LeaseGrantResponsekeepAliveChan <-chan *clientv3.LeaseKeepAliveResponse
}// 初始化注冊服務
func InitService(host []string, timeSeconds int64) (*ServiceRegister, error) {config := clientv3.Config{Endpoints: host,DialTimeout: 5 * time.Second,}client, err := clientv3.New(config)if err != nil {fmt.Printf("create connection etcd failed %s\n", err)return nil, err}// 得到KV和Lease的API子集kv := clientv3.NewKV(client)lease := clientv3.NewLease(client)service := &ServiceRegister{client: client,kv: kv,lease: lease,}return service, nil
}// 設置租約
func (s *ServiceRegister) setLease(timeSeconds int64) error {leaseResp, err := s.lease.Grant(context.TODO(), timeSeconds)if err != nil {fmt.Printf("create lease failed %s\n", err)return err}// 設置續租ctx, cancelFunc := context.WithCancel(context.TODO())leaseRespChan, err := s.lease.KeepAlive(ctx, leaseResp.ID)if err != nil {fmt.Printf("KeepAlive failed %s\n", err)return err}s.leaseResp = leaseResps.canclefunc = cancelFuncs.keepAliveChan = leaseRespChanreturn nil
}// 監聽續租情況
func (s *ServiceRegister) ListenLeaseRespChan() {for {select {case leaseKeepResp := <-s.keepAliveChan:if leaseKeepResp == nil {fmt.Println("續租功能已經關閉")return} else {fmt.Println("續租成功")}}}
}// 通過租約注冊服務
func (s *ServiceRegister) PutService(key, val string) error {fmt.Printf("PutService key <%s> val <%s>\n", key, val)_, err := s.kv.Put(context.TODO(), key, val, clientv3.WithLease(s.leaseResp.ID))return err
}// 撤銷租約
func (s *ServiceRegister) RevokeLease() error {s.canclefunc()time.Sleep(2 * time.Second)_, err := s.lease.Revoke(context.TODO(), s.leaseResp.ID)return err}func main() {service, _ := InitService([]string{"192.168.0.129:2379"}, 5)service.setLease(10)defer service.RevokeLease()go service.ListenLeaseRespChan()err := service.PutService("/wohu", "http://localhost:8080")if err != nil {fmt.Printf("PutService failed %s\n", err)}// 使得程序阻塞運行,便于觀察輸出結果select {}
}
4. 客戶端代碼
package mainimport ("context""fmt""sync""time""github.com/coreos/etcd/clientv3""github.com/coreos/etcd/mvcc/mvccpb"
)// 客戶端對象
type Client struct {client *clientv3.Clientkv clientv3.KVlease clientv3.Leasewatch clientv3.WatcherserverList map[string]stringlock sync.Mutex
}// 初始化客戶端對象
func InitClient(addr []string) (*Client, error) {conf := clientv3.Config{Endpoints: addr,DialTimeout: 5 * time.Second,}client, err := clientv3.New(conf)if err != nil {fmt.Printf("create connection etcd failed %s\n", err)return nil, err}// 得到 KV 、Lease、 Watcher 的API子集kv := clientv3.NewKV(client)lease := clientv3.NewLease(client)watch := clientv3.NewWatcher(client)// 給客戶端對象賦值c := &Client{client: client,kv: kv,lease: lease,watch: watch,serverList: make(map[string]string),}return c, nil
}// 根據注冊的服務名,獲取服務實例的信息
func (c *Client) getServiceByName(prefix string) ([]string, error) {// 讀取的時候帶有 WithPrefix 選項,所以會讀取該前綴所有的字段值resp, err := c.kv.Get(context.Background(), prefix, clientv3.WithPrefix())if err != nil {fmt.Printf("getServiceByName failed %s\n", err)return nil, err}// 返回的 resp 是多個字段值。需要遍歷提取對應的 key valueaddrs := c.extractAddrs(resp)return addrs, nil}// 根據 etcd 的響應,提取服務實例的數組
func (c *Client) extractAddrs(resp *clientv3.GetResponse) []string {addrs := make([]string, 0)if resp == nil || resp.Kvs == nil {return addrs}for i := range resp.Kvs {if v := resp.Kvs[i].Value; v != nil {// 將 key value 值保存在 ServiceList 表中c.SetServiceList(string(resp.Kvs[i].Key), string(resp.Kvs[i].Value))addrs = append(addrs, string(v))}}return addrs
}// 設置 serverList
func (c *Client) SetServiceList(key, val string) {c.lock.Lock()defer c.lock.Unlock()// serverList 為初始化設置的本地 map 對象,由于考慮到多個 client 運行,所以需要加鎖控制c.serverList[key] = string(val)fmt.Println("set data key :", key, "val:", val)
}// 刪除本地緩存的服務實例信息
func (c *Client) DelServiceList(key string) {c.lock.Lock()defer c.lock.Unlock()delete(c.serverList, key)fmt.Println("del data key:", key)newRes, err := c.getServiceByName(key)if err != nil {fmt.Printf("getServiceByName failed %s\n", err)} else {fmt.Printf("get key %s", key, " current val is: %v\n", newRes)}}// 獲取服務實例信息
func (c *Client) GetService(prefix string) ([]string, error) {if addrs, err := c.getServiceByName(prefix); err != nil {panic(err)} else {fmt.Println("get service ", prefix, " for instance list: ", addrs)go c.watcher(prefix)return addrs, nil}
}// 監控指定鍵值對的變更
func (c *Client) watcher(prefix string) {watchRespChan := c.watch.Watch(context.Background(), prefix, clientv3.WithPrefix())for watchResp := range watchRespChan {for _, event := range watchResp.Events {switch event.Type {case mvccpb.PUT: // 寫入的事件c.SetServiceList(string(event.Kv.Key), string(event.Kv.Value))case mvccpb.DELETE: // 刪除的事件c.DelServiceList(string(event.Kv.Key))}}}
}func main() {/*先創建 etcd 連接,構建 Client 對象,隨后獲取指定的服務 /wohu 實例信息;最后監測 wohu 服務實例的變更事件,根據不同的事件產生不同的行為。*/c, _ := InitClient([]string{"192.168.0.129:2379"})c.GetService("/wohu")// 使得程序阻塞運行,模擬服務的持續運行select {}
}
總結
以上是生活随笔為你收集整理的etcd 笔记(09)— 基于 etcd 实现微服务的注册与发现的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 2022-2028年中国服装行业分析报告
- 下一篇: 2022-2028年中国文化创意产业园区