apache flume_Flume:使用Apache Flume收集客户产品搜索点击数据
apache flume
這篇文章涵蓋了使用Apache flume收集客戶產品搜索點擊并使用hadoop和elasticsearch接收器存儲信息。 數據可能包含不同的產品搜索事件,例如基于不同方面的過濾,分類信息,分頁信息,以及進一步查看的產品以及某些被客戶標記為喜歡的產品。 在以后的文章中,我們將進一步分析數據,以使用相同的信息進行顯示和分析。
產品搜索功能
任何電子商務平臺都可以為客戶提供不同的產品,而搜索功能是其基礎之一。 允許用戶使用不同的構面/過濾器進行引導導航,或使用自由文本搜索內容,這與任何現有搜索功能無關緊要。
SearchQueryInstruction
考慮類似的情況,客戶可以搜索產品,并允許我們使用以下信息來捕獲產品搜索行為,
public class SearchQueryInstruction implements Serializable {@JsonIgnoreprivate final String _eventIdSuffix;private String eventId;private String hostedMachineName;private String pageUrl;private Long customerId;private String sessionId;private String queryString;private String sortOrder;private Long pageNumber;private Long totalHits;private Long hitsShown;private final Long createdTimeStampInMillis;private String clickedDocId;private Boolean favourite;@JsonIgnoreprivate Map<String, Set<String>> filters;@JsonProperty(value = "filters")private List<FacetFilter> _filters;public SearchQueryInstruction() {_eventIdSuffix = UUID.randomUUID().toString();createdTimeStampInMillis = new Date().getTime();}......private static class FacetFilter implements Serializable {private String code;private String value;public FacetFilter(String code, String value) {this.code = code;this.value = value;}......} }有關更多源信息,請訪問SearchQueryInstruction 。 數據以JSON格式序列化,以便能夠直接與ElasticSearch結合使用以進一步顯示。
示例數據,基于用戶點擊的點擊信息的外觀。 在將數據發送到嵌入式水槽代理之前,數據將轉換為json格式。
{"eventid":"629e9b5f-ff4a-4168-8664-6c8df8214aa7-1399386809805-24","hostedmachinename":"192.168.182.1330","pageurl":"http://jaibigdata.com/5","customerid":24,"sessionid":"648a011d-570e-48ef-bccc-84129c9fa400","querystring":null,"sortorder":"desc","pagenumber":3,"totalhits":28,"hitsshown":7,"createdtimestampinmillis":1399386809805,"clickeddocid":"41","favourite":null,"eventidsuffix":"629e9b5f-ff4a-4168-8664-6c8df8214aa7","filters":[{"code":"searchfacettype_color_level_2","value":"Blue"},{"code":"searchfacettype_age_level_2","value":"12-18 years"}]} {"eventid":"648b5cf7-7ca9-4664-915d-23b0d45facc4-1399386809782-298","hostedmachinename":"192.168.182.1333","pageurl":"http://jaibigdata.com/4","customerid":298,"sessionid":"7bf042ea-526a-4633-84cd-55e0984ea2cb","querystring":"queryString48","sortorder":"desc","pagenumber":0,"totalhits":29,"hitsshown":19,"createdtimestampinmillis":1399386809782,"clickeddocid":"9","favourite":null,"eventidsuffix":"648b5cf7-7ca9-4664-915d-23b0d45facc4","filters":[{"code":"searchfacettype_color_level_2","value":"Green"}]} {"eventid":"74bb7cfe-5f8c-4996-9700-0c387249a134-1399386809799-440","hostedmachinename":"192.168.182.1330","pageurl":"http://jaibigdata.com/1","customerid":440,"sessionid":"940c9a0f-a9b2-4f1d-b114-511ac11bf2bb","querystring":"queryString16","sortorder":"asc","pagenumber":3,"totalhits":5,"hitsshown":32,"createdtimestampinmillis":1399386809799,"clickeddocid":null,"favourite":null,"eventidsuffix":"74bb7cfe-5f8c-4996-9700-0c387249a134","filters":[{"code":"searchfacettype_brand_level_2","value":"Apple"}]} {"eventid":"9da05913-84b1-4a74-89ed-5b6ec6389cce-1399386809828-143","hostedmachinename":"192.168.182.1332","pageurl":"http://jaibigdata.com/1","customerid":143,"sessionid":"08a4a36f-2535-4b0e-b86a-cf180202829b","querystring":null,"sortorder":"desc","pagenumber":0,"totalhits":21,"hitsshown":34,"createdtimestampinmillis":1399386809828,"clickeddocid":"38","favourite":true,"eventidsuffix":"9da05913-84b1-4a74-89ed-5b6ec6389cce","filters":[{"code":"searchfacettype_color_level_2","value":"Blue"},{"code":"product_price_range","value":"10.0 - 20.0"}]}阿帕奇水槽
Apache Flume用于收集和聚合數據。 此處,嵌入式Flume代理用于捕獲搜索查詢指令事件。 根據實際使用情況,
- 您可以使用嵌入式代理來收集數據
- 或通過rest api將數據從頁面推送到專用于事件收集的后端api服務
- 或者,您可以使用應用程序日志記錄功能來記錄所有搜索事件,并在日志文件的末尾收集數據
考慮一個取決于應用程序的場景,多個Web /應用程序服務器將事件數據發送到收集器水槽代理。 如下圖所示,搜索點擊事件是從多個Web /應用程序服務器和一個收集器/合并器代理收集的,以從所有代理收集數據。 數據基于選擇器使用多路復用策略進一步劃分,以存儲在Hadoop HDFS中,并且還將相關數據定向到例如ElasticSearch。 最近瀏覽過的商品。
嵌入式Flume代理
嵌入式Flume Agent允許我們在應用程序本身中包含Flume代理,并允許我們收集數據并進一步發送給收集器代理。
private static EmbeddedAgent agent;private void createAgent() {final Map<String, String> properties = new HashMap<String, String>();properties.put("channel.type", "memory");properties.put("channel.capacity", "100000");properties.put("channel.transactionCapacity", "1000");properties.put("sinks", "sink1");properties.put("sink1.type", "avro");properties.put("sink1.hostname", "localhost");properties.put("sink1.port", "44444");properties.put("processor.type", "default");try {agent = new EmbeddedAgent("searchqueryagent");agent.configure(properties);agent.start();} catch (final Exception ex) {LOG.error("Error creating agent!", ex);}}存儲搜索事件數據
Flume提供了多個接收器選項來存儲數據以供將來分析。 如圖所示,我們將采用將數據存儲在Apache Hadoop和ElasticSearch中的方案,以實現最近查看的項目功能。
Hadoop接收器
允許將數據永久存儲到HDFS,以便以后對其進行分析以進行分析。
根據傳入的事件數據,假設我們要每小時存儲一次。 “ / searchevents / 2014/05/15/16”目錄將存儲16小時內的所有傳入事件。
檢查FlumeHDFSSinkServiceImpl.java以獲取有關hdfs接收器的詳細啟動/停止信息。
下面的示例數據存儲在hadoop中,
Check:hdfs://localhost.localdomain:54321/searchevents/2014/05/06/16/searchevents.1399386809864 body is:{"eventid":"e8470a00-c869-4a90-89f2-f550522f8f52-1399386809212-72","hostedmachinename":"192.168.182.1334","pageurl":"http://jaibigdata.com/0","customerid":72,"sessionid":"7871a55c-a950-4394-bf5f-d2179a553575","querystring":null,"sortorder":"desc","pagenumber":0,"totalhits":8,"hitsshown":44,"createdtimestampinmillis":1399386809212,"clickeddocid":"23","favourite":null,"eventidsuffix":"e8470a00-c869-4a90-89f2-f550522f8f52","filters":[{"code":"searchfacettype_brand_level_2","value":"Apple"},{"code":"searchfacettype_color_level_2","value":"Blue"}]} body is:{"eventid":"2a4c1e1b-d2c9-4fe2-b38d-9b7d32feb4e0-1399386809743-61","hostedmachinename":"192.168.182.1330","pageurl":"http://jaibigdata.com/0","customerid":61,"sessionid":"78286f6d-cc1e-489c-85ce-a7de8419d628","querystring":"queryString59","sortorder":"asc","pagenumber":3,"totalhits":32,"hitsshown":9,"createdtimestampinmillis":1399386809743,"clickeddocid":null,"favourite":null,"eventidsuffix":"2a4c1e1b-d2c9-4fe2-b38d-9b7d32feb4e0","filters":[{"code":"searchfacettype_age_level_2","value":"0-12 years"}]}ElasticSearch接收器
出于查看目的,向最終用戶顯示最近查看的項目。 ElasticSearch Sink允許自動創建每日最近查看的項目。 該功能可用于顯示客戶最近查看的項目。
假設您已經有ES實例在localhost / 9310上運行。
檢查FlumeESSinkServiceImpl.java以獲得啟動/停止ElasticSearch接收器的詳細信息。
elasticsearch中的樣本數據存儲為
{timestamp=1399386809743, body={pageurl=http://jaibigdata.com/0, querystring=queryString59, pagenumber=3, hitsshown=9, hostedmachinename=192.168.182.1330, createdtimestampinmillis=1399386809743, sessionid=78286f6d-cc1e-489c-85ce-a7de8419d628, eventid=2a4c1e1b-d2c9-4fe2-b38d-9b7d32feb4e0-1399386809743-61, totalhits=32, clickeddocid=null, customerid=61, sortorder=asc, favourite=null, eventidsuffix=2a4c1e1b-d2c9-4fe2-b38d-9b7d32feb4e0, filters=[{value=0-12 years, code=searchfacettype_age_level_2}]}, eventId=2a4c1e1b-d2c9-4fe2-b38d-9b7d32feb4e0} {timestamp=1399386809757, body={pageurl=http://jaibigdata.com/1, querystring=null, pagenumber=1, hitsshown=34, hostedmachinename=192.168.182.1330, createdtimestampinmillis=1399386809757, sessionid=e6a3fd51-fe07-4e21-8574-ce5ab8bfbd68, eventid=fe5279b7-0bce-4e2b-ad15-8b94107aa792-1399386809757-134, totalhits=9, clickeddocid=22, customerid=134, sortorder=desc, favourite=null, eventidsuffix=fe5279b7-0bce-4e2b-ad15-8b94107aa792, filters=[{value=Blue, code=searchfacettype_color_level_2}]}, State=VIEWED, eventId=fe5279b7-0bce-4e2b-ad15-8b94107aa792} {timestamp=1399386809765, body={pageurl=http://jaibigdata.com/0, querystring=null, pagenumber=4, hitsshown=2, hostedmachinename=192.168.182.1331, createdtimestampinmillis=1399386809765, sessionid=29864de8-5708-40ab-a78b-4fae55698b01, eventid=886e9a28-4c8c-4e8c-a866-e86f685ecc54-1399386809765-317, totalhits=2, clickeddocid=null, customerid=317, sortorder=asc, favourite=null, eventidsuffix=886e9a28-4c8c-4e8c-a866-e86f685ecc54, filters=[{value=0-12 years, code=searchfacettype_age_level_2}, {value=0.0 - 10.0, code=product_price_range}]}, eventId=886e9a28-4c8c-4e8c-a866-e86f685ecc54}ElasticSearchJsonBodyEventSerializer
控制如何在ElasticSearch中建立數據索引。 根據您的策略更新事件Seaalalizer,以查看應如何為數據建立索引。
public class ElasticSearchJsonBodyEventSerializer implements ElasticSearchEventSerializer {@Overridepublic BytesStream getContentBuilder(final Event event) throws IOException {final XContentBuilder builder = jsonBuilder().startObject();appendBody(builder, event);appendHeaders(builder, event);return builder;}...... }檢查ElasticSearchJsonBodyEventSerializer.java以配置序列化器以索引數據。
讓我們以Java為例創建Flume源,以在測試用例中處理上述SearchQueryInstruction并存儲數據。
帶通道選擇器的Avro Source
為了進行測試,讓我們創建Avro源,以基于水槽多路復用功能將數據重定向到相關的接收器。
//Avro source to start at below port and process incoming data.private AvroSource avroSource;final Map<String, String> properties = new HashMap<String, String>();properties.put("type", "avro");properties.put("bind", "localhost");properties.put("port", "44444");avroSource = new AvroSource();avroSource.setName("AvroSource-" + UUID.randomUUID());Context sourceContext = new Context(properties);avroSource.configure(sourceContext);ChannelSelector selector = new MultiplexingChannelSelector();//Channels from above servicesChannel ESChannel = flumeESSinkService.getChannel();Channel HDFSChannel = flumeHDFSSinkService.getChannel();List<Channel> channels = new ArrayList<>();channels.add(ESChannel);channels.add(HDFSChannel);selector.setChannels(channels);final Map<String, String> selectorProperties = new HashMap<String, String>();selectorProperties.put("type", "multiplexing");selectorProperties.put("header", "State");selectorProperties.put("mapping.VIEWED", HDFSChannel.getName() + " "+ ESChannel.getName());selectorProperties.put("mapping.FAVOURITE", HDFSChannel.getName() + " "+ ESChannel.getName());selectorProperties.put("default", HDFSChannel.getName());Context selectorContext = new Context(selectorProperties);selector.configure(selectorContext);ChannelProcessor cp = new ChannelProcessor(selector);avroSource.setChannelProcessor(cp);avroSource.start();檢查FlumeAgentServiceImpl.java,將數據直接存儲到上面配置的接收器,甚至將所有數據記錄到日志文件中。
獨立Flume / Hadoop / ElasticSearch環境
該應用程序可用于生成SearchQueryInstruction數據,并且您可以使用自己的獨立環境進一步處理數據。 如果您已經在運行Flume / Hadoop / ElasticSearch環境,請使用以下設置進一步處理數據。
如果您已經在運行Flume實例,也可以使用以下配置(flume.conf),
# Name the components on this agent searcheventscollectoragent.sources = eventsavrosource searcheventscollectoragent.sinks = hdfssink essink searcheventscollectoragent.channels = hdfschannel eschannel# Bind the source and sink to the channel searcheventscollectoragent.sources.eventsavrosource.channels = hdfschannel eschannel searcheventscollectoragent.sinks.hdfssink.channel = hdfschannel searcheventscollectoragent.sinks.essink.channel = eschannel#Avro source. This is where data will send data to. searcheventscollectoragent.sources.eventsavrosource.type = avro searcheventscollectoragent.sources.eventsavrosource.bind = 0.0.0.0 searcheventscollectoragent.sources.eventsavrosource.port = 44444 searcheventscollectoragent.sources.eventsavrosource.selector.type = multiplexing searcheventscollectoragent.sources.eventsavrosource.selector.header = State searcheventscollectoragent.sources.eventsavrosource.selector.mapping.VIEWED = hdfschannel eschannel searcheventscollectoragent.sources.eventsavrosource.selector.mapping.default = hdfschannel# Use a channel which buffers events in memory. This will keep all incoming stuff in memory. You may change this to file etc. in case of too much data coming and memory an issue. searcheventscollectoragent.channels.hdfschannel.type = memory searcheventscollectoragent.channels.hdfschannel.capacity = 100000 searcheventscollectoragent.channels.hdfschannel.transactionCapacity = 1000searcheventscollectoragent.channels.eschannel.type = memory searcheventscollectoragent.channels.eschannel.capacity = 100000 searcheventscollectoragent.channels.eschannel.transactionCapacity = 1000#HDFS sink. Store events directly to hadoop file system. searcheventscollectoragent.sinks.hdfssink.type = hdfs searcheventscollectoragent.sinks.hdfssink.hdfs.path = hdfs://localhost.localdomain:54321/searchevents/%Y/%m/%d/%H searcheventscollectoragent.sinks.hdfssink.hdfs.filePrefix = searchevents searcheventscollectoragent.sinks.hdfssink.hdfs.fileType = DataStream searcheventscollectoragent.sinks.hdfssink.hdfs.rollInterval = 0 searcheventscollectoragent.sinks.hdfssink.hdfs.rollSize = 134217728 searcheventscollectoragent.sinks.hdfssink.hdfs.idleTimeout = 60 searcheventscollectoragent.sinks.hdfssink.hdfs.rollCount = 0 searcheventscollectoragent.sinks.hdfssink.hdfs.batchSize = 10 searcheventscollectoragent.sinks.hdfssink.hdfs.useLocalTimeStamp = true#Elastic search searcheventscollectoragent.sinks.essink.type = elasticsearch searcheventscollectoragent.sinks.essink.hostNames = 127.0.0.1:9310 searcheventscollectoragent.sinks.essink.indexName = recentlyviewed searcheventscollectoragent.sinks.essink.indexType = clickevent searcheventscollectoragent.sinks.essink.clusterName = jai-testclusterName searcheventscollectoragent.sinks.essink.batchSize = 10 searcheventscollectoragent.sinks.essink.ttl = 5 searcheventscollectoragent.sinks.essink.serializer = org.jai.flume.sinks.elasticsearch.serializer.ElasticSearchJsonBodyEventSerializer要測試應用程序搜索查詢指令在現有hadoop實例上的行為,請分別設置hadoop和elasticsearch實例。 該應用程序使用Cloudera hadoop distribution 5.0進行測試。
在后面的文章中,我們將介紹進一步分析生成的數據,
- 使用Hive可以查詢數據,以查詢最重要的客戶和產品瀏覽的次數。
- 使用ElasticSearch Hadoop為客戶最重要的查詢和產品視圖數據編制索引
- 使用Pig計算唯一客戶總數
- 使用Oozie計劃針對配置單元分區進行協調的作業,并將作業捆綁以將數據索引到ElasticSearch。
翻譯自: https://www.javacodegeeks.com/2014/05/flume-gathering-customer-product-search-clicks-data-using-apache-flume.html
apache flume
總結
以上是生活随笔為你收集整理的apache flume_Flume:使用Apache Flume收集客户产品搜索点击数据的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: war解压缩(war解压 linux)
- 下一篇: ddos攻击服务提供商有哪些(ddos攻