Python调用Ansible API执行任务

执行Ansible Task

#!/usr/bin/env python
# ansible verison is 4.10.0 python version 3.6
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
import sys
import json

from ansible.executor.task_queue_manager import TaskQueueManager
from ansible.module_utils.common.collections import ImmutableDict
from ansible.inventory.manager import InventoryManager
from ansible.parsing.dataloader import DataLoader
from ansible.playbook.play import Play
from ansible.vars.manager import VariableManager
from ansible import context
from ansible.utils.display import initialize_locale

initialize_locale()

from ansible.plugins.callback import CallbackBase


class ResultsCollector(CallbackBase):
def __init__(self, *args, **kwargs):
super(ResultsCollector, self).__init__(*args, **kwargs)
self.task_ok = {}
self.task_failed = {}
self.task_skipped = {}
self.task_status = {}
self.task_unreachable = {}

def v2_runner_on_ok(self, result, *args, **kwargs):
self.task_ok[result.task_name] = result._result

def v2_runner_on_failed(self, result, *args, **kwargs):
self.task_failed[result.task_name] = result._result

def v2_runner_on_skipped(self, result, *args, **kwargs):
self.task_skipped[result.task_name] = result._result

def v2_runner_on_status(self, result, *args, **kwargs):
self.task_status[result.task_name] = result._result

def v2_runner_on_unreachable(self, result, *args, **kwargs):
self.task_unreachable[result.task_name] = result._result


def main(ip,username,password):
# since the API is constructed for CLI it expects certain options to always be set in the context object
context.CLIARGS = ImmutableDict(connection='smart', module_path=['/to/mymodules', '/usr/share/ansible'], forks=10, become=None,
become_method=None, become_user=None, check=False, diff=False, verbosity=0)

# initialize needed objects
loader = DataLoader() # Takes care of finding and reading yaml, json and ini files

# Instantiate our ResultsCollectorJSONCallback for handling results as they come in. Ansible expects this to be one of its main display outlets

# create inventory, use path to host config file as source or hosts in a comma separated string
inventory = InventoryManager(loader=loader, sources='inventory/hosts')
# add host to inventory
inventory.add_host(ip)
# variable manager takes care of merging all the different sources to give you a unified view of variables available in each context
variable_manager = VariableManager(loader=loader, inventory=inventory)
variable_manager.set_host_variable(ip, 'ansible_ssh_user', username)
variable_manager.set_host_variable(ip, 'ansible_ssh_pass', password)

host_list = [i for i in inventory._inventory.hosts]

# instantiate task queue manager, which takes care of forking and setting up all objects to iterate over host list and tasks
# IMPORTANT: This also adds library dirs paths to the module loader
# IMPORTANT: and so it must be initialized before calling `Play.load()`.
result_callback = ResultsCollector()
tqm = TaskQueueManager(

inventory=inventory,
variable_manager=variable_manager,
loader=loader,
passwords={},
stdout_callback=result_callback
)

# create data structure that represents our play, including tasks, this is basically what our YAML loader does internally.
play_source = dict(
name="Ansible Play",
hosts=host_list,
gather_facts='yes',
tasks=[
{
'name': 'get ubuntu os version',
'shell': "lsb_release -a | grep Description | awk -F ':' '{gsub(/^[ \t]+/,\"\",$2); print $2}'",
'register': 'ubuntu_os_version',
'when': "ansible_distribution == 'Ubuntu'"
},
{
'name': 'get redhat os version',
'shell': "cat /etc/redhat-release",
'register': 'redhat_os_version',
'when': "ansible_distribution == 'RedHat'"
},
{
'name': 'get hostname',
'shell': 'hostname',
'register': 'os_hostname'
},
{
'name': 'debug info',
'debug': {
'msg': '{{ ubuntu_os_version.stdout | default(redhat_os_version.stdout) }},{{ os_hostname.stdout }}'
}
}
]
)
# Create play object, playbook objects use .load instead of init or new methods,
# this will also automatically create the task objects from the info provided in play_source
play = Play().load(play_source, variable_manager=variable_manager, loader=loader)

