Flume:使用Apache Flume收集客户产品搜索点击数据
這篇文章涵蓋了使用Apache flume收集客戶產(chǎn)品搜索點擊并使用hadoop和elasticsearch接收器存儲信息。 數(shù)據(jù)可能包含不同的產(chǎn)品搜索事件,例如基于不同方面的過濾,排序信息,分頁信息,以及進(jìn)一步查看的產(chǎn)品以及某些被客戶標(biāo)記為喜歡的產(chǎn)品。 在以后的文章中,我們將進(jìn)一步分析數(shù)據(jù),以使用相同的信息進(jìn)行顯示和分析。
產(chǎn)品搜索功能
任何電子商務(wù)平臺都可以為客戶提供不同的產(chǎn)品,而搜索功能是其基礎(chǔ)之一。 允許用戶使用不同的構(gòu)面/過濾器進(jìn)行引導(dǎo)導(dǎo)航,或使用自由文本搜索內(nèi)容,這與任何現(xiàn)有搜索功能無關(guān)緊要。
SearchQueryInstruction
考慮類似的情況,客戶可以搜索產(chǎn)品,并允許我們使用以下信息來捕獲產(chǎn)品搜索行為,
public class SearchQueryInstruction implements Serializable {@JsonIgnoreprivate final String _eventIdSuffix;private String eventId;private String hostedMachineName;private String pageUrl;private Long customerId;private String sessionId;private String queryString;private String sortOrder;private Long pageNumber;private Long totalHits;private Long hitsShown;private final Long createdTimeStampInMillis;private String clickedDocId;private Boolean favourite;@JsonIgnoreprivate Map<String, Set<String>> filters;@JsonProperty(value = "filters")private List<FacetFilter> _filters;public SearchQueryInstruction() {_eventIdSuffix = UUID.randomUUID().toString();createdTimeStampInMillis = new Date().getTime();}......private static class FacetFilter implements Serializable {private String code;private String value;public FacetFilter(String code, String value) {this.code = code;this.value = value;}......} }有關(guān)更多源信息,請訪問SearchQueryInstruction 。 數(shù)據(jù)以JSON格式序列化,以便能夠直接與ElasticSearch結(jié)合使用以進(jìn)一步顯示。
示例數(shù)據(jù),基于用戶點擊的點擊信息的外觀。 數(shù)據(jù)先轉(zhuǎn)換為json格式,然后再發(fā)送給嵌入式水槽代理。
{"eventid":"629e9b5f-ff4a-4168-8664-6c8df8214aa7-1399386809805-24","hostedmachinename":"192.168.182.1330","pageurl":"http://jaibigdata.com/5","customerid":24,"sessionid":"648a011d-570e-48ef-bccc-84129c9fa400","querystring":null,"sortorder":"desc","pagenumber":3,"totalhits":28,"hitsshown":7,"createdtimestampinmillis":1399386809805,"clickeddocid":"41","favourite":null,"eventidsuffix":"629e9b5f-ff4a-4168-8664-6c8df8214aa7","filters":[{"code":"searchfacettype_color_level_2","value":"Blue"},{"code":"searchfacettype_age_level_2","value":"12-18 years"}]} {"eventid":"648b5cf7-7ca9-4664-915d-23b0d45facc4-1399386809782-298","hostedmachinename":"192.168.182.1333","pageurl":"http://jaibigdata.com/4","customerid":298,"sessionid":"7bf042ea-526a-4633-84cd-55e0984ea2cb","querystring":"queryString48","sortorder":"desc","pagenumber":0,"totalhits":29,"hitsshown":19,"createdtimestampinmillis":1399386809782,"clickeddocid":"9","favourite":null,"eventidsuffix":"648b5cf7-7ca9-4664-915d-23b0d45facc4","filters":[{"code":"searchfacettype_color_level_2","value":"Green"}]} {"eventid":"74bb7cfe-5f8c-4996-9700-0c387249a134-1399386809799-440","hostedmachinename":"192.168.182.1330","pageurl":"http://jaibigdata.com/1","customerid":440,"sessionid":"940c9a0f-a9b2-4f1d-b114-511ac11bf2bb","querystring":"queryString16","sortorder":"asc","pagenumber":3,"totalhits":5,"hitsshown":32,"createdtimestampinmillis":1399386809799,"clickeddocid":null,"favourite":null,"eventidsuffix":"74bb7cfe-5f8c-4996-9700-0c387249a134","filters":[{"code":"searchfacettype_brand_level_2","value":"Apple"}]} {"eventid":"9da05913-84b1-4a74-89ed-5b6ec6389cce-1399386809828-143","hostedmachinename":"192.168.182.1332","pageurl":"http://jaibigdata.com/1","customerid":143,"sessionid":"08a4a36f-2535-4b0e-b86a-cf180202829b","querystring":null,"sortorder":"desc","pagenumber":0,"totalhits":21,"hitsshown":34,"createdtimestampinmillis":1399386809828,"clickeddocid":"38","favourite":true,"eventidsuffix":"9da05913-84b1-4a74-89ed-5b6ec6389cce","filters":[{"code":"searchfacettype_color_level_2","value":"Blue"},{"code":"product_price_range","value":"10.0 - 20.0"}]}阿帕奇水槽
Apache Flume用于收集和聚合數(shù)據(jù)。 此處,嵌入式Flume代理用于捕獲搜索查詢指令事件。 根據(jù)實際使用情況,
- 您可以使用嵌入式代理來收集數(shù)據(jù)
- 或通過rest api將數(shù)據(jù)從頁面推送到專用于事件收集的后端api服務(wù)
- 或者,您可以使用應(yīng)用程序日志記錄功能來記錄所有搜索事件,并在日志文件的末尾收集數(shù)據(jù)
考慮一個取決于應(yīng)用程序的場景,多個Web /應(yīng)用程序服務(wù)器將事件數(shù)據(jù)發(fā)送到收集器水槽代理。 如下圖所示,搜索點擊事件是從多個Web /應(yīng)用服務(wù)器和一個收集器/合并器代理收集的,以從所有代理收集數(shù)據(jù)。 數(shù)據(jù)基于選擇器使用多路復(fù)用策略進(jìn)一步劃分,以存儲在Hadoop HDFS中,并且還將相關(guān)數(shù)據(jù)定向到ElasticSearch。 最近瀏覽過的商品。
嵌入式Flume代理
嵌入式Flume Agent允許我們在應(yīng)用程序本身中包含F(xiàn)lume代理,并允許我們收集數(shù)據(jù)并進(jìn)一步發(fā)送給收集器代理。
private static EmbeddedAgent agent;private void createAgent() {final Map<String, String> properties = new HashMap<String, String>();properties.put("channel.type", "memory");properties.put("channel.capacity", "100000");properties.put("channel.transactionCapacity", "1000");properties.put("sinks", "sink1");properties.put("sink1.type", "avro");properties.put("sink1.hostname", "localhost");properties.put("sink1.port", "44444");properties.put("processor.type", "default");try {agent = new EmbeddedAgent("searchqueryagent");agent.configure(properties);agent.start();} catch (final Exception ex) {LOG.error("Error creating agent!", ex);}}存儲搜索事件數(shù)據(jù)
Flume提供了多個接收器選項來存儲數(shù)據(jù)以供將來分析。 如圖所示,我們將采用將數(shù)據(jù)存儲在Apache Hadoop和ElasticSearch中的方案,以實現(xiàn)最近查看的項目功能。
Hadoop接收器
允許將數(shù)據(jù)永久存儲到HDFS,以便以后進(jìn)行分析以進(jìn)行分析。
根據(jù)傳入的事件數(shù)據(jù),假設(shè)我們要每小時存儲一次。 “ / searchevents / 2014/05/15/16”目錄將存儲16小時內(nèi)的所有傳入事件。
檢查FlumeHDFSSinkServiceImpl.java以獲取有關(guān)hdfs接收器的詳細(xì)啟動/停止信息。
下面的示例數(shù)據(jù)存儲在hadoop中,
Check:hdfs://localhost.localdomain:54321/searchevents/2014/05/06/16/searchevents.1399386809864 body is:{"eventid":"e8470a00-c869-4a90-89f2-f550522f8f52-1399386809212-72","hostedmachinename":"192.168.182.1334","pageurl":"http://jaibigdata.com/0","customerid":72,"sessionid":"7871a55c-a950-4394-bf5f-d2179a553575","querystring":null,"sortorder":"desc","pagenumber":0,"totalhits":8,"hitsshown":44,"createdtimestampinmillis":1399386809212,"clickeddocid":"23","favourite":null,"eventidsuffix":"e8470a00-c869-4a90-89f2-f550522f8f52","filters":[{"code":"searchfacettype_brand_level_2","value":"Apple"},{"code":"searchfacettype_color_level_2","value":"Blue"}]} body is:{"eventid":"2a4c1e1b-d2c9-4fe2-b38d-9b7d32feb4e0-1399386809743-61","hostedmachinename":"192.168.182.1330","pageurl":"http://jaibigdata.com/0","customerid":61,"sessionid":"78286f6d-cc1e-489c-85ce-a7de8419d628","querystring":"queryString59","sortorder":"asc","pagenumber":3,"totalhits":32,"hitsshown":9,"createdtimestampinmillis":1399386809743,"clickeddocid":null,"favourite":null,"eventidsuffix":"2a4c1e1b-d2c9-4fe2-b38d-9b7d32feb4e0","filters":[{"code":"searchfacettype_age_level_2","value":"0-12 years"}]}ElasticSearch接收器
出于查看目的,向最終用戶顯示最近查看的項目。 ElasticSearch Sink允許自動創(chuàng)建每日最近查看的項目。 該功能可用于顯示客戶最近查看的項目。
假設(shè)您已經(jīng)有ES實例在localhost / 9310上運(yùn)行。
檢查FlumeESSinkServiceImpl.java以獲得啟動/停止ElasticSearch接收器的詳細(xì)信息。
elasticsearch中的樣本數(shù)據(jù)存儲為
{timestamp=1399386809743, body={pageurl=http://jaibigdata.com/0, querystring=queryString59, pagenumber=3, hitsshown=9, hostedmachinename=192.168.182.1330, createdtimestampinmillis=1399386809743, sessionid=78286f6d-cc1e-489c-85ce-a7de8419d628, eventid=2a4c1e1b-d2c9-4fe2-b38d-9b7d32feb4e0-1399386809743-61, totalhits=32, clickeddocid=null, customerid=61, sortorder=asc, favourite=null, eventidsuffix=2a4c1e1b-d2c9-4fe2-b38d-9b7d32feb4e0, filters=[{value=0-12 years, code=searchfacettype_age_level_2}]}, eventId=2a4c1e1b-d2c9-4fe2-b38d-9b7d32feb4e0} {timestamp=1399386809757, body={pageurl=http://jaibigdata.com/1, querystring=null, pagenumber=1, hitsshown=34, hostedmachinename=192.168.182.1330, createdtimestampinmillis=1399386809757, sessionid=e6a3fd51-fe07-4e21-8574-ce5ab8bfbd68, eventid=fe5279b7-0bce-4e2b-ad15-8b94107aa792-1399386809757-134, totalhits=9, clickeddocid=22, customerid=134, sortorder=desc, favourite=null, eventidsuffix=fe5279b7-0bce-4e2b-ad15-8b94107aa792, filters=[{value=Blue, code=searchfacettype_color_level_2}]}, State=VIEWED, eventId=fe5279b7-0bce-4e2b-ad15-8b94107aa792} {timestamp=1399386809765, body={pageurl=http://jaibigdata.com/0, querystring=null, pagenumber=4, hitsshown=2, hostedmachinename=192.168.182.1331, createdtimestampinmillis=1399386809765, sessionid=29864de8-5708-40ab-a78b-4fae55698b01, eventid=886e9a28-4c8c-4e8c-a866-e86f685ecc54-1399386809765-317, totalhits=2, clickeddocid=null, customerid=317, sortorder=asc, favourite=null, eventidsuffix=886e9a28-4c8c-4e8c-a866-e86f685ecc54, filters=[{value=0-12 years, code=searchfacettype_age_level_2}, {value=0.0 - 10.0, code=product_price_range}]}, eventId=886e9a28-4c8c-4e8c-a866-e86f685ecc54}ElasticSearchJsonBodyEventSerializer
控制如何在ElasticSearch中建立數(shù)據(jù)索引。 根據(jù)您的策略更新事件Seaalalizer,以查看應(yīng)如何為數(shù)據(jù)建立索引。
public class ElasticSearchJsonBodyEventSerializer implements ElasticSearchEventSerializer {@Overridepublic BytesStream getContentBuilder(final Event event) throws IOException {final XContentBuilder builder = jsonBuilder().startObject();appendBody(builder, event);appendHeaders(builder, event);return builder;}...... }檢查ElasticSearchJsonBodyEventSerializer.java以配置序列化器以索引數(shù)據(jù)。
讓我們以Java為例創(chuàng)建Flume源,以在測試用例中處理上述SearchQueryInstruction并存儲數(shù)據(jù)。
帶通道選擇器的Avro Source
為了進(jìn)行測試,讓我們創(chuàng)建Avro源,以基于水槽多路復(fù)用功能將數(shù)據(jù)重定向到相關(guān)的接收器。
//Avro source to start at below port and process incoming data.private AvroSource avroSource;final Map<String, String> properties = new HashMap<String, String>();properties.put("type", "avro");properties.put("bind", "localhost");properties.put("port", "44444");avroSource = new AvroSource();avroSource.setName("AvroSource-" + UUID.randomUUID());Context sourceContext = new Context(properties);avroSource.configure(sourceContext);ChannelSelector selector = new MultiplexingChannelSelector();//Channels from above servicesChannel ESChannel = flumeESSinkService.getChannel();Channel HDFSChannel = flumeHDFSSinkService.getChannel();List<Channel> channels = new ArrayList<>();channels.add(ESChannel);channels.add(HDFSChannel);selector.setChannels(channels);final Map<String, String> selectorProperties = new HashMap<String, String>();selectorProperties.put("type", "multiplexing");selectorProperties.put("header", "State");selectorProperties.put("mapping.VIEWED", HDFSChannel.getName() + " "+ ESChannel.getName());selectorProperties.put("mapping.FAVOURITE", HDFSChannel.getName() + " "+ ESChannel.getName());selectorProperties.put("default", HDFSChannel.getName());Context selectorContext = new Context(selectorProperties);selector.configure(selectorContext);ChannelProcessor cp = new ChannelProcessor(selector);avroSource.setChannelProcessor(cp);avroSource.start();檢查FlumeAgentServiceImpl.java,將數(shù)據(jù)直接存儲到上面配置的接收器,甚至將所有數(shù)據(jù)記錄到日志文件中。
獨(dú)立Flume / Hadoop / ElasticSearch環(huán)境
該應(yīng)用程序可用于生成SearchQueryInstruction數(shù)據(jù),并且您可以使用自己的獨(dú)立環(huán)境進(jìn)一步處理數(shù)據(jù)。 如果您已經(jīng)在運(yùn)行Flume / Hadoop / ElasticSearch環(huán)境,請使用以下設(shè)置進(jìn)一步處理數(shù)據(jù)。
如果您已經(jīng)在運(yùn)行Flume實例,也可以使用以下配置(flume.conf),
# Name the components on this agent searcheventscollectoragent.sources = eventsavrosource searcheventscollectoragent.sinks = hdfssink essink searcheventscollectoragent.channels = hdfschannel eschannel# Bind the source and sink to the channel searcheventscollectoragent.sources.eventsavrosource.channels = hdfschannel eschannel searcheventscollectoragent.sinks.hdfssink.channel = hdfschannel searcheventscollectoragent.sinks.essink.channel = eschannel#Avro source. This is where data will send data to. searcheventscollectoragent.sources.eventsavrosource.type = avro searcheventscollectoragent.sources.eventsavrosource.bind = 0.0.0.0 searcheventscollectoragent.sources.eventsavrosource.port = 44444 searcheventscollectoragent.sources.eventsavrosource.selector.type = multiplexing searcheventscollectoragent.sources.eventsavrosource.selector.header = State searcheventscollectoragent.sources.eventsavrosource.selector.mapping.VIEWED = hdfschannel eschannel searcheventscollectoragent.sources.eventsavrosource.selector.mapping.default = hdfschannel# Use a channel which buffers events in memory. This will keep all incoming stuff in memory. You may change this to file etc. in case of too much data coming and memory an issue. searcheventscollectoragent.channels.hdfschannel.type = memory searcheventscollectoragent.channels.hdfschannel.capacity = 100000 searcheventscollectoragent.channels.hdfschannel.transactionCapacity = 1000searcheventscollectoragent.channels.eschannel.type = memory searcheventscollectoragent.channels.eschannel.capacity = 100000 searcheventscollectoragent.channels.eschannel.transactionCapacity = 1000#HDFS sink. Store events directly to hadoop file system. searcheventscollectoragent.sinks.hdfssink.type = hdfs searcheventscollectoragent.sinks.hdfssink.hdfs.path = hdfs://localhost.localdomain:54321/searchevents/%Y/%m/%d/%H searcheventscollectoragent.sinks.hdfssink.hdfs.filePrefix = searchevents searcheventscollectoragent.sinks.hdfssink.hdfs.fileType = DataStream searcheventscollectoragent.sinks.hdfssink.hdfs.rollInterval = 0 searcheventscollectoragent.sinks.hdfssink.hdfs.rollSize = 134217728 searcheventscollectoragent.sinks.hdfssink.hdfs.idleTimeout = 60 searcheventscollectoragent.sinks.hdfssink.hdfs.rollCount = 0 searcheventscollectoragent.sinks.hdfssink.hdfs.batchSize = 10 searcheventscollectoragent.sinks.hdfssink.hdfs.useLocalTimeStamp = true#Elastic search searcheventscollectoragent.sinks.essink.type = elasticsearch searcheventscollectoragent.sinks.essink.hostNames = 127.0.0.1:9310 searcheventscollectoragent.sinks.essink.indexName = recentlyviewed searcheventscollectoragent.sinks.essink.indexType = clickevent searcheventscollectoragent.sinks.essink.clusterName = jai-testclusterName searcheventscollectoragent.sinks.essink.batchSize = 10 searcheventscollectoragent.sinks.essink.ttl = 5 searcheventscollectoragent.sinks.essink.serializer = org.jai.flume.sinks.elasticsearch.serializer.ElasticSearchJsonBodyEventSerializer要測試應(yīng)用程序搜索查詢指令在現(xiàn)有hadoop實例上的行為,請分別設(shè)置hadoop和elasticsearch實例。 該應(yīng)用程序使用Cloudera hadoop distribution 5.0進(jìn)行測試。
在后面的文章中,我們將介紹進(jìn)一步分析生成的數(shù)據(jù),
- 使用Hive可以查詢數(shù)據(jù),以查詢最熱門的客戶和產(chǎn)品瀏覽的次數(shù)。
- 使用ElasticSearch Hadoop為客戶熱門查詢和產(chǎn)品視圖數(shù)據(jù)編制索引
- 使用Pig來計算唯一客戶總數(shù)
- 使用Oozie計劃針對配置單元分區(qū)進(jìn)行協(xié)調(diào)的作業(yè),并將作業(yè)捆綁以將數(shù)據(jù)索引到ElasticSearch。
翻譯自: https://www.javacodegeeks.com/2014/05/flume-gathering-customer-product-search-clicks-data-using-apache-flume.html
總結(jié)
以上是生活随笔為你收集整理的Flume:使用Apache Flume收集客户产品搜索点击数据的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 跑得快是词吗
- 下一篇: 太快了,太变态了:什么会影响Java中的