聊聊storm的LoggingMetricsConsumer
生活随笔
收集整理的這篇文章主要介紹了
聊聊storm的LoggingMetricsConsumer
小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.
序
本文主要研究一下storm的LoggingMetricsConsumer
LoggingMetricsConsumer
storm-2.0.0/storm-client/src/jvm/org/apache/storm/metric/LoggingMetricsConsumer.java
public class LoggingMetricsConsumer implements IMetricsConsumer {public static final Logger LOG = LoggerFactory.getLogger(LoggingMetricsConsumer.class);static private String padding = " ";@Overridepublic void prepare(Map<String, Object> topoConf, Object registrationArgument, TopologyContext context, IErrorReporter errorReporter) {}@Overridepublic void handleDataPoints(TaskInfo taskInfo, Collection<DataPoint> dataPoints) {StringBuilder sb = new StringBuilder();String header = String.format("%d\t%15s:%-4d\t%3d:%-11s\t",taskInfo.timestamp,taskInfo.srcWorkerHost, taskInfo.srcWorkerPort,taskInfo.srcTaskId,taskInfo.srcComponentId);sb.append(header);for (DataPoint p : dataPoints) {sb.delete(header.length(), sb.length());sb.append(p.name).append(padding).delete(header.length() + 23, sb.length()).append("\t").append(p.value);LOG.info(sb.toString());}}@Overridepublic void cleanup() {} }- LoggingMetricsConsumer實現(xiàn)了IMetricsConsumer接口,在handleDataPoints方法將taskInfo及dataPoints打印到log;具體打印到哪個log呢,這個需要看storm的log4j2的配置
log4j2/worker.xml
<?xml version="1.0" encoding="UTF-8"?> <!--Licensed to the Apache Software Foundation (ASF) under one or morecontributor license agreements. See the NOTICE file distributed withthis work for additional information regarding copyright ownership.The ASF licenses this file to You under the Apache License, Version 2.0(the "License"); you may not use this file except in compliance withthe License. You may obtain a copy of the License athttp://www.apache.org/licenses/LICENSE-2.0Unless required by applicable law or agreed to in writing, softwaredistributed under the License is distributed on an "AS IS" BASIS,WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.See the License for the specific language governing permissions andlimitations under the License. --><configuration monitorInterval="60" shutdownHook="disable"> <properties><property name="pattern">%d{yyyy-MM-dd HH:mm:ss.SSS} %c{1.} %t [%p] %msg%n</property><property name="patternNoTime">%msg%n</property><property name="patternMetrics">%d %-8r %m%n</property> </properties> <appenders><RollingFile name="A1"fileName="${sys:workers.artifacts}/${sys:storm.id}/${sys:worker.port}/${sys:logfile.name}"filePattern="${sys:workers.artifacts}/${sys:storm.id}/${sys:worker.port}/${sys:logfile.name}.%i.gz"><PatternLayout><pattern>${pattern}</pattern></PatternLayout><Policies><SizeBasedTriggeringPolicy size="100 MB"/> <!-- Or every 100 MB --></Policies><DefaultRolloverStrategy max="9"/></RollingFile><RollingFile name="STDOUT"fileName="${sys:workers.artifacts}/${sys:storm.id}/${sys:worker.port}/${sys:logfile.name}.out"filePattern="${sys:workers.artifacts}/${sys:storm.id}/${sys:worker.port}/${sys:logfile.name}.out.%i.gz"><PatternLayout><pattern>${patternNoTime}</pattern></PatternLayout><Policies><SizeBasedTriggeringPolicy size="100 MB"/> <!-- Or every 100 MB --></Policies><DefaultRolloverStrategy max="4"/></RollingFile><RollingFile name="STDERR"fileName="${sys:workers.artifacts}/${sys:storm.id}/${sys:worker.port}/${sys:logfile.name}.err"filePattern="${sys:workers.artifacts}/${sys:storm.id}/${sys:worker.port}/${sys:logfile.name}.err.%i.gz"><PatternLayout><pattern>${patternNoTime}</pattern></PatternLayout><Policies><SizeBasedTriggeringPolicy size="100 MB"/> <!-- Or every 100 MB --></Policies><DefaultRolloverStrategy max="4"/></RollingFile><RollingFile name="METRICS"fileName="${sys:workers.artifacts}/${sys:storm.id}/${sys:worker.port}/${sys:logfile.name}.metrics"filePattern="${sys:workers.artifacts}/${sys:storm.id}/${sys:worker.port}/${sys:logfile.name}.metrics.%i.gz"><PatternLayout><pattern>${patternMetrics}</pattern></PatternLayout><Policies><SizeBasedTriggeringPolicy size="2 MB"/></Policies><DefaultRolloverStrategy max="9"/></RollingFile><Syslog name="syslog" format="RFC5424" charset="UTF-8" host="localhost" port="514"protocol="UDP" appName="[${sys:storm.id}:${sys:worker.port}]" mdcId="mdc" includeMDC="true"facility="LOCAL5" enterpriseNumber="18060" newLine="true" exceptionPattern="%rEx{full}"messageId="[${sys:user.name}:${sys:logging.sensitivity}]" id="storm" immediateFail="true" immediateFlush="true"/> </appenders> <loggers><root level="info"> <!-- We log everything --><appender-ref ref="A1"/><appender-ref ref="syslog"/></root><Logger name="org.apache.storm.metric.LoggingMetricsConsumer" level="info" additivity="false"><appender-ref ref="METRICS"/></Logger><Logger name="STDERR" level="INFO"><appender-ref ref="STDERR"/><appender-ref ref="syslog"/></Logger><Logger name="STDOUT" level="INFO"><appender-ref ref="STDOUT"/><appender-ref ref="syslog"/></Logger> </loggers> </configuration>- 以worker.xml為例,這里對name為org.apache.storm.metric.LoggingMetricsConsumer的logger配置了info級別的輸出,additivity為false
- METRICS的appender指定了文件名為${sys:workers.artifacts}/${sys:storm.id}/${sys:worker.port}/${sys:logfile.name}.metrics,比如workers-artifacts/tickDemo-1-1541070680/6700/worker.log.metrics
- METRCIS配置的是RollingFile,SizeBasedTriggeringPolicy的大小為2MB
配置
topology配置
conf.registerMetricsConsumer(org.apache.storm.metric.LoggingMetricsConsumer.class, 1);- 可以在topology提交的時候,在conf注冊LoggingMetricsConsumer;這種配置只有該topology的worker生效,即有指標數(shù)據(jù)的話,會寫入topology的worker.log.metrics文件
storm.yaml配置
topology.metrics.consumer.register:- class: "org.apache.storm.metric.LoggingMetricsConsumer"max.retain.metric.tuples: 100parallelism.hint: 1- class: "org.apache.storm.metric.HttpForwardingMetricsConsumer"parallelism.hint: 1argument: "http://example.com:8080/metrics/my-topology/"- storm.yaml配置是作用于所有的topology,注意這里配置的是topology.metrics.consumer.register,是topology級別的,數(shù)據(jù)是寫入worker.log.metrics文件
- 如果是cluster級別的話,配置的是storm.cluster.metrics.consumer.register,而且只能使用storm.yaml的配置方式,開啟這個的話,有指標數(shù)據(jù)會寫入nimbus.log.metrics以及supervisor.log.metrics文件
- 啟動nimbus以及supervisor采用的log4j配置參數(shù)為-Dlog4j.configurationFile=/apache-storm/log4j2/cluster.xml;而啟動woker采用的log4j配置參數(shù)為-Dlog4j.configurationFile=/apache-storm/log4j2/worker.xml;各個組件的-Dlogfile.name參數(shù)分別為nimbus.log、supervisor.log、worker.log
MetricsConsumerBolt
storm-2.0.0/storm-client/src/jvm/org/apache/storm/metric/MetricsConsumerBolt.java
public class MetricsConsumerBolt implements IBolt {public static final Logger LOG = LoggerFactory.getLogger(MetricsConsumerBolt.class);private final int _maxRetainMetricTuples;private final Predicate<IMetricsConsumer.DataPoint> _filterPredicate;private final DataPointExpander _expander;private final BlockingQueue<MetricsTask> _taskQueue;IMetricsConsumer _metricsConsumer;String _consumerClassName;OutputCollector _collector;Object _registrationArgument;private Thread _taskExecuteThread;private volatile boolean _running = true;public MetricsConsumerBolt(String consumerClassName, Object registrationArgument, int maxRetainMetricTuples,Predicate<IMetricsConsumer.DataPoint> filterPredicate, DataPointExpander expander) {_consumerClassName = consumerClassName;_registrationArgument = registrationArgument;_maxRetainMetricTuples = maxRetainMetricTuples;_filterPredicate = filterPredicate;_expander = expander;if (_maxRetainMetricTuples > 0) {_taskQueue = new LinkedBlockingDeque<>(_maxRetainMetricTuples);} else {_taskQueue = new LinkedBlockingDeque<>();}}@Overridepublic void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {try {_metricsConsumer = (IMetricsConsumer) Class.forName(_consumerClassName).newInstance();} catch (Exception e) {throw new RuntimeException("Could not instantiate a class listed in config under section " +Config.TOPOLOGY_METRICS_CONSUMER_REGISTER + " with fully qualified name " + _consumerClassName, e);}_metricsConsumer.prepare(topoConf, _registrationArgument, context, collector);_collector = collector;_taskExecuteThread = new Thread(new MetricsHandlerRunnable());_taskExecuteThread.setDaemon(true);_taskExecuteThread.start();}@Overridepublic void execute(Tuple input) {IMetricsConsumer.TaskInfo taskInfo = (IMetricsConsumer.TaskInfo) input.getValue(0);Collection<IMetricsConsumer.DataPoint> dataPoints = (Collection) input.getValue(1);Collection<IMetricsConsumer.DataPoint> expandedDataPoints = _expander.expandDataPoints(dataPoints);List<IMetricsConsumer.DataPoint> filteredDataPoints = getFilteredDataPoints(expandedDataPoints);MetricsTask metricsTask = new MetricsTask(taskInfo, filteredDataPoints);while (!_taskQueue.offer(metricsTask)) {_taskQueue.poll();}_collector.ack(input);}private List<IMetricsConsumer.DataPoint> getFilteredDataPoints(Collection<IMetricsConsumer.DataPoint> dataPoints) {return Lists.newArrayList(Iterables.filter(dataPoints, _filterPredicate));}@Overridepublic void cleanup() {_running = false;_metricsConsumer.cleanup();_taskExecuteThread.interrupt();}static class MetricsTask {private IMetricsConsumer.TaskInfo taskInfo;private Collection<IMetricsConsumer.DataPoint> dataPoints;public MetricsTask(IMetricsConsumer.TaskInfo taskInfo, Collection<IMetricsConsumer.DataPoint> dataPoints) {this.taskInfo = taskInfo;this.dataPoints = dataPoints;}public IMetricsConsumer.TaskInfo getTaskInfo() {return taskInfo;}public Collection<IMetricsConsumer.DataPoint> getDataPoints() {return dataPoints;}}class MetricsHandlerRunnable implements Runnable {@Overridepublic void run() {while (_running) {try {MetricsTask task = _taskQueue.take();_metricsConsumer.handleDataPoints(task.getTaskInfo(), task.getDataPoints());} catch (InterruptedException e) {break;} catch (Throwable t) {LOG.error("Exception occurred during handle metrics", t);}}}}}- MetricsConsumerBolt在構造器里頭創(chuàng)建了_taskQueue,如果_maxRetainMetricTuples大于0,則創(chuàng)建的是有界隊列,否則創(chuàng)建的是無界隊列;讀取的是topology.metrics.consumer.register下面的max.retain.metric.tuples值,讀取不到默認為100
- MetricsConsumerBolt在prepare的時候啟動了MetricsHandlerRunnable線程,該線程從_taskQueue取出MetricsTask,然后調(diào)用_metricsConsumer.handleDataPoints(task.getTaskInfo(), task.getDataPoints());
- MetricsConsumerBolt的execute方法,在接收到tuple的時候,就會往_taskQueue添加數(shù)據(jù),如果添加不進去,則poll掉一個再添加
StormCommon.systemTopologyImpl
storm-2.0.0/storm-client/src/jvm/org/apache/storm/daemon/StormCommon.java
protected StormTopology systemTopologyImpl(Map<String, Object> topoConf, StormTopology topology) throws InvalidTopologyException {validateBasic(topology);StormTopology ret = topology.deepCopy();addAcker(topoConf, ret);if (hasEventLoggers(topoConf)) {addEventLogger(topoConf, ret);}addMetricComponents(topoConf, ret);addSystemComponents(topoConf, ret);addMetricStreams(ret);addSystemStreams(ret);validateStructure(ret);return ret;}public static void addMetricComponents(Map<String, Object> conf, StormTopology topology) {Map<String, Bolt> metricsConsumerBolts = metricsConsumerBoltSpecs(conf, topology);for (Map.Entry<String, Bolt> entry : metricsConsumerBolts.entrySet()) {topology.put_to_bolts(entry.getKey(), entry.getValue());}}public static void addMetricStreams(StormTopology topology) {for (Object component : allComponents(topology).values()) {ComponentCommon common = getComponentCommon(component);StreamInfo streamInfo = Thrift.outputFields(Arrays.asList("task-info", "data-points"));common.put_to_streams(Constants.METRICS_STREAM_ID, streamInfo);}}public static Map<String, Bolt> metricsConsumerBoltSpecs(Map<String, Object> conf, StormTopology topology) {Map<String, Bolt> metricsConsumerBolts = new HashMap<>();Set<String> componentIdsEmitMetrics = new HashSet<>();componentIdsEmitMetrics.addAll(allComponents(topology).keySet());componentIdsEmitMetrics.add(Constants.SYSTEM_COMPONENT_ID);Map<GlobalStreamId, Grouping> inputs = new HashMap<>();for (String componentId : componentIdsEmitMetrics) {inputs.put(Utils.getGlobalStreamId(componentId, Constants.METRICS_STREAM_ID), Thrift.prepareShuffleGrouping());}List<Map<String, Object>> registerInfo = (List<Map<String, Object>>) conf.get(Config.TOPOLOGY_METRICS_CONSUMER_REGISTER);if (registerInfo != null) {Map<String, Integer> classOccurrencesMap = new HashMap<String, Integer>();for (Map<String, Object> info : registerInfo) {String className = (String) info.get(TOPOLOGY_METRICS_CONSUMER_CLASS);Object argument = info.get(TOPOLOGY_METRICS_CONSUMER_ARGUMENT);Integer maxRetainMetricTuples = ObjectReader.getInt(info.get(TOPOLOGY_METRICS_CONSUMER_MAX_RETAIN_METRIC_TUPLES), 100);Integer phintNum = ObjectReader.getInt(info.get(TOPOLOGY_METRICS_CONSUMER_PARALLELISM_HINT), 1);Map<String, Object> metricsConsumerConf = new HashMap<String, Object>();metricsConsumerConf.put(Config.TOPOLOGY_TASKS, phintNum);List<String> whitelist = (List<String>) info.get(TOPOLOGY_METRICS_CONSUMER_WHITELIST);List<String> blacklist = (List<String>) info.get(TOPOLOGY_METRICS_CONSUMER_BLACKLIST);FilterByMetricName filterPredicate = new FilterByMetricName(whitelist, blacklist);Boolean expandMapType = ObjectReader.getBoolean(info.get(TOPOLOGY_METRICS_CONSUMER_EXPAND_MAP_TYPE), false);String metricNameSeparator = ObjectReader.getString(info.get(TOPOLOGY_METRICS_CONSUMER_METRIC_NAME_SEPARATOR), ".");DataPointExpander expander = new DataPointExpander(expandMapType, metricNameSeparator);MetricsConsumerBolt boltInstance = new MetricsConsumerBolt(className, argument,maxRetainMetricTuples, filterPredicate, expander);Bolt metricsConsumerBolt = Thrift.prepareSerializedBoltDetails(inputs,boltInstance, null, phintNum, metricsConsumerConf);String id = className;if (classOccurrencesMap.containsKey(className)) {// e.g. [\"a\", \"b\", \"a\"]) => [\"a\", \"b\", \"a#2\"]"int occurrenceNum = classOccurrencesMap.get(className);occurrenceNum++;classOccurrencesMap.put(className, occurrenceNum);id = Constants.METRICS_COMPONENT_ID_PREFIX + className + "#" + occurrenceNum;} else {id = Constants.METRICS_COMPONENT_ID_PREFIX + className;classOccurrencesMap.put(className, 1);}metricsConsumerBolts.put(id, metricsConsumerBolt);}}return metricsConsumerBolts;}- StormCommon在創(chuàng)建systemTopologyImpl的時候,會添加添加一些系統(tǒng)的components,這里就調(diào)用了addMetricComponents以及addMetricStreams
- addMetricComponents根據(jù)conf創(chuàng)建MetricsConsumerBolt,并使用shuffle以及Constants.METRICS_STREAM_ID指定所有的component為輸入源
- addMetricStreams給每個component配置了輸出數(shù)據(jù)到Constants.METRICS_STREAM_ID,且輸出的字段為Arrays.asList("task-info", "data-points")
Executor.setupMetrics
storm-2.0.0/storm-client/src/jvm/org/apache/storm/executor/Executor.java
protected final Map<Integer, Map<Integer, Map<String, IMetric>>> intervalToTaskToMetricToRegistry;protected void setupMetrics() {for (final Integer interval : intervalToTaskToMetricToRegistry.keySet()) {StormTimer timerTask = workerData.getUserTimer();timerTask.scheduleRecurring(interval, interval,() -> {TupleImpl tuple =new TupleImpl(workerTopologyContext, new Values(interval), Constants.SYSTEM_COMPONENT_ID,(int) Constants.SYSTEM_TASK_ID, Constants.METRICS_TICK_STREAM_ID);AddressedTuple metricsTickTuple = new AddressedTuple(AddressedTuple.BROADCAST_DEST, tuple);try {receiveQueue.publish(metricsTickTuple);receiveQueue.flush(); // avoid buffering} catch (InterruptedException e) {LOG.warn("Thread interrupted when publishing metrics. Setting interrupt flag.");Thread.currentThread().interrupt();return;}});}}public Map<Integer, Map<Integer, Map<String, IMetric>>> getIntervalToTaskToMetricToRegistry() {return intervalToTaskToMetricToRegistry;}- Executor在setupMetrics方法里頭,設置了定時任務,采用BROADCAST_DEST的方式定時向METRICS_TICK_STREAM_ID發(fā)射metricsTickTuple
- 這里是依據(jù)intervalToTaskToMetricToRegistry來配置的,其key為interval
- intervalToTaskToMetricToRegistry在Executor構造器中初始化:intervalToTaskToMetricToRegistry = new HashMap<>()
Task.mkTopologyContext
storm-2.0.0/storm-client/src/jvm/org/apache/storm/daemon/Task.java
private TopologyContext mkTopologyContext(StormTopology topology) throws IOException {Map<String, Object> conf = workerData.getConf();return new TopologyContext(topology,workerData.getTopologyConf(),workerData.getTaskToComponent(),workerData.getComponentToSortedTasks(),workerData.getComponentToStreamToFields(),// This is updated by the Worker and the topology has shared access to itworkerData.getBlobToLastKnownVersion(),workerData.getTopologyId(),ConfigUtils.supervisorStormResourcesPath(ConfigUtils.supervisorStormDistRoot(conf, workerData.getTopologyId())),ConfigUtils.workerPidsRoot(conf, workerData.getWorkerId()),taskId,workerData.getPort(), workerData.getLocalTaskIds(),workerData.getDefaultSharedResources(),workerData.getUserSharedResources(),executor.getSharedExecutorData(),executor.getIntervalToTaskToMetricToRegistry(),executor.getOpenOrPrepareWasCalled());}- mkTopologyContext方法在創(chuàng)建TopologyContext的時候,傳遞進去了executor.getIntervalToTaskToMetricToRegistry()
TopologyContext
storm-2.0.0/storm-client/src/jvm/org/apache/storm/task/TopologyContext.java
public class TopologyContext extends WorkerTopologyContext implements IMetricsContext {private Integer _taskId;private Map<String, Object> _taskData = new HashMap<>();private List<ITaskHook> _hooks = new ArrayList<>();private Map<String, Object> _executorData;private Map<Integer, Map<Integer, Map<String, IMetric>>> _registeredMetrics;public <T extends IMetric> T registerMetric(String name, T metric, int timeBucketSizeInSecs) {if (_openOrPrepareWasCalled.get()) {throw new RuntimeException("TopologyContext.registerMetric can only be called from within overridden " +"IBolt::prepare() or ISpout::open() method.");}if (metric == null) {throw new IllegalArgumentException("Cannot register a null metric");}if (timeBucketSizeInSecs <= 0) {throw new IllegalArgumentException("TopologyContext.registerMetric can only be called with timeBucketSizeInSecs " +"greater than or equal to 1 second.");}if (getRegisteredMetricByName(name) != null) {throw new RuntimeException("The same metric name `" + name + "` was registered twice.");}Map<Integer, Map<Integer, Map<String, IMetric>>> m1 = _registeredMetrics;if (!m1.containsKey(timeBucketSizeInSecs)) {m1.put(timeBucketSizeInSecs, new HashMap<Integer, Map<String, IMetric>>());}Map<Integer, Map<String, IMetric>> m2 = m1.get(timeBucketSizeInSecs);if (!m2.containsKey(_taskId)) {m2.put(_taskId, new HashMap<String, IMetric>());}Map<String, IMetric> m3 = m2.get(_taskId);if (m3.containsKey(name)) {throw new RuntimeException("The same metric name `" + name + "` was registered twice.");} else {m3.put(name, metric);}return metric;}//...... }- Executor的intervalToTaskToMetricToRegistry最后傳遞給了TopologyContext的_registeredMetrics
- registerMetric方法會往_registeredMetrics添加值,其key為timeBucketSizeInSecs
- 內(nèi)置metrics的timeBucketSizeInSecs讀取的是Config.TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS(topology.builtin.metrics.bucket.size.secs)值,在defaults.yaml中默認為60,即Executor每隔60秒發(fā)射一次metricsTickTuple,其streamId為Constants.METRICS_TICK_STREAM_ID
Executor.metricsTick
storm-2.0.0/storm-client/src/jvm/org/apache/storm/executor/Executor.java
public void metricsTick(Task task, TupleImpl tuple) {try {Integer interval = tuple.getInteger(0);int taskId = task.getTaskId();Map<Integer, Map<String, IMetric>> taskToMetricToRegistry = intervalToTaskToMetricToRegistry.get(interval);Map<String, IMetric> nameToRegistry = null;if (taskToMetricToRegistry != null) {nameToRegistry = taskToMetricToRegistry.get(taskId);}if (nameToRegistry != null) {IMetricsConsumer.TaskInfo taskInfo = new IMetricsConsumer.TaskInfo(hostname, workerTopologyContext.getThisWorkerPort(),componentId, taskId, Time.currentTimeSecs(), interval);List<IMetricsConsumer.DataPoint> dataPoints = new ArrayList<>();for (Map.Entry<String, IMetric> entry : nameToRegistry.entrySet()) {IMetric metric = entry.getValue();Object value = metric.getValueAndReset();if (value != null) {IMetricsConsumer.DataPoint dataPoint = new IMetricsConsumer.DataPoint(entry.getKey(), value);dataPoints.add(dataPoint);}}if (!dataPoints.isEmpty()) {task.sendUnanchored(Constants.METRICS_STREAM_ID,new Values(taskInfo, dataPoints), executorTransfer, pendingEmits);executorTransfer.flush();}}} catch (Exception e) {throw Utils.wrapInRuntime(e);}}- SpoutExecutor以及BoltExecutor在tupleActionFn中接收到streamId為Constants.METRICS_TICK_STREAM_ID的tuple的時候,會調(diào)用父類Executor.metricsTick方法
- metricsTick采用task.sendUnanchored(Constants.METRICS_STREAM_ID, new Values(taskInfo, dataPoints), executorTransfer, pendingEmits);發(fā)射數(shù)據(jù),發(fā)射到Constants.METRICS_STREAM_ID中,values為taskInfo及dataPoints;dataPoints的數(shù)據(jù)從TopologyContext的_registeredMetrics中讀取(這個使用的是舊版的metrics,非V2版本)
- MetricsConsumerBolt接收到數(shù)據(jù)之后,就是放入_taskQueue隊列;與此同時MetricsHandlerRunnable線程會阻塞從_taskQueue中取數(shù)據(jù),然后回調(diào)_metricsConsumer.handleDataPoints方法來消費數(shù)據(jù)
小結
- LoggingMetricsConsumer是storm metric提供的,metrics2中沒有;nimbus及supervisor使用的是-Dlog4j.configurationFile=/apache-storm/log4j2/cluster.xml;worker使用的是-Dlog4j.configurationFile=/apache-storm/log4j2/worker.xml;各個組件的-Dlogfile.name分別為nimbus.log、supervisor.log、worker.log
- storm在構建topology的時候會添加系統(tǒng)的component,其中就包括添加metricsConsumerBolt以及metricStreams;同時Executor在init方法中會setupMetrics,定時發(fā)射metricsTickTuple;SpoutExecutor以及BoltExecutor在tupleActionFn接收到metricsTickTuple的時候,會調(diào)用metricsTick方法來生產(chǎn)數(shù)據(jù)發(fā)射到Constants.METRICS_STREAM_ID中,之后MetricsConsumerBolt就可以接收數(shù)據(jù),然后回調(diào)_metricsConsumer.handleDataPoints方法來消費數(shù)據(jù)
- 這里要注意兩個參數(shù),一個是MetricsConsumerBolt里頭用到的max.retain.metric.tuples,它是配置在topology.metrics.consumer.register下面的,如果沒有配置默認為100;它是MetricsConsumerBolt里頭_taskQueue隊列的大小,如果設置為0,則表示無界;內(nèi)置metric的interval讀取的是Config.TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS(topology.builtin.metrics.bucket.size.secs)參數(shù),默認為60,即60秒發(fā)射一次metricsTickTuple
doc
- Storm Metrics
- New Metrics Reporting API
總結
以上是生活随笔為你收集整理的聊聊storm的LoggingMetricsConsumer的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 【翻译】CodeMix使用教程(三):E
- 下一篇: 三维的对象表示---OpenGL二次曲面