# Actually run it
try:
result = tqm.run(play) # most interesting data for a play is actually sent to the callback's methods
finally:
#we always need to cleanup child procs and the structures we use to communicate with them
tqm.cleanup()
if loader:
loader.cleanup_all_tmp_files()

task_ok = result_callback.task_ok
task_failed = result_callback.task_failed
task_unreachable = result_callback.task_unreachable
if task_ok:
del task_ok['Gathering Facts']
return {'result':'ok','msg': task_ok}
if task_failed:
return {'result':'failed','msg': task_failed}
if task_unreachable:
return {'result':'unreachable','msg': task_unreachable}

执行Ansible Playbook task

#!/usr/bin/env python
# ansible verison is 4.10.0 python version 3.6
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type

from ansible.executor.task_queue_manager import TaskQueueManager
from ansible.module_utils.common.collections import ImmutableDict
from ansible.inventory.manager import InventoryManager
from ansible.parsing.dataloader import DataLoader
from ansible.playbook.play import Play
from ansible.vars.manager import VariableManager
from ansible import context
from ansible.utils.display import initialize_locale

initialize_locale()


def main(host_vars):
# since the API is constructed for CLI it expects certain options to always be set in the context object
context.CLIARGS = ImmutableDict(connection='smart', module_path=['/to/mymodules', '/usr/share/ansible'], forks=10, become=None,
become_method=None, become_user=None, check=False, diff=False, verbosity=0)

# initialize needed objects
loader = DataLoader() # Takes care of finding and reading yaml, json and ini files

# Instantiate our ResultsCollectorJSONCallback for handling results as they come in. Ansible expects this to be one of its main display outlets

# create inventory, use path to host config file as source or hosts in a comma separated string
inventory = InventoryManager(loader=loader, sources='inventory/hosts')
# add host to inventory
for host,key,value in host_vars:
inventory.add_host(host)
# variable manager takes care of merging all the different sources to give you a unified view of variables available in each context
variable_manager = VariableManager(loader=loader, inventory=inventory)
variable_manager.set_host_variable(host, key, value)

host_list = [i for i in inventory._inventory.hosts]

# instantiate task queue manager, which takes care of forking and setting up all objects to iterate over host list and tasks
# IMPORTANT: This also adds library dirs paths to the module loader
# IMPORTANT: and so it must be initialized before calling `Play.load()`.

tqm = TaskQueueManager(

inventory=inventory,
variable_manager=variable_manager,
loader=loader,
passwords={},
)

# create data structure that represents our play, including tasks, this is basically what our YAML loader does internally.
play_source = dict(
name="Ansible Play",
hosts=host_list,
gather_facts='no',
tasks=[
dict(action=dict(module='shell', args='ls'), register='shell_out'),
dict(action=dict(module='debug', args=dict(msg='{{shell_out.stdout}}'))),
dict(action=dict(module='command', args=dict(cmd='/usr/bin/uptime'))),
]
)

# Create play object, playbook objects use .load instead of init or new methods,
# this will also automatically create the task objects from the info provided in play_source
play = Play().load(play_source, variable_manager=variable_manager, loader=loader)

# Actually run it
try:
result = tqm.run(play) # most interesting data for a play is actually sent to the callback's methods
finally:
#we always need to cleanup child procs and the structures we use to communicate with them
tqm.cleanup()
if loader:
loader.cleanup_all_tmp_files()

# Remove ansible tmpdir
# shutil.rmtree(C.DEFAULT_LOCAL_TMP, True)

if __name__ == '__main__':
host_vars = [('192.168.1.10','ansible_ssh_pass','123456'),('192.168.1.11','ansible_ssh_pass','123456')]
main(host_vars)

执行Ansible Playbook调用yml

#!/usr/bin/env python
#coding=utf-8
#ansible version 4.10.0

