【从入门到放弃-ZooKeeper】ZooKeeper实战-分布式队列
前言
上文【從入門到放棄-ZooKeeper】ZooKeeper入門中,我們學(xué)習(xí)了ZooKeeper的簡單安裝和cli使用。
接下來我們開始基于java API的實戰(zhàn)編程。本文先來寫一個分布式隊列的代碼實現(xiàn)。
設(shè)計
我們來寫一個先進(jìn)先出的分布式無界公平隊列。參考我們之前介紹的【從入門到放棄-Java】并發(fā)編程-JUC-ConcurrentLinkedQueue和【從入門到放棄-Java】并發(fā)編程-JUC-LinkedBlockingQueue。我們直接繼承AbstractQueue類,并實現(xiàn)Queue接口。
主要重寫offer、poll、peek、size方法。
我們使用ZooKeeper的持久化順序節(jié)點來實現(xiàn)分布式隊列。
offer是入隊,入隊時新創(chuàng)建一個持久化順序節(jié)點,節(jié)點后綴會根據(jù)ZooKeeper的特性自動累加。
poll的出隊,獲取根節(jié)點下的所有節(jié)點,根據(jù)后綴數(shù)字排序,數(shù)組最小的是最先入隊的,因此要最先出隊。
peek,獲取到最下入隊的數(shù)據(jù),和poll的區(qū)別是,peek只獲取數(shù)據(jù),不出隊,不刪除已經(jīng)消費(fèi)的節(jié)點。
size獲取隊列長度,實現(xiàn)方式是,獲取根節(jié)點下的節(jié)點數(shù)量即可。這個方法在并發(fā)時可能會有問題。慎用。
DistributedQueue
//繼承AbstractQueue類并實現(xiàn)Queue接口 public class DistributedQueue<E> extends AbstractQueue<E> implements Queue<E> {private static Logger logger = LoggerFactory.getLogger(DistributedQueue.class);//ZooKeeper客戶端,進(jìn)行ZooKeeper操作private ZooKeeper zooKeeper;//根節(jié)點名稱private String dir;//數(shù)據(jù)節(jié)點名稱,順序節(jié)點在插入口會變?yōu)?node{00000000xx} 格式private String node;//ZooKeeper鑒權(quán)信息private List<ACL> acls;/*** Constructor.** @param zooKeeper the zoo keeper* @param dir the dir* @param node the node* @param acls the acls*/public DistributedQueue (ZooKeeper zooKeeper, String dir, String node, List<ACL> acls) {this.zooKeeper = zooKeeper;this.dir = dir;this.node = node;this.acls = acls;init();}private void init() {//需要先判斷根節(jié)點是否存在,不存在的話,創(chuàng)建子節(jié)點時會出錯。try {Stat stat = zooKeeper.exists(dir, false);if (stat == null) {zooKeeper.create(dir, null, acls, CreateMode.PERSISTENT);}} catch (Exception e) {logger.error("[DistributedQueue#init] error : " + e.toString(), e);}} }offer
/*** Offer boolean.** @param o the o* @return the boolean*/ @Override public boolean offer(E o) {//構(gòu)建要插入的節(jié)點名稱String fullPath = dir.concat("/").concat(node);try {//創(chuàng)建子節(jié)點成功則返回入隊成功zooKeeper.create(fullPath, objectToBytes(o), acls, CreateMode.PERSISTENT_SEQUENTIAL);return true;} catch (Exception e) {logger.error("[DistributedQueue#offer] error : " + e.toString(), e);}return false; }poll
/*** Poll e.** @return the e*/ @Override public E poll() {try {//獲取根節(jié)點所有子節(jié)點信息。List<String> children = zooKeeper.getChildren(dir, null);//如果隊列是空的則返回nullif (children == null || children.isEmpty()) {return null;}//將子節(jié)點名稱排序Collections.sort(children);for (String child : children) {//拼接子節(jié)點的具體名稱String fullPath = dir.concat("/").concat(child);try {//如果獲取數(shù)據(jù)成功,則類型轉(zhuǎn)換后,返回,并刪除改隊列中該節(jié)點byte[] bytes = zooKeeper.getData(fullPath, false, null);E data = (E) bytesToObject(bytes);zooKeeper.delete(fullPath, -1);return data;} catch (Exception e) {logger.warn("[DistributedQueue#poll] warn : " + e.toString(), e);}}} catch (Exception e) {logger.error("[DistributedQueue#peek] poll : " + e.toString(), e);}return null; }peek
/*** Peek e.** @return the e*/ @Override public E peek() {try {//獲取根節(jié)點所有子節(jié)點信息。List<String> children = zooKeeper.getChildren(dir, null);//如果隊列是空的則返回nullif (children == null || children.isEmpty()) {return null;}//將子節(jié)點名稱排序Collections.sort(children);for (String child : children) {//拼接子節(jié)點的具體名稱String fullPath = dir.concat("/").concat(child);try {//如果獲取數(shù)據(jù)成功,則類型轉(zhuǎn)換后,返回,不會刪除改隊列中該節(jié)點byte[] bytes = zooKeeper.getData(fullPath, false, null);E data = (E) bytesToObject(bytes);return data;} catch (Exception e) {logger.warn("[DistributedQueue#peek] warn : " + e.toString(), e);}}} catch (Exception e) {logger.error("[DistributedQueue#peek] warn : " + e.toString(), e);}return null; }size
/*** Size int.** @return the int*/ @Override public int size() {try {//獲取根節(jié)點的子節(jié)點名稱List<String> children = zooKeeper.getChildren(dir, null);//返回子結(jié)點信息數(shù)量return children.size();} catch (Exception e) {logger.error("[DistributedQueue#offer] size : " + e.toString(), e);}return 0; }總結(jié)
上面我們一起學(xué)習(xí)了如何利用持久性順序節(jié)點,創(chuàng)建一個分布式先進(jìn)先出隊列。源代碼可見:aloofJr。
如果有好的優(yōu)化建議,歡迎一起討論。
原文鏈接
本文為云棲社區(qū)原創(chuàng)內(nèi)容,未經(jīng)允許不得轉(zhuǎn)載。
總結(jié)
以上是生活随笔為你收集整理的【从入门到放弃-ZooKeeper】ZooKeeper实战-分布式队列的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 玩转运维编排服务的权限:Assume R
- 下一篇: 为了实现在线库的复杂查询,你还在双写吗?