1
0
mirror of https://github.com/vmware/vsphere-automation-sdk-python.git synced 2024-11-22 01:39:58 -05:00

Merge pull request #117 from tianhao64/master

Rewrite tagging sample to use vsphere client
This commit is contained in:
Tianhao He 2018-11-21 05:35:38 -08:00 committed by GitHub
commit a6ebd9bc1b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 148 additions and 95 deletions

View File

@ -24,8 +24,8 @@ def get_obj(content, vimtype, name):
Get the vsphere managed object associated with a given text name
"""
obj = None
container = content.viewManager.CreateContainerView(content.rootFolder,
vimtype, True)
container = content.viewManager.CreateContainerView(
content.rootFolder, vimtype, True)
_views.append(container)
for c in container.view:
if c.name == name:
@ -39,8 +39,8 @@ def get_obj_by_moId(content, vimtype, moid):
Get the vsphere managed object by moid value
"""
obj = None
container = content.viewManager.CreateContainerView(content.rootFolder,
vimtype, True)
container = content.viewManager.CreateContainerView(
content.rootFolder, vimtype, True)
_views.append(container)
for c in container.view:
if c._GetMoId() == moid:
@ -76,7 +76,8 @@ def poweron_vm(content, mo):
wait_for_tasks(content, [mo.PowerOn()])
print('{0} powered on successfully'.format(mo._GetMoId()))
except Exception:
print('Unexpected error while powering on vm {0}'.format(mo._GetMoId()))
print('Unexpected error while powering on vm {0}'.format(
mo._GetMoId()))
return False
return True
@ -93,8 +94,8 @@ def poweroff_vm(content, mo):
wait_for_tasks(content, [mo.PowerOff()])
print('{0} powered off successfully'.format(mo._GetMoId()))
except Exception:
print(
'Unexpected error while powering off vm {0}'.format(mo._GetMoId()))
print('Unexpected error while powering off vm {0}'.format(
mo._GetMoId()))
return False
return True
@ -106,10 +107,11 @@ def wait_for_tasks(content, tasks):
taskList = [str(task) for task in tasks]
# Create filter
objSpecs = [vmodl.query.PropertyCollector.ObjectSpec(obj=task) for task in
tasks]
propSpec = vmodl.query.PropertyCollector.PropertySpec(type=vim.Task,
pathSet=[], all=True)
objSpecs = [
vmodl.query.PropertyCollector.ObjectSpec(obj=task) for task in tasks
]
propSpec = vmodl.query.PropertyCollector.PropertySpec(
type=vim.Task, pathSet=[], all=True)
filterSpec = vmodl.query.PropertyCollector.FilterSpec()
filterSpec.objectSet = objSpecs
filterSpec.propSet = [propSpec]
@ -147,6 +149,15 @@ def wait_for_tasks(content, tasks):
task_filter.Destroy()
def get_cluster_name_by_id(content, name):
cluster_obj = get_obj(content, [vim.ClusterComputeResource], name)
if cluster_obj is not None:
self.mo_id = cluster_obj._GetMoId()
print('Cluster MoId: {0}'.format(self.mo_id))
else:
print('Cluster: {0} not found'.format(self.cluster_name))
def __destroy_container_views():
for view in _views:
try:

View File

