hive2solr问题小结
? 搞了一段時間,hive2solr的job終于可以穩定的跑了,實現使用hive向solr插數據,主要是實現RecordWriter接口,重寫write方法和close方法。下面對遇到的問題一一列出:
1.數據覆蓋問題,使用原子更新
參考:http://caiguangguang.blog.51cto.com/1652935/1599137
2.重復構建solrserver和solrtable對象問題,使用static在初始化的時候構建,后面直接調用
構建:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 | ????????public?static?Map<Integer,SolrServer>?solrServers?=?new?HashMap<Integer,SolrServer>(); ????????public?static?Map<Integer,SolrTable>?solrTables?=?new?HashMap<Integer,SolrTable>(); ????????public?static?String[]?iparray; ????????public?static?String?ipstring; ????????public?static?String?collec; ????????static?{ ???????????????LOG?.warn("in?SolrServerCustom?start?initialize?ip?maps"?); ???????????????ipstring?=?"xxxx,xxxxxx"; ???????????????collec?=?"userinfo"?; ???????????????LOG?.warn("in?SolrServerCustom??ipstring?and?collec:?"?+?ipstring?+?","?+?collec?); ???????????????iparray?=?ipstring?.split(","?); ??????????????Arrays.?sort(?iparray); ???????????????for?(int?i=0;i<?iparray.?length;i++){ ?????????????????????String?urlx?=?"http://"?+iparray?[i]+"/solr/"?+?collec; ??????????????????????solrServers.put(i,?new?HttpSolrServer(urlx)); ??????????????????????solrTables.put(i,?new?SolrTable(String.valueOf(i))); ??????????????} ???????????????LOG?.warn("in?SolrServerCustom?end?initialize?ip?maps,maps?size?"?+?solrServers?.size()); ???????????????LOG?.warn("in?SolrServerCustom?end?initialize?ip?mapsx,mapsx?size?"?+?solrTables?.size());? ???????} |
引用:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 | ?public?void?write(Writable?w)?throws?IOException?{ ??????????MapWritable?map?=?(MapWritable)?w; ??????????SolrInputDocument?doc?=?new?SolrInputDocument(); ??????????String?key; ??????????String?value; ??????????String?newkey; ??????????int?idx; ??????????for?(final?Map.Entry<Writable,?Writable>?entry?:?map.entrySet())?{ ???????????????key?=?entry.getKey().toString(); ???????????????newkey?=?this.tableName?+?"."?+?entry.getKey().toString(); ???????????????value?=?entry.getValue().toString(); ???????????????if(key.equals("id")){ ????????????????????idx?=?SolrUtil.getIntServer(value,SolrServerCustom.solrServers);?//引用靜態屬性SolrServerCustom.solrServers ????????????????????table?=?SolrServerCustom.solrTables.get(idx);?//引用靜態屬性SolrServerCustom.solrTables ????????????????????table.setNumInputBufferRows(this.numInputBufferRows); ???????????????} ???????????????if(key.equals("id")){ ????????????????????doc.addField("id",Integer.valueOf(value)); ???????????????}else{ ????????????????????if?(value.equals("(null)")){ ?????????????????????????value?=?""; ????????????????????} ????????????????????setOper?=?new?LinkedHashMap<String,Object>(); ????????????????????setOper.put("set",value); ????????????????????if(!doc.keySet().contains(newkey)){ ?????????????????????????doc.addField(newkey,?setOper); ????????????????????}???? ???????????????} ??????????} ??????????table.save(doc); ?????} |
3.代碼存在內存泄露問題
1)對象的聲明,放在循環外,并調整outbuffer的大小
現象:yarn map/reduce? java heap滿導致job hang
錯誤日志:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 | 2015-01-26?14:01:10,000?FATAL?[main]?org.apache.hadoop.mapred.YarnChild:?Error?running?child?:?java.lang.OutOfMemoryError:?GC?overhead?limit?exceeded ????????at?java.lang.AbstractStringBuilder.<init>(AbstractStringBuilder.java:45) ????????at?java.lang.StringBuilder.<init>(StringBuilder.java:68) ????????at?com.chimpler.hive.solr.SolrWriter.write(SolrWriter.java:71) ????????at?org.apache.hadoop.hive.ql.exec.FileSinkOperator.processOp(FileSinkOperator.java:621) ????????at?org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:793) ????????at?org.apache.hadoop.hive.ql.exec.SelectOperator.processOp(SelectOperator.java:87) ????????at?org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:793) ????????at?org.apache.hadoop.hive.ql.exec.TableScanOperator.processOp(TableScanOperator.java:92) ????????at?org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:793) ????????at?org.apache.hadoop.hive.ql.exec.MapOperator.process(MapOperator.java:540) ????????at?org.apache.hadoop.hive.ql.exec.mr.ExecMapper.map(ExecMapper.java:177) ????????at?org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:54) ????????at?org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:428) ????????at?org.apache.hadoop.mapred.MapTask.run(MapTask.java:340) ????????at?org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:160) ????????at?java.security.AccessController.doPrivileged(Native?Method) ????????at?javax.security.auth.Subject.doAs(Subject.java:396) ????????at?org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1438) ??????????????at?org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:155) |
2)try...catch....finally的使用(在finally中 clear buffer)
一開始沒有增加finally,導致在異常發生時buffer會大于設置,最終導致job內存用滿,hang住。
4.異常的處理
要求一個solrserver出錯,或者solr暫時不響應時程序不能退出,默認情況下異常向上拋出,最終導致job失敗
比如:
| 1 2 3 4 5 6 7 | Caused?by:?org.apache.solr.client.solrj.impl.HttpSolrServer$RemoteSolrException:?Expected?content?type?application/octet-stream?but?got?text/html.?<html> <head><title>504?Gateway?Time-out</title></head> <body?bgcolor="white"> <center><h1>504?Gateway?Time-out</h1></center> <hr><center>nginx/1.6.2</center> </body> </html> |
?防止異常的拋出會造成runtime error導致job失敗,catch異常后不做處理
| 1 2 3 4 5 6 7 8 9 10 11 | ?????public?void?flush(){ ??????????try?{ ???????????????if?(!outputBuffer.isEmpty())?{ ????????????????????server.add(outputBuffer); ???????????????} ??????????}?catch(Exception?e){ ???????????????LOG.warn("solrtable?add?error,Exception?log?is?"?+?e); ??????????}finally{ ???????????????outputBuffer.clear();?//在finally中清除buffer,否則會導致buffer在異常拋出時一直遞增導致jvm?oom的問題 ??????????} ?????} |
5.commit問題,調用close方法時,只有最后一個solrtable會close,開始時使用每插入一行就commit的方式,但是這種性能很差(大約50%的降低),后來在solrserver端控制commit
solrconfig.xml:
| 1 2 3 4 5 6 7 8 9 10 | ?????<autoCommit> ???????<!--<maxTime>${solr.autoCommit.maxTime:15000}</maxTime>--> ?????????<maxDocs>15000</maxDocs>?//當內存索引數量達到指定值的時候,將內存的索引DUMP到硬盤中,并通知searcher類加載新的索引 ????????<maxTime>1000</maxTime>?//每隔指定的時間段,自動的COMMIT內存中的索引數據,并通知Searcher類加載新的索引,以最先達到條件執行為準 ???????<openSearcher>true</openSearcher>??//設置為false時,雖然commit會導致index的變更flush到磁盤上,但是客戶端不會看到更新 ?????</autoCommit> ???? ?????<autoSoftCommit> ???????<maxTime>${solr.autoSoftCommit.maxTime:10000}</maxTime> ?????</autoSoftCommit> |
這里autoCommit是指hard commit,如果不使用autoCommit也可以在add document時帶上commitWithin的參數autoSoftCommit和autoCommit類似,但是它是一個solf類型的commit,可以確保數據可見但是沒有把數據flush到磁盤,機器crash會導致數據丟失。
save也導致性能損耗,save會消耗6ms左右的時間,需要放到一個list中進行save操作(batch操作)
6.outbuffer的問題
初始的代碼,因為對用solrtable來說只有一個入口(solrcloud時也一樣),這樣solrtable只有一個實例,這里用到了靜態變量,每個solrtable不能按自己的buffer進行操作
改成非靜態變量,并且使用靜態代碼塊初始化table和server,放到一個hashmap中,用的時候去取,保證只有幾個的實例。否則如果在使用時進行實例化,每次的對象都不同,導致buffer一直為1。
7.close的問題
如果設置了buffer,可能會導致不能flush
| 1 2 3 4 5 6 | public?void?save(SolrInputDocument?doc)?{ ?????outputBuffer.add(doc);?//使用save放到buffer?list中 ?????if?(outputBuffer.size()?>=?numOutputBufferRows)?{?//只有list的大小>=設置的buffer大小時才會觸發flush的操作 ?????????flush(); ?????} } |
而flush中會調用server.add(outputBuffer)操作。filesink關閉時調用SolrWriter.close
調用SolrTable的commit(commit中調用flush和server.commit),發現只有最后一個table實例會調用commit.
解決方法,在SolrWriter.close中循環調用SolrTable.commit方法:
| 1 2 3 4 5 6 7 8 9 10 | public?void?close(boolean?abort)?throws?IOException?{ ?????if?(!abort)?{ ?????????Map<Integer,SolrTable>?maps?=?new?SolrServerCustom().solrTable; ?????????for(Map.Entry<Integer,?SolrTable>?entry:maps.entrySet()){ ?????????????entry.getValue().commit(); ?????????} ?????}?else?{ ?????????table.rollback(); ?????} } |
8.鎖的問題,從nginx端看到大量的302 ,solr日志看到有鎖的問題,調整參數,在solr啟動時釋放鎖
solr端日志:
| 1 | userinfo:?org.apache.solr.common.SolrException:org.apache.solr.common.SolrException:?Index?locked?for?write?for?core?userinfo |
解決:solrconfig.xml中設置
| 1 | <unlockOnStartup>true</unlockOnStartup> |
原因:
org.apache.solr.core.SolrCore初始化時使用IndexWriter.isLocked(dir)判斷是否加鎖,如果已經加了鎖,則分為兩種情況,一種是在solrconfig.xml中配置了unlockOnStartup,會嘗試unlock,如果沒有配置unlockStartup,則會拋出Index locked for write for core異常
根據堆棧可以看對應代碼:
org.apache.solr.core.SolrCore 構造函數中會調用initIndex方法:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 | ??void?initIndex(boolean?reload)?throws?IOException?{ ??????String?indexDir?=?getNewIndexDir(); ??????boolean?indexExists?=?getDirectoryFactory().exists(indexDir); ??????boolean?firstTime; ??????synchronized?(SolrCore.class)?{ ????????firstTime?=?dirs.add(getDirectoryFactory().normalize(indexDir)); ??????} ??????boolean?removeLocks?=?solrConfig.unlockOnStartup;?//?unlockOnStartup?=?getBool(indexConfigPrefix+"/unlockOnStartup",?false);?默認為false ??????initIndexReaderFactory(); ??????if?(indexExists?&&?firstTime?&&?!reload)?{ ???????? ????????Directory?dir?=?directoryFactory.get(indexDir,?DirContext.DEFAULT, ????????????getSolrConfig().indexConfig.lockType); ????????try?{ ??????????if?(IndexWriter.isLocked(dir))?{ ????????????if?(removeLocks)?{ ??????????????log.warn( ??????????????????logid ??????????????????????+?"WARNING:?Solr?index?directory?'{}'?is?locked.??Unlocking...", ??????????????????indexDir); ??????????????IndexWriter.unlock(dir);?//解鎖 ????????????}?else?{ ??????????????log.error(logid ??????????????????+?"Solr?index?directory?'{}'?is?locked.??Throwing?exception", ??????????????????indexDir); ??????????????throw?new?LockObtainFailedException( ??????????????????"Index?locked?for?write?for?core?"?+?name); ????????????} ???????????? ??????????} ????????}?finally?{ ??????????directoryFactory.release(dir); ????????} ??????} ??????//?Create?the?index?if?it?doesn't?exist. ??????if(!indexExists)?{ ????????log.warn(logid+"Solr?index?directory?'"?+?new?File(indexDir)?+?"'?doesn't?exist." ????????????????+?"?Creating?new?index..."); ????????SolrIndexWriter?writer?=?SolrIndexWriter.create("SolrCore.initIndex",?indexDir,?getDirectoryFactory(),?true, ????????????????????????????????????????????????????????getLatestSchema(),?solrConfig.indexConfig,?solrDelPolicy,?codec); ????????writer.close(); ??????} ??} |
9.tomcat的配置導致的問題,每臺機器兩個solr實例,其中一個一直不能啟動(在實例化core時會嘗試獲取鎖,這里獲取鎖失敗,可以手動刪除write.lock)
最終發現是兩個tomcat寫到了一個solr目錄里面
錯誤日志:
| 1 2 3 4 5 6 7 8 9 | Caused?by:?org.apache.lucene.store.LockObtainFailedException:?Lock?obtain?timed?out:?NativeFSLock@/apps/dat/web/working/solr/cloud/storage/data/userinfo/data/index/write.lock ?????at?org.apache.lucene.store.Lock.obtain(Lock.java:89) ?????at?org.apache.lucene.index.IndexWriter.<init>(IndexWriter.java:710) ?????at?org.apache.solr.update.SolrIndexWriter.<init>(SolrIndexWriter.java:77) ?????at?org.apache.solr.update.SolrIndexWriter.create(SolrIndexWriter.java:64) ?????at?org.apache.solr.update.DefaultSolrCoreState.createMainIndexWriter(DefaultSolrCoreState.java:267) ?????at?org.apache.solr.update.DefaultSolrCoreState.getIndexWriter(DefaultSolrCoreState.java:110) ?????at?org.apache.solr.core.SolrCore.openNewSearcher(SolrCore.java:1513) ?????...?12?more |
10.部分job運行緩慢,其中一個job運行了11個小時。。
原因:
數據寫入時發生在mapoperator或者reduceoperator中,多少個map或者reduce就是多少個并發線程寫入。job只有一個reduce,導致寫入緩慢,調整reduce的數量到100(set mapreduce.job.reduces=100)后,性能大幅度提升,3kw數據導入時間由40916s下降到993s。
本文轉自菜菜光 51CTO博客,原文鏈接:http://blog.51cto.com/caiguangguang/1612601,如需轉載請自行聯系原作者
總結
以上是生活随笔為你收集整理的hive2solr问题小结的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: eclipse中hibernate和my
- 下一篇: 【玩转树莓派】使用 sinopia 搭建