聊聊storm supervisor的启动
序
本文主要研究一下storm supervisor的啟動(dòng)
Supervisor.launch
storm-core-1.2.2-sources.jar!/org/apache/storm/daemon/supervisor/Supervisor.java
/*** Launch the supervisor*/public void launch() throws Exception {LOG.info("Starting Supervisor with conf {}", conf);String path = ConfigUtils.supervisorTmpDir(conf);FileUtils.cleanDirectory(new File(path));Localizer localizer = getLocalizer();SupervisorHeartbeat hb = new SupervisorHeartbeat(conf, this);hb.run();// should synchronize supervisor so it doesn't launch anything after being down (optimization)Integer heartbeatFrequency = Utils.getInt(conf.get(Config.SUPERVISOR_HEARTBEAT_FREQUENCY_SECS));heartbeatTimer.scheduleRecurring(0, heartbeatFrequency, hb);this.eventManager = new EventManagerImp(false);this.readState = new ReadClusterState(this);Set<String> downloadedTopoIds = SupervisorUtils.readDownloadedTopologyIds(conf);Map<Integer, LocalAssignment> portToAssignments = localState.getLocalAssignmentsMap();if (portToAssignments != null) {Map<String, LocalAssignment> assignments = new HashMap<>();for (LocalAssignment la : localState.getLocalAssignmentsMap().values()) {assignments.put(la.get_topology_id(), la);}for (String topoId : downloadedTopoIds) {LocalAssignment la = assignments.get(topoId);if (la != null) {SupervisorUtils.addBlobReferences(localizer, topoId, conf, la.get_owner());} else {LOG.warn("Could not find an owner for topo {}", topoId);}}}// do this after adding the references so we don't try to clean things being usedlocalizer.startCleaner();UpdateBlobs updateBlobsThread = new UpdateBlobs(this);if ((Boolean) conf.get(Config.SUPERVISOR_ENABLE)) {// This isn't strictly necessary, but it doesn't hurt and ensures that the machine stays up// to date even if callbacks don't all work exactly righteventTimer.scheduleRecurring(0, 10, new EventManagerPushCallback(readState, eventManager));// Blob update thread. Starts with 30 seconds delay, every 30 secondsblobUpdateTimer.scheduleRecurring(30, 30, new EventManagerPushCallback(updateBlobsThread, eventManager));// supervisor health checkeventTimer.scheduleRecurring(300, 300, new SupervisorHealthCheck(this));}LOG.info("Starting supervisor with id {} at host {}.", getId(), getHostName());} 復(fù)制代碼- supervisor launch的時(shí)候new了一個(gè)ReadClusterState
ReadClusterState
storm-core-1.2.2-sources.jar!/org/apache/storm/daemon/supervisor/ReadClusterState.java
public ReadClusterState(Supervisor supervisor) throws Exception {this.superConf = supervisor.getConf();this.stormClusterState = supervisor.getStormClusterState();this.syncSupEventManager = supervisor.getEventManger();this.assignmentVersions = new AtomicReference<Map<String, VersionedData<Assignment>>>(new HashMap<String, VersionedData<Assignment>>());this.assignmentId = supervisor.getAssignmentId();this.iSuper = supervisor.getiSupervisor();this.localizer = supervisor.getAsyncLocalizer();this.host = supervisor.getHostName();this.localState = supervisor.getLocalState();this.clusterState = supervisor.getStormClusterState();this.cachedAssignments = supervisor.getCurrAssignment();this.launcher = ContainerLauncher.make(superConf, assignmentId, supervisor.getSharedContext());@SuppressWarnings("unchecked")List<Number> ports = (List<Number>)superConf.get(Config.SUPERVISOR_SLOTS_PORTS);for (Number port: ports) {slots.put(port.intValue(), mkSlot(port.intValue()));}try {Collection<String> workers = SupervisorUtils.supervisorWorkerIds(superConf);for (Slot slot: slots.values()) {String workerId = slot.getWorkerId();if (workerId != null) {workers.remove(workerId);}}if (!workers.isEmpty()) {supervisor.killWorkers(workers, launcher);}} catch (Exception e) {LOG.warn("Error trying to clean up old workers", e);}//All the slots/assignments should be recovered now, so we can clean up anything that we don't expect to be heretry {localizer.cleanupUnusedTopologies();} catch (Exception e) {LOG.warn("Error trying to clean up old topologies", e);}for (Slot slot: slots.values()) {slot.start();}}private Slot mkSlot(int port) throws Exception {return new Slot(localizer, superConf, launcher, host, port,localState, clusterState, iSuper, cachedAssignments);} 復(fù)制代碼- 這里讀取SUPERVISOR_SLOTS_PORTS(supervisor.slots.ports),默認(rèn)是[6700,6701,6702,6703]
- 通過ContainerLauncher.make(superConf, assignmentId, supervisor.getSharedContext())創(chuàng)建ContainerLauncher
- 根據(jù)slots的port配置調(diào)用mkSlot創(chuàng)建slot,最后挨個(gè)調(diào)用slot的start,啟動(dòng)slot線程
ContainerLauncher.make
storm-core-1.2.2-sources.jar!/org/apache/storm/daemon/supervisor/ContainerLauncher.java
/*** Factory to create the right container launcher * for the config and the environment.* @param conf the config* @param supervisorId the ID of the supervisor* @param sharedContext Used in local mode to let workers talk together without netty* @return the proper container launcher* @throws IOException on any error*/public static ContainerLauncher make(Map<String, Object> conf, String supervisorId, IContext sharedContext) throws IOException {if (ConfigUtils.isLocalMode(conf)) {return new LocalContainerLauncher(conf, supervisorId, sharedContext);}if (Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false)) {return new RunAsUserContainerLauncher(conf, supervisorId);}return new BasicContainerLauncher(conf, supervisorId);} 復(fù)制代碼- 這里根據(jù)配置來創(chuàng)建ContainerLauncher的不同子類,local模式的創(chuàng)建的是LocalContainerLauncher;要求runAsUser的創(chuàng)建的是RunAsUserContainerLauncher;其他的創(chuàng)建的是BasicContainerLauncher
Slot
storm-core-1.2.2-sources.jar!/org/apache/storm/daemon/supervisor/Slot.java
public void run() {try {while(!done) {Set<TopoProfileAction> origProfileActions = new HashSet<>(profiling.get());Set<TopoProfileAction> removed = new HashSet<>(origProfileActions);DynamicState nextState = stateMachineStep(dynamicState.withNewAssignment(newAssignment.get()).withProfileActions(origProfileActions, dynamicState.pendingStopProfileActions), staticState);if (LOG.isDebugEnabled() || dynamicState.state != nextState.state) {LOG.info("STATE {} -> {}", dynamicState, nextState);}//Save the current state for recoveryif (!equivalent(nextState.currentAssignment, dynamicState.currentAssignment)) {LOG.info("SLOT {}: Changing current assignment from {} to {}", staticState.port, dynamicState.currentAssignment, nextState.currentAssignment);saveNewAssignment(nextState.currentAssignment);}if (equivalent(nextState.newAssignment, nextState.currentAssignment) &&nextState.currentAssignment != null && nextState.currentAssignment.get_owner() == null &&nextState.newAssignment != null && nextState.newAssignment.get_owner() != null) {//This is an odd case for a rolling upgrade where the user on the old assignment may be null,// but not on the new one. Although in all other ways they are the same.// If this happens we want to use the assignment with the owner.LOG.info("Updating assignment to save owner {}", nextState.newAssignment.get_owner());saveNewAssignment(nextState.newAssignment);nextState = nextState.withCurrentAssignment(nextState.container, nextState.newAssignment);}// clean up the profiler actions that are not being processedremoved.removeAll(dynamicState.profileActions);removed.removeAll(dynamicState.pendingStopProfileActions);for (TopoProfileAction action: removed) {try {clusterState.deleteTopologyProfileRequests(action.topoId, action.request);} catch (Exception e) {LOG.error("Error trying to remove profiling request, it will be retried", e);}}Set<TopoProfileAction> orig, copy;do {orig = profiling.get();copy = new HashSet<>(orig);copy.removeAll(removed);} while (!profiling.compareAndSet(orig, copy));dynamicState = nextState;}} catch (Throwable e) {if (!Utils.exceptionCauseIsInstanceOf(InterruptedException.class, e)) {LOG.error("Error when processing event", e);Utils.exitProcess(20, "Error when processing an event");}}}private void saveNewAssignment(LocalAssignment assignment) {synchronized(staticState.localState) {Map<Integer, LocalAssignment> assignments = staticState.localState.getLocalAssignmentsMap();if (assignments == null) {assignments = new HashMap<>();}if (assignment == null) {assignments.remove(staticState.port);} else {assignments.put(staticState.port, assignment);}staticState.localState.setLocalAssignmentsMap(assignments);}Map<Long, LocalAssignment> update = null;Map<Long, LocalAssignment> orig = null;do {Long lport = new Long(staticState.port);orig = cachedCurrentAssignments.get();update = new HashMap<>(orig);if (assignment == null) {update.remove(lport);} else {update.put(lport, assignment);}} while (!cachedCurrentAssignments.compareAndSet(orig, update));}static DynamicState stateMachineStep(DynamicState dynamicState, StaticState staticState) throws Exception {LOG.debug("STATE {}", dynamicState.state);switch (dynamicState.state) {case EMPTY:return handleEmpty(dynamicState, staticState);case RUNNING:return handleRunning(dynamicState, staticState);case WAITING_FOR_WORKER_START:return handleWaitingForWorkerStart(dynamicState, staticState);case KILL_AND_RELAUNCH:return handleKillAndRelaunch(dynamicState, staticState);case KILL:return handleKill(dynamicState, staticState);case WAITING_FOR_BASIC_LOCALIZATION:return handleWaitingForBasicLocalization(dynamicState, staticState);case WAITING_FOR_BLOB_LOCALIZATION:return handleWaitingForBlobLocalization(dynamicState, staticState);default:throw new IllegalStateException("Code not ready to handle a state of "+dynamicState.state);}} 復(fù)制代碼- 不斷循環(huán)stateMachineStep方法切換state
- 當(dāng)state是WAITING_FOR_BLOB_LOCALIZATION時(shí),會(huì)觸發(fā)handleWaitingForBlobLocalization
handleWaitingForBlobLocalization
storm-core-1.2.2-sources.jar!/org/apache/storm/daemon/supervisor/Slot.java
/*** State Transitions for WAITING_FOR_BLOB_LOCALIZATION state.* PRECONDITION: neither pendingLocalization nor pendingDownload is null.* PRECONDITION: The slot should be empty* @param dynamicState current state* @param staticState static data* @return the next state* @throws Exception on any error*/static DynamicState handleWaitingForBlobLocalization(DynamicState dynamicState, StaticState staticState) throws Exception {assert(dynamicState.pendingLocalization != null);assert(dynamicState.pendingDownload != null);assert(dynamicState.container == null);//Ignore changes to scheduling while downloading the topology blobs// We don't support canceling the download through the future yet,// so to keep everything in sync, just waittry {dynamicState.pendingDownload.get(1000, TimeUnit.MILLISECONDS);//Downloading of all blobs finished.if (!equivalent(dynamicState.newAssignment, dynamicState.pendingLocalization)) {//Scheduling changedstaticState.localizer.releaseSlotFor(dynamicState.pendingLocalization, staticState.port);return prepareForNewAssignmentNoWorkersRunning(dynamicState, staticState);}Container c = staticState.containerLauncher.launchContainer(staticState.port, dynamicState.pendingLocalization, staticState.localState);return dynamicState.withCurrentAssignment(c, dynamicState.pendingLocalization).withState(MachineState.WAITING_FOR_WORKER_START).withPendingLocalization(null, null);} catch (TimeoutException e) {//We waited for 1 second loop around and try again....return dynamicState;}} 復(fù)制代碼- 這里通過staticState.containerLauncher.launchContainer去啟動(dòng)container
BasicContainerLauncher.launchContainer
storm-core-1.2.2-sources.jar!/org/apache/storm/daemon/supervisor/BasicContainerLauncher.java
@Overridepublic Container launchContainer(int port, LocalAssignment assignment, LocalState state) throws IOException {LocalContainer ret = new LocalContainer(_conf, _supervisorId, port, assignment, _sharedContext);ret.setup();ret.launch();return ret;} 復(fù)制代碼- launchContainer的時(shí)候,先調(diào)用setup,再調(diào)用launch方法
Container.setup
storm-core-1.2.2-sources.jar!/org/apache/storm/daemon/supervisor/Container.java
/*** Setup the container to run. By default this creates the needed directories/links in the* local file system* PREREQUISITE: All needed blobs and topology, jars/configs have been downloaded and* placed in the appropriate locations* @throws IOException on any error*/protected void setup() throws IOException {_type.assertFull();if (!_ops.doRequiredTopoFilesExist(_conf, _topologyId)) {LOG.info("Missing topology storm code, so can't launch worker with assignment {} for this supervisor {} on port {} with id {}", _assignment,_supervisorId, _port, _workerId);throw new IllegalStateException("Not all needed files are here!!!!");} LOG.info("Setting up {}:{}", _supervisorId, _workerId);_ops.forceMkdir(new File(ConfigUtils.workerPidsRoot(_conf, _workerId)));_ops.forceMkdir(new File(ConfigUtils.workerTmpRoot(_conf, _workerId)));_ops.forceMkdir(new File(ConfigUtils.workerHeartbeatsRoot(_conf, _workerId)));File workerArtifacts = new File(ConfigUtils.workerArtifactsRoot(_conf, _topologyId, _port));if (!_ops.fileExists(workerArtifacts)) {_ops.forceMkdir(workerArtifacts);_ops.setupWorkerArtifactsDir(_assignment.get_owner(), workerArtifacts);}String user = getWorkerUser();writeLogMetadata(user);saveWorkerUser(user);createArtifactsLink();createBlobstoreLinks();} 復(fù)制代碼- setup主要做一些創(chuàng)建目錄或鏈接的準(zhǔn)備工作
BasicContainer.launch
storm-core-1.2.2-sources.jar!/org/apache/storm/daemon/supervisor/BasicContainer.java
public void launch() throws IOException {_type.assertFull();LOG.info("Launching worker with assignment {} for this supervisor {} on port {} with id {}", _assignment,_supervisorId, _port, _workerId);String logPrefix = "Worker Process " + _workerId;ProcessExitCallback processExitCallback = new ProcessExitCallback(logPrefix);_exitedEarly = false;final WorkerResources resources = _assignment.get_resources();final int memOnheap = getMemOnHeap(resources);final String stormRoot = ConfigUtils.supervisorStormDistRoot(_conf, _topologyId);final String jlp = javaLibraryPath(stormRoot, _conf);List<String> commandList = mkLaunchCommand(memOnheap, stormRoot, jlp);Map<String, String> topEnvironment = new HashMap<String, String>();@SuppressWarnings("unchecked")Map<String, String> environment = (Map<String, String>) _topoConf.get(Config.TOPOLOGY_ENVIRONMENT);if (environment != null) {topEnvironment.putAll(environment);}topEnvironment.put("LD_LIBRARY_PATH", jlp);LOG.info("Launching worker with command: {}. ", Utils.shellCmd(commandList));String workerDir = ConfigUtils.workerRoot(_conf, _workerId);launchWorkerProcess(commandList, topEnvironment, logPrefix, processExitCallback, new File(workerDir));}/*** Launch the worker process (non-blocking)* * @param command* the command to run* @param env* the environment to run the command* @param processExitcallback* a callback for when the process exits* @param logPrefix* the prefix to include in the logs* @param targetDir* the working directory to run the command in* @return true if it ran successfully, else false* @throws IOException* on any error*/protected void launchWorkerProcess(List<String> command, Map<String, String> env, String logPrefix,ExitCodeCallback processExitCallback, File targetDir) throws IOException {SupervisorUtils.launchProcess(command, env, logPrefix, processExitCallback, targetDir);} 復(fù)制代碼- 這里通過mkLaunchCommand來準(zhǔn)備創(chuàng)建命令
- 然后通過SupervisorUtils.launchProcess啟動(dòng)worker進(jìn)程
mkLaunchCommand
storm-core-1.2.2-sources.jar!/org/apache/storm/daemon/supervisor/BasicContainerLauncher.java
/*** Create the command to launch the worker process* @param memOnheap the on heap memory for the worker* @param stormRoot the root dist dir for the topology* @param jlp java library path for the topology* @return the command to run* @throws IOException on any error.*/private List<String> mkLaunchCommand(final int memOnheap, final String stormRoot,final String jlp) throws IOException {final String javaCmd = javaCmd("java");final String stormOptions = ConfigUtils.concatIfNotNull(System.getProperty("storm.options"));final String stormConfFile = ConfigUtils.concatIfNotNull(System.getProperty("storm.conf.file"));final String workerTmpDir = ConfigUtils.workerTmpRoot(_conf, _workerId);List<String> classPathParams = getClassPathParams(stormRoot);List<String> commonParams = getCommonParams();List<String> commandList = new ArrayList<>();//Log Writer Command...commandList.add(javaCmd);commandList.addAll(classPathParams);commandList.addAll(substituteChildopts(_topoConf.get(Config.TOPOLOGY_WORKER_LOGWRITER_CHILDOPTS)));commandList.addAll(commonParams);commandList.add("org.apache.storm.LogWriter"); //The LogWriter in turn launches the actual worker.//Worker Command...commandList.add(javaCmd);commandList.add("-server");commandList.addAll(commonParams);commandList.addAll(substituteChildopts(_conf.get(Config.WORKER_CHILDOPTS), memOnheap));commandList.addAll(substituteChildopts(_topoConf.get(Config.TOPOLOGY_WORKER_CHILDOPTS), memOnheap));commandList.addAll(substituteChildopts(OR(_topoConf.get(Config.TOPOLOGY_WORKER_GC_CHILDOPTS),_conf.get(Config.WORKER_GC_CHILDOPTS)), memOnheap));commandList.addAll(getWorkerProfilerChildOpts(memOnheap));commandList.add("-Djava.library.path=" + jlp);commandList.add("-Dstorm.conf.file=" + stormConfFile);commandList.add("-Dstorm.options=" + stormOptions);commandList.add("-Djava.io.tmpdir=" + workerTmpDir);commandList.addAll(classPathParams);commandList.add("org.apache.storm.daemon.worker");commandList.add(_topologyId);commandList.add(_supervisorId);commandList.add(String.valueOf(_port));commandList.add(_workerId);return commandList;} 復(fù)制代碼- 啟動(dòng)參數(shù)實(shí)例
- org.apache.storm.daemon.worker"的路徑為storm-core-1.2.2-sources.jar!/org/apache/storm/daemon/worker.clj
SupervisorUtils.launchProcess
storm-core-1.2.2-sources.jar!/org/apache/storm/daemon/supervisor/SupervisorUtils.java
/*** Launch a new process as per {@link java.lang.ProcessBuilder} with a given* callback.* @param command the command to be executed in the new process* @param environment the environment to be applied to the process. Can be* null.* @param logPrefix a prefix for log entries from the output of the process.* Can be null.* @param exitCodeCallback code to be called passing the exit code value* when the process completes* @param dir the working directory of the new process* @return the new process* @throws IOException* @see java.lang.ProcessBuilder*/public static Process launchProcess(List<String> command,Map<String,String> environment,final String logPrefix,final ExitCodeCallback exitCodeCallback,File dir)throws IOException {ProcessBuilder builder = new ProcessBuilder(command);Map<String,String> procEnv = builder.environment();if (dir != null) {builder.directory(dir);}builder.redirectErrorStream(true);if (environment != null) {procEnv.putAll(environment);}final Process process = builder.start();if (logPrefix != null || exitCodeCallback != null) {Utils.asyncLoop(new Callable<Object>() {public Object call() {if (logPrefix != null ) {Utils.readAndLogStream(logPrefix,process.getInputStream());}if (exitCodeCallback != null) {try {process.waitFor();exitCodeCallback.call(process.exitValue());} catch (InterruptedException ie) {LOG.info("{} interrupted", logPrefix);exitCodeCallback.call(-1);}}return null; // Run only once.}});}return process;} 復(fù)制代碼- 這里通過ProcessBuilder來啟動(dòng)進(jìn)程
小結(jié)
- storm的supervisor啟動(dòng)的時(shí)候,會(huì)創(chuàng)建ContainerLauncher以及根據(jù)SUPERVISOR_SLOTS_PORTS(supervisor.slots.ports)創(chuàng)建slots
- slot線程會(huì)不斷循環(huán)state,在WAITING_FOR_BLOB_LOCALIZATION的時(shí)候使用ContainerLauncher的launchContainer創(chuàng)建Container并launch
- container launch的時(shí)候通過SupervisorUtils.launchProcess(使用ProcessBuilder)啟動(dòng)worker進(jìn)程
doc
- Storm Concepts
總結(jié)
以上是生活随笔為你收集整理的聊聊storm supervisor的启动的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Java黄金五年——1~5年一个Java
- 下一篇: 【译】开发大型 Angular 应用的1