@ -1,8 +1,7 @@
#!/usr/bin/env python
"""
* *******************************************************
* Copyright (c) VMware, Inc. 2014, 2016. All Rights Reserved.
* Copyright (c) VMware, Inc. 2014, 2016, 2018 All Rights Reserved.
* SPDX-License-Identifier: MIT
* *******************************************************
*
@ -18,16 +17,26 @@ __copyright__ = 'Copyright 2014, 2016 VMware, Inc. All rights reserved.'
__vcenter_version__ = '6.0+'
import time
from pyVim.connect import SmartConnect, Disconnect
from pyVmomi import vim, vmodl
from com.vmware.cis.tagging_client import (
Category, CategoryModel, Tag, TagAssociation)
from com.vmware.cis.tagging_client import (Category, CategoryModel, Tag,
TagAssociation)
from com.vmware.vcenter_client import Cluster
from com.vmware.vapi.std_client import DynamicID
from samples.vsphere.common.sample_base import SampleBase
from vmware.vapi.vsphere.client import create_vsphere_client
from samples.vsphere.common import vapiconnect
from samples.vsphere.common.vim.helpers import vim_utils
from samples.vsphere.common import sample_cli
from samples.vsphere.common import sample_util
from samples.vsphere.common.ssl_helper import get_unverified_session
from samples.vsphere.common.ssl_helper import get_unverified_context
from samples.vsphere.common.vim.helpers.get_cluster_by_name import get_cluster_id
class TaggingWorkflow(SampleBase):
class TaggingWorkflow:
"""
Demonstrates tagging CRUD operations
Step 1: Create a Tag category.
@ -42,75 +51,87 @@ class TaggingWorkflow(SampleBase):
"""
def __init__(self):
SampleBase.__init__(self, self.__doc__)
self.servicemanager = None
self.category_svc = None
self.tag_svc = None
self.tag_association = None
parser = sample_cli.build_arg_parser()
self.category_name = None
self.category_desc = None
self.tag_name = None
self.tag_desc = None
parser.add_argument(
'--clustername',
action='store',
required=True,
help='Name of the cluster to be tagged')
self.cluster_name = None
self.cluster_moid = None
self.category_id = None
self.tag_id = None
self.tag_attached = False
self.dynamic_id = None
parser.add_argument(
'--categoryname',
action='store',
required=True,
help='Name of the Category to be created')
def _options(self):
self.argparser.add_argument('-clustername', '--clustername', help='Name of the cluster to be tagged')
self.argparser.add_argument('-categoryname', '--categoryname', help='Name of the Category to be created')
self.argparser.add_argument('-categorydesc', '--categorydesc', help='Description of the Category to be created')
self.argparser.add_argument('-tagname', '--tagname', help='Name of the tag to be created')
self.argparser.add_argument('-tagdesc', '--tagdesc', help='Description of the tag to be created')
parser.add_argument(
'--categorydesc',
action='store',
default='Sample category description',
help='Description of the Category to be created')
def _setup(self):
if self.cluster_name is None: # for testing
self.cluster_name = self.args.clustername
assert self.cluster_name is not None
print('Cluster Name: {0}'.format(self.cluster_name))
parser.add_argument(
'--tagname',
action='store',
required=True,
help='Name of the tag to be created')
if self.category_name is None:
self.category_name = self.args.categoryname
assert self.category_name is not None
print('Category Name: {0}'.format(self.category_name))
parser.add_argument(
'--tagdesc',
action='store',
default='Sample tag description',
help='Description of the tag to be created')
if self.category_desc is None:
self.category_desc = self.args.categorydesc
assert self.category_desc is not None
print('Category Description: {0}'.format(self.category_desc))
args = sample_util.process_cli_args(parser.parse_args())
self.cleardata = args.cleardata
self.cluster_name = args.clustername
self.category_name = args.categoryname
self.category_desc = args.categorydesc
self.tag_name = args.tagname
self.tag_desc = args.tagdesc
self.skip_verification = args.skipverification
if self.tag_name is None:
self.tag_name = self.args.tagname
assert self.tag_name is not None
print('Tag Name: {0}'.format(self.tag_name))
session = get_unverified_session() if args.skipverification else None
if self.tag_desc is None:
self.tag_desc = self.args.tagdesc
assert self.tag_desc is not None
print('Tag Description: {0}'.format(self.tag_desc))
# Connect to vSphere client
self.client = create_vsphere_client(
server=args.server,
username=args.username,
password=args.password,
session=session)
if self.servicemanager is None:
self.servicemanager = self.get_service_manager()
# For vSphere 6.0 users, use pyVmomi to get the cluster ID
context = get_unverified_context() if self.skip_verification else None
si = SmartConnect(
host=args.server,
user=args.username,
pwd=args.password,
sslContext=context)
# Sample is not failing if Clustername passed is not valid
# Validating if Cluster Name passed is Valid
print('finding the cluster {0}'.format(self.cluster_name))
self.cluster_moid = get_cluster_id(service_manager=self.servicemanager, cluster_name=self.cluster_name)
assert self.cluster_moid is not None
print('Found cluster:{0} mo_id:{1}'.format(self.cluster_name, self.cluster_moid))
cluster_obj = vim_utils.get_obj(
content=si.RetrieveContent(),
vimtype=[vim.ClusterComputeResource],
name=args.clustername)
self.category_svc = Category(self.servicemanager.stub_config)
self.tag_svc = Tag(self.servicemanager.stub_config)
self.tag_association = TagAssociation(self.servicemanager.stub_config)
if cluster_obj:
self.cluster_moid = cluster_obj._GetMoId()
print('Found cluster:{} mo_id:{}'.format(self.cluster_name,
self.cluster_moid))
else:
raise ValueError('Cluster with name "{}" not found'.format(
self.cluster_name))
def _execute(self):
# Note: for vSphere 6.5+ users, use vSphere Automation APIs to get the cluster ID
# filter_spec = Cluster.FilterSpec(names=set([self.cluster_name]))
# cluster_summaries = self.client.vcenter.Cluster.list(filter_spec)
# if len(cluster_summaries) > 0:
# self.cluster_moid = cluster_summaries[0].cluster
def run(self):
print('List all the existing categories user has access to...')
categories = self.category_svc.list()
categories = self.client.tagging.Category.list()
if len(categories) > 0:
for category in categories:
print('Found Category: {0}'.format(category))
@ -118,7 +139,7 @@ class TaggingWorkflow(SampleBase):
print('No Tag Category Found...')
print('List all the existing tags user has access to...')
tags = self.tag_svc.list()
tags = self.client.tagging.Tag.list()
if len(tags) > 0:
for tag in tags:
print('Found Tag: {0}'.format(tag))
@ -126,36 +147,59 @@ class TaggingWorkflow(SampleBase):
print('No Tag Found...')
print('creating a new tag category...')
self.category_id = self.create_tag_category(self.category_name, self.category_desc,
self.category_id = self.create_tag_category(
self.category_name, self.category_desc,
CategoryModel.Cardinality.MULTIPLE)
assert self.category_id is not None
print('Tag category created; Id: {0}'.format(self.category_id))
print("Get category name and description...")
category_ids = self.client.tagging.Category.list()
for category_id in category_ids:
category_model = self.client.tagging.Category.get(category_id)
print("Category ID '{}', name '{}', description '{}'".format(
category_model.id, category_model.name, category_model.description
))
print("creating a new Tag...")
self.tag_id = self.create_tag(self.tag_name, self.tag_desc, self.category_id)
self.tag_id = self.create_tag(self.tag_name, self.tag_desc,
self.category_id)
assert self.tag_id is not None
print('Tag created; Id: {0}'.format(self.tag_id))
print("Get tag name and description...")
tag_ids = self.client.tagging.Tag.list()
for tag_id in tag_ids:
tag_model = self.client.tagging.Tag.get(tag_id)
print("Tag ID '{}', name '{}', description '{}'".format(
tag_model.id, tag_model.name, tag_model.description
))
print('updating the tag...')
date_time = time.strftime('%d/%m/%Y %H:%M:%S')
self.update_tag(self.tag_id, 'Server Tag updated at ' + date_time)
print('Tag updated; Id: {0}'.format(self.tag_id))
print('Tagging the cluster {0}...'.format(self.cluster_name))
self.dynamic_id = DynamicID(type='ClusterComputeResource', id=self.cluster_moid)
self.tag_association.attach(tag_id=self.tag_id, object_id=self.dynamic_id)
for tag_id in self.tag_association.list_attached_tags(self.dynamic_id):
self.dynamic_id = DynamicID(
type='ClusterComputeResource', id=self.cluster_moid)
self.client.tagging.TagAssociation.attach(
tag_id=self.tag_id, object_id=self.dynamic_id)
for tag_id in self.client.tagging.TagAssociation.list_attached_tags(
self.dynamic_id):
if tag_id == self.tag_id:
self.tag_attached = True
break
assert self.tag_attached
print('Tagged cluster: {0}'.format(self.cluster_moid))
def _cleanup(self):
try:
def cleanup(self):
if self.cleardata:
if self.tag_attached:
self.tag_association.detach(self.tag_id, self.dynamic_id)
print('Removed tag from cluster: {0}'.format(self.cluster_moid))
self.client.tagging.TagAssociation.detach(
self.tag_id, self.dynamic_id)
print('Removed tag from cluster: {0}'.format(
self.cluster_moid))
if self.tag_id is not None:
self.delete_tag(self.tag_id)
@ -164,52 +208,50 @@ class TaggingWorkflow(SampleBase):
if self.category_id is not None:
self.delete_tag_category(self.category_id)
print('Tag category deleted; Id: {0}'.format(self.category_id))
except Exception as e:
raise Exception(e)
def create_tag_category(self, name, description, cardinality):
"""create a category. User who invokes this needs create category privilege."""
create_spec = self.category_svc.CreateSpec()
create_spec = self.client.tagging.Category.CreateSpec()
create_spec.name = name
create_spec.description = description
create_spec.cardinality = cardinality
associableTypes = set()
create_spec.associable_types = associableTypes
return self.category_svc.create(create_spec)
return self.client.tagging.Category.create(create_spec)
def delete_tag_category(self, category_id):
"""Deletes an existing tag category; User who invokes this API needs
delete privilege on the tag category.
"""
self.category_svc.delete(category_id)
self.client.tagging.Category.delete(category_id)
def create_tag(self, name, description, category_id):
"""Creates a Tag"""
create_spec = self.tag_svc.CreateSpec()
create_spec = self.client.tagging.Tag.CreateSpec()
create_spec.name = name
create_spec.description = description
create_spec.category_id = category_id
return self.tag_svc.create(create_spec)
return self.client.tagging.Tag.create(create_spec)
def update_tag(self, tag_id, description):
"""Update the description of an existing tag.
User who invokes this API needs edit privilege on the tag.
"""
update_spec = self.tag_svc.UpdateSpec()
update_spec = self.client.tagging.Tag.UpdateSpec()
update_spec.setDescription = description
self.tag_svc.update(tag_id, update_spec)
self.client.tagging.Tag.update(tag_id, update_spec)
def delete_tag(self, tag_id):
"""Delete an existing tag.
User who invokes this API needs delete privilege on the tag."""
self.tag_svc.delete(tag_id)
self.client.tagging.Tag.delete(tag_id)
def main():
tagging_workflow = TaggingWorkflow()
tagging_workflow.main()
tagging_workflow.run()
tagging_workflow.cleanup()
# Start program
if __name__ == '__main__':
main()