CAT的Server初始化
1.?Server初始化從web.xml文件開始,作為一個war包項目,首先需要初始化Servlet,首先是CatServlet專門初始化cat相關的server程序,比如接受客戶端傳過來的數據等等,另一個servlet為MVC專門提供數據查詢接口的普通的MVC功能,功能相當于縮小版的SpringMVC。過濾器有兩個CatFilter最主要是做一些過濾處理,每次請求經過ENVIRONMENT,ID_SETUP,LOG_CLIENT_PAYLOAD,LOG_SPAN的handle方法,打印一些日志,統計一些信息。DomainFilter主要是對domain進行處理。
2.?CatServlet初始化,首先執行父類AbstractContainerServlet的init方法,這是Servlet統一的初始化方法,初始化上下文IOC容器PlexusContainer,在CatServlet中開始準備初始化各模塊,在上下文中保存客戶端和服務端文件路徑,后文會用到。
protected void initComponents(ServletConfig servletConfig) throws ServletException {try {ModuleContext ctx = new DefaultModuleContext(getContainer());ModuleInitializer initializer = ctx.lookup(ModuleInitializer.class);File clientXmlFile = getConfigFile(servletConfig, "cat-client-xml", "client.xml");File serverXmlFile = getConfigFile(servletConfig, "cat-server-xml", "server.xml");ctx.setAttribute("cat-client-config-file", clientXmlFile);ctx.setAttribute("cat-server-config-file", serverXmlFile);initializer.execute(ctx);} catch (Exception e) {m_exception = e;System.err.println(e);throw new ServletException(e);}} 執行DefaultModuleInitializer#execute方法,先執行各模塊的setup方法,只有CatHomeModule實現了該方法 private void expandAll(ModuleContext ctx, Module[] modules, Set<Module> all) throws Exception {if (modules != null) {for (Module module : modules) {if (module != null && !all.contains(module)) {if (module instanceof AbstractModule) {((AbstractModule) module).setup(ctx);}expandAll(ctx, module.getDependencies(ctx), all);all.add(module);}}}}判斷各模塊初始化狀態,未初始化的進行初始化處理
expandAll(ctx, modules, all);for (Module module : all) {if (!module.isInitialized()) {executeModule(ctx, module, m_index++);}}初始化各個模塊,設置初始化狀態,AbstractModule#=>AbstractModule#execute,只有CatClientModule和CatHomeModule有具體的執行內容,因為CatClientModule的執行內容前文已經介紹過,可以參考前文。
private synchronized void executeModule(ModuleContext ctx, Module module, int index) throws Exception {long start = System.currentTimeMillis();// set flag to avoid re-entrancemodule.setInitialized(true);info(ctx, index + " ------ " + module.getClass().getName());// execute itself after its dependenciesmodule.initialize(ctx);long end = System.currentTimeMillis();info(ctx, index + " ------ " + module.getClass().getName() + " DONE in " + (end - start) + " ms.");}3. 至此主要就是解析CatHomeModule類,首先是它的setup方法,根據之前設置到上下文的服務器文件來初始化ServerConfigManager,初始化TcpSocketReceiver主要用于和客戶端之前進行消息通訊,也就是充當netty的服務端,設置服務結束時的關閉鉤子,銷毀netty的線程池通道,關閉通道等等。
protected void setup(ModuleContext ctx) throws Exception {if (!isInitialized()) {File serverConfigFile = ctx.getAttribute("cat-server-config-file");ServerConfigManager serverConfigManager = ctx.lookup(ServerConfigManager.class);final TcpSocketReceiver messageReceiver = ctx.lookup(TcpSocketReceiver.class);serverConfigManager.initialize(serverConfigFile);messageReceiver.init();Runtime.getRuntime().addShutdownHook(new Thread() {@Overridepublic void run() {messageReceiver.destory();}});}}netty的服務端設置,端口號默認是2280,根據系統選擇不同的線程執行方式,設置主線程池m_bossGroup也就是NIO中TCP連接器Selector的作用,保持和客戶端之間的通道鏈接處理,m_workerGroup主要是處理具體的數據的線程池,具體可以參考之前關于Netty的相關文章。
public synchronized void startServer(int port) throws InterruptedException {boolean linux = getOSMatches("Linux") || getOSMatches("LINUX");int threads = 24;ServerBootstrap bootstrap = new ServerBootstrap();m_bossGroup = linux ? new EpollEventLoopGroup(threads) : new NioEventLoopGroup(threads);m_workerGroup = linux ? new EpollEventLoopGroup(threads) : new NioEventLoopGroup(threads);bootstrap.group(m_bossGroup, m_workerGroup);bootstrap.channel(linux ? EpollServerSocketChannel.class : NioServerSocketChannel.class);bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast("decode", new MessageDecoder());}});bootstrap.childOption(ChannelOption.SO_REUSEADDR, true);bootstrap.childOption(ChannelOption.TCP_NODELAY, true);bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);bootstrap.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);try {m_future = bootstrap.bind(port).sync();m_logger.info("start netty server!");} catch (Exception e) {m_logger.error("Started Netty Server Failed:" + port, e);}}4. 當消息從客戶端發送的時候需要對數據進行編碼處理,m_codec.encode(tree, buf);內存塊的前四個字節為消息的總長度,先獲取當前可寫的下標,然后寫入長度占位數字0,開始給消息頭編碼,統計長度
public void encode(MessageTree tree, ByteBuf buf) {int count = 0;int index = buf.writerIndex();buf.writeInt(0); // place-holdercount += encodeHeader(tree, buf);if (tree.getMessage() != null) {count += encodeMessage(tree.getMessage(), buf);}buf.setInt(index, count);}給消息體編碼,這里以主要的Transaction類型消息距離,當本次線程只發送一個消息時也就是子消息列表為空,會設置type為A,如果有兩個消息的話就會一次設置t,A,T三個type,不同的type也對應不同的encodeLine處理,增加消息時間等等。最后就是統計長度,最后統一覆蓋到之前的占位下標。(消息類型Transaction,Event,Trace,Metric,Heartbeat)
public int encodeMessage(Message message, ByteBuf buf) {if (message instanceof Transaction) {Transaction transaction = (Transaction) message;List<Message> children = transaction.getChildren();if (children.isEmpty()) {return encodeLine(transaction, buf, 'A', Policy.WITH_DURATION);} else {int count = 0;int len = children.size();count += encodeLine(transaction, buf, 't', Policy.WITHOUT_STATUS);for (int i = 0; i < len; i++) {Message child = children.get(i);if (child != null) {count += encodeMessage(child, buf);}}count += encodeLine(transaction, buf, 'T', Policy.WITH_DURATION);return count;}} else if (message instanceof Event) {return encodeLine(message, buf, 'E', Policy.DEFAULT);} else if (message instanceof Trace) {return encodeLine(message, buf, 'L', Policy.DEFAULT);} else if (message instanceof Metric) {return encodeLine(message, buf, 'M', Policy.DEFAULT);} else if (message instanceof Heartbeat) {return encodeLine(message, buf, 'H', Policy.DEFAULT);} else {throw new RuntimeException(String.format("Unsupported message type: %s.", message));}}同樣,數據流到達服務端的時候就需要進行解碼處理,pipeline.addLast("decode", new MessageDecoder());解碼操作在PlainTextMessageCodec#decode,先解碼消息頭,然后判斷是否有可讀字節,
public void decode(ByteBuf buf, MessageTree tree) {Context ctx = m_ctx.get().setBuffer(buf);decodeHeader(ctx, tree);if (buf.readableBytes() > 0) {decodeMessage(ctx, tree);}}先解析出父消息,這個時候前面設置的type就會起到分支的作用,一個消息直接走'A',兩個消息依次走t,A,T,剛好最開始放入棧中的消息為空case 't':stack.push(parent);這個剛好是退出while循環的條件,case 'A':parent.addChild(tran);case 'T':return stack.pop();同時把該設置的子消息設置到父消息中,解析m_durationInMicro時需要減2是因為編碼的時候添加了"us"字符串。具體方法在PlainTextMessageCodec#decodeLine中。完整的消息DefaultMessageTree就這樣解碼完成。
protected void decodeMessage(Context ctx, MessageTree tree) {Stack<DefaultTransaction> stack = new Stack<DefaultTransaction>();Message parent = decodeLine(ctx, null, stack);tree.setMessage(parent);while (ctx.getBuffer().readableBytes() > 0) {Message message = decodeLine(ctx, (DefaultTransaction) parent, stack);if (message instanceof DefaultTransaction) {parent = message;} else {break;}}}當然在解碼之前,會對數據的完整性進行校驗,因為每條消息都會有個四字節的長度,所以可讀字節數不能小于4,mark暫存數據塊的讀下標,開始讀取消息長度,讀下標往后移動了四個位置,現在重置回讀取之前的位置reset也就是往前移動了四位,繼續校驗可讀字節數,最后讀取消息內容數據塊進行解碼。解碼完成后把數據塊保存在DefaultMessageTree中,記錄執行數量m_processCount以及其他的統計值,最后開始進行消息處理。
protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> out) throws Exception {if (buffer.readableBytes() < 4) {return;}buffer.markReaderIndex();int length = buffer.readInt();buffer.resetReaderIndex();if (buffer.readableBytes() < length + 4) {return;}try {if (length > 0) {ByteBuf readBytes = buffer.readBytes(length + 4);readBytes.markReaderIndex();readBytes.readInt();DefaultMessageTree tree = (DefaultMessageTree) m_codec.decode(readBytes);readBytes.resetReaderIndex();tree.setBuffer(readBytes);m_handler.handle(tree);m_processCount++;long flag = m_processCount % CatConstants.SUCCESS_COUNT;if (flag == 0) {m_serverStateManager.addMessageTotal(CatConstants.SUCCESS_COUNT);}} else {// client message is errorbuffer.readBytes(length);}} catch (Exception e) {m_serverStateManager.addMessageTotalLoss(1);m_logger.error(e.getMessage(), e);}}5. 執行CatHomeModule#execute方法,初始化消息處理類RealtimeConsumer,初始化配置更新守護線程ConfigReloadTask并且進行啟動,根據服務配置類ServerConfigManager的相關配置啟動其他的幾個守護線程。
public void run() {boolean active = true;while (active) {try {m_productLineConfigManager.refreshConfig();} catch (Exception e) {Cat.logError(e);}try {m_metricConfigManager.refreshConfig();} catch (Exception e) {Cat.logError(e);}Transaction t = Cat.newTransaction("ReloadConfig", "router");try {m_routerConfigManager.refreshConfig();t.setStatus(Transaction.SUCCESS);} catch (Exception e) {Cat.logError(e);t.setStatus(e);} finally {t.complete();}try {m_blackListManager.refreshConfig();} catch (Exception e) {Cat.logError(e);}try {m_allTransactionConfigManager.refreshConfig();} catch (Exception e) {Cat.logError(e);}try {Thread.sleep(TimeHelper.ONE_MINUTE);} catch (InterruptedException e) {active = false;}}}最后設置服務關閉鉤子MessageConsumer#doCheckpoint做一些消息處理的結尾工作,在文件中保存消息數據等等。
?
總結
以上是生活随笔為你收集整理的CAT的Server初始化的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 解决:GET http://localh
- 下一篇: Javaweb 实现简单的用户注册登录(