Spark加载hadoop配置原理
0x0 背景
最近為了將hadoop&hive的五大配置文件,即:
core-site.xml
hdfs-site.xml
yarn-site.xml
mapred-site.xml
hive-site.xml
從項目中(classpath)移到項目外(任意位置),研究了spark啟動過程的源碼,在此記錄一下。
0x1 Hadoop及Hive獲取默認配置過程
Hadoop有一個類?
Configuration implementsIterable<Map.Entry<String,String>>,Writable?
這個類就是用于處理hadoop的配置,其內部有靜態代碼塊:
static{
? ? //print deprecation warning if hadoop-site.xml is found in classpath
? ? ClassLoader cL = Thread.currentThread().getContextClassLoader();
? ? if (cL == null) {
? ? ? cL = Configuration.class.getClassLoader();
? ? }
? ? if(cL.getResource("hadoop-site.xml")!=null) {
? ? ? LOG.warn("DEPRECATED: hadoop-site.xml found in the classpath. " +
? ? ? ? ? "Usage of hadoop-site.xml is deprecated. Instead use core-site.xml, "
? ? ? ? ? + "mapred-site.xml and hdfs-site.xml to override properties of " +
? ? ? ? ? "core-default.xml, mapred-default.xml and hdfs-default.xml " +
? ? ? ? ? "respectively");
? ? }
? ? addDefaultResource("core-default.xml");
? ? addDefaultResource("core-site.xml");
? }
可見,當Configuration加載后,就會從classpath讀取
hadoop-site.xml
core-default.xml
core-site.xml
這三個配置文件。?
同時,Configuration類有四個子類:?
?
分別是:
HdfsConfiguration
HiveConf
JobConf
YarnConfiguration
進入這四個類內部同樣可以見到類似的靜態代碼,?
HdfsConfiguration中:
static {
? ? addDeprecatedKeys();
? ? // adds the default resources
? ? Configuration.addDefaultResource("hdfs-default.xml");
? ? Configuration.addDefaultResource("hdfs-site.xml");
}
YarnConfiguration中:
static {
? ? ? ? addDeprecatedKeys();
? ? ? ? Configuration.addDefaultResource("yarn-default.xml");
? ? ? ? Configuration.addDefaultResource("yarn-site.xml");
? ? ? ? ...
}
JobConf中:
public static void loadResources() {
? ? ? ? addDeprecatedKeys();
? ? ? ? Configuration.addDefaultResource("mapred-default.xml");
? ? ? ? Configuration.addDefaultResource("mapred-site.xml");
? ? ? ? Configuration.addDefaultResource("yarn-default.xml");
? ? ? ? Configuration.addDefaultResource("yarn-site.xml");
}
但是HiveConf并未在靜態代碼塊中讀取配置文件,然而在CarbonData的啟動過程中,會讀取hive-site.xml:
val hadoopConf = new Configuration()
val configFile = Utils.getContextOrSparkClassLoader.getResource("hive-site.xml")
if (configFile != null) {
? ? hadoopConf.addResource(configFile)
}
可見,Hadoop在啟動過程中,各組件會首先在classpath下讀取相應的配置文件。?
我們也可以通過Configuration的set(String name, String value)或者addResource(Path file)方法來添加配置,addResource內部執行流程如下:
? ? //將資源添加到resources列表(存儲配置文件資源的列表)
? ? resources.add(resource); ? // add to resources
? ? //將已有的屬性清空
? ? properties = null; ? ? ? ? // trigger reload
? ? finalParameters.clear(); ? // clear site-limits
? ? //重新加載所有配置
? ? loadResources(Properties properties,
? ? ? ? ? ? ? ? ? ArrayList<Resource> resources,
? ? ? ? ? ? ? ? ? boolean quiet)
0x2 Spark啟動過程中設置Hadoop配置
Spark Application啟動過程中首先要實啟動一個SparkContext,其實SparkContext本質上可以理解為Spark運行的配置集合。
val sc = SparkContext.getOrCreate(sparkConf)
而在SparkContext創建過程中會啟動一個調度任務,用于連接遠程集群:
val backend = cm.createSchedulerBackend(sc, masterUrl, scheduler)
如果是Spark on Yarn,會調用YarnClusterManager的createSchedulerBackend方法:
override def createSchedulerBackend(sc: SparkContext,
? ? ? masterURL: String,
? ? ? scheduler: TaskScheduler): SchedulerBackend = {
? ? sc.deployMode match {
? ? ? case "cluster" =>
? ? ? ? new YarnClusterSchedulerBackend(scheduler.asInstanceOf[TaskSchedulerImpl], sc)
? ? ? case "client" =>
? ? ? ? new YarnClientSchedulerBackend(scheduler.asInstanceOf[TaskSchedulerImpl], sc)
? ? ? case ?_ =>
? ? ? ? throw new SparkException(s"Unknown deploy mode '${sc.deployMode}' for Yarn")
? ? }
? }
然后在YarnClientSchedulerBackend中創建了YarnClient,可見看Client中的構造函數:
private[spark] class Client(
? ? val args: ClientArguments,
? ? val hadoopConf: Configuration,
? ? val sparkConf: SparkConf)
? extends Logging {
? import Client._
? import YarnSparkHadoopUtil._
? def this(clientArgs: ClientArguments, spConf: SparkConf) =
? ? this(clientArgs, SparkHadoopUtil.get.newConfiguration(spConf), spConf)
? private val yarnClient = YarnClient.createYarnClient
? private val yarnConf = new YarnConfiguration(hadoopConf)
可見,Spark將利用SparkConf中的配置,調用SparkHadoopUtil.get.newConfiguration(spConf)方法生成相應的Hadoop配置。?
其實,在SparkContext中,有2個成員變量(本質上是一個):
private var _hadoopConfiguration: Configuration = _
def hadoopConfiguration: Configuration = _hadoopConfiguration
....
_hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(_conf)
這個_hadoopConfiguration 也是通過SparkHadoopUtil.get.newConfiguration(_conf)方法獲取到hadoop的配置。?
進入SparkHadoopUtil.get.newConfiguration(_conf)方法,可見到:
? ? ?conf.getAll.foreach { case (key, value) =>
? ? ? ? if (key.startsWith("spark.hadoop.")) {
? ? ? ? ? hadoopConf.set(key.substring("spark.hadoop.".length), value)
? ? ? ? }
? ? ? }
1
2
3
4
5
也就是說,在SparkConf中所有以spark.hadoop.開頭的屬性,都會被轉換為hadoop的配置。
那么我們通過解析hadoop的xml配置文件,轉換為相應的鍵值對,傳給spark就可以了。代碼如下:
? ? /**
? ? ?* 讀取hadoopConfPath下所有hadoop相關配置文件,并轉換為SparkConf
? ? ?*
? ? ?* @param hadoopConfPath hadoop配置文件所在的文件夾
? ? ?* @return?
? ? ?*/
? ? public SparkConf getHadoopConf(String hadoopConfPath) {
? ? ? ? SparkConf hadoopConf = new SparkConf();
? ? ? ? try {
? ? ? ? ? ? Map<String, String> hiveConfMap = parseXMLToMap(hadoopConfPath + "/hive-site.xml");
? ? ? ? ? ? Map<String, String> hadoopConfMap = parseXMLToMap(hadoopConfPath + "/core-site.xml");
? ? ? ? ? ? hadoopConfMap.putAll(parseXMLToMap(hadoopConfPath + "/hdfs-site.xml"));
? ? ? ? ? ? hadoopConfMap.putAll(parseXMLToMap(hadoopConfPath + "/yarn-site.xml"));
? ? ? ? ? ? hadoopConfMap.putAll(parseXMLToMap(hadoopConfPath + "/mapred-site.xml"));
? ? ? ? ? ? for (Map.Entry<String, String> entry : hiveConfMap.entrySet()) {
? ? ? ? ? ? ? ? hadoopConf.set(entry.getKey(), entry.getValue());
? ? ? ? ? ? }
? ? ? ? ? ? for (Map.Entry<String, String> entry : hadoopConfMap.entrySet()) {
? ? ? ? ? ? ? ? hadoopConf.set("spark.hadoop." + entry.getKey(), entry.getValue());
? ? ? ? ? ? }
? ? ? ? ? ? return hadoopConf;
? ? ? ? } catch (DocumentException e) {
? ? ? ? ? ? logger.error("讀取xml文件失敗!");
? ? ? ? ? ? throw new RuntimeException(e);
? ? ? ? }
? ? }
? ? //將xml解析為HashMap
? ? private Map<String, String> parseXMLToMap(String xmlFilePath) throws DocumentException {
? ? ? ? Map<String, String> confMap = new HashMap<>();
? ? ? ? SAXReader reader = new SAXReader();
? ? ? ? Document document = reader.read(new File(xmlFilePath));
? ? ? ? Element configuration = document.getRootElement();
? ? ? ? Iterator iterator = configuration.elementIterator();
? ? ? ? while (iterator.hasNext()) {
? ? ? ? ? ? Element property = (Element) iterator.next();
? ? ? ? ? ? String name = property.element("name").getText();
? ? ? ? ? ? String value = property.element("value").getText();
? ? ? ? ? ? confMap.put(name, value);
? ? ? ? }
? ? ? ? return confMap;
? ? }
注意:?
經測試,如果集群有kerberos加密,該方法無效!?
原因可能是:
class SparkHadoopUtil extends Logging {
? ? ? private val sparkConf = new SparkConf(false).loadFromSystemProperties(true)
? ? ? val conf: Configuration = newConfiguration(sparkConf)
? ? ? UserGroupInformation.setConfiguration(conf)
在該類中設置了一個new的SparkConf,這個SparkConf只從System.getProperty讀取spark開頭的屬性,因此不是正確的屬性,導致kerberos登錄異常。
?
總結
以上是生活随笔為你收集整理的Spark加载hadoop配置原理的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: spark运行时加载hive,hdfs配
- 下一篇: kerberos体系下的应用(yarn,