spark写入elasticsearch限流
文章目錄
- 1. spark 批量寫入es
- 2. java-spark寫入elasticsearch
- 3. es_hadoop的源碼拓展
- 1. MyEsSparkSQL
- 2. MyEsDataFrameWriter
1. spark 批量寫入es
正常情況下,我們的spark任務有寫入es的需求的時候,我們都是使用ES_Hadoop。參考官方的這里,選擇適合自己的版本,如果是hive,spark等都有用到的話可以直接配置
<dependency><groupId>org.elasticsearch</groupId><artifactId>elasticsearch-hadoop</artifactId><version>7.1.1</version> </dependency>因為我們這里只是用到了spark,spark的版本是2.3 , scale 是2.11 ,elasticsearch是7.1.1所以只引入spark的包即可。
<dependency><groupId>org.elasticsearch</groupId><artifactId>elasticsearch-spark-20_2.11</artifactId><version>7.1.1</version></dependency>2. java-spark寫入elasticsearch
java寫入es的代碼可以這樣
@Data public class UserProfileRecord {public String uid;public String want_val; } SparkConf sparkConf = new SparkConf().setAppName(JOB_NAME).set(ConfigurationOptions.ES_NODES, esHost).set(ConfigurationOptions.ES_PORT, esPort).set(ConfigurationOptions.ES_NET_HTTP_AUTH_USER, esUser).set(ConfigurationOptions.ES_NET_HTTP_AUTH_PASS, esPass).set(ConfigurationOptions.ES_BATCH_SIZE_ENTRIES, "500").set(ConfigurationOptions.ES_MAPPING_ID, "uid");SparkSession sparkSession = SparkSession.builder().config(sparkConf).getOrCreate();Dataset<Row> wantedCols = sparkSession.read().parquet(path);Dataset<UserProfileRecord> searchUserProfile = wantedCols.mapPartitions(new MapPartitionsFunction<Row, UserProfileRecord>() {@Overridepublic Iterator<UserProfileRecord> call(Iterator<Row> input) throws Exception {List<UserProfileRecord> cleanProfileList = new LinkedList<>();while (input.hasNext()) {UserProfileRecord aRecord = new UserProfileRecord();.........cleanProfileList.add(aRecord);}return cleanProfileList.iterator();}}, Encoders.bean(UserProfileRecord.class));EsSparkSQL.saveToEs(searchUserProfile.repartition(3), this.writeIndex);??這里因為es當前只有3個節點,所以用了一個repartition來將寫入es的task數變成3個,減小對es的壓力,在實際的使用過程中主片的寫入速度能夠達到平均3w/s,但是當任務產出的數據量比較大的時候寫入的時間會比較長,還是會對當前的es集群產生比較大的影響,導致部分查詢超時。
??查找了很多官方的文檔,發現能夠調整的很有限,一般都是調整partition的數量和ConfigurationOptions.ES_BATCH_SIZE_ENTRIES 來throttle寫入es的速度。我這邊各種試探,收效甚微。
??本來想用elasticsearch的java-client直接做rest請求的(這樣就可以控制速速了),但是翻了一下es_hadoop的源碼,發現她用的是tranport-client(是es內部通信使用的基于tcp的協議封裝)那肯定比http類型的rest更高效啊,而且還有很多partition和es索引的replica的映射關系,想著應該是做了很多優化。所以還是用es_hadoop來做吧,沒有辦法了,只能看看改改源碼了。
3. es_hadoop的源碼拓展
增加了兩個scala文件(強上scala😂)
MyEsSparkSQL
MyEsDataFrameWriter
注意包名一定要是org.elasticsearch.spark.sql
1. MyEsSparkSQL
package org.elasticsearch.spark.sqlimport org.apache.commons.logging.LogFactory import org.apache.spark.sql.DataFrame import org.apache.spark.sql.Dataset import org.apache.spark.sql.SQLContext import org.apache.spark.sql.SparkSession import org.elasticsearch.hadoop.EsHadoopIllegalArgumentException import org.elasticsearch.hadoop.cfg.ConfigurationOptions.ES_QUERY import org.elasticsearch.hadoop.cfg.ConfigurationOptions.ES_RESOURCE_READ import org.elasticsearch.hadoop.cfg.ConfigurationOptions.ES_RESOURCE_WRITE import org.elasticsearch.hadoop.cfg.PropertiesSettings import org.elasticsearch.hadoop.rest.InitializationUtils import org.elasticsearch.hadoop.util.ObjectUtils import org.elasticsearch.spark.cfg.SparkSettingsManagerimport scala.collection.JavaConverters.mapAsJavaMapConverter import scala.collection.JavaConverters.propertiesAsScalaMapConverter import scala.collection.Mapobject MyEsSparkSQL {private val init = { ObjectUtils.loadClass("org.elasticsearch.spark.rdd.CompatUtils", classOf[ObjectUtils].getClassLoader) }@transient private[this] val LOG = LogFactory.getLog(EsSparkSQL.getClass)//// Read//def esDF(sc: SQLContext): DataFrame = esDF(sc, Map.empty[String, String])def esDF(sc: SQLContext, resource: String): DataFrame = esDF(sc, Map(ES_RESOURCE_READ -> resource))def esDF(sc: SQLContext, resource: String, query: String): DataFrame = esDF(sc, Map(ES_RESOURCE_READ -> resource, ES_QUERY -> query))def esDF(sc: SQLContext, cfg: Map[String, String]): DataFrame = {val esConf = new SparkSettingsManager().load(sc.sparkContext.getConf).copy()esConf.merge(cfg.asJava)sc.read.format("org.elasticsearch.spark.sql").options(esConf.asProperties.asScala.toMap).load}def esDF(sc: SQLContext, resource: String, query: String, cfg: Map[String, String]): DataFrame = {esDF(sc, collection.mutable.Map(cfg.toSeq: _*) += (ES_RESOURCE_READ -> resource, ES_QUERY -> query))}def esDF(sc: SQLContext, resource: String, cfg: Map[String, String]): DataFrame = {esDF(sc, collection.mutable.Map(cfg.toSeq: _*) += (ES_RESOURCE_READ -> resource))}// SparkSession variantdef esDF(ss: SparkSession): DataFrame = esDF(ss.sqlContext, Map.empty[String, String])def esDF(ss: SparkSession, resource: String): DataFrame = esDF(ss.sqlContext, Map(ES_RESOURCE_READ -> resource))def esDF(ss: SparkSession, resource: String, query: String): DataFrame = esDF(ss.sqlContext, Map(ES_RESOURCE_READ -> resource, ES_QUERY -> query))def esDF(ss: SparkSession, cfg: Map[String, String]): DataFrame = esDF(ss.sqlContext, cfg)def esDF(ss: SparkSession, resource: String, query: String, cfg: Map[String, String]): DataFrame = esDF(ss.sqlContext, resource, query, cfg)def esDF(ss: SparkSession, resource: String, cfg: Map[String, String]): DataFrame = esDF(ss.sqlContext, resource, cfg)//// Write//def saveToEs(srdd: Dataset[_], resource: String): Unit = {saveToEs(srdd, Map(ES_RESOURCE_WRITE -> resource))}def saveToEs(srdd: Dataset[_], resource: String, cfg: Map[String, String]): Unit = {saveToEs(srdd, collection.mutable.Map(cfg.toSeq: _*) += (ES_RESOURCE_WRITE -> resource))}def saveToEs(srdd: Dataset[_], cfg: Map[String, String]): Unit = {if (srdd != null) {if (srdd.isStreaming) {throw new EsHadoopIllegalArgumentException("Streaming Datasets should not be saved with 'saveToEs()'. Instead, use " +"the 'writeStream().format(\"es\").save()' methods.")}val sparkCtx = srdd.sqlContext.sparkContextval sparkCfg = new SparkSettingsManager().load(sparkCtx.getConf)val esCfg = new PropertiesSettings().load(sparkCfg.save())esCfg.merge(cfg.asJava)// Need to discover ES Version before checking index existenceInitializationUtils.discoverClusterInfo(esCfg, LOG)InitializationUtils.checkIdForOperation(esCfg)InitializationUtils.checkIndexExistence(esCfg)sparkCtx.runJob(srdd.toDF().rdd, new MyEsDataFrameWriter(srdd.schema, esCfg.save()).write _)}} }這個類就是直接盜版了EsSparkSQL,只是重寫了def saveToEs(srdd: Dataset[_], cfg: Map[String, String]): Unit方法中的最后一句
從
sparkCtx.runJob(srdd.toDF().rdd, new EsDataFrameWriter(srdd.schema, esCfg.save()).write _)變成了
sparkCtx.runJob(srdd.toDF().rdd, new MyEsDataFrameWriter(srdd.schema, esCfg.save()).write _)2. MyEsDataFrameWriter
package org.elasticsearch.spark.sqlimport java.util.concurrent.atomic.AtomicIntegerimport lombok.extern.slf4j.Slf4j import org.apache.spark.TaskContext import org.apache.spark.sql.Row import org.apache.spark.sql.types.StructType import org.elasticsearch.hadoop.rest.RestService import org.elasticsearch.hadoop.serialization.{BytesConverter, JdkBytesConverter} import org.elasticsearch.hadoop.serialization.builder.ValueWriter import org.elasticsearch.hadoop.serialization.field.FieldExtractor import org.elasticsearch.spark.rdd.EsRDDWriter/*** Created by chencc on 2020/8/31.*/ @Slf4j class MyEsDataFrameWriter (schema: StructType, override val serializedSettings: String)extends EsRDDWriter[Row](serializedSettings:String) {override protected def valueWriter: Class[_ <: ValueWriter[_]] = classOf[DataFrameValueWriter]override protected def bytesConverter: Class[_ <: BytesConverter] = classOf[JdkBytesConverter]override protected def fieldExtractor: Class[_ <: FieldExtractor] = classOf[DataFrameFieldExtractor]override protected def processData(data: Iterator[Row]): Any = { (data.next, schema) }override def write(taskContext: TaskContext, data: Iterator[Row]): Unit = {val writer = RestService.createWriter(settings, taskContext.partitionId.toLong, -1, log)taskContext.addTaskCompletionListener((TaskContext) => writer.close())if (runtimeMetadata) {writer.repository.addRuntimeFieldExtractor(metaExtractor)}val counter= new AtomicInteger(0);while (data.hasNext) {counter.incrementAndGet();writer.repository.writeToIndex(processData(data))if(counter.get()>=500){Thread.sleep(100);counter.set(0)log.info("batch is 2000 will sleep 50 milliseconds ") // log.info("no sleep..")}}} }這個MyEsDataFrameWriter 重寫了EsRDDWriter的write方法,增加了一些sleep,實際上可以根據線上的實際情況來調整這里。
這里的500和100可以做成在SparkConf中配置的,靈活性就更高了。
通過這樣的配置可以完美的throttle spark 寫入es的速度。
總結
以上是生活随笔為你收集整理的spark写入elasticsearch限流的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: spark读取文件源码分析-3
- 下一篇: elasticsearch_script