go使用grpc实现异步_(python、go)基于ETCD的gRPC分布式服务器实现详解
1-概述
gRPC框架是一個性能很好的rpc框架,但框架中沒有實現分布式服務器負載均衡的方法,只是給出實現方案,需要我們自己實現。官方推薦是客戶端負載均衡的方案,也就是由客戶端主動選擇路由,這樣的好處是不用給代理服務器造成壓力。
分布式場景下一般采用etcd、consul、zookeeper等分布式系統,這里采用etcd,etcd是go語言實現,在github可看到開源的全部實現代碼。
目前網上都是go語言寫的關于基于etcd的grpc分布式服務器的實現,幾乎沒有python版的grpc服務器實現,于是本人就寫了一個python版的服務器和不同客戶端(python、go)版的全套實現方案。
注:全部完整代碼已上傳至github:Zartenc/grpc_etcd_ms_py
2-實現思想
2-1-服務器端實現思想
服務器端思想主旨是:每個gRPC服務器啟動(上線)都會在etcd的key中注冊自己在本機中對外暴露的ip-port,一旦gRPC服務停止(下線、意外掛掉、租約到期等)會在etcd中注銷掉自己的信息。
2-2-客戶端實現思想
客戶端思想主旨是:只需連接到etcd服務器產生一個etcdClient對象,長期維護這個對象即可,通過這個對象的負載均衡策略可獲取眾多gRPC服務器中的其中一個進行連接。在這個etcdClient對象內部會對etcd的key添加一個監視(watch)并維護一個可用gRPC服務器信息的集合。
3-具體實現核心代碼
前提已部署并啟動etcd服務器,若沒部署請參考這里。
3-1-服務器端實現核心代碼
服務器端采用python編寫。
由于采用etcd分布式框架,這里首先實現一個etcd客戶端的類EtcdClient,類方法有:
- get_values_by_key():通過etcd的key獲取服務器信息。
- put_values_by_key():添加服務器信息到etcd的key中。
再實現一個此服務器對etcd操作的類EtcdHandleServ,類方法有:
register_service():注冊本機信息到etcd的key中。
logout_service:從etcd的key中注銷本機信息。
class EtcdHandleServ():def __init__(self, service_port, etcd_ip, etcd_port, etcd_prefix):self.etcd_ip = etcd_ipself.etcd_port = etcd_portself.etcd_prefix = etcd_prefix# service_ip = get_outside_ip()service_ip = '127.0.0.1' #在本機機器作實驗使用self.endpoint = f'{service_ip}:{service_port}'def register_service(self):etcd_client = EtcdClient(host=self.etcd_ip, port=self.etcd_port)key_name = f'{self.etcd_prefix}/grpc'with etcd_client.lock(key_name):value_list = etcd_client.get_values_by_key(key_name)if self.endpoint not in value_list:value_list.append(self.endpoint)etcd_client.put_values_by_key(key_name, value_list)def logout_service(self):etcd_client = EtcdClient(host=self.etcd_ip, port=self.etcd_port)key_name = f'{self.etcd_prefix}/grpc'with etcd_client.lock(key_name):value_list = etcd_client.get_values_by_key(key_name)if self.endpoint in value_list:value_list.remove(self.endpoint)etcd_client.put_values_by_key(key_name, value_list)最后在主函數中進行相關的注冊和注銷操作并監控程序停止信號。
*注意:在docker啟動的操作事項
在下面代碼中,若服務器要在docker中啟動需要考慮2個問題:
- 1.在docker run命令啟動時如何將參數從外部傳到容器內?
- 2.docker stop命令停止時如何程序內部如何接收停止信號?
上面2個問題共同之處是在Dockerfile文件中的ENTRYPOINT命令:
ENTRYPOINT ["python3", "-u", "main.py"]在Dockerfile文件中啟動命名使用ENTRYPOINT的exec模式,這樣程序在容器內為1號進程,可接收停止信號(若為shell模式,也可處理,但麻煩一些,后面給出解決方案)。
docker啟動時從外部傳參,只需在后面跟上需要傳入的參數即可:
docker run -d -p 65510:65510 zartenImage:v ----service_port 65510 --etcd_ip xxxx --etcd_prefix /zarten上面給出了2個問題的解決方案,具體更全面的方案,請參考本人之前寫的一篇docker使用詳解的文章,并在文章最后的常見問題中有提到解決方案。
def main(service_ip, service_port, etcd_ip, etcd_port, etcd_prefix):print('***service is starting...')grpc_server = grpc.server(futures.ThreadPoolExecutor(max_workers=500))zarten_pb2_grpc.add_ZartenServicer_to_server(ZartenServ(), grpc_server)grpc_server.add_insecure_port(f'{service_ip}:{service_port}')grpc_server.start()etcd_handle_serv = EtcdHandleServ(service_port=service_port, etcd_ip=etcd_ip, etcd_port=etcd_port, etcd_prefix=etcd_prefix)etcd_handle_serv.register_service()event = threading.Event()def signal_handler(*args):etcd_handle_serv.logout_service()event.set()signal.signal(signal.SIGINT, signal_handler)signal.signal(signal.SIGTERM, signal_handler)print("***serveice started")try:while True:time.sleep(60 * 60 * 24)except KeyboardInterrupt:etcd_handle_serv.logout_service()grpc_server.stop(0)最后采用命令方式啟動。
# python main.py --service_port 65510 --etcd_ip xxxx --etcd_prefix /zarten if __name__ == '__main__':parser = get_arguments_parser()args = parser.parse_args()main(args.service_ip, args.service_port, args.etcd_ip, args.etcd_port, args.etcd_prefix)3-2-客戶端實現核心代碼
客戶端采用python和go編寫。也可采用其他語言實現,這里就不作展示了,知道實現思想即可自己實現。
gRPC客戶端中的負載均衡是以每次調用為基礎而不是以每個連接為基礎,即只需維護一個連接對象,每次調用都是連接不同gRPC的服務器。官方文檔是這樣描述的:
“It is worth noting that load-balancing within gRPC happens on a per-call basis, not a per-connection basis. In other words, even if all requests come from a single client, we still want them to be load-balanced across all servers.”
3-2-1-python客戶端
首先同樣是一個連接etcd的客戶端類,跟服務器端代碼差不多。
class EtcdClient(etcd3.Etcd3Client):def get_values_by_key(self, key, **kwargs):values, _ = self.get(key, **kwargs)values_list = []if values is not None:try:values_list = json.loads(values.decode('utf-8'))if not isinstance(values_list, list):raise TypeError()except:raise Exception()return values_listdef put_values_by_key(self, key, values):if not isinstance(values, list):raise Exception()self.put(key, json.dumps(values))其次是此客戶端對etcd的操作類,采用單例模式,主要方法就是監視etcd并進行回調處理。
class EtcdHandleClient(EtcdClient):_singleton = Nonedef __new__(cls, *args, **kwargs):if not cls._singleton:cls._singleton = super().__new__(cls)return cls._singletondef __init__(self, etcd_ip, etcd_port, etcd_prefix):self.etcd_ip = etcd_ipself.etcd_port = etcd_portself.etcd_prefix = etcd_prefixsuper().__init__(host=etcd_ip, port=etcd_port)self.endpoints_list = self.get_values_by_key(f'{self.etcd_prefix}/grpc')self.watched_id = self.add_watch_callback(key=f'{self.etcd_prefix}/grpc', callback=self._update_endpoints)def __del__(self):self.cancel_watch(self.watched_id)def get_grpc_serv_ip(self):endpoints_nums = len(self.endpoints_list)if endpoints_nums <= 0:raise RuntimeError('No grpc services are available.Please notify the administrator to start the grpc service')select_id = random.randint(0, len(self.endpoints_list)-1)return self.endpoints_list[select_id]def _update_endpoints(self, watched_response):watched_event = watched_response.events[0]try:update_endpoint_list = json.loads(watched_event.value)if not isinstance(update_endpoint_list, list):raise TypeErrorexcept Exception as e:print(e)returnself.endpoints_list = update_endpoint_list最后main函數中只需長期維護一個EtcdHandleClient對象即可。
def main():etcd_client = EtcdHandleClient(etcd_ip='xxxx', etcd_port=2379, etcd_prefix='/zarten')endpoint = etcd_client.get_grpc_serv_ip()print('endpoint:', endpoint)with grpc.insecure_channel(endpoint) as channel:stub = zarten_pb2_grpc.ZartenStub(channel)response = stub.GetInfo(zarten_pb2.ZartenRequest(zhihu_name='Zarten123'))print(f'receive response: {response}')3-2-2-go客戶端
go語言客戶思想跟python一樣,只是代碼不同而已。
首先定義一個GrpcClient結構體,包括一個etcd連接對象和一個可用gRPC服務器信息數組。
type GrpcClient struct {Etcd3Client *clientv3.ClientGrpcEndpoints []string }其次是初始化GrpcClient結構體的函數NewGrpcClient(),此函數中會調用一個協程來監視etcd的變動。
GrpcClient結構體的方法只有一個GetRrpcServIp()對外開放函數來獲取某個gRPC服務器的信息。
func NewGrpcClient(EtcdIp string, EtcdPort int, EtcdPrefix string) *GrpcClient{keyName := EtcdPrefix+"/grpc"grpcClient := new(GrpcClient)cli, err := clientv3.New(clientv3.Config{Endpoints: []string{EtcdIp + ":" + strconv.Itoa(EtcdPort)},DialTimeout: 10 * time.Second,})if err != nil {log.Fatal(err)}res, err := cli.Get(context.Background(), keyName)if err != nil{log.Fatal(err)}for _, ev := range res.Kvs {endPoints := ev.Valueerr := json.Unmarshal(endPoints, &grpcClient.GrpcEndpoints)if err != nil{log.Fatal(err)}break}if len(grpcClient.GrpcEndpoints) <= 0{log.Fatal("No grpc services are available.Please notify the administrator to start the grpc service")}grpcClient.Etcd3Client = clirch := cli.Watch(context.Background(), keyName)go func() {for wresp := range rch {for _, ev := range wresp.Events {fmt.Printf("%s %q : %qn", ev.Type, ev.Kv.Key, ev.Kv.Value)mu.Lock()err := json.Unmarshal(ev.Kv.Value, &grpcClient.GrpcEndpoints)mu.Unlock()if err != nil{log.Fatal(err)}fmt.Println(grpcClient.GrpcEndpoints)}}}()return grpcClient }func (g *GrpcClient) GetRrpcServIp() string{rand.Seed(time.Now().Unix())n := len(g.GrpcEndpoints)return g.GrpcEndpoints[rand.Intn(n)] }main函數中只需維護一個NewGrpcClient對象即可。
func main() {grpcClient := client_center.NewGrpcClient("xxxx", 2379, "/zarten")ip := grpcClient.GetRrpcServIp()fmt.Println(ip)conn, err := grpc.Dial(grpcClient.GetRrpcServIp(), grpc.WithInsecure(), grpc.WithBlock())if err != nil {log.Fatalf("did not connect: %v", err)}defer conn.Close()client := go_protoc.NewZartenClient(conn)res, _ := client.GetInfo(context.Background(), &go_protoc.ZartenRequest{ZhihuName:"zarten456"})fmt.Println(res.Name)fmt.Println(res.Homepage)}4-演示結果
在本地機器啟動3個服務器端,端口分別為65510、65511、65512.
多次調用python和go版的客戶端,都是使用不同gRPC服務器并成功返回信息。部分截圖如下所示:
5-總結
主旨思想是通過etcd來進行交互來共享所有的gRPC服務器信息。
在服務器端的實現為了能更加健全,還需考慮加入租約功能,此功能主要用于服務器端自身掛掉后無法及時通知etcd來注銷掉自己信息。此功能以后有時間會在github中更新,敬請期待!
總結
以上是生活随笔為你收集整理的go使用grpc实现异步_(python、go)基于ETCD的gRPC分布式服务器实现详解的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: copyof java_死磕 java集
- 下一篇: ecology9 后端开发环境搭建_利用