SaltStack源码分析之:master端执行salt模块大致流程
2019獨角獸企業重金招聘Python工程師標準>>>
##JOB執行流程 先看下官網對于master端的工作流程的介紹:
The Salt master works by always publishing commands to all connected minions and the minions decide if the command is meant for them by checking themselves against the command target.The typical lifecycle of a salt job from the perspective of the master might be as follows:1) A command is issued on the CLI. For example, 'salt my_minion test.ping'. 使用命令行工具生成一個條命令,如:'salt my_minion test.ping'。2) The 'salt' command uses LocalClient to generate a request to the salt master by connecting to the ReqServer on TCP:4506 and issuing the job. 'salt' 命令使用LocalClient連接本地的4506端口來發送命令。3) The salt-master ReqServer sees the request and passes it to an available MWorker over workers.ipc. salt-master ReqServer接收請求,然后把請求通過workers.ipc分發到一個可用的MWorker中去。4) A worker picks up the request and handles it. First, it checks to ensure that the requested user has permissions to issue the command. Then, it sends the publish command to all connected minions. For the curious, this happens in ClearFuncs.publish(). 一個worker線程認領請求并且處理它。首先,它檢查用戶是否有權限發送命令。然后,它發送一個publish類型的命令到所有連接的minions。這一步發生在ClearFuncs.publish()中。5) The worker announces on the master event bus that it is about to publish a job to connected minions. This happens by placing the event on the master event bus (master_event_pull.ipc) where the EventPublisher picks it up and distributes it to all connected event listeners on master_event_pub.ipc. worker線程生成一個事件,說它準備將命令發送給minons。步驟是(1)worker將事件發送到master的事件總線中去(master_event_pull.ipc)。(2)EventPublisher獲取這個事件,并通過master_event_pub.ipc分發給所有的訂閱者。6) The message to the minions is encrypted and sent to the Publisher via IPC on publish_pull.ipc. 發送個minions的消息加密后通過publish_pull.ipc發送給Publisher。7) Connected minions have a TCP session established with the Publisher on TCP port 4505 where they await commands. When the Publisher receives the job over publish_pull, it sends the jobs across the wire to the minions for processing. 在線的minions通過TCP會話連接到master端的4505端口來等待命令。當Publisher在publish_pull接收到命令后,便把命令通過4505端口發送給minions。8) After the minions receive the request, they decrypt it and perform any requested work, if they determine that they are targeted to do so. minions接收到請求后,首先解密請求,如果確定命令是發送給自己的,便去執行命令。9) When the minion is ready to respond, it publishes the result of its job back to the master by sending the encrypted result back to the master on TCP 4506 where it is again picked up by the ReqServer and forwarded to an available MWorker for processing. (Again, this happens by passing this message across workers.ipc to an available worker.) 當minion處理完命令后,便通過master的4506端口返回執行結果。master端的ReqServer接收到結果,再次將結果發送給MWorker去處理。(ReqServer是通過workers.ipc將消息分發給一個可用的worker線程的。)10) When the MWorker receives the job it decrypts it and fires an event onto the master event bus (master_event_pull.ipc). (Again for the curious, this happens in AESFuncs._return(). MWorker接收這個job并解密它,然后它會在master的事件總線中發布一個事件(master_event_pull.ipc)(這一步發生在AESFuncs._return()中)。11) The EventPublisher sees this event and re-publishes it on the bus to all connected listeners of the master event bus (on master_event_pub.ipc). This is where the LocalClient has been waiting, listening to the event bus for minion replies. It gathers the job and stores the result. EventPublisher接收到這個事件,再次把它分發給所有的訂閱者(通過master_event_pub.ipc)。LocalClient就在這里監聽事件,等待自己需要的結果。它搜集并存儲命令執行結果。12) When all targeted minions have replied or the timeout has been exceeded, the salt client displays the results of the job to the user on the CLI. 當所有的minions返回結果或者執行超時,salt客戶端在界面顯示結果。##源碼分析
下面介紹master執行salt模塊用到的幾個類,參照上面的流程閱讀源碼。
###salt.master.Master
創建ReqServer的代碼在run_reqserver()中:
def run_reqserver(self):reqserv = ReqServer(self.opts,self.key,self.master_key)reqserv.run()###salt.master.ReqServer
打開salt.master.ReqServer:
class ReqServer(object):'''Starts up the master request server, minions send results to thisinterface.'''def __init__(self, opts, key, mkey):'''Create a request server:param dict opts: The salt options dictionary:key dict: The user starting the server and the AES key:mkey dict: The user starting the server and the RSA key:rtype: ReqServer:returns: Request server'''self.opts = optsself.master_key = mkey# Prepare the AES keyself.key = keydef __bind(self):'''Binds the reply server'''dfn = os.path.join(self.opts['cachedir'], '.dfn')if os.path.isfile(dfn):try:os.remove(dfn)except os.error:passself.process_manager = salt.utils.process.ProcessManager(name='ReqServer_ProcessManager')req_channels = []for transport, opts in iter_transport_opts(self.opts):chan = salt.transport.server.ReqServerChannel.factory(opts)chan.pre_fork(self.process_manager)req_channels.append(chan)for ind in range(int(self.opts['worker_threads'])):self.process_manager.add_process(MWorker,args=(self.opts,self.master_key,self.key,req_channels,),)self.process_manager.run()def run(self):'''Start up the ReqServer'''try:self.__bind()except KeyboardInterrupt:log.warn('Stopping the Salt Master')raise SystemExit('\nExiting on Ctrl-c')def destroy(self):if hasattr(self, 'clients') and self.clients.closed is False:self.clients.setsockopt(zmq.LINGER, 1)self.clients.close()if hasattr(self, 'workers') and self.workers.closed is False:self.workers.setsockopt(zmq.LINGER, 1)self.workers.close()if hasattr(self, 'context') and self.context.closed is False:self.context.term()# Also stop the workersif hasattr(self, 'process_manager'):self.process_manager.kill_children()def __del__(self):self.destroy()代碼比較簡單,主要的功能在_bind()方法中,它根據配置文件的中worker_threads生成數個worker線程。
###salt.master.MWorker
在salt.master.MWorker類中,也是通過_bind()方法來接收請求的:
def __bind(self):'''Bind to the local port'''# using ZMQIOLoop since we *might* need zmq in therezmq.eventloop.ioloop.install()self.io_loop = zmq.eventloop.ioloop.ZMQIOLoop()for req_channel in self.req_channels:req_channel.post_fork(self._handle_payload, io_loop=self.io_loop) # TODO: cleaner? Maybe lazily?self.io_loop.start()核心語句在req_channel.post_fork(self._handle_payload, io_loop=self.io_loop),它將接收到的請求交給self._handle_payload處理,我們看下_handle_payload方法:
@tornado.gen.coroutine def _handle_payload(self, payload):'''The _handle_payload method is the key method used to figure out whatneeds to be done with communication to the serverExample cleartext payload generated for 'salt myminion test.ping':{'enc': 'clear','load': {'arg': [],'cmd': 'publish','fun': 'test.ping','jid': '','key': 'alsdkjfa.,maljf-==adflkjadflkjalkjadfadflkajdflkj','kwargs': {'show_jid': False, 'show_timeout': False},'ret': '','tgt': 'myminion','tgt_type': 'glob','user': 'root'}}:param dict payload: The payload route to the appropriate handler'''key = payload['enc']load = payload['load']ret = {'aes': self._handle_aes,'clear': self._handle_clear}[key](load)raise tornado.gen.Return(ret)在代碼的最后一行可以看到,如果key是'aes'的話就調用self._handle_aes方法,它是用來處理minion返回的結果的;如果key是'clear'的話就調用self._handle_clear方法,它是用來處理master發送的命令的。
看下self. _handle_clear方法:
def _handle_clear(self, load):'''Process a cleartext command:param dict load: Cleartext payload:return: The result of passing the load to a function in ClearFuncs corresponding tothe command specified in the load's 'cmd' key.'''log.trace('Clear payload received with command {cmd}'.format(**load))if load['cmd'].startswith('__'):return Falsereturn getattr(self.clear_funcs, load['cmd'])(load), {'fun': 'send_clear'}重點是最后一句,它根據load['cmd']的值來調用self.clear_funcs中的對應方法,執行salt模塊時,load['cmd']的值是publish。self.clear_funcs是salt.master.ClearFuncs的實例化對象,salt.master.ClearFuncs介紹見下文。
self. _handle_aes方法跟self. _handle_clear方法類似:
def _handle_aes(self, data):'''Process a command sent via an AES key:param str load: Encrypted payload:return: The result of passing the load to a function in AESFuncs corresponding tothe command specified in the load's 'cmd' key.'''if 'cmd' not in data:log.error('Received malformed command {0}'.format(data))return {}log.trace('AES payload received with command {0}'.format(data['cmd']))if data['cmd'].startswith('__'):return Falsereturn self.aes_funcs.run_func(data['cmd'], data)當salt-minion返回命令的結果時data['cmd']的值是_return,看下run_func的源碼可知其調用的是salt.master.AESFuncs的_return方法,salt.master.AESFuncs介紹見下文。
###salt.master.ClearFuncs
ClearFuncs.publish方法開始的部分是進行身份認證,認證通過后會生成一條事件來說明即將發送消息:
payload = self._prep_pub(minions, jid, clear_load, extra)self._prep_pub中核心代碼是這一行:
self.event.fire_event(new_job_load, tagify([clear_load['jid'], 'new'], 'job'))最后發送消息給minions:
self._send_pub(payload)self._send_pub方法很簡單,調用底層的消息隊列發送消息:
def _send_pub(self, load):'''Take a load and send it across the network to connected minions'''for transport, opts in iter_transport_opts(self.opts):chan = salt.transport.server.PubServerChannel.factory(opts)chan.publish(load)###salt.master.AESFuncs
看下_return方法源碼:
def _return(self, load):'''Handle the return data sent from the minions.Takes the return, verifies it and fires it on the master event bus.Typically, this event is consumed by the Salt CLI waiting on the otherend of the event bus but could be heard by any listener on the bus.:param dict load: The minion payload'''try:salt.utils.job.store_job(self.opts, load, event=self.event, mminion=self.mminion)except salt.exception.SaltCacheError:log.error('Could not store job information for load: {0}'.format(load))可以看到,主要代碼在salt.utils.job.store_job中,核心代碼在這里:
if event:# If the return data is invalid, just ignore itlog.info('Got return from {id} for job {jid}'.format(**load))event.fire_event(load, tagify([load['jid'], 'ret', load['id']], 'job'))event.fire_ret_load(load)往事件總線里面發送消息。
##總結 這里只是大致介紹了大致的流程,其中關于數據如何在消息隊列間流轉的,沒有細寫,以后有機會再單獨寫篇博客介紹下。
轉載于:https://my.oschina.net/fmnisme/blog/553004
總結
以上是生活随笔為你收集整理的SaltStack源码分析之:master端执行salt模块大致流程的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 做梦梦到狐狸是怎么回事啊
- 下一篇: 梦到好几个小孩儿