在python中使用Ansible实现Devops的相关总结
在python中使用Ansible實現Devops的相關總結
前言
? Ansible雖然底層是用python寫的,但是對python的API支持并不友好,官方文檔里只用了一個example來說明,想知道各個組件的參數功能可能只能去看源碼了,現在記錄一下自己的總結
環境——ansible==2.8.1
問題總結
-
如果需要有額外的參數,或者在playbook中需要傳入參數怎么辦?
在context.CLIARGS = ImmutableDict中通過extra_vars=[extra_vars]的方式
-
ansible.cfg文件應該放在哪里?
ansible.cfg這個文件pip install后并不會自動在/etc/ansible中創建,需要去github中將里面的example文件夾下的文件拷貝進本地才行。優先級問題:這個文件如果需要跟著項目走,那么在哪里執行run腳本文件,這個ansible.cfg文件就應該放在哪里。如果在這里找不到就去/etc/ansible里找,當然也可以設置環境變量,這個優先級最高
-
如果playbook一個命令執行時間太長怎么辦?
ssh鏈接最長好像只有5分鐘,5分鐘后還沒執行完,直接返回unreachable。我嘗試在代碼中添加constant,或者修改ansible.cfg里的各種timeout,或者retries都不管用,最后是通過在playbook中加入async解決的
例子:
--- - name: 666hosts: "666"gather_facts: notasks:- name: 6不6shell:chdir: /rootcmd: lsasync: 1200poll: 10- poll的意思是說每隔10秒重新連接一次,只要poll大于0,就是阻塞狀態,直到這個異步任務執行結束才會執行下一個task。如果設置poll=0,就會即可跳過這個task(轉到后臺執行),繼續執行下面的任務,使用于兩種不同的場景,詳情見官方手冊
參考:
https://stackoverflow.com/questions/41455002/long-running-command-in-ansible-ending-in-failed-status-with-host-unreachable
https://www.axelerant.com/resources/team-blog/how-handle-long-running-tasks-ansible
-
關于callback函數的改寫
callback函數是說每次有一個task任務被執行,會根據執行的狀態success,failed等等去調用這些函數,result就是執行中調用這個函數時的入參,通過下面的例子的方式可以保存下來這些內容
實例:
import json import shutil from ansible import constants from ansible.executor.playbook_executor import PlaybookExecutor from ansible.module_utils.common.collections import ImmutableDict from ansible.parsing.dataloader import DataLoader from ansible.vars.manager import VariableManager from ansible.inventory.manager import InventoryManager from ansible.playbook.play import Play from ansible.inventory.group import Group from ansible.inventory.host import Host from ansible.executor.task_queue_manager import TaskQueueManager from ansible.plugins.callback import CallbackBase from ansible import context import ansible.constants as Cclass ResultCallback(CallbackBase):"""重寫callbackBase類的部分方法"""def __init__(self, *args, **kwargs):super().__init__(*args, **kwargs)self.task_ok = []self.task_unreachable = []self.task_failed = []self.task_skipped = []self.task_status = {}def v2_runner_on_ok(self, result, **kwargs):self.task_ok.append({result._host.get_name(): result})def v2_runner_on_failed(self, result, **kwargs):self.task_failed.append({result._host.get_name(): result})def v2_runner_on_skipped(self, result, **kwargs):self.task_skipped.append({result._host.get_name(): result})def v2_runner_on_unreachable(self, result, **kwargs):self.task_unreachable.append({result._host.get_name(): result})def v2_playbook_on_stats(self, stats):hosts = sorted(stats.processed.keys())for h in hosts:t = stats.summarize(h)self.task_status[h] = {"ok": t["ok"],"changed": t["changed"],"unreachable": t["unreachable"],"skipped": t["skipped"],"failed": t["failed"]}class MyInventory:def __init__(self, hostsresource):"""初始化函數:param hostsresource: 主機資源可以有2種形式列表形式: [{"ip": "172.16.48.171", "port": "22", "username": "root", "password": "123456"}]字典形式: {"Group1": {"hosts": [{"ip": "192.168.200.10", "port": "1314", "username": "root", "password": None}],"vars": {"var1": "ansible"}},"Group2": {}}"""self._hostsresource = hostsresourceself._loader = DataLoader()self._hostsfilelist = ["temphosts"]"""sources這個我們知道這里是設置hosts文件的地方,它可以是一個列表里面包含多個文件路徑且文件真實存在,在單純的執行ad-hoc的時候這里的文件里面必須具有有效的hosts配置,但是當通過動態生成的資產信息的時候這個文件必須存在但是它里面可以是空的,如果這里配置成None那么它不影響資產信息動態生成但是會有一個警告,所以還是要配置一個真實文件。"""self._inventory = InventoryManager(loader=self._loader, sources=self._hostsfilelist)self._variable_manager = VariableManager(loader=self._loader, inventory=self._inventory)self._dynamic_inventory()def _add_dynamic_group(self, hosts_list, groupname, groupvars=None):"""動態添加主機到指定的主機組完整的HOSTS文件格式[test1]hostname ansible_ssh_host=192.168.1.111 ansible_ssh_user="root" ansible_ssh_pass="123456"但通常我們都省略hostname,端口也省略因為默認是22,這個在ansible配置文件中有,除非有非22端口的才會配置[test1]192.168.100.10 ansible_ssh_user="root" ansible_ssh_pass="123456" ansible_python_interpreter="/PATH/python3/bin/python3":param hosts_list: 主機列表 [{"ip": "192.168.100.10", "port": "22", "username": "root", "password": None}, {}]:param groupname: 組名稱:param groupvars: 組變量,格式為字典:return:"""# 添加組my_group = Group(name=groupname)self._inventory.add_group(groupname)# 添加組變量if groupvars:for key, value in groupvars.items():my_group.set_variable(key, value)# 添加一個主機for host in hosts_list:hostname = host.get("hostname", None)hostip = host.get("ip", None)if hostip is None:print("IP地址為空,跳過該元素。")continuehostport = host.get("port", "22")username = host.get("username", "root")password = host.get("password", None)ssh_key = host.get("ssh_key", None)python_interpreter = host.get("python_interpreter", None)try:# hostname可以不寫,如果為空默認就是IP地址if hostname is None:hostname = hostip# 生成一個host對象my_host = Host(name=hostname, port=hostport)# 添加主機變量self._variable_manager.set_host_variable(host=my_host, varname="ansible_ssh_host", value=hostip)self._variable_manager.set_host_variable(host=my_host, varname="ansible_ssh_port", value=hostport)if password:self._variable_manager.set_host_variable(host=my_host, varname="ansible_ssh_pass", value=password)self._variable_manager.set_host_variable(host=my_host, varname="ansible_ssh_user", value=username)if ssh_key:self._variable_manager.set_host_variable(host=my_host, varname="ansible_ssh_private_key_file", value=ssh_key)if python_interpreter:self._variable_manager.set_host_variable(host=my_host, varname="ansible_python_interpreter", value=python_interpreter)# 添加其他變量for key, value in host.items():if key not in ["ip", "hostname", "port", "username", "password", "ssh_key", "python_interpreter"]:self._variable_manager.set_host_variable(host=my_host, varname=key, value=value)# 添加主機到組self._inventory.add_host(host=hostname, group=groupname, port=hostport)except Exception as err:print(err)def _dynamic_inventory(self):"""添加 hosts 到inventory:return:"""if isinstance(self._hostsresource, list):self._add_dynamic_group(self._hostsresource, "default_group")elif isinstance(self._hostsresource, dict):for groupname, hosts_and_vars in self._hostsresource.items():self._add_dynamic_group(hosts_and_vars.get("hosts"), groupname, hosts_and_vars.get("vars"))@propertydef INVENTORY(self):"""返回資產實例:return:"""return self._inventory@propertydef VARIABLE_MANAGER(self):"""返回變量管理器實例:return:"""return self._variable_managerclass MyAnsiable():def __init__(self,hostsresource=None,connection='smart', # 連接方式 local 本地方式,smart ssh方式remote_user=None, # ssh 用戶remote_password=None, # ssh 用戶的密碼,應該是一個字典, key 必須是 conn_passprivate_key_file=None, # 指定自定義的私鑰地址sudo=None, sudo_user=None, ask_sudo_pass=None,module_path=None, # 模塊路徑,可以指定一個自定義模塊的路徑become=None, # 是否提權become_method=None, # 提權方式 默認 sudo 可以是 subecome_user=None, # 提權后,要成為的用戶,并非登錄用戶check=False, diff=False,listhosts=None,listtasks=None,listtags=None,verbosity=3,forks=5,syntax=None,start_at_task=None,inventory=None,extra_vars=None):# 函數文檔注釋"""初始化函數,定義的默認的選項值,在初始化的時候可以傳參,以便覆蓋默認選項的值"""context.CLIARGS = ImmutableDict(connection=connection,remote_user=remote_user,private_key_file=private_key_file,sudo=sudo,sudo_user=sudo_user,ask_sudo_pass=ask_sudo_pass,module_path=module_path,become=become,forks=forks,become_method=become_method,become_user=become_user,verbosity=verbosity,listhosts=listhosts,listtasks=listtasks,listtags=listtags,syntax=syntax,start_at_task=start_at_task,check=False,extra_vars=[extra_vars] # 如果有附加參數,或者playbook中需要傳入參數,可以使用這個功能)# 三元表達式,假如沒有傳遞 inventory, 就使用 "localhost,"# 指定 inventory 文件# inventory 的值可以是一個 資產清單文件# 也可以是一個包含主機的元組,這個僅僅適用于測試# 比如 : 1.1.1.1, # 如果只有一個 IP 最后必須有英文的逗號# 或者: 1.1.1.1, 2.2.2.2my_inventory = MyInventory(hostsresource=hostsresource)# 實例化數據解析器self.loader = DataLoader()# 實例化 資產配置對象self.inv_obj = my_inventory.INVENTORY# 設置密碼self.passwords = remote_password# 實例化回調插件對象self.results_callback = ResultCallback()# 變量管理器self.variable_manager = my_inventory.VARIABLE_MANAGERdef run(self, hosts='localhost', gether_facts="no", module="ping", args='', task_time=0):"""參數說明:task_time -- 執行異步任務時等待的秒數,這個需要大于 0 ,等于 0 的時候不支持異步(默認值)。這個值應該等于執行任務實際耗時時間為好"""play_source = dict(name="Ad-hoc",hosts=hosts,gather_facts=gether_facts,tasks=[# 這里每個 task 就是這個列表中的一個元素,格式是嵌套的字典# 也可以作為參數傳遞過來,這里就簡單化了。{"action": {"module": module, "args": args}, "async": task_time, "poll": 0}])play = Play().load(play_source, variable_manager=self.variable_manager, loader=self.loader)tqm = Nonetry:tqm = TaskQueueManager(inventory=self.inv_obj,variable_manager=self.variable_manager,loader=self.loader,passwords=self.passwords,stdout_callback=self.results_callback)result = tqm.run(play)finally:if tqm is not None:tqm.cleanup()shutil.rmtree(C.DEFAULT_LOCAL_TMP, True)def playbook(self, playbooks):"""Keyword arguments:playbooks -- 需要是一個列表類型"""try:playbook = PlaybookExecutor(playbooks=playbooks,inventory=self.inv_obj,variable_manager=self.variable_manager,loader=self.loader,passwords=self.passwords)# 使用回調函數playbook._tqm._stdout_callback = self.results_callback# 下面這些設置只有constants.HOST_KEY_CHECKING = False管用# constants.HOST_KEY_CHECKING = False# constants.CONNECT_TIMEOUT = 1200# constants.COMMAND_TIMEOUT = 1200# constants.ANSIBLE_PERSISTENT_CONNECT_TIMEOUT = 1200# constants.ANSIBLE_PERSISTENT_COMMAND_TIMEOUT = 1200# constants.TIMEOUT = 1200# constants.CONFIG_FILE = './888/ansible.cfg'result = playbook.run()except Exception:raisedef get_result(self):result_raw = dict()for i in self.results_callback.task_ok:for host, result in i.items():if result_raw.get(host):result_raw[host]["success"].update({result._task.get_name(): result._result})else:result_raw[host] = dict()result_raw[host]["success"] = {result._task.get_name(): result._result}for i in self.results_callback.task_failed:for host, result in i.items():if result_raw.get(host):result_raw[host]["failed"].update({result._task.get_name(): result._result})else:result_raw[host] = dict()result_raw[host]["failed"] = {result._task.get_name(): result._result}for i in self.results_callback.task_unreachable:for host, result in i.items():if result_raw.get(host):result_raw[host]["unreachable"].update({result._task.get_name(): result._result})else:result_raw[host] = dict()result_raw[host]["unreachable"] = {result._task.get_name(): result._result}for i in self.results_callback.task_skipped:for host, result in i.items():if result_raw.get(host):result_raw[host]["skipped"].update({result._task.get_name(): result._result})else:result_raw[host] = dict()result_raw[host]["skipped"] = {result._task.get_name(): result._result}for host, result in self.results_callback.task_status.items():result_raw[host]["status"] = result# 最終打印結果,并且使用 JSON 繼續格式化print(json.dumps(result_raw, indent=4))return json.dumps(result_raw)callback輸出:
{"666.666.666.666":{"success":{"task_name":result_result(這是一個字典)}} }相關參考資料
https://segmentfault.com/a/1190000023106094
https://www.jianshu.com/p/ec1e4d8438e9
https://www.cnblogs.com/rexcheny/category/1264272.html
總結
以上是生活随笔為你收集整理的在python中使用Ansible实现Devops的相关总结的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 快手,字节面试题,将IP地址转换成整数类
- 下一篇: 接上一篇Ansible和celery的结