Spark 读 Elasticsearch
生活随笔
收集整理的這篇文章主要介紹了
Spark 读 Elasticsearch
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
2019獨角獸企業重金招聘Python工程師標準>>>
沒什么好說的,直接粘貼代碼吧
public?class?MySparkReadEs?implements?Serializable?{private?transient?JavaSparkContext?javaSparkContext?=?null;private?transient?SparkConf?sparkConf?=?null;private?transient?Configuration?esConf?=?null;private?String?esSource?=?"post/post";private?String?esNodes?=?"192.168.1.235,192.168.1.236";private?static?final?Log?LOG?=?LogFactory.getLog(MySparkReadEs.class);private?MySparkReadEs()?{}private?MySparkReadEs(String?esSource,?String?esNodes)?{this.esSource?=?esSource;this.esNodes?=?esNodes;}//初始化Spark?contextprivate?void?initSparkContext()?{this.sparkConf?=?new?SparkConf().setMaster("local[2]").setAppName("my?spark?rdd");this.javaSparkContext?=?new?JavaSparkContext(this.sparkConf);}//初始化查詢Es的配置文件private?void?initEsConfig(String?urn)?{this.esConf?=?new?Configuration((Configuration)?HBaseConfiguration.create());//指定讀取的索引名稱this.esConf.set("es.resource",?this.esSource);//指定es節點this.esConf.set("es.nodes",?this.esNodes);//加入ES查詢條件this.esConf.set("es.query",?buildQueryESCondition(urn));}//構建查詢Es的條件private?String?buildQueryESCondition(String?urn)?{//支持lucence的寫法?里面可以繼續添加其他字段StringBuilder?sb?=?new?StringBuilder();sb.append("postUrn:").append("(").append(urn).append(")");//查詢條件的構造ImmutableMap<String,?ImmutableMap<String,?ImmutableMap<String,?String>>>?conditionMap?=?ImmutableMap.of("query",?ImmutableMap.of("query_string",?ImmutableMap.of("query",?sb.toString())));//SPARK查詢ES的查詢條件LOG.info("SPARK?查詢?ES?的條件為:"?+?JSON.toJSONString(conditionMap));return?JSON.toJSONString(conditionMap);}//開始任務private?void?startJob()?{List<String>?extensionCollect?=?this.javaSparkContext.newAPIHadoopRDD(this.esConf,?EsInputFormat.class,?NullWritable.class,?MapWritable.class).map(new?Function<Tuple2<NullWritable,?MapWritable>,?String>()?{@Overridepublic?String?call(Tuple2<NullWritable,?MapWritable>?v1)?throws?Exception?{MapWritable?mapWritable?=?v1._2();Map<String,?Object>?o?=?(Map<String,?Object>)?WritableUtils.fromWritable(mapWritable);String?extension?=?(String)?o.get("extension");return?extension;}}).collect();for?(String?extension?:?extensionCollect)?{LOG.info("Extension?:?"?+?extension);}}//主方法public?static?void?main(String[]?args)?{String?esNodes?=?"192.168.1.235,192.168.1.236";String?esSource?=?"post/post";String?userurn?=?"3136949-bf30c1da6f8b47d3d93a5c4f75194447";MySparkReadEs?mySparkRdd?=?new?MySparkReadEs(esSource,?esNodes);//初始化?spark?contextmySparkRdd.initSparkContext();//初始化?es?配置mySparkRdd.initEsConfig(userurn);//開始作業mySparkRdd.startJob();} }轉載于:https://my.oschina.net/momisabuilder/blog/605956
總結
以上是生活随笔為你收集整理的Spark 读 Elasticsearch的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 黄聪:C# 开发Chrome内核浏览器(
- 下一篇: Mac下github的使用