apache mesos_Apache Mesos:编写自己的分布式框架
apache mesos
在上一篇文章中 ,我們了解了mesos是什么,它如何有用,并開始使用它。 在本文中,我們將看到如何在mesos上編寫自己的框架。 (在mesos中,框架是在其上運行的任何應用程序。)本文介紹了一個名為“ mesos-pinspider”的框架,該框架獲取用戶的pinterest頁面的用戶配置文件信息和用戶面板信息。
Mesos框架
通常,Mesos框架具有三個基本組件。
- 將任務提交給框架的驅動程序
- 向主服務器注冊要提供資源的調度程序 ,執行任務并在執行程序上運行它們
- 在從屬節點上啟動以執行框架任務的執行程序進程
Pinspider框架示例
您可以在github上檢查代碼。 讓我們將其細分為PinDriver,PinScheduler和Pin UserProfileExecutor。
司機
該框架的驅動程序組件是PinDriver。
- 創建執行人信息
使用Builder模式描述有關執行程序的信息,而mesos使用Google協議緩沖區進行數據交換。 在這里,我們需要設置executorID,該命令基本上是一個shell命令,通過以下命令執行:'/ bin / sh -c value'。 在執行命令之前,將獲取指定的所有URI。 名稱由setName()設置。 來源由
Protos.ExecutorInfo userProfileExecutorInfo = Protos.ExecutorInfo.newBuilder().setExecutorId(Protos.ExecutorID.newBuilder().setValue("PinUserProfileExecutor")).setCommand(commandInfoUserProfile).setName("PinUserProfileExecutor Java").setSource("java").build();
setSource(),框架用來跟蹤執行程序源的標識符樣式字符串。 當不同的執行者ID可能在語義上相關時,這很有用。 - 創建框架信息
描述框架信息。 用戶字段用于確定執行程序/任務應以其啟動的Unix用戶。 如果用戶字段設置為空字符串,Mesos將自動將其設置為當前用戶。 主機在刪除框架之前等待調度程序進行故障轉移的時間由以下方式指定:
Protos.FrameworkInfo.Builder frameworkBuilder = Protos.FrameworkInfo.newBuilder().setFailoverTimeout(120000).setUser("").setName("Pinspider Framework");
setFailoverTimeout()。 框架的名稱由setName()設置 - 實例化調度程序
您需要使用需要提交的數量實例化調度程序才能使執行程序運行。
Scheduler scheduler = args.length == 1 ?new PinScheduler(userProfileExecutorInfo,userBoardExecutorInfo) :new PinScheduler(userProfileExecutorInfo, userBoardExecutorInfo, Integer.parseInt(args[1]), args[2]);注意:請注意,使用了兩個ExecutorInfo。 一個用于獲取用戶個人資料信息,另一個用于演示用戶板信息。 此說明僅涉及一個executorinfo – userProfileExecutorInfo
- 啟動mesos調度程序驅動程序。
MesosSchedulerDriver是SchedulerDriver的實現,SchedulerDriver是將調度程序連接到mesos的抽象接口。 這是通過管理調度程序的生命周期(開始,停止和等待任務完成)以及與Mesos交互(啟動任務,終止任務等)來完成的。
MesosSchedulerDriver schedulerDriver =new MesosSchedulerDriver(scheduler,frameworkBuilder.build(), args[0]);int status = schedulerDriver.run() == Protos.Status.DRIVER_STOPPED ? 0 : 1;schedulerDriver.stop();System.exit(status);
執行器執行
框架的執行器組件是PinUserProfileExecutor。
執行程序是由框架的執行程序實現的回調接口。 在我們的實現中,讓我們專注于launchTask()
@Override public void launchTask(final ExecutorDriver executorDriver final Protos.TaskInfo taskInfo) { }- 通過使用構建器模式設置ID和狀態來設置任務狀態。 Protos.TaskStatus taskStatus =Protos.TaskStatus.newBuilder().setTaskId(taskInfo.getTaskId()) .setState(Protos.TaskState.TASK_RUNNING).build();
- 將狀態更新發送到框架調度程序,根據需要進行重試,直到收到確認或執行程序終止為止,在這種情況下,將發送TASK_LOST狀態更新。 executorDriver.sendStatusUpdate(taskStatus);
- 從任務中獲取數據并運行邏輯。 try {message = ("userprofile :" + getUserProfileInfo(url)).getBytes(); } catch (IOException e) {LOGGER.error("Error parsing the Pinterest URL :" + e.getMessage()); }
- 向框架發送消息。 executorDriver.sendFrameworkMessage(message);
- 將任務的狀態標記為已完成,然后將狀態更新發送到框架調度程序。 taskStatus = Protos.TaskStatus.newBuilder().setTaskId(taskInfo.getTaskId()).setState(Protos.TaskState.TASK_FINISHED).build(); executorDriver.sendStatusUpdate(taskStatus);
- main()方法創建MesosExecutorDriver實例并運行 mesosExecutorDriver.run() == Protos.Status.DRIVER_STOPPED ? 0 : 1
調度程序實施
該框架的Scheduler組件是Pin Scheduler。
調度程序是由框架的調度程序實現的回調接口。 在我們的實現中,讓我們專注于resourceOffers(),statusUpdate()和frameworkMessage()
- 構造函數:使用執行程序信息和啟動任務數進行構造。 public PinScheduler(Protos.ExecutorInfo pinUserProfileExecutor , Protos.ExecutorInfo pinUserBoardExecutor ) {this(pinUserProfileExecutor,pinUserBoardExecutor, 5, "http://www.pinterest.com/techcrunch"); } public PinScheduler(Protos.ExecutorInfo pinUserProfileExecutor,Protos.ExecutorInfo pinUserBoardExecutor, int totalTasks, String url) { this.pinUserProfileExecutor = pinUserProfileExecutor;this.pinUserBoardExecutor = pinUserBoardExecutor;this.totalTasks = totalTasks; this.crawlQueue =Collections.synchronizedList(new ArrayList<String>());this.crawlQueue.add(url); }
- 資源報價
- 資源商品可以是CPU,內存等資源。從商品列表中,獲取資源的標量值。 在設置任務信息時,我們需要提供任務資源的需求。 for (Protos.Offer offer : list) {List<Protos.TaskInfo> taskInfoList = new ArrayList<Protos.TaskInfo>();double offerCpus = 0; double offerMem = 0;for (Protos.Resource resource : offer.getResourcesList()) {if (resource.getName().equals("cpus")) {offerCpus += resource.getScalar().getValue();}else if (resource.getName().equals("mem")) {offerMem += resource.getScalar().getValue();}}LOGGER.info("Received Offer : " + offer.getId().getValue() +" with cpus = " + offerCpus + " and mem =" + offerMem);
- 創建任務ID。 Protos.TaskID taskID = Protos.TaskID.newBuilder().setValue(Integer.toString(launchedTasks++)).build();
- 通過設置任務ID,添加資源,設置數據和設置執行程序來創建任務信息。 Protos.TaskInfo pinUserProfileTaskInfo = Protos.TaskInfo.newBuilder().setName("task " + taskID.getValue()).setTaskId(taskID).setSlaveId(offer.getSlaveId()).addResources(Protos.Resource.newBuilder().setName("cpus").setType(Protos.Value.Type.SCALAR).setScalar(Protos.Value.Scalar.newBuilder().setValue(CPUS_PER_TASK))).addResources(Protos.Resource.newBuilder().setName("mem").setType(Protos.Value.Type.SCALAR).setScalar(Protos.Value.Scalar.newBuilder().setValue(MEM_PER_TASK))).setData(ByteString.copyFromUtf8(crawlQueue.get(0))).setExecutor(Protos.ExecutorInfo.newBuilder(pinUserProfileExecutor)).build();
- 通過SchedulerDriver啟動任務。 ... taskInfoList.add(pinUserProfileTaskInfo); taskInfoList.add(pinUserBoardTaskInfo); } schedulerDriver.launchTasks(offer.getId(), taskInfoList);
- 狀態更新
當任務的狀態已更改(即,從屬丟失且任務丟失),任務完成且執行者發送狀態更新時,調用此方法。
@Override public void statusUpdate(SchedulerDriver schedulerDriver, Protos.TaskStatus taskStatus) { ... } - 如果任務完成,請停止SchedulerDriver if (taskStatus.getState() == Protos.TaskState.TASK_FINISHED) {finishedTasks++;LOGGER.info("Finished tasks : " + finishedTasks);if (finishedTasks == totalTasks) {schedulerDriver.stop();}}
- 如果任務被殺死,丟失或失敗,則中止SchedulerDriver if (taskStatus.getState() == Protos.TaskState.TASK_FAILED || taskStatus.getState() == Protos.TaskState.TASK_KILLED || taskStatus.getState() == Protos.TaskState.TASK_LOST) {LOGGER.error("Aborting because the task " + taskStatus.getTaskId().getValue() +" is in unexpected state : " + taskStatus.getState().getValueDescriptor().getName() +"with reason : " + taskStatus.getReason().getValueDescriptor().getName()+ " from source : " + taskStatus.getSource().getValueDescriptor().getName() + " with message : " + taskStatus.getMessage());schedulerDriver.abort(); }
- 框架訊息
當執行程序發送消息時,將調用此函數。
- 處理您的訊息 @Override public void frameworkMessage(SchedulerDriver schedulerDriver, Protos.ExecutorID executorID, Protos.SlaveID slaveID, byte[] bytes) {String data = new String(bytes);System.out.println(data);LOGGER.info("User Profile Information : " + data); }
此處提供了完整的代碼,并提供了運行和輸出示例的說明。
翻譯自: https://www.javacodegeeks.com/2015/01/apache-mesos-writing-your-own-distributed-frameworks.html
apache mesos
總結
以上是生活随笔為你收集整理的apache mesos_Apache Mesos:编写自己的分布式框架的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 安全事故备案(备案级事故)
- 下一篇: 安卓效果器软件(安卓效果器)