应用监控CAT之cat-client源码阅读(一)
CAT 由大眾點評開發的,基于 Java 的實時應用監控平臺,包括實時應用監控,業務監控。對于及時發現線上問題非常有用。(不知道大家有沒有在用)
應用自然是最初級的,用完之后,還想了解下其背后的原理,所以有了源碼閱讀一說。
今天來看看 cat-client 模塊,重在調用方。
打開文件,首先看一下使用說明,背景,資料。ok,進入正題。
先大致看一下目錄結構:
接下來,從樣例開始著手,在這里從單元測試開始干活。
public class CatTest {@Testpublic void test() {Transaction trans = Cat.newTransaction("logTransaction", "logTransaction");Cat.newEvent("logEvent", "logEvent");Cat.newTrace("logTrace", "logTrace");Cat.newHeartbeat("logHeartbeat", "logHeartbeat");Throwable cause = new Throwable();Cat.logError(cause);Cat.logError("message", cause);Cat.logTrace("logTrace", "<trace>");Cat.logTrace("logTrace", "<trace>", Trace.SUCCESS, "data");Cat.logMetric("logMetric", "test", "test");Cat.logMetricForCount("logMetricForCount");Cat.logMetricForCount("logMetricForCount", 4);Cat.logMetricForDuration("logMetricForDuration", 100);Cat.logMetricForSum("logMetricForSum", 100);Cat.logMetricForSum("logMetricForSum", 100, 100);Cat.logEvent("RemoteLink", "Call", Message.SUCCESS, "Cat-0a010680-384736-2061");Cat.logEvent("EventType", "EventName");Cat.logHeartbeat("logHeartbeat", "logHeartbeat", Message.SUCCESS, null);trans.setStatus(Transaction.SUCCESS); // trans.setStatus(cause); trans.complete();Assert.assertEquals(true, Cat.isInitialized());} }
看得出來,cat把其主要功能都列舉在了這個單元測試里。大概功能就是,記錄event,trace,error,metrics.
不過,咱們只討論下其中個別類型的處理就O了。
先來看第一個創建事務的方法:
    Cat.newTransaction("logTransaction", "logTransaction");
// 進入方法查看,1. 先獲取生產者; 2. 創建一個事務public static Transaction newTransaction(String type, String name) {return Cat.getProducer().newTransaction(type, name);}
// 查看獲取生產者的方法,檢查是否已初始化,如果沒有初始化則進行初始化,深度咱們就先到這里public static MessageProducer getProducer() {checkAndInitialize();return s_instance.m_producer;}
// 2. 創建一個事務,1.先獲取上下文如果沒有則新建; 2. 如果可以記錄消息,則立馬創建一個默認事務DefaultTransaction; 3. 開啟執行,返回事務實例,供下文調用;
    @Overridepublic Transaction newTransaction(String type, String name) {// this enable CAT client logging cat message without explicit setupif (!m_manager.hasContext()) {m_manager.setup();}if (m_manager.isMessageEnabled()) {DefaultTransaction transaction = new DefaultTransaction(type, name, m_manager);m_manager.start(transaction, false);return transaction;} else {return NullMessage.TRANSACTION;}}
// 2.1. 如何獲取當前上下文,
    @Overridepublic void setup() {Context ctx;if (m_domain != null) {ctx = new Context(m_domain.getId(), m_hostName, m_domain.getIp());} else {ctx = new Context("Unknown", m_hostName, "");}m_context.set(ctx);}
// 2.2. 檢查是否已初始化上下文
    @Overridepublic boolean hasContext() {return m_context.get() != null;}
// 2.3. 上下文怎么保證線程安全,使用 ThreadLocal 線程變量private ThreadLocal<Context> m_context = new ThreadLocal<Context>(); 
// 2.4. 開啟一個事務,1. 獲取上下文; 2. 開啟上下文事務; 3. 如果是tag類型的事務,則將其放入 m_taggedTransactions; 配置有誤,只提示一次警告
    @Overridepublic void start(Transaction transaction, boolean forked) {Context ctx = getContext();if (ctx != null) {ctx.start(transaction, forked);if (transaction instanceof TaggedTransaction) {TaggedTransaction tt = (TaggedTransaction) transaction;m_taggedTransactions.put(tt.getTag(), tt);}} else if (m_firstMessage) {m_firstMessage = false;m_logger.warn("CAT client is not enabled because it's not initialized yet");}}
// 2.4.1. 獲取上下文private Context getContext() {if (Cat.isInitialized()) {Context ctx = m_context.get();if (ctx != null) {return ctx;} else {if (m_domain != null) {ctx = new Context(m_domain.getId(), m_hostName, m_domain.getIp());} else {ctx = new Context("Unknown", m_hostName, "");}m_context.set(ctx);return ctx;}}return null;}
// 2.4.2. 開啟事務,1. 如果stack為空就把事務設置到m_tree上,否則處理子節點; 2. 把事務壓入棧中;public void start(Transaction transaction, boolean forked) {if (!m_stack.isEmpty()) {// Do NOT make strong reference from parent transaction to forked transaction.// Instead, we create a "soft" reference to forked transaction later, via linkAsRunAway()// By doing so, there is no need for synchronization between parent and child threads.// Both threads can complete() anytime despite the other thread.if (!(transaction instanceof ForkedTransaction)) {Transaction parent = m_stack.peek();addTransactionChild(transaction, parent);}} else {m_tree.setMessage(transaction);}if (!forked) {m_stack.push(transaction);}}
// 2.4.3. 上下文結構public Context(String domain, String hostName, String ipAddress) {m_tree = new DefaultMessageTree();        // 創建一個消息樹m_stack = new Stack<Transaction>();        // 存放棧信息
Thread thread = Thread.currentThread();String groupName = thread.getThreadGroup().getName();m_tree.setThreadGroupName(groupName);m_tree.setThreadId(String.valueOf(thread.getId()));m_tree.setThreadName(thread.getName());m_tree.setDomain(domain);m_tree.setHostName(hostName);m_tree.setIpAddress(ipAddress);m_length = 1;m_knownExceptions = new HashSet<Throwable>();}// DefaultModuleInitializer
@Overridepublic void execute(ModuleContext ctx, Module... modules) {Set<Module> all = new LinkedHashSet<Module>();info(ctx, "Initializing top level modules:");for (Module module : modules) {info(ctx, "   " + module.getClass().getName());}try {expandAll(ctx, modules, all);for (Module module : all) {if (!module.isInitialized()) {executeModule(ctx, module, m_index++);}}} catch (Exception e) {throw new RuntimeException("Error when initializing modules! Exception: " + e, e);}}
// 調用executeModule方法,初始化數據private synchronized void executeModule(ModuleContext ctx, Module module, int index) throws Exception {long start = System.currentTimeMillis();// set flat to avoid re-entrancemodule.setInitialized(true);info(ctx, index + " ------ " + module.getClass().getName());// execute itself after its dependencies
      module.initialize(ctx);long end = System.currentTimeMillis();info(ctx, index + " ------ " + module.getClass().getName() + " DONE in " + (end - start) + " ms.");}// cat初始化// this should be called during application initialization timepublic static void initialize(File configFile) {PlexusContainer container = ContainerLoader.getDefaultContainer();initialize(container, configFile);}public static void initialize(PlexusContainer container, File configFile) {ModuleContext ctx = new DefaultModuleContext(container);// 該方法會去 components.xml中查找 org.unidal.initialization.Module 的實現類,Module module = ctx.lookup(Module.class, CatClientModule.ID);if (!module.isInitialized()) {ModuleInitializer initializer = ctx.lookup(ModuleInitializer.class);ctx.setAttribute("cat-client-config-file", configFile);initializer.execute(ctx, module);}}  // components.xml 中配置的 Module, 加載入 CatClientModule?
<component><role>org.unidal.initialization.Module</role><role-hint>cat-client</role-hint><implementation>com.dianping.cat.CatClientModule</implementation></component>
// plexus.xml 中 配置日志輸出
<plexus><components><component><role>org.codehaus.plexus.logging.LoggerManager</role><implementation>org.unidal.lookup.logger.TimedConsoleLoggerManager</implementation><configuration><dateFormat>MM-dd HH:mm:ss.SSS</dateFormat><showClass>true</showClass><logFilePattern>cat_{0,date,yyyyMMdd}.log</logFilePattern><baseDirRef>CAT_HOME</baseDirRef><defaultBaseDir>/data/applogs/cat</defaultBaseDir></configuration></component></components> </plexus>
// logEvent 舉個例子,event處理過程
Cat.logEvent("RemoteLink", "Call", Message.SUCCESS, "Cat-0a010680-384736-2061");
// 進入方法public static void logEvent(String type, String name, String status, String nameValuePairs) {Cat.getProducer().logEvent(type, name, status, nameValuePairs);}// DefaultMessageProducer, logEvent
    @Overridepublic void logEvent(String type, String name, String status, String nameValuePairs) {Event event = newEvent(type, name);if (nameValuePairs != null && nameValuePairs.length() > 0) {event.addData(nameValuePairs);}event.setStatus(status);event.complete();}// DefaultEvent, complete 方法
    @Overridepublic void complete() {setCompleted(true);if (m_manager != null) {m_manager.add(this);}}// DefaultMessageManager, add方法,添加到上下文中
    @Overridepublic void add(Message message) {Context ctx = getContext();if (ctx != null) {ctx.add(message);}}// DefaultMessageManager, 最終添加方法public void add(Message message) {if (m_stack.isEmpty()) {MessageTree tree = m_tree.copy();tree.setMessage(message);flush(tree);} else {Transaction parent = m_stack.peek();addTransactionChild(message, parent);}}// DefaultMessageManager, 發送刷寫數據public void flush(MessageTree tree) {if (tree.getMessageId() == null) {tree.setMessageId(nextMessageId());}MessageSender sender = m_transportManager.getSender();if (sender != null && isMessageEnabled()) {sender.send(tree);reset();} else {m_throttleTimes++;if (m_throttleTimes % 10000 == 0 || m_throttleTimes == 1) {m_logger.info("Cat Message is throttled! Times:" + m_throttleTimes);}}}// TcpSocketSender, 發送數據// 先插入 BlockingQueue<MessageTree> m_queue 阻塞隊列中,如果插入失敗,則進行日志隊列檢查
    @Overridepublic void send(MessageTree tree) {if (isAtomicMessage(tree)) {boolean result = m_atomicTrees.offer(tree, m_manager.getSample());if (!result) {logQueueFullInfo(tree);}} else {boolean result = m_queue.offer(tree, m_manager.getSample());if (!result) {logQueueFullInfo(tree);}}}// 日志隊列檢查private void logQueueFullInfo(MessageTree tree) {if (m_statistics != null) {m_statistics.onOverflowed(tree);}int count = m_errors.incrementAndGet();if (count % 1000 == 0 || count == 1) {m_logger.error("Message queue is full in tcp socket sender! Count: " + count);}tree = null;}// 如果隊列不為空,則插入到上一節點之后private void addTransactionChild(Message message, Transaction transaction) {long treePeriod = trimToHour(m_tree.getMessage().getTimestamp());long messagePeriod = trimToHour(message.getTimestamp() - 10 * 1000L); // 10 seconds extra time allowedif (treePeriod < messagePeriod || m_length >= m_configManager.getMaxMessageLength()) {m_validator.truncateAndFlush(this, message.getTimestamp());}transaction.addChild(message);m_length++;}// DefaultTransaction, addChild, 添加子節點,完成添加操作
    @Overridepublic DefaultTransaction addChild(Message message) {if (m_children == null) {m_children = new ArrayList<Message>();}if (message != null) {m_children.add(message);} else {Cat.logError(new Exception("null child message"));}return this;}  // Transaction 的 complete 實現,最終的提交
trans.complete(); // 進入方法,如果已經結束,則認為是異常情況 @Overridepublic void complete() {try {if (isCompleted()) {// complete() was called more than onceDefaultEvent event = new DefaultEvent("cat", "BadInstrument");event.setStatus("TransactionAlreadyCompleted");event.complete();addChild(event);} else {m_durationInMicro = (System.nanoTime() - m_durationStart) / 1000L;setCompleted(true); // 防止下次再進入if (m_manager != null) {m_manager.end(this);}}} catch (Exception e) {// ignore }}// DefaultMessageManager, end 方法 @Overridepublic void end(Transaction transaction) {Context ctx = getContext();if (ctx != null && transaction.isStandalone()) {if (ctx.end(this, transaction)) {m_context.remove();}}}// DefaultMessageManager, end transaction 進行校驗public boolean end(DefaultMessageManager manager, Transaction transaction) {if (!m_stack.isEmpty()) {Transaction current = m_stack.pop();if (transaction == current) {m_validator.validate(m_stack.isEmpty() ? null : m_stack.peek(), current);} else {while (transaction != current && !m_stack.empty()) {m_validator.validate(m_stack.peek(), current);current = m_stack.pop();}}if (m_stack.isEmpty()) {MessageTree tree = m_tree.copy();m_tree.setMessageId(null);m_tree.setMessage(null);if (m_totalDurationInMicros > 0) {adjustForTruncatedTransaction((Transaction) tree.getMessage());}manager.flush(tree);return true;}}return false;}// 驗證事務的正確性,對嵌套的 transaction 進行驗證public void validate(Transaction parent, Transaction transaction) {if (transaction.isStandalone()) {List<Message> children = transaction.getChildren();int len = children.size();for (int i = 0; i < len; i++) {Message message = children.get(i);if (message instanceof Transaction) {validate(transaction, (Transaction) message);}}if (!transaction.isCompleted() && transaction instanceof DefaultTransaction) {// missing transaction end, log a BadInstrument event so that// developer can fix the code markAsNotCompleted((DefaultTransaction) transaction);}} else if (!transaction.isCompleted()) {if (transaction instanceof DefaultForkedTransaction) {// link it as run away message since the forked transaction is not completed yet linkAsRunAway((DefaultForkedTransaction) transaction);} else if (transaction instanceof DefaultTaggedTransaction) {// link it as run away message since the forked transaction is not completed yet markAsRunAway(parent, (DefaultTaggedTransaction) transaction);}}}// 適應事務時間段private void adjustForTruncatedTransaction(Transaction root) {DefaultEvent next = new DefaultEvent("TruncatedTransaction", "TotalDuration");long actualDurationInMicros = m_totalDurationInMicros + root.getDurationInMicros();next.addData(String.valueOf(actualDurationInMicros));next.setStatus(Message.SUCCESS);root.addChild(next);m_totalDurationInMicros = 0;} // 發送最后的數據public void flush(MessageTree tree) {if (tree.getMessageId() == null) {tree.setMessageId(nextMessageId());}MessageSender sender = m_transportManager.getSender();if (sender != null && isMessageEnabled()) {sender.send(tree);reset();} else {m_throttleTimes++;if (m_throttleTimes % 10000 == 0 || m_throttleTimes == 1) {m_logger.info("Cat Message is throttled! Times:" + m_throttleTimes);}}}// 可以記錄的前提是,所有條件均滿足 @Overridepublic boolean isMessageEnabled() {return m_domain != null && m_domain.isEnabled() && m_context.get() != null && m_configManager.isCatEnabled();} // 發送messageTree到 LinkedBlockingQueue<MessageTree> m_tree @Overridepublic void send(MessageTree tree) {if (isAtomicMessage(tree)) {boolean result = m_atomicTrees.offer(tree, m_manager.getSample());if (!result) {logQueueFullInfo(tree);}} else {boolean result = m_queue.offer(tree, m_manager.getSample());if (!result) {logQueueFullInfo(tree);}}} // 發送數據完成后,需要將原來的數據清空還原,以便下次可用 @Overridepublic void reset() {// destroy current thread local dataContext ctx = m_context.get();if (ctx != null) {if (ctx.m_totalDurationInMicros == 0) {ctx.m_stack.clear();ctx.m_knownExceptions.clear();m_context.remove();} else {ctx.m_knownExceptions.clear();}}}// 上下文的移除,其他鏈表結構各自移除 public void remove() {ThreadLocalMap m = getMap(Thread.currentThread());if (m != null)m.remove(this);}// 為保證上下文絕對移除,再次操作 @Overridepublic void end(Transaction transaction) {Context ctx = getContext();if (ctx != null && transaction.isStandalone()) {if (ctx.end(this, transaction)) {m_context.remove();}}}
// 寫入隊列后,由 TcpSocketSender 線程進行輪詢發送到cat后臺
@Overridepublic void run() {m_active = true;while (m_active) {ChannelFuture channel = m_manager.channel();if (channel != null && checkWritable(channel)) {try {MessageTree tree = m_queue.poll();if (tree != null) {sendInternal(tree);tree.setMessage(null);}} catch (Throwable t) {m_logger.error("Error when sending message over TCP socket!", t);}} else {long current = System.currentTimeMillis();long oldTimestamp = current - HOUR;while (true) {try {MessageTree tree = m_queue.peek();if (tree != null && tree.getMessage().getTimestamp() < oldTimestamp) {MessageTree discradTree = m_queue.poll();if (discradTree != null) {m_statistics.onOverflowed(discradTree);}} else {break;}} catch (Exception e) {m_logger.error(e.getMessage(), e);break;}}try {Thread.sleep(5);} catch (Exception e) {// ignore itm_active = false;}}}}
?
如此,整個cat埋點的過程就搞定了。關鍵技術就是:
1. ThreadLocal 用于保存上下文埋點,保證線程安全。
2.??LinkedBlockingQueue 用于保存消息樹,作為生產線程與消費線的溝通橋梁!
3.?AtomicInteger 用于計數,保證準確性。
4. 心跳線和用于發送本機的狀態到cat后臺。
5. 懶加載,單例模式的使用。
等等,來個圖:?
?
轉載于:https://www.cnblogs.com/yougewe/p/9479638.html
總結
以上是生活随笔為你收集整理的应用监控CAT之cat-client源码阅读(一)的全部內容,希望文章能夠幫你解決所遇到的問題。
                            
                        - 上一篇: 《感鹤》第九句是什么
 - 下一篇: 水箱多少钱一个啊?