大数据技术_ 基础理论 之 数据采集与预处理
2.1 大數據采集架構
2.1.1概述
如今,社會中各個機構、部門、公司、團體等正在實時不斷地產生大量的信息,這些信息需要以簡單的方式進行處理,同時又要十分準確且能迅速滿足各種類型的數據(信息)需求者。這給我們帶來了許多挑戰,第一個挑戰就是在大量的數據中收集需要的數據,下面介紹常用的大數據采集工具。
2.1.2 常用大數據采集工具
數據采集最傳統的方式是企業自己的生產系統產生的數據,除上述生產系統中的數據外,企業的信息系統還充斥著大量的用戶行為數據、日志式的活動數據、事件信息等,越來越多的企業通過架設日志采集系統來保存這些數據,希望通過這些數據獲取其商業或社會價值。
2.1.3 Apache Kafka數據采集
6、使用Java來編寫Kafka的實例
首先,編寫KafkaProducer.properties文件:
zk.connect = localhost:2181 broker.list = localhost:9092 serializer.class = kafka.serializer.StringEncoder request.required.acks = 1下面的代碼是使用Java編寫了一個Kafka消息發布者:
import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; public class MyKafkaProducer {private Producer<String, String> producer;private final String topic;public MyKafkaProducer(String topic) throws Exception {InputStream in = Properties.class.getResourceAsStream("KafkaProducer.properties");Properties props = new Properties();props.load(in);ProducerConfig config = new ProducerConfig(props);producer = new Producer<String, String>(config);}public void sendMessage(String msg){KeyedMessage<String, String> data =new KeyedMessage<String, String>( topic, msg);producer.send(data);producer.close();}public static void main(String[] args) throws Exception{MyKafkaProducer producer = new MyKafkaProducer("HelloTopic");String msg = "Hello Kafka!";producer. sendMessage(msg);} }下面創建Comsumer,首先編寫KafkaProperties文件:
zk.connect = localhost:2181 group.id = testgroup zookeeper.session.timeout.ms = 500 zookeeper.sync.time.ms = 250 auto.commit.interval.ms = 1000上述參數配置,十分容易理解,具體的詳細說明,可以參考Kafka的官方文檔。下面的代碼是使用Java編寫了一個Kafka的Comsumer。
import java.io.InputStream; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; import kafka.consumer.Consumer;public class MyKafkaConsumer {private final ConsumerConnector consumer;private final String topic;public MyKafkaConsumer(String topic) throws Exception{InputStream in = Properties.class.getResourceAsStream("KafkaProducer.properties");Properties props = new Properties();props.load(in);ConsumerConfig config = new ConsumerConfig(props);consumer = Consumer.createJavaConsumerConnector(config);this.topic = topic;}public void consumeMessage() {Map<String, String> topicMap = new HashMap<String, String>();topicMap.put(topic, new Integer(1));Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreamsMap =consumer.createMessageStreams(topicMap);List<KafkaStream<byte[], byte[]>> streamList =consumerStreamsMap.get(topic);for (final KafkaStream<byte[], byte[]> stream : streamList) {ConsumerIterator<byte[], byte[]> consumerIte =stream.iterator();while (consumerIte.hasNext())System.out.println("message :: "+ new String(consumerIte.next().message()));}if (consumer != null)consumer.shutdown();}public static void main(String[] args) throws Exception{String groupId = "testgroup";String topic = "HelloTopic";MyKafkaConsumer consumer = new MyKafkaConsumer(topic);consumer.consumeMessage();} }2.2 數據預處理原理
通過數據預處理工作,可以使殘缺的數據完整,并將錯誤的數據糾正、多余的數據去除,進而將所需的數據挑選出來,并且進行數據集成。數據預處理的常見方法有數據清洗、數據集成與數據變換。
2.2.1 數據清洗
2.2.2 數據集成
2.2.3 數據變換
2.3 數據倉庫與ETL工具
2.3.1 數據倉庫與ETL工具
數據倉庫,是在企業管理和決策中面向主題的、集成的、隨時間變化的、非易失性數據的集合。
數據倉庫中的數據來自于多種業務數據源,這些數據源可能處于不同硬件平臺上,使用不同的操作系統,數據模型也相差很遠。如何獲取并向數據倉庫加載這些數據量大、種類多的數據,已成為建立數據倉庫所面臨的一個關鍵問題。
2.3.2 常用ETL工具
2.3.3 案例:Kettle數據遷移
總結
以上是生活随笔為你收集整理的大数据技术_ 基础理论 之 数据采集与预处理的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 装饰器函数
- 下一篇: 12.4日团队工作总结