采集文件到kafka
采集指定目錄下文本數據到kafka
package com.shenyuchong; import java.io.BufferedReader; import java.io.File; import java.io.FileReader; import java.io.IOException; import java.net.HttpURLConnection; import java.net.URL; import java.util.Date; import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.regex.Pattern; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.serialization.StringSerializer; /** 用途:* 用于收集多個文件夾下文件內容到kafka;* 文件一行一行發送;* 支持發送完成后發出通知* 文件發送完成后會將文件添加.COMPLETED后綴* 支持采集指定后綴(多個)* 支持對行進行正則,不匹配的行丟棄* 僅支持對行進行分隔符切分* 支持將切分后的字段按新分隔符重組* * 用法:* mvn package打包成jar包:* file2kafka-2.0.jar* 編寫配置文件xxx.conf內容如下:* ip=192.168.1.91* threadnum=20* port=9092* topic=customertopic* path=/home/ftpuser/customer* includesuffix=txt* lineregex=^#\d.*$* delimiter=\s+* noticeurl=http://192.168.1.92:6009/schedule/customer* fieldsquence=id,name,score* 執行:* java -jar file2kafka-2.0.jar xxx.conf* 建議:用linux crontab進行定時執行(對同一個目錄進行多次采集不會造成數據重復發送)*/ public class App {public static String fieldSquence = "";public static int fieldNum = 0;public static String ip = "";public static String port = "";public static String path = "";public static String threadNum = "5";public static String topic = "";public static String lineRegex = "^.*$";public static String delimiter = "\\s+";public static String delimiter2 = "|||";public static String includeSuffix = "aSuffix,bSuffix";public static Pattern linePattern =null;public static Properties props =null;public static String noticeUrl;public static void main(String[] args) {/** 配置文件若不存在則拋出異常*/if(args.length<1){try {throw new Exception("無配置文件");} catch (Exception e) {e.printStackTrace();}}try {BufferedReader br = new BufferedReader(new FileReader(new File(args[0])));String line="";while((line=br.readLine())!=null){line = line.replaceAll("\\s+", "");if(line.indexOf("=")!=-1){String[] kv=line.split("=");String k= kv[0];String v= kv[1];if (k.equals("port")) port = v; //kafka 端口if (k.equals("ip")) ip = v; //kafka 主機地址if (k.equals("topic")) topic = v; //kafka 主題if (k.equals("fieldsquence")) fieldSquence = v; //字段序列,逗號隔開if (k.equals("threadnum")) threadNum = v; //采集線程數if (k.equals("path")) path = v; //采集的目錄,多目錄逗號隔開if (k.equals("lineregex")) lineRegex=v; //行正則,不匹配的行數據丟棄if (k.equals("delimiter")) delimiter=v; //字段分隔符if (k.equals("delimiter2")) delimiter2=v; //重組分隔符(發送到Kafka)if (k.equals("includesuffix")) includeSuffix=v; //包含文件的后綴if (k.equals("noticeurl")) noticeUrl=v; //采集完成通知的接口 }}br.close();} catch (IOException e1) {e1.printStackTrace();}/** kafka配置*/props = new Properties();props.put("bootstrap.servers", ip+":"+port);props.put("acks", "all");props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");/** 將字段序列按逗號分隔,并獲取字段序數目*/fieldNum = fieldSquence.split(",").length;/** 行數據正則Pattern*/linePattern= Pattern.compile(lineRegex);/** 線程池*/ExecutorService es = Executors.newFixedThreadPool(Integer.valueOf(threadNum));/** 根據path目錄獲取文件* 根據includesuffix選中文件調用send(file)* 每個文件創建一個線程(線程實際總數由threadNum決定)*/for(String path:path.split(",")){File dir=new File(path);File[] files = dir.listFiles();for(final File file:files){for(String suffix:includeSuffix.split(",")){if(file.getAbsolutePath().endsWith(suffix)){es.submit(new Runnable() {@Overridepublic void run() {send(file); }});}}}}/** 關閉線程池*/es.shutdown();/** 線程池停止后通知后續服務直到后續服務接受了請求*/boolean stop=false,noticed=false;try {while(!stop||!noticed){if (es.isTerminated()) { stop=true;} Thread.sleep(2000);if(stop){noticed = connectSuccess(noticeUrl);}}} catch (Exception e) {e.printStackTrace();}}/** 讀取文件并發送到kafka,文件內容發送完成后將文件添加.COMPLETED后綴*/public static void send(File file){BufferedReader bf =null;StringBuffer sb = null;try { bf = new BufferedReader(new FileReader(file));String line = null;Producer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());while((line = bf.readLine())!=null){sb = new StringBuffer();line = line.trim();if(linePattern.matcher(line).matches()){String[] fields = line.split(delimiter);if(fields.length<fieldNum){}else{for(String fieldValue:fields)sb.append(fieldValue).append(delimiter2);sb.append(file.getAbsolutePath());producer.send(new ProducerRecord<String, String>(topic, String.valueOf((new Date()).getTime()), sb.toString()),new Callback() {@Overridepublic void onCompletion(RecordMetadata rm, Exception e) {if(e!=null)System.out.println("send fail"+rm.toString()+",e:"+e.getMessage());}});}}else{}}producer.close();} catch (Exception e) {System.out.println(e.toString());}finally {if(bf!=null)try {bf.close();} catch (Exception e) {e.printStackTrace();}}file.renameTo(new File(file.getAbsolutePath()+".COMPLETED"));}/** 根據地址請求服務,請求成功則返回true*/public static boolean connectSuccess(String path){URL url;try {url = new URL(noticeUrl);HttpURLConnection con = (HttpURLConnection) url.openConnection();if(con.getResponseCode()==200) return true;} catch (Exception e) {return false;}return false;} }?
?
? 配置文件編寫customer2kafka.conf
ip=192.168.1.91 threadnum=20 port=9092 topic=customertopic path=/home/ftpuser/customer includesuffix=txt lineregex=^#\d.*$ delimiter=\s+ noticeurl=http://192.168.1.92:6009/schedule/customer fieldsquence=id,name,scoremaven打包執行:
java -jar file2kafka-2.0.jar /opt/app/file2kafka/customer2kafka.confpom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.shenyuchong</groupId>
<artifactId>file2kafka</artifactId>
<version>2.0</version>
<packaging>jar</packaging>
<name>file2hive</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.0.0</version><!--$NO-MVN-MAN-VER$ -->
</dependency>
</dependencies>
<build>
<sourceDirectory>src</sourceDirectory>
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<appendAssemblyId>false</appendAssemblyId>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass>com.gbd.App</mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>assembly</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
轉載于:https://www.cnblogs.com/shenyuchong/p/11454506.html
總結
以上是生活随笔為你收集整理的采集文件到kafka的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 国王与金矿
- 下一篇: 解决webpack打包vue项目后,部署