spark rdd map java_如何在spark RDD(JavaRDD)中获取记录的文件名
我正在使用多個文件加載到
JavaRDD中
JavaRDD allLines = sc.textFile(hdfs://path/*.csv);
加載文件后,我修改每條記錄并想保存它們.但是,我還需要將原始文件名(ID)與記錄一起保存以供將來參考.無論如何我可以從RDD中的單個記錄中獲取原始文件名嗎?
謝謝
您可以嘗試執行以下代碼段中的操作:
JavaPairRDD javaPairRDD = sc.newAPIHadoopFile(
"hdfs://path/*.csv",
TextInputFormat.class,
LongWritable.class,
Text.class,
new Configuration()
);
JavaNewHadoopRDD hadoopRDD = (JavaNewHadoopRDD) javaPairRDD;
JavaRDD> namedLinesRDD = hadoopRDD.mapPartitionsWithInputSplit((inputSplit, lines) -> {
FileSplit fileSplit = (FileSplit) inputSplit;
String fileName = fileSplit.getPath().getName();
Stream> stream =
StreamSupport.stream(Spliterators.spliteratorUnknownSize(lines, Spliterator.ORDERED), false)
.map(line -> {
String lineText = line._2().toString();
// emit file name as key and line as a value
return new Tuple2(fileName, lineText);
});
return stream.iterator();
}, true);
更新(適用于java7)
JavaRDD> namedLinesRDD = hadoopRDD.mapPartitionsWithInputSplit(
new Function2>, Iterator>>() {
@Override
public Iterator> call(InputSplit inputSplit, final Iterator> lines) throws Exception {
FileSplit fileSplit = (FileSplit) inputSplit;
final String fileName = fileSplit.getPath().getName();
return new Iterator>() {
@Override
public boolean hasNext() {
return lines.hasNext();
}
@Override
public Tuple2 next() {
Tuple2 entry = lines.next();
return new Tuple2(fileName, entry._2().toString());
}
};
}
},
true
);
總結
以上是生活随笔為你收集整理的spark rdd map java_如何在spark RDD(JavaRDD)中获取记录的文件名的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: java outofmemory jsp
- 下一篇: java最长回文子序列_LeetCode