import argparse
import json
import logging
from ansible.module_utils.common.collections import ImmutableDict
from ansible.inventory.manager import InventoryManager
from ansible.parsing.dataloader import DataLoader
from ansible.vars.manager import VariableManager
from ansible import context
from ansible.executor.playbook_executor import PlaybookExecutor
from ansible.plugins.callback import CallbackBase

logging.basicConfig(level=logging.DEBUG,format="%(asctime)s %(levelname)s %(message)s")

class ResultsCallback(CallbackBase):
def __init__(self,*args,**kwargs):
super(ResultsCallback,self).__init__(*args,**kwargs)
self.task_ok = {}
self.task_failed = {}
self.task_skipped = {}
self.task_status = {}
self.task_unreachable = {}

def v2_runner_on_ok(self, result, *args, **kwargs):
self.task_ok[result.task_name] = result._result

def v2_runner_on_failed(self, result, *args, **kwargs):
self.task_failed[result.task_name] = result._result

def v2_runner_on_skipped(self, result, *args, **kwargs):
self.task_skipped[result.task_name] = result._result

def v2_runner_on_status(self, result, *args, **kwargs):
self.task_status[result.task_name] = result._result

def v2_runner_on_unreachable(self, result, *args, **kwargs):
self.task_unreachable[result.task_name] = result._result

class AnsibleApi:
def __init__(self):
self.parse_arguments()
self.init_context()


def init_context(self):
context.CLIARGS = ImmutableDict(
connection='smart',
module_path=['library'],
forks=10,
become=None,
become_method=None,
become_user=None,
check=False,
diff=False,
verbosity=0
)

def parse_arguments(self):
parser = argparse.ArgumentParser(description='Ansible API')
parser.add_argument('--host', type=str, help='')
parser.add_argument('--username', type=str, help='')
parser.add_argument('--password', type=str, help='')
parser.add_argument('--jdk_version', type=str, help='')
parser.add_argument('--system_name', type=str, help='')
parser.add_argument('--env_type', type=str, help='')
parser.add_argument('--deploy_type', type=str, help='')
parser.add_argument('--action', type=str, help='')
self.args = vars(parser.parse_args())

def get_variable(self):
loader = DataLoader()
inventory = InventoryManager(loader=loader, sources='inventory/hosts')
variable_manager = VariableManager(loader=loader, inventory=inventory)
host = self.args['host']
inventory.add_host(host, group='all')
variable_manager.set_host_variable(host, 'ansible_ssh_user', self.args['username'])
variable_manager.set_host_variable(host, 'ansible_ssh_pass', self.args['password'])
variable_manager.set_host_variable(host, varname='user', value={'username': self.args['username'],
'home': '/home/{}'.format(self.args['username'])})
for key,value in self.args.items():
variable_manager.set_host_variable(host,key,value)
return {"loader": loader, "inventory": inventory, "variable_manager": variable_manager}

def run_playbook(self):
data = self.get_variable()
loader, inventory, variable_manager = data['loader'], data['inventory'], data['variable_manager']
pbex = PlaybookExecutor(
playbooks=['{}.yml'.format(self.args['action'])],
inventory=inventory,
variable_manager=variable_manager,
loader=loader,
passwords=None
)
return pbex

def get_result(self):
pbex = self.run_playbook()
result_callback = ResultsCallback()
pbex._tqm._callback_plugins.append(result_callback)
pbex.run()
task_failed = result_callback.task_failed
task_unreachable = result_callback.task_unreachable
if task_unreachable:
raise Exception('task unreachable: {}'.format(json.dump(task_unreachable,indent=4)))
elif task_failed:
raise Exception('task failed: {}'.format(json.dump(task_failed, indent=4)))

if __name__ == '__main__':
api = AnsibleApi()
api.get_result()
文章作者: 慕容峻才
文章链接: https://www.acaiblog.top/Python调用Ansible API执行任务/
版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 阿才的博客
微信打赏
支付宝打赏