xxl-job 2.1.1执行器源码解读
?
XxlJobConfig
執行器端通過XxlJobConfig類作為xxl-job的啟動入口。通過注解@bean的initMethod的方法來啟動xxl-job。
/*** xxl-job config** @author xuxueli 2017-04-28*/ @Configuration public class XxlJobConfig {@Bean(initMethod = "start", destroyMethod = "destroy")public XxlJobSpringExecutor xxlJobExecutor() {logger.info(">>>>>>>>>>> xxl-job config init.");XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();xxlJobSpringExecutor.setAdminAddresses(adminAddresses);xxlJobSpringExecutor.setAppName(appName);xxlJobSpringExecutor.setIp(ip);xxlJobSpringExecutor.setPort(port);xxlJobSpringExecutor.setAccessToken(accessToken);xxlJobSpringExecutor.setLogPath(logPath);xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);return xxlJobSpringExecutor;}}XxlJobSpringExecutor
XxlJobSpringExecutor的start()方法,首先初始化執行器端的所有執行器,再調用XxlJobExecutor的start()方法。
public class XxlJobSpringExecutor extends XxlJobExecutor implements ApplicationContextAware {@Overridepublic void start() throws Exception {// init JobHandler RepositoryinitJobHandlerRepository(applicationContext);// refresh GlueFactoryGlueFactory.refreshInstance(1);// super startsuper.start();}由于XxlJobSpringExecutor實現了ApplicationContextAware接口,在Spring啟動時,會調用setApplicationContext()方法,設置ApplicationContext,用于訪問spring bean。
initJobHandlerRepository方法獲取所有應用了@JobHandler注解的JobHandler,并注冊。
private void initJobHandlerRepository(ApplicationContext applicationContext){if (applicationContext == null) {return;}// init job handler actionMap<String, Object> serviceBeanMap = applicationContext.getBeansWithAnnotation(JobHandler.class);if (serviceBeanMap!=null && serviceBeanMap.size()>0) {for (Object serviceBean : serviceBeanMap.values()) {if (serviceBean instanceof IJobHandler){String name = serviceBean.getClass().getAnnotation(JobHandler.class).value();IJobHandler handler = (IJobHandler) serviceBean;if (loadJobHandler(name) != null) {throw new RuntimeException("xxl-job jobhandler["+ name +"] naming conflicts.");}registJobHandler(name, handler);}}}}XxlJobExecutor
XxlJobExecutor的start()方法
public void start() throws Exception {// init logpathXxlJobFileAppender.initLogPath(logPath);// init invoker, admin-clientinitAdminBizList(adminAddresses, accessToken);//開啟日志清理線程JobLogFileCleanThread.getInstance().start(logRetentionDays);// 初始化觸發器回調線程(用RPC回調調度中心接口)TriggerCallbackThread.getInstance().start();//初始化Rpc服務port = port>0?port: NetUtil.findAvailablePort(9999);ip = (ip!=null&&ip.trim().length()>0)?ip: IpUtil.getIp();initRpcProvider(ip, port, appName, accessToken);}初始化RPC服務
private void initRpcProvider(String ip, int port, String appName, String accessToken) throws Exception {// init, provider factoryString address = IpUtil.getIpPort(ip, port);Map<String, String> serviceRegistryParam = new HashMap<String, String>();serviceRegistryParam.put("appName", appName);serviceRegistryParam.put("address", address);xxlRpcProviderFactory = new XxlRpcProviderFactory(); //初始化xxlRpcProviderFactory.initConfig(NetEnum.NETTY_HTTP, Serializer.SerializeEnum.HESSIAN.getSerializer(), ip, port, accessToken, ExecutorServiceRegistry.class, serviceRegistryParam);// add servicesxxlRpcProviderFactory.addService(ExecutorBiz.class.getName(), null, new ExecutorBizImpl());// startxxlRpcProviderFactory.start();}Rpc服務啟動
- 構造server。
- 設置started回調,stoped回調。
- start server。
提供了幾種server:
public enum NetEnum {/*** netty tcp server*/NETTY(NettyServer.class, NettyClient.class),/*** netty http server (servlet no server, ServletServerHandler)*/NETTY_HTTP(NettyHttpServer.class, NettyHttpClient.class),/*** mina tcp server*/MINA(MinaServer.class, MinaClient.class),/*** jetty http server*/JETTY(JettyServer .class, JettyClient .class);public final Class<? extends Server> serverClass;public final Class<? extends Client> clientClass;NetEnum(Class<? extends Server> serverClass, Class<? extends Client> clientClass) {this.serverClass = serverClass;this.clientClass = clientClass;}}NettyHttpServer
NettyHttpServer使用Netty作為底層通信框架。注冊了4個handler:
- IdleStateHandler
- HttpServerCodec
- HttpObjectAggregator
- NettyHttpServerHandler
其中NettyHttpServerHandler為xxl-rpc實現的消息處理方法。反序列化請求,處理消息,再序列化響應。以上步驟都由XxlRpcProviderFactory處理。
// request deserializeXxlRpcRequest xxlRpcRequest = (XxlRpcRequest) xxlRpcProviderFactory.getSerializer().deserialize(requestBytes, XxlRpcRequest.class);requestId = xxlRpcRequest.getRequestId();// invoke + responseXxlRpcResponse xxlRpcResponse = xxlRpcProviderFactory.invokeService(xxlRpcRequest);// response serializebyte[] responseBytes = xxlRpcProviderFactory.getSerializer().serialize(xxlRpcResponse);// response-writewriteResponse(ctx, keepAlive, responseBytes);啟動server之后,會觸發onStart()事件,通過serviceRegistryClass注冊服務。
server.setStartedCallback(new BaseCallback() { // serviceRegistry started@Overridepublic void run() throws Exception {// start registryif (serviceRegistryClass != null) {serviceRegistry = serviceRegistryClass.newInstance();serviceRegistry.start(serviceRegistryParam);if (serviceData.size() > 0) {//注冊服務。serviceRegistry.registry(serviceData.keySet(), serviceAddress);}}}});serviceRegistryClass啟動一個線程,向Admin(可以是多個管理端)注冊服務。
registryThread = new Thread(new Runnable() {@Overridepublic void run() {// 注冊,此處為守護線程,一直不退出。while (!toStop) {try {RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appName, address);for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {try {ReturnT<String> registryResult = adminBiz.registry(registryParam);if (registryResult!=null && ReturnT.SUCCESS_CODE == registryResult.getCode()) {registryResult = ReturnT.SUCCESS;logger.debug(">>>>>>>>>>> xxl-job registry success, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult}); //注冊一個AdminBiz就跳出??看不明白。break;} else {logger.info(">>>>>>>>>>> xxl-job registry fail, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});}} catch (Exception e) {logger.info(">>>>>>>>>>> xxl-job registry error, registryParam:{}", registryParam, e);}}} catch (Exception e) {if (!toStop) {logger.error(e.getMessage(), e);}}try {if (!toStop) {//每30秒注冊一次,心跳。TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);}} catch (InterruptedException e) {if (!toStop) {logger.warn(">>>>>>>>>>> xxl-job, executor registry thread interrupted, error msg:{}", e.getMessage());}}}//線程停止時,取消注冊。try {RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appName, address);for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {try {ReturnT<String> registryResult = adminBiz.registryRemove(registryParam);if (registryResult!=null && ReturnT.SUCCESS_CODE == registryResult.getCode()) {registryResult = ReturnT.SUCCESS;logger.info(">>>>>>>>>>> xxl-job registry-remove success, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});break;} else {logger.info(">>>>>>>>>>> xxl-job registry-remove fail, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});}} catch (Exception e) {if (!toStop) {logger.info(">>>>>>>>>>> xxl-job registry-remove error, registryParam:{}", registryParam, e);}}}} catch (Exception e) {if (!toStop) {logger.error(e.getMessage(), e);}}logger.info(">>>>>>>>>>> xxl-job, executor registry thread destory.");}});向Admin注冊Job,使用AdminBiz的代理實現類,向Admin服務器發送方法。
AdminBiz adminBiz = (AdminBiz) new XxlRpcReferenceBean(NetEnum.NETTY_HTTP,serializer,CallType.SYNC,LoadBalance.ROUND,AdminBiz.class,null,3000,addressUrl,accessToken,null,null).getObject(); public Object getObject() {return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class[] { iface },new InvocationHandler() {@Overridepublic Object invoke(Object proxy, Method method, Object[] args) throws Throwable {........// 封裝方法調用為request,發送到AdminXxlRpcRequest xxlRpcRequest = new XxlRpcRequest();xxlRpcRequest.setRequestId(UUID.randomUUID().toString());xxlRpcRequest.setCreateMillisTime(System.currentTimeMillis());xxlRpcRequest.setAccessToken(accessToken);xxlRpcRequest.setClassName(className);xxlRpcRequest.setMethodName(methodName);xxlRpcRequest.setParameterTypes(parameterTypes);xxlRpcRequest.setParameters(parameters);// 同步,異步,回調方式調用方法。if (CallType.SYNC == callType) {} else if (CallType.FUTURE == callType) {} else if (CallType.CALLBACK == callType) {} else if (CallType.ONEWAY == callType) {} else {throw new XxlRpcException("xxl-rpc callType["+ callType +"] invalid");}}});}?
總結
以上是生活随笔為你收集整理的xxl-job 2.1.1执行器源码解读的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: RabbitMq链接
- 下一篇: Spring注解编程基石(四)