python sanic加速_python微服务sanic 使用异步zipkin(2) - 一步步创建Sanic插件: sanic-zipin...
關鍵字:python sanic 微服務 異步 zipkin sanic-plugin 插件 Sanic-Plugins-Framewor Pypi發布
所需環境:python3.7, Docker, Linux or WSL
image.png
Sanic插件(Plugin/Extension) - sanic-zipkin已經ready,你可以輕松用pip安裝啦:
喜歡的話,github點個贊吧:https://github.com/kevinqqnj/sanic-zipkin
pip install sanic-zipkin
# app.py
from sanic_zipkin import SanicZipkin, logger, sz_rpc
sz = SanicZipkin(app)
上一篇已經學會了如何在Sanic app里引入aiozipkin,來做分布式系統追蹤。本篇,來討論下,如何創建一個完整的Sanic插件,以方便自己或者分享給他人。
先來看看插件是怎么用的:
功能
adding "Request span" by default
if Request is from another micro-service endpoint, span will be attached (Inject/Extract) to that endpoint
use "logger" decorator to create span for "methods" calls
use "sz_rpc" method to create sub-span for RPC calls, attaching to parent span
run examples/servic_a.py and examples/service_b.py
use Docker to run zipkin or jaeger:
docker run -d -p9411:9411 openzipkin/zipkin:latest
or
docker run -d -e COLLECTOR_ZIPKIN_HTTP_PORT=9411 -p5775:5775/udp -p6831:6831/udp -p6832:6832/udp -p5778:5778 -p16686:16686 -p14268:14268 -p9411:9411 jaegertracing/all-in-one
access the endpoint:
最簡應用:
curl localhost:8000/ to see plugin's basic usage
from sanic_zipkin import SanicZipkin, logger, sz_rpc
app = Sanic(__name__)
# initilize plugin, default parameters:
# zipkin_address = 'http://127.0.0.1:9411/api/v2/spans'
# service = __name__
# host = '127.0.0.1'
# port = 8000
sz = SanicZipkin(app, service='service-a')
@app.route("/")
async def index(request):
return response.json({"hello": "from index"})
This "/" endpoint will add trace span to zipkin automatically
使用裝飾器裝飾方法,以及鏈式trace
curl localhost:8000/2 to see how to decorate methods and chain-calls
@logger()
async def db_access(context, data):
await asyncio.sleep(0.1)
print(f'db_access done. data: {data}')
return
@sz.route("/2")
async def method_call(request, context):
await db_access(context, 'this is method_call data')
return response.json({"hello": 'method_call'})
Use "@logger" decorator to generate span for methods.
Note: in this case, you need to use "@sz.route" decorator, and pass contextparameter to method calls.
微服務之間通過PRC訪問:
curl localhost:8000/3 to see how RPC calls working, both GET/POST is supported
@logger()
async def decorate_demo(context, data):
await db_access(context, data)
data = {'payload': 'rpc call data of decorate_demo'}
rsp = await sz_rpc(context, backend_service2, data, method='GET')
print(rsp.status, await rsp.text())
return
@sz.route("/3")
async def rpc_call(request, context):
await decorate_demo(context, 'this is index4 data')
data = {'payload': 'rpc call data of rpc_call'}
rsp = await sz_rpc(context, backend_service1, data) # default method='POST'
print(rsp.status, await rsp.text())
return response.json({"hello": 'rpc_call'})
method sz_rpc just wrapper span injection to RPC POST/GET calls. In peer server, span-context will be automatically extracted and generate a chain-view in zipkin.
在Zipkin/Jaeger UI里查看Trace:
image.png
Sanic插件開發過程
使用Sanic-Plugins-Framework開發,省時省力,而且充分利用Sanic異步框架的威力。
1. 插件初始化
用戶可自定義的初始化變量
繼承Sanic-Plugins-Framework(SPF) 的Contextualize類型,在on_before_registered方法引用時,加載用戶自定義的初始變量。
目前支持:
zipkin server地址
微服務名稱service
微服務IP, port
from spf.plugins.contextualize import Contextualize
class SanicZipkin(Contextualize):
def __init__(self, *args, **kwargs):
super(SanicZipkin, self).__init__(*args, **kwargs)
self.zipkin_address = None
self.service = None
self.host = None
self.port = None
def on_before_registered(self, context, *args, **kwargs):
self.zipkin_address = kwargs.get('zipkin_address', 'http://127.0.0.1:9411/api/v2/spans')
self.service = kwargs.get('service', __name__)
self.host = kwargs.get('host', '127.0.0.1')
self.port = kwargs.get('port', 8000)
_logger.info(f'SanicZipkin: before registered: service={self.service}')
創建aiozipkin服務
實例化sanic_zipkin,然后調用Sanic 'before_server_start'方法,初始化context.tracer、context.aio_session。
context是SPF全局可以訪問的上下文變量,可以存儲任何你想要共享的數據。
sanic_zipkin = instance = SanicZipkin()
@sanic_zipkin.listener('before_server_start')
async def setup_zipkin(app, loop, context):
endpoint = az.create_endpoint(sanic_zipkin.service, ipv4=sanic_zipkin.host,
port=sanic_zipkin.port)
context.tracer = await az.create(sanic_zipkin.zipkin_address, endpoint,
sample_rate=1.0)
trace_config = az.make_trace_config(context.tracer)
context.aio_session = aiohttp.ClientSession(trace_configs=[trace_config])
context.span = []
context.zipkin_headers = []
這里context.span設計成數組,模擬堆棧FILO(先進后出),是因為考慮到鏈式調用時,tracer需要以parent span為基礎,創建child span。同理Inject/Extract用到的context.zipkin_headers也設成數組。
2. 創建middleware,給Request GET/POST自動添加span
Requst: 先用middleware裝飾器來監聽request消息,然后調用自定義方法request_span(request, context)來創建span。
@sanic_zipkin.middleware(priority=2, with_context=True)
def mw1(request, context):
context.log(DEBUG, f'mw-request: add span and headers before request')
span = request_span(request, context)
context.span.append(span)
context.zipkin_headers.append(span.context.make_headers())
這里,要考慮到,當前span是new span,還是child span。
span.append()壓入堆棧。
自定義方法request_span(),通過讀取request里是否有zipkin_headers信息,來判斷當前是其它微服務的RPC call,還是用戶發起的http訪問。
def request_span(request, context):
context.log(DEBUG, f'REQUEST json: {request.json}, args: {request.args}')
headers = request.parsed_json.get('zipkin_headers', None) if request.json else \
request.args.get('zipkin_headers', None)
Response: 在一次http訪問結束,返回response時,此次訪問的context.span和context.zipkin_headers彈出堆棧,以免污染到其它http訪問的trace:span.pop()
@sanic_zipkin.middleware(priority=8, attach_to='response', relative='post',
with_context=True)
def mw2(request, response, context):
context.span.pop()
context.zipkin_headers.pop()
context.log(DEBUG, 'mw-response: clear span/zipkin_headers after Response')
3. 創建zipkin_headers,用于RPC Inject/Extract
如果當前堆棧里有zipkin_headers,則方法request_span()創建上下文:
def request_span(request, context):
context.log(DEBUG, f'REQUEST json: {request.json}, args: {request.args}')
headers = request.parsed_json.get('zipkin_headers', None) if request.json else \
request.args.get('zipkin_headers', None)
if headers:
span_context = az.make_context(headers)
with context.tracer.new_child(span_context) as span:
...
如果無,則創建新的span:
with context.tracer.new_trace() as span:
span.name(f'{request.method} {request.path}')
...
4. methods函數,添加@logger裝飾器
對于非http request的方法函數,因為沒有middleware可以監聽,則需要創建新的裝飾器了。
使用@logger時,默認context作為第一個參數:
def logger(type=None, category=None, detail=None, description=None,
tracing=True, level=logging.INFO, *args, **kwargs):
def decorator(fn=None):
@functools.wraps(fn)
async def _decorator(*args, **kwargs):
# print('_decorator args: ', args, kwargs)
context = args[0] if len(args) > 0 and isinstance(args[0], ContextDict) else None
...
然后創建新的new span 或 child span。
同時,添加(Inject)新的上下文zipkin_headers:
span = gen_span(fn.__name__, context)
context.zipkin_headers.append(span.context.make_headers())
執行裝飾器所裝飾的函數fn()。
之后,必須清除(彈出最上面)本次裝飾器新增的臨時span和zipkin_headers。因為當前函數外部的其它引用函數(如果有)所需要的數據,還在堆棧下面。
try:
exce = False
res = await fn(*args, **kwargs)
return res
except Exception as e:
exce = True
raise e
finally:
...
# clean up tmp vars for this wrapper
context.span.pop()
context.zipkin_headers.pop()
5. 創建幫助函數sz_rpc(),簡化RPC 訪問其它微服務時的Inject操作
這個很簡單的helper,負責把zipkin_headers,inject到POST/GET訪問的data里,這樣,對方收到http request時,就可以順利的Extract,得到span上下文了。
async def sz_rpc(context, url, data, method='POST'):
data.update({'zipkin_headers': json.dumps(context.zipkin_headers[-1])})
if method.upper() == 'POST':
return await context.aio_session.post(url, json=data)
else:
return await context.aio_session.get(url, params=data)
6. 發布插件到Pypi
插件寫好了,下面就是發布了。
修改當前目錄的結構,以及添加一些必要的標注文件:
kevinqq@CN-00009841:/c/Users/xxx/git/sanic-zipkin$ tree -L 2
├── CHANGES.txt # 版本信息,必須
├── LICENSE # 必須
├── MANIFEST.in
├── README.md
├── dist # 發布時自動打的包
│ └── sanic-zipkin-0.1.2.tar.gz
├── examples
│ ├── requirements.txt
│ ├── service_a.py
│ └── service_b.py
├── requirements-dev.txt
├── sanic_zipkin # 包的目錄
│ ├── __init__.py # 必須。含版本信息和可引用的對象
│ └── sanic_zipkin.py # 主文件
├── sanic_zipkin.egg-info # 發布時自動生成
└── setup.py # 發布用的程序
發布前檢查:
python3 setup.py check
發布到Pypi:
python3 setup.py sdist upload
此時,會讓你輸入Pypi的密碼。
如果收到200,則上傳成功。
檢查是否已經可用:
pip install sanic-zipkin
總結
以上是生活随笔為你收集整理的python sanic加速_python微服务sanic 使用异步zipkin(2) - 一步步创建Sanic插件: sanic-zipin...的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: python 视觉技术_python+o
- 下一篇: python 列表 换行_python基