Netflix网关zuul(1.x和2.x)全解析
zuul 是netflix開源的一個API Gateway 服務器, 本質上是一個web servlet應用。
Zuul可以通過加載動態過濾機制,從而實現以下各項功能:
- 驗證與安全保障: 識別面向各類資源的驗證要求并拒絕那些與要求不符的請求。
- 審查與監控: 在邊緣位置追蹤有意義數據及統計結果,從而為我們帶來準確的生產狀態結論。
- 動態路由: 以動態方式根據需要將請求路由至不同后端集群處。
- 壓力測試: 逐漸增加指向集群的負載流量,從而計算性能水平。
- 負載分配: 為每一種負載類型分配對應容量,并棄用超出限定值的請求。
- 靜態響應處理: 在邊緣位置直接建立部分響應,從而避免其流入內部集群。
- 多區域彈性: 跨越AWS區域進行請求路由,旨在實現ELB使用多樣化并保證邊緣位置與使用者盡可能接近。
網關zuul從1.0到2.0 經歷了較大的變化,先從架構上看看吧
?zuul 1.0的架構
?
從上圖看,
1.ZuulServlet負責接收請求,對filter進行處理
/*** Core Zuul servlet which intializes and orchestrates zuulFilter execution** @author Mikey Cohen* Date: 12/23/11* Time: 10:44 AM*/@Overridepublic void service(javax.servlet.ServletRequest servletRequest, javax.servlet.ServletResponse servletResponse) throws ServletException, IOException {try {init((HttpServletRequest) servletRequest, (HttpServletResponse) servletResponse);// Marks this request as having passed through the "Zuul engine", as opposed to servlets// explicitly bound in web.xml, for which requests will not have the same data attachedRequestContext context = RequestContext.getCurrentContext();context.setZuulEngineRan();try {preRoute();} catch (ZuulException e) {error(e);postRoute();return;}try {route();} catch (ZuulException e) {error(e);postRoute();return;}try {postRoute();} catch (ZuulException e) {error(e);return;}} catch (Throwable e) {error(new ZuulException(e, 500, "UNHANDLED_EXCEPTION_" + e.getClass().getName()));} finally {RequestContext.getCurrentContext().unset();}}其中
FilterProcessor處理核心類
前置filter
runFilters("pre"); //前置filter類型跳轉filter
runFilters("route");后置filter
runFilters("post");2. zuul的核心是一系列的filters, 其作用可以類比Servlet框架的Filter,或者AOP。工作原理如下圖所示
Zuul可以對Groovy過濾器進行動態的加載,編譯,運行。FilterFileManager.java
/*** This class manages the directory polling for changes and new Groovy filters.* Polling interval and directories are specified in the initialization of the class, and a poller will check* for changes and additions.** @author Mikey Cohen* Date: 12/7/11* Time: 12:09 PM*/void processGroovyFiles(List<File> aFiles) throws Exception {List<Callable<Boolean>> tasks = new ArrayList<>();for (File file : aFiles) {tasks.add(() -> {try {return filterLoader.putFilter(file);}catch(Exception e) {LOG.error("Error loading groovy filter from disk! file = " + String.valueOf(file), e);return false;}});}processFilesService.invokeAll(tasks, FILE_PROCESSOR_TASKS_TIMEOUT_SECS.get(), TimeUnit.SECONDS);}?3.對groovy文件的動態操作管理類FilterScriptManagerServlet
/*** Servlet for uploading/downloading/managing scripts.* <p/>* <ul>* <li>Upload scripts to the registry for a given endpoint.</li>* <li>Download scripts from the registry</li>* <li>List all revisions of scripts for a given endpoint.</li>* <li>Mark a particular script revision as active for production.</li>* </ul>*/@Overrideprotected void doPut(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {if (!adminEnabled.get()) {response.sendError(HttpServletResponse.SC_FORBIDDEN, "Filter admin is disabled. See the zuul.filters.admin.enabled FastProperty.");return;}// retrieve arguments and validateString action = request.getParameter("action");/* validate the action and method */if (!isValidAction(request, response)) {return;}// perform actionif ("UPLOAD".equals(action)) {handleUploadAction(request, response);} else if ("ACTIVATE".equals(action)) {handleActivateAction(request, response);} else if ("CANARY".equals(action)) {handleCanaryAction(request, response);} else if ("DEACTIVATE".equals(action)) {handledeActivateAction(request, response);}}zuul 2.0架構
?
從上圖可以看到:
1.Zuul引入了Netty和RxJava,正如之前的?ZuulFilter?分為了?Pre,Post,Route,Error,Zuul2的Filter分為三種類型
- Inbound Filters: 在路由之前執行
- Endpoint Filters: 路由操作
- Outbound Filters: 得到相應數據之后執行
使用RxJava重寫了Pre,Post,Route ZuulFilter的結構如下
?
ZuulServerChannelInitializer.java
@Overrideprotected void initChannel(Channel ch) throws Exception{// Configure our pipeline of ChannelHandlerS.ChannelPipeline pipeline = ch.pipeline();storeChannel(ch);addTimeoutHandlers(pipeline);addPassportHandler(pipeline);addTcpRelatedHandlers(pipeline);addHttp1Handlers(pipeline);addHttpRelatedHandlers(pipeline);addZuulHandlers(pipeline);}其父類實現了addZuulHandlers方法
protected void addZuulHandlers(final ChannelPipeline pipeline){pipeline.addLast("logger", nettyLogger);pipeline.addLast(new ClientRequestReceiver(sessionContextDecorator));pipeline.addLast(passportLoggingHandler);addZuulFilterChainHandler(pipeline);pipeline.addLast(new ClientResponseWriter(requestCompleteHandler, registry));}protected void addZuulFilterChainHandler(final ChannelPipeline pipeline) {final ZuulFilter<HttpResponseMessage, HttpResponseMessage>[] responseFilters = getFilters( //1new OutboundPassportStampingFilter(FILTERS_OUTBOUND_START),new OutboundPassportStampingFilter(FILTERS_OUTBOUND_END));// response filter chainfinal ZuulFilterChainRunner<HttpResponseMessage> responseFilterChain = getFilterChainRunner(responseFilters,filterUsageNotifier);// endpoint | response filter chain final FilterRunner<HttpRequestMessage, HttpResponseMessage> endPoint = getEndpointRunner(responseFilterChain, //2filterUsageNotifier, filterLoader);final ZuulFilter<HttpRequestMessage, HttpRequestMessage>[] requestFilters = getFilters( //3new InboundPassportStampingFilter(FILTERS_INBOUND_START),new InboundPassportStampingFilter(FILTERS_INBOUND_END));// request filter chain | end point | response filter chainfinal ZuulFilterChainRunner<HttpRequestMessage> requestFilterChain = getFilterChainRunner(requestFilters,filterUsageNotifier, endPoint);pipeline.addLast(new ZuulFilterChainHandler(requestFilterChain, responseFilterChain));}調用Handler處理
@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {if (msg instanceof HttpRequestMessage) {zuulRequest = (HttpRequestMessage)msg;//Replace NETTY_SERVER_CHANNEL_HANDLER_CONTEXT in SessionContextfinal SessionContext zuulCtx = zuulRequest.getContext();zuulCtx.put(NETTY_SERVER_CHANNEL_HANDLER_CONTEXT, ctx);zuulCtx.put(ZUUL_FILTER_CHAIN, requestFilterChain);requestFilterChain.filter(zuulRequest);}else if ((msg instanceof HttpContent)&&(zuulRequest != null)) {requestFilterChain.filter(zuulRequest, (HttpContent) msg);}else {LOG.debug("Received unrecognized message type. " + msg.getClass().getName());ReferenceCountUtil.release(msg);}}調用ZuulFilterChainRunner的filter方法
@Overridepublic void filter(T inMesg, HttpContent chunk) {String filterName = "-";try {Preconditions.checkNotNull(inMesg, "input message");final AtomicInteger runningFilterIdx = getRunningFilterIndex(inMesg);final int limit = runningFilterIdx.get();for (int i = 0; i < limit; i++) {final ZuulFilter<T, T> filter = filters[i];filterName = filter.filterName();if ((! filter.isDisabled()) && (! shouldSkipFilter(inMesg, filter))) {final HttpContent newChunk = filter.processContentChunk(inMesg, chunk);if (newChunk == null) {//Filter wants to break the chain and stop propagating this chunk any furtherreturn;}//deallocate original chunk if necessaryif ((newChunk != chunk) && (chunk.refCnt() > 0)) {chunk.release(chunk.refCnt());}chunk = newChunk;}}if (limit >= filters.length) {//Filter chain has run to end, pass down the channel pipeline invokeNextStage(inMesg, chunk);} else {inMesg.bufferBodyContents(chunk);boolean isAwaitingBody = isFilterAwaitingBody(inMesg);// Record passport states for start and end of buffering bodies.if (isAwaitingBody) {CurrentPassport passport = CurrentPassport.fromSessionContext(inMesg.getContext());if (inMesg.hasCompleteBody()) {if (inMesg instanceof HttpRequestMessage) {passport.addIfNotAlready(PassportState.FILTERS_INBOUND_BUF_END);} else if (inMesg instanceof HttpResponseMessage) {passport.addIfNotAlready(PassportState.FILTERS_OUTBOUND_BUF_END);}}else {if (inMesg instanceof HttpRequestMessage) {passport.addIfNotAlready(PassportState.FILTERS_INBOUND_BUF_START);} else if (inMesg instanceof HttpResponseMessage) {passport.addIfNotAlready(PassportState.FILTERS_OUTBOUND_BUF_START);}}}if (isAwaitingBody && inMesg.hasCompleteBody()) {//whole body has arrived, resume filter chain runFilters(inMesg, runningFilterIdx);}}}catch (Exception ex) {handleException(inMesg, filterName, ex);}}2.NettyClient?
if (filter.getSyncType() == FilterSyncType.SYNC) {final SyncZuulFilter<I, O> syncFilter = (SyncZuulFilter) filter;final O outMesg = syncFilter.apply(inMesg);recordFilterCompletion(SUCCESS, filter, startTime, inMesg, snapshot);return (outMesg != null) ? outMesg : filter.getDefaultOutput(inMesg);}// async filter filter.incrementConcurrency();resumer = new FilterChainResumer(inMesg, filter, snapshot, startTime);filter.applyAsync(inMesg).observeOn(Schedulers.from(getChannelHandlerContext(inMesg).executor())).doOnUnsubscribe(resumer::decrementConcurrency).subscribe(resumer);ProxyEndpoint.java
@Overridepublic HttpResponseMessage apply(final HttpRequestMessage input) {// If no Origin has been selected, then just return a 404 static response.// handle any exception heretry {if (origin == null) {handleNoOriginSelected();return null;}origin.getProxyTiming(zuulRequest).start();// To act the same as Ribbon, we must do this before starting execution (as well as before each attempt).IClientConfig requestConfig = origin.getExecutionContext(zuulRequest).getRequestConfig();originalReadTimeout = requestConfig.getProperty(ReadTimeout, null);setReadTimeoutOnContext(requestConfig, 1);origin.onRequestExecutionStart(zuulRequest); proxyRequestToOrigin();//Doesn't return origin response to caller, calls invokeNext() internally in response filter chainreturn null;} catch (Exception ex) {handleError(ex);return null;}}將請求轉發至遠端
private void proxyRequestToOrigin() {Promise<PooledConnection> promise = null;try {attemptNum += 1;requestStat = createRequestStat();origin.preRequestChecks(zuulRequest);concurrentReqCount++;// update RPS trackers updateOriginRpsTrackers(origin, attemptNum);// We pass this AtomicReference<Server> here and the origin impl will assign the chosen server to it.promise = origin.connectToOrigin(zuulRequest, channelCtx.channel().eventLoop(), attemptNum, passport, chosenServer, chosenHostAddr);storeAndLogOriginRequestInfo();currentRequestAttempt = origin.newRequestAttempt(chosenServer.get(), context, attemptNum);requestAttempts.add(currentRequestAttempt);passport.add(PassportState.ORIGIN_CONN_ACQUIRE_START);if (promise.isDone()) {operationComplete(promise);} else {promise.addListener(this);}}catch (Exception ex) {LOG.error("Error while connecting to origin, UUID {} " + context.getUUID(), ex);storeAndLogOriginRequestInfo();if (promise != null && ! promise.isDone()) {promise.setFailure(ex);} else {errorFromOrigin(ex);}}}調用BasicNettyOrigin
@Overridepublic Promise<PooledConnection> connectToOrigin(HttpRequestMessage zuulReq, EventLoop eventLoop, int attemptNumber,CurrentPassport passport, AtomicReference<Server> chosenServer,AtomicReference<String> chosenHostAddr) {return clientChannelManager.acquire(eventLoop, null, zuulReq.getMethod().toUpperCase(),zuulReq.getPath(), attemptNumber, passport, chosenServer, chosenHostAddr);}?
3.小結
? ?>> zuul2通過啟動BaseServerStartup的實現類,啟動一個netty server
? ?>> netty server將ZuulFilter (Inbound,?Outbound,?EndPoint)包裹成ChainRunner組合成netty的一個handler:ZuulFilterChainHandler
? ?>>?ZuulFilterChainHandler將請求包裝成SyncZuulFilter封裝成NettyClient
4.zuul1和zuul2的選擇
? 性能對比
Zuul 1 (阻塞)的應用場景
? cpu密集型任務
? 簡單操作的需求
? 開發簡單的需求
? 實時請求高的
zuul2(非阻塞)的應用場景
? io密集的任務
? 大請求或者大文件
? 隊列的流式數據
? 超大量的連接
參考文獻
【1】https://www.cnblogs.com/lexiaofei/p/7080257.html
【2】https://blog.csdn.net/lengyue309/article/details/82192118
【】https://github.com/strangeloop/StrangeLoop2017/blob/master/slides/ArthurGonigberg-ZuulsJourneyToNonBlocking.pdf
轉載于:https://www.cnblogs.com/davidwang456/p/10337441.html
總結
以上是生活随笔為你收集整理的Netflix网关zuul(1.x和2.x)全解析的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: https下 http的会被阻塞 Thi
- 下一篇: 开源任务调度平台elastic-job-