Windbg调优Kafka.Client内存泄露
??? 從來沒寫過Blog,想想也是,工作十多年了,搞過N多的架構、技術,不與大家分享實在是可惜了。另外,從傳統地ERP行業轉到互聯網,也遇到了很所前所未有的問題,原來知道有一些坑,但是不知道坑太多太深。借著填坑的機會,把過程Log下來。
??? 言歸正傳,先說說背景吧。Teld的業務平臺中存在大量的物聯網終端傳感數據和車輛運行數據,這些數據中蘊含著大量的財富。So,要存儲。Teld的充電終端還是很NB的,現在已經有2W+,而且每隔30S上報一次數據,當然單條數據量不會很大。這才是開始,按照國家規劃,到2020年,我們要到百萬級別了。擦,說的太遠了!換算了一下,僅充電終端上報數據的TPS要求還是挺高的。通過2個月的研究和技術選型,我們選用Kafka作為海量數據處理的應用中間件。
??? 好吧!選了Kafka,開始填坑吧。由于我們采用了.net技術路線,Kafka Client也必須是.net的。…(此處省略1萬字),Kafka環境順利調試成功,但是基于Kafka.Client編寫的Consumer程序卻出現嚴重的內存泄露。
??????
??? Consumer程序需長時間運行,上圖僅僅運行了2個小時后的內存就達到了570M。果斷抓Dump,Windbg分析。
??? 啟動Windbg,設置符號文件,加載Dump。
??? 執行下面命令:
??????? .loadby sos clr? (說明:程序是4.0的,2.0請問度娘)。
??????? !dumpheap –stat (說明:按照類型顯示堆中的對象數量和內存占用大小)
??? 執行結果:
00007ff947e2f2e8? 1215019???? 29160456 Kafka.Client.Common.NoOpTimer
00007ff947e2f1a8? 1215019???? 29160456 Kafka.Client.Metrics.KafkaTimer
00007ff947e39600? 1215018???? 38880576 Kafka.Client.Consumers.FetchRequestAndResponseMetrics
00007ff947e2df70? 1215018 ??? 38880576 Kafka.Client.Common.ClientIdAndBroker
00007ff947e3a058? 1215007???? 58320336 System.Collections.Concurrent.ConcurrentDictionary`2+Node[[Kafka.Client.Common.ClientIdAndBroker, Kafka.Client],[Kafka.Client.Consumers.FetchRequestAndResponseMetrics, Kafka.Client]]
00007ff9a5cc3d60? 1267853???? 86313134 System.String
??? 通過執行結果可以看到,NoOpTimer、KafkaTimer、TetchRequestAndResponseMetrics、ConcurrentDictionary對象每類都有120w+,占用內存近200M。好吧,好像是這幾個家伙的原因,矛頭直指Kafka.Client。選取NoOpTimer,先看看gcroot情況吧,繼續!
??? 執行命令:(對象太多了,命令運行一會,break吧。)
!dumpheap -mt 00007ff947e2f3b0?
??? 執行結果:
0000021a972f8538 00007ff947e2f3b0?????? 24????
0000021a972f86e0 00007ff947e2f3b0?????? 24????
0000021a972f8828 00007ff947e2f3b0?????? 24????
0000021a972f89b8 00007ff947e2f3b0?????? 24????
0000021a972f8b10 00007ff947e2f3b0?????? 24????
??? 執行結果的第一列為NoOpTimer對象的地址。查看gcroot情況。
??? 執行命令:
!gcroot 0000021a972f8538
??? 執行結果:
0000021ae58965a8 Teld.Core.Log.Processor.ProcessService
?????????? ->? 0000021ae58966a8 System.Collections.Generic.List`1[[Teld.Core.Log.Processor.LogListener, Teld.Core.Log.Processor]]
?????????? ->? 0000021ae5898068 Teld.Core.Log.Processor.LogListener[]
?????????? ->? 0000021ae5896b78 Teld.Core.Log.Processor.LogListener
?????????? ->? 0000021ae5896bb8 Teld.Core.Log.Processor.KafkaConsumer
?????????? ->? 0000021a8ae4a2f8 Kafka.Client.Consumers.ZookeeperConsumerConnector
?????????? ->? 0000021a94f6c0e8 Kafka.Client.Consumers.ConsumerFetcherManager
?????????? ->? 0000021a94f6c1f0 System.Collections.Generic.Dictionary`2[[Kafka.Client.Server.BrokerAndFetcherId, Kafka.Client],[Kafka.Client.Server.AbstractFetcherThread, Kafka.Client]]
?????????? ->? 0000021a958fc328 System.Collections.Generic.Dictionary`2+Entry[[Kafka.Client.Server.BrokerAndFetcherId, Kafka.Client],[Kafka.Client.Server.AbstractFetcherThread, Kafka.Client]][]
?????????? ->? 0000021a962decf0 Kafka.Client.Consumers.ConsumerFetcherThread
?????????? ->? 0000021a962df050 Kafka.Client.Consumers.SimpleConsumer
?????????? ->? 0000021ae58f6348 Kafka.Client.Consumers.FetchRequestAndResponseStats
?????????? ->? 0000021ae58f6378 Kafka.Client.Utils.Pool`2[[Kafka.Client.Common.ClientIdAndBroker, Kafka.Client],[Kafka.Client.Consumers.FetchRequestAndResponseMetrics, Kafka.Client]]
?????????? ->? 0000021a8d531598 System.Collections.Concurrent.ConcurrentDictionary`2+Tables[[Kafka.Client.Common.ClientIdAndBroker, Kafka.Client],[Kafka.Client.Consumers.FetchRequestAndResponseMetrics, Kafka.Client]]
?????????? ->? 0000021af58130c8 System.Collections.Concurrent.ConcurrentDictionary`2+Node[[Kafka.Client.Common.ClientIdAndBroker, Kafka.Client],[Kafka.Client.Consumers.FetchRequestAndResponseMetrics, Kafka.Client]][]
?????????? ->? 0000021a972f8550 System.Collections.Concurrent.ConcurrentDictionary`2+Node[[Kafka.Client.Common.ClientIdAndBroker, Kafka.Client],[Kafka.Client.Consumers.FetchRequestAndResponseMetrics, Kafka.Client]]
?????????? ->? 0000021a972f84e8 Kafka.Client.Consumers.FetchRequestAndResponseMetrics
??? 通過執行結果可以看到,NoOpTimer對象被FetchRequestAndResponseMetric所持有,而FetchRequestAndResponseMetric好像被緩存到ConcurrentDictionary中了。ConcurrentDictionary這一坨看著這么熟悉呢,fuck!剛才!dumpheap –stat的結果里面有它!那就再分析ConCurrentDictionary類型看看吧。繼續!
??? 執行命令:(00007ff947e3a058 是第一次!dumpheap –stat 執行結果中的ConcurrentDictionary類型第一列的值(MT)。)
!dumpheap -mt 00007ff947e3a058
??? 執行結果:(隨機截取一段)
0000021aefcd5a90 00007ff947e3a058?????? 48????
0000021aefcd5c20 00007ff947e3a058?????? 48????
0000021aefcd5d60 00007ff947e3a058?????? 48????
0000021aefcd5ef0 00007ff947e3a058??????? 48????
0000021aefcd6030 00007ff947e3a058?????? 48????
0000021aefcd65e8 00007ff947e3a058?????? 48????
0000021aefcd6790 00007ff947e3a058?????? 48????
0000021aefcd68d8 00007ff947e3a058?????? 48????
0000021aefcd6a68 00007ff947e3a058?????? 48??
??? 隨機選取一個,繼續查看gcroot情況。
??? 執行命令:
!gcroot 0000021aefcd6a68
??? 執行結果:
0000021ae58965a8 Teld.Core.Log.Processor.ProcessService
??????????? ->? 0000021ae58966a8 System.Collections.Generic.List`1[[Teld.Core.Log.Processor.LogListener, Teld.Core.Log.Processor]]
??????????? ->? 0000021ae5898068 Teld.Core.Log.Processor.LogListener[]
??????????? ->? 0000021ae58970a8 Teld.Core.Log.Processor.LogListener
??????????? ->? 0000021ae58970e8 Teld.Core.Log.Processor.KafkaConsumer
??????????? ->? 0000021a8cedba08 Kafka.Client.Consumers.ZookeeperConsumerConnector
??????????? ->? 0000021a94f56710 Kafka.Client.Consumers.ConsumerFetcherManager
??????????? ->? 0000021a94f56818 System.Collections.Generic.Dictionary`2[[Kafka.Client.Server.BrokerAndFetcherId, Kafka.Client],[Kafka.Client.Server.AbstractFetcherThread, Kafka.Client]]
??????????? ->? 0000021a94f5bd20 System.Collections.Generic.Dictionary`2+Entry[[Kafka.Client.Server.BrokerAndFetcherId, Kafka.Client],[Kafka.Client.Server.AbstractFetcherThread, Kafka.Client]][]
??????????? ->? 0000021a962e5e80 Kafka.Client.Consumers.ConsumerFetcherThread
??????????? ->? 0000021a962e61e0 Kafka.Client.Consumers.SimpleConsumer
??????????? ->? 0000021ae58f60e8 Kafka.Client.Consumers.FetchRequestAndResponseStats
??????????? ->? 0000021ae58f6118 Kafka.Client.Utils.Pool`2[[Kafka.Client.Common.ClientIdAndBroker, Kafka.Client],[Kafka.Client.Consumers.FetchRequestAndResponseMetrics, Kafka.Client]]
??????????? ->? 0000021a89deda70 System.Collections.Concurrent.ConcurrentDictionary`2+Tables[[Kafka.Client.Common.ClientIdAndBroker, Kafka.Client],[Kafka.Client.Consumers.FetchRequestAndResponseMetrics, Kafka.Client]]
??????????? ->? 0000021af5a43128 System.Collections.Concurrent.ConcurrentDictionary`2+Node[[Kafka.Client.Common.ClientIdAndBroker, Kafka.Client],[Kafka.Client.Consumers.FetchRequestAndResponseMetrics, Kafka.Client]][]
??????????? ->? 0000021aefcd6a68 System.Collections.Concurrent.ConcurrentDictionary`2+Node[[Kafka.Client.Common.ClientIdAndBroker, Kafka.Client],[Kafka.Client.Consumers.FetchRequestAndResponseMetrics, Kafka.Client]]
??? 通過結果可以看到,ConcurrentDictionary被Pool引用,而Pool又被FetchRequestAndResponseStats引用。這與NoOpTimer類型的引用情況很相似啊!
??? 搜一下第一次!dumpheap –stat 的結果,發現FetchRequestAndResponseStats和Pool類型的對象數量只有11個。
00007ff947e387f8?????? 11????????? 528 Kafka.Client.Consumers.FetchRequestAndResponseStats
7ff947e397d8?????? 11????????? 792 Kafka.Client.Utils.Pool`2[[Kafka.Client.Common.ClientIdAndBroker, Kafka.Client],[Kafka.Client.Consumers.FetchRequestAndResponseMetrics, Kafka.Client]]
??? 看來,100多萬個對象都是從Pool上來的。果斷翻開kafka.Client的源代碼。
internal class FetchRequestAndResponseStats
??? {
??????? private string clientId;
??????? private Func<ClientIdAndBroker, FetchRequestAndResponseMetrics> valueFactory;
??????? private Pool<ClientIdAndBroker, FetchRequestAndResponseMetrics> stats;
??????? private FetchRequestAndResponseMetrics allBrokerStats;
??????? public FetchRequestAndResponseStats(string clientId)
??????? {
??????????? this.clientId = clientId;
??????????? this.valueFactory = k => new FetchRequestAndResponseMetrics(k);
??????????? this.stats = new Pool<ClientIdAndBroker, FetchRequestAndResponseMetrics>(this.valueFactory);
??????????? this.allBrokerStats = new FetchRequestAndResponseMetrics(new ClientIdAndBroker(clientId, "AllBrokers"));
??????? }
??????? public FetchRequestAndResponseMetrics GetFetchRequestAndResponseAllBrokersStats()
??????? {
??????????? return this.allBrokerStats;
??????? }
??????? public FetchRequestAndResponseMetrics GetFetchRequestAndResponseStats(string brokerInfo)
??????? {
??????????? return this.stats.GetAndMaybePut(new ClientIdAndBroker(this.clientId, brokerInfo + "-"));
??????? }
??? }
??? Pool類型的對象是FetchRequestAndResponseStats的一個屬性,并且Pool是繼承自ConcurrentDictionary,Key的類型為ClientIdAndBroker。Pool的定義如下:
public class Pool<TKey, TValue> : ConcurrentDictionary<TKey, TValue>
??? {
??????? public Func<TKey, TValue> ValueFactory { get; set; }
??????? public Pool(Func<TKey, TValue> valueFactory = null)
??????? {
??????????? this.ValueFactory = valueFactory;
??????? }
??????? public TValue GetAndMaybePut(TKey key)
??????? {
??????????? if (this.ValueFactory == null)
??????????? {
??????????????? throw new KafkaException("Empty value factory in pool");
??????????? }
??????????? return this.GetOrAdd(key, this.ValueFactory);
??????? }
??? }
??? 問題來了,FetchRequestAndResponseStats.GetFetchRequestAndResponseStats方法,每次New ClientIdAndBroker 對象后,調用Pool.GetAndMaybePut方法。擦!!!每次訪問都是新對象,這個對象是要作為ConcurrentDictionary的Key存入的。并且存入方法調用的是ConcurrentDictionary.GetOrAdd()。新建的對象只能從ConcurrentDictionary中Add,沒有任何Get到的可能性啊。Kafka.Client中竟然會出現這么低級的問題,瞬間對開源的組件有了新的認識:開源組件的坑太深了,不填不知道啊。
??? 抓緊把開源組件的代碼改一下吧。把Pool的key類型從ClientIdAndBroker改為string。調試運行,下面是Run了2天的Consumer程序的內存占用情況,期間Consumer已經處理了60萬日志。
????
??? 問題終于完美解決了!最后,國際慣例,感謝JuQiang老師指導。在互聯網領域,我是個新手,Blog中難免存在一些不客觀,不成熟的見解,還請多多包涵!
? ? vveiliang 2015-12-3
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?
轉載于:https://www.cnblogs.com/teld/p/5016891.html
總結
以上是生活随笔為你收集整理的Windbg调优Kafka.Client内存泄露的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Web 四种常见的POST提交数据方式
- 下一篇: zoj 3747 (DP)(连续至多,连