1
0
mirror of https://github.com/vmware/vsphere-automation-sdk-python.git synced 2024-11-21 09:19:59 -05:00

Refresh vCenter bindings, samples, README and LICENSE files for 8.0U2 release

Signed-off-by: Ankit Agrawal <aagrawal3@vmware.com>
This commit is contained in:
Ankit Agrawal 2023-09-12 16:45:42 +05:30
parent 303d9be840
commit b106559c1a
12 changed files with 1683 additions and 2202 deletions

10
LICENSE
View File

@ -1,14 +1,6 @@
LICENSE
vsphere_automation_sdk_python 8.0U1
Copyright (c) 2016-2023 VMware, Inc. All rights reserved.
This product is licensed to you under the MIT License (the License). You may not use this product except in compliance with the License.
MIT License
Copyright (c) <year> <copyright holders>
Copyright (c) 2016-2023 VMware, Inc.
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:

View File

@ -21,14 +21,14 @@ samples require the vSphere Management SDK packages (pyVmomi) to be installed on
The samples have been developed to work with python 3.8+
## Supported OnPrem vCenter Releases
vCenter 7.0, 7.0U1, 7.0U2, 7.0U3 , 8.0 and 8.0U1
vCenter 7.0, 7.0U1, 7.0U2, 7.0U3 , 8.0, 8.0U1, 8.0U2
Please refer to the notes in each sample for detailed compatibility information.
## Supported NSX-T Releases
NSX-T 2.2 - 4.1.0 and VMC 1.7 - 1.22
NSX-T 2.2 - 4.1.0 and VMC 1.7 - 1.24
## Latest VMware Cloud on AWS Release:
VMC M22 (1.22) ([Release Notes](https://docs.vmware.com/en/VMware-Cloud-on-AWS/0/rn/vmc-on-aws-relnotes.html))
VMC M24 (1.24) ([Release Notes](https://docs.vmware.com/en/VMware-Cloud-on-AWS/0/rn/vmc-on-aws-relnotes.html))
## Quick Start Guide
@ -264,8 +264,9 @@ $ python samples/vsphere/vcenter/vm/list_vms.py -v
### vSphere API Documentation
* [VMware vSphere REST API Reference documentation](https://developer.vmware.com/docs/vsphere-automation/latest/)
* [vSphere 8.0.1.0 Python APIs (latest)](https://vmware.github.io/vsphere-automation-sdk-python/vsphere/8.0.1.0/)
* Previous Releases: vSphere [8.0.0.1](https://vmware.github.io/vsphere-automation-sdk-python/vsphere/8.0.0.1/)
* [vSphere 8.0 U2 Python APIs (latest)](https://vmware.github.io/vsphere-automation-sdk-python/vsphere/8.0.2.0/)
* Previous Releases: vSphere [8.0 U1](https://vmware.github.io/vsphere-automation-sdk-python/vsphere/8.0.1.0/)
[8.0 GA](https://vmware.github.io/vsphere-automation-sdk-python/vsphere/8.0.0.1/)
[8.0.0.0](https://vmware.github.io/vsphere-automation-sdk-python/vsphere/8.0.0.0/),
[7.0 U3](https://vmware.github.io/vsphere-automation-sdk-python/vsphere/7.0.3.0/)
[7.0 U2](https://vmware.github.io/vsphere-automation-sdk-python/vsphere/7.0.2.0/), [7.0 U1](https://vmware.github.io/vsphere-automation-sdk-python/vsphere/7.0.1.0/), [7.0](https://vmware.github.io/vsphere-automation-sdk-python/vsphere/7.0.0.1/).

File diff suppressed because it is too large Load Diff

View File

@ -19,7 +19,8 @@ __vcenter_version__ = '7.0.2.0'
import requests
import sys, time
import sys
import time
from samples.vsphere.common import sample_cli
from samples.vsphere.common import sample_util
@ -54,9 +55,11 @@ Sample Prerequisites:
- Python 3.9
"""
def get_kms_providers(client: VsphereClient) -> kms_client.Providers:
return vsphere_client.vcenter.crypto_manager.kms.Providers
def print_kms_configurations(kmsProviders: kms_client.Providers):
for provider in kmsProviders.list():
print(f"Native Key Provider summary: {provider}")
@ -182,4 +185,3 @@ print("Delete Native Key Provider")
kmsProviders.delete(provider=provider_name)
print("Done.")

View File

@ -0,0 +1,279 @@
#!/usr/bin/env python
"""
* *******************************************************
* Copyright (c) VMware, Inc. 2023. All Rights Reserved.
* SPDX-License-Identifier: MIT
* *******************************************************
*
* DISCLAIMER. THIS PROGRAM IS PROVIDED TO YOU "AS IS" WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, WHETHER ORAL OR WRITTEN,
* EXPRESS OR IMPLIED. THE AUTHOR SPECIFICALLY DISCLAIMS ANY IMPLIED
* WARRANTIES OR CONDITIONS OF MERCHANTABILITY, SATISFACTORY QUALITY,
* NON-INFRINGEMENT AND FITNESS FOR A PARTICULAR PURPOSE.
"""
__author__ = 'VMware, Inc.'
__vcenter_version__ = '8.0u1+'
import urllib.request
import concurrent.futures
import asyncio
import requests
import logging
import sys
import ssl
from vmware.vapi.vsphere.client import create_vsphere_client
from com.vmware.vcenter.authorization_client import PrivilegeChecks
from pyVim.connect import SmartConnect
from pyVmomi import vim, VmomiSupport
from samples.vsphere.common import sample_cli
from samples.vsphere.common import sample_util
from samples.vsphere.common.ssl_helper import get_unverified_session
from six.moves import http_client, urllib
from os import path
from time import sleep
"""
Creates a virtual machine from an ovf template
and then retrieves the minimal set of privileges
required for the performed workflow
The vm create workflow can be swapped
for any other desired vCenter workflow
like e.g. network/host configuration,
vCenter upgrade etc.
Sample Prerequisites:
- Existing vCenter environment with at least 1 host
- The user executing the script should have at least Sessions.CollectPrivilegeChecks
to invoke the PrivilegeChecks API.
However VM creation and other workflows will require different set of privileges.
The script is intended to be executed by an administrator or another privileged user.
"""
parser = sample_cli.build_arg_parser()
parser.add_argument("--host_moId",
help="Host's moId on which to deploy vm",
required=True)
parser.add_argument("--ovf_url",
help="Url to the ovf from which to create vm",
required=True)
parser.add_argument('--vm_name',
default='Sample_Default_VM_for_Simple_Testbed',
help='Name of the testing vm')
parser.add_argument("--dc_moId",
help="Datacenter's moId under which to create vm.")
parser.add_argument("--ds_moId",
help="Datastore's moId which to use for vm storage")
parser.add_argument("--rp_moId",
help="Resource pool's moId")
parser.add_argument("--opId",
help="opId which to use for vc invocations in order to track used privileges",
default="create-vm-opId")
class OpId:
def __init__(self, opId):
self.opId = opId
def __enter__(self, name=None):
reqCtx = VmomiSupport.GetRequestContext()
self.prevId = reqCtx[
'operationID'] if 'operationID' in reqCtx else None
reqCtx["operationID"] = self.opId
return self
def __exit__(self, *args):
reqCtx = VmomiSupport.GetRequestContext()
if self.prevId is not None:
reqCtx['operationID'] = self.prevId
else:
del reqCtx['operationID']
class OvfDeployer:
def __init__(self, args):
self.logger = logging.getLogger("ovf-deploy")
self.logger.addHandler(logging.StreamHandler(sys.stdout))
self.logger.setLevel(logging.DEBUG)
self.context = ssl._create_unverified_context() if args.skipverification else None
self.si = SmartConnect(host=args.server,
user=args.username,
pwd=args.password,
sslContext=self.context)
self.base_url = path.dirname(args.ovf_url)
self.ovf_content = self.get_ovf_content(args.ovf_url)
self.container_view = None
self.host = vim.HostSystem(args.host_moId, self.si._stub)
self.datacenter = self.get_datacenter(args)
self.datastore = self.get_datastore(args)
self.resource_pool = self.get_resource_pool(args)
self.vm_name = args.vm_name
# Set config options
self.set_vpxd_option(self.si, "config.vpxd.privCheck.bufferSize",
"5000000")
self.set_vpxd_option(self.si, "config.vpxd.privCheck.cleanupInterval",
"1000")
def __del__(self):
# Unset config options
self.set_vpxd_option(self.si, "config.vpxd.privCheck.bufferSize", "0")
self.set_vpxd_option(self.si, "config.vpxd.privCheck.cleanupInterval",
"0")
async def deploy(self):
import_spec = self.create_import_spec()
lease = self.import_vapp(import_spec)
await self.upload_files(lease, import_spec.fileItem)
lease.Progress(100)
lease.Complete()
def set_vpxd_option(self, si, key, val):
optionVal = vim.option.OptionValue()
optionVal.key = key
optionVal.value = val
optionVals = [optionVal]
om = si.content.setting
om.UpdateValues(optionVals)
def get_ovf_content(self, ovf_url):
return urllib.request.urlopen(ovf_url).read()
def create_import_spec(self):
importSpecParams = vim.OvfManager.CreateImportSpecParams(
ipAllocationPolicy='dhcpPolicy',
ipProtocol='IPv4',
diskProvisioning='thin',
entityName=self.vm_name)
spec = self.si.content.ovfManager.CreateImportSpec(
ovfDescriptor=self.ovf_content.decode(),
resourcePool=self.resource_pool,
datastore=self.datastore,
cisp=importSpecParams)
return spec
def import_vapp(self, import_spec):
lease = self.resource_pool.ImportVApp(import_spec.importSpec,
self.datacenter.vmFolder,
self.host)
while lease.state == vim.HttpNfcLease.State.initializing:
sleep(1)
assert lease.state == vim.HttpNfcLease.State.ready
return lease
async def upload_files(self, lease, file_items):
upload_map = {}
for device_url in lease.info.deviceUrl:
upload_map[device_url.importKey] = [device_url.key, device_url.url]
loop = asyncio.get_event_loop()
upload_tasks = []
with concurrent.futures.ThreadPoolExecutor() as pool:
for file_item in file_items:
_, dst_url = upload_map[file_item.deviceId]
upload_tasks.append(
loop.run_in_executor(pool, self.upload_file,
*[file_item, dst_url]))
await asyncio.wait(upload_tasks, return_when=asyncio.ALL_COMPLETED,
timeout=2 * 60 * 60)
def upload_file(self, file_item, dst_url):
src_url = "{}/{}".format(self.base_url, file_item.path)
src_req = requests.get(src_url, verify=False, stream=True)
assert src_req.ok
parsed_url = urllib.parse.urlparse(dst_url)
dst_req = http_client.HTTPSConnection(self.host.name,
context=self.context)
request_type = "PUT" if file_item.create else "POST"
dst_req.putrequest(request_type, parsed_url.path)
dst_req.putheader('Content-Length', file_item.size)
dst_req.endheaders()
chunk = min(64 * 1024, file_item.size)
for content_chunk in src_req.iter_content(chunk_size=chunk):
dst_req.send(content_chunk)
dst_req.close()
src_req.close()
async def _wait_for_task(self, task):
state = task.info.state
error = task.info.error
while state not in (
vim.TaskInfo.State.success, vim.TaskInfo.State.error):
if state not in ['running', 'success']:
self.logger.debug(
'task state is not expected:\n %s' % task.info)
await asyncio.sleep(3)
self.logger.info('%s progress=%s' % (task, task.info.progress))
if state != vim.TaskInfo.State.success:
self.logger.debug(task.info)
raise error
self.logger.info('%s' % task.info)
def _get_container_view(self):
if self.container_view is not None:
return self.container_view
self.container_view = self.si.RetrieveContent().viewManager.CreateContainerView(
self.host, recursive=False)
return self.container_view
def get_datacenter(self, args):
if args.dc_moId:
return vim.Datacenter(args.dc_moId, self.si._stub)
container_view = self._get_container_view()
return self.get_parent(container_view.container, vim.Datacenter)
def get_resource_pool(self, args):
if args.rp_moId:
return vim.ResourcePool(args.rpId, self.si._stub)
container_view = self._get_container_view()
compute_resource = self.get_parent(container_view.container,
vim.ComputeResource) or \
self.get_parent(container_view.container,
vim.ClusterComputeResource)
return compute_resource.resourcePool
def get_datastore(self, args):
if args.ds_moId:
return vim.Datastore(args.ds_moId, self.si._stub)
container_view = self._get_container_view()
return container_view.container.datastore[0]
def get_parent(self, container, parent_type):
parent = container.parent
while type(parent) is not parent_type:
if not hasattr(parent, "parent"):
return None
parent = parent.parent
return parent
args = sample_util.process_cli_args(parser.parse_args())
deployer = OvfDeployer(args)
with OpId(args.opId) as opId:
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait([deployer.deploy()]))
session = get_unverified_session() if args.skipverification else None
client = create_vsphere_client(server=args.server,
username=args.username,
password=args.password,
session=session)
user, domain = args.username.split("@")
principal = PrivilegeChecks.Principal(name=user, domain=domain)
filterSpec = PrivilegeChecks.FilterSpec(op_ids={args.opId},
principals=[principal])
iterationSpec = PrivilegeChecks.IterationSpec(size=50)
checks = PrivilegeChecks(client._stub_config).list(filter=filterSpec,
iteration=iterationSpec)
for item in checks.items:
print(
f"Object: {item.object.type}:{item.object.id},"
f" Privilege: {item.privilege},"
f" Principal: {item.principal.name}@{item.principal.domain}")
del deployer

View File

@ -0,0 +1,176 @@
#!/usr/bin/env python
"""
* *******************************************************
* Copyright (c) VMware, Inc. 2023. All Rights Reserved.
* SPDX-License-Identifier: MIT
* *******************************************************
*
* DISCLAIMER. THIS PROGRAM IS PROVIDED TO YOU "AS IS" WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, WHETHER ORAL OR WRITTEN,
* EXPRESS OR IMPLIED. THE AUTHOR SPECIFICALLY DISCLAIMS ANY IMPLIED
* WARRANTIES OR CONDITIONS OF MERCHANTABILITY, SATISFACTORY QUALITY,
* NON-INFRINGEMENT AND FITNESS FOR A PARTICULAR PURPOSE.
"""
__author__ = 'VMware, Inc.'
__copyright__ = 'Copyright 2023 VMware, Inc. All rights reserved.'
__vcenter_version__ = '8.0U2+'
import json
import requests
from vmware.vapi.data.serializers.cleanjson import DataValueConverter
from pyVmomi import vim
from pyVmomi.SoapAdapter import Serialize, Deserialize
from samples.vsphere.common import sample_cli
from samples.vsphere.common import sample_util
"""
Demonstrates conversion of data objects from the JSON based
automation runtime to the SOAP based runtime (pyVmomi) and vice
versa through the transcoder API, introduced in version `8.0.2.0`.
The current sample utilizes a `vim.vm.ConfigSpec` managed object,
present in bindings of both runtimes.
Sample Prerequisites:
- vCenter
"""
class TranscoderStub(object):
"""
Stub utilized for communicating to the transcoder API.
"""
def __init__(self, server, session_id, version):
self.server = server
self.session_id = session_id
self.version = version
def transcode(self, body, to_json):
"""
Transcodes and validates the integrity of a JSON or XML
serialized data object.
Transcoding is available from JSON or XML to JSON or XML
for both cases.
Transcoding to different encoding types is useful when
utilizing the same data objects in a program involving
SOAP and JSON based stacks/bindings.
"""
resp = requests.post(url='https://{}/sdk/vim25/{}/transcoder'.format(self.server, self.version),
data=body,
headers={'Content-type': 'application/json' if not to_json else 'application/xml',
'Accept': 'application/json' if to_json else 'application/xml',
'vmware-api-session-id': self.session_id}, # alternatively use 'Cookie' header
# Skip server cert verification.
# This is not recommended in production code.
verify=False)
return resp.content.decode()
def negotiate_version(server, client_desired_versions):
"""
Invokes the System::Hello API, responsible for negotiating
common parameters for API communication. The implementation
selects mutually supported version from the choices passed
in the request body.
"""
resp = requests.post(url='https://{}/api/vcenter/system?action=hello'.format(server),
json={'api_releases': client_desired_versions},
# Skip server cert verification.
# This is not recommended in production code.
verify=False)
return json.loads(resp.content.decode())['api_release']
def get_session_id(server, username, password, version):
"""
Login through VI/JSON `SessionManager` API and acquire
the `vmware-api-session-id` header.
"""
resp = requests.post(url='https://{}/sdk/vim25/{}/SessionManager/SessionManager/Login'.format(server, version),
json=json.loads('{{"userName":"{}","password":"{}"}}'.format(username, password)),
# Skip server cert verification.
# This is not recommended in production code.
verify=False)
return resp.headers.get('vmware-api-session-id')
def create_config_spec(datastore_name='datastore1',
name='sample-vm',
memory=4,
guest='guest',
annotation='Sample',
cpus=1):
"""
Creates a pyVmomi `vim.vm.ConfigSpec` data object with
arbitrarily populated fields.
"""
config = vim.vm.ConfigSpec()
config.annotation = annotation
config.memoryMB = int(memory)
config.guestId = guest
config.name = name
config.numCPUs = cpus
files = vim.vm.FileInfo()
files.vmPathName = '[' + datastore_name + ']'
config.files = files
return config
def convert_pyvmomi_obj_to_automation_dynamic_struct(transcoder, pyvmomi_obj):
# Serialize pyVmomi object to XML
xml_vm_config = Serialize(pyvmomi_obj)
# Transcode XML to JSON
json_vm_config = transcoder.transcode(xml_vm_config, to_json=True)
print(json_vm_config)
# Deserialize JSON to automation DynamicStructure
struct_vm_config = DataValueConverter.convert_to_data_value(json_vm_config)
print(type(struct_vm_config))
return struct_vm_config
def convert_automation_dynamic_struct_to_pyvmomi_obj(transcoder, dynamic_struct):
# Serialize DynamicStructure into JSON
json_vm_config = DataValueConverter.convert_to_json(dynamic_struct)
# Transcode JSON to XML
xml_vm_config = transcoder.transcode(json_vm_config, to_json=False)
print(xml_vm_config)
# Deserialize XML to `vim.vm.ConfigSpec` data object
config_vm_xml = Deserialize(xml_vm_config)
print(type(config_vm_xml))
if __name__ == '__main__':
# Disabling warnings is not recommended in production code.
requests.packages.urllib3.disable_warnings()
parser = sample_cli.build_arg_parser()
args = sample_util.process_cli_args(parser.parse_args())
# Negotiating API release is necessary to use in APIs
# utilizing inheritance based polymorphism - such as transcoder API.
# Desired version is '8.0.2.0'
version = negotiate_version(args.server, ['8.0.2.0'])
session_id = get_session_id(args.server, args.username, args.password, version)
transcoder = TranscoderStub(args.server, session_id, version)
# Create SOAP vim.vm.ConfigSpec obj
pyvmomi_vm_config = create_config_spec()
dynamic_struct = convert_pyvmomi_obj_to_automation_dynamic_struct(transcoder, pyvmomi_vm_config)
# Demonstrate conversion in the other direction
convert_automation_dynamic_struct_to_pyvmomi_obj(transcoder, dynamic_struct)

View File

@ -0,0 +1,192 @@
#!/usr/bin/env python
"""
* *******************************************************
* Copyright (c) VMware, Inc. 2019. All Rights Reserved.
* SPDX-License-Identifier: MIT
* *******************************************************
*
* DISCLAIMER. THIS PROGRAM IS PROVIDED TO YOU "AS IS" WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, WHETHER ORAL OR WRITTEN,
* EXPRESS OR IMPLIED. THE AUTHOR SPECIFICALLY DISCLAIMS ANY IMPLIED
* WARRANTIES OR CONDITIONS OF MERCHANTABILITY, SATISFACTORY QUALITY,
* NON-INFRINGEMENT AND FITNESS FOR A PARTICULAR PURPOSE.
"""
__author__ = 'VMware, Inc.'
__vcenter_version__ = '7.0+'
from os.path import dirname
from os.path import join as pjoin
from com.vmware.vcenter.vm_client import Power
from vmware.vapi.vsphere.client import create_vsphere_client
from com.vmware.vapi.std.errors_client import NotFound
from samples.vsphere.common.sample_cli import build_arg_parser
from samples.vsphere.common.ssl_helper import get_unverified_session
from samples.vsphere.vcenter.helper.vm_helper import get_vm
from pyVim.connect import (SmartConnect, Disconnect)
from pyVmomi import vim
import atexit
import configparser
import time
import ssl
# import CustomizationSpecManager from existing test case
custSpecMgrPath = pjoin(dirname(dirname(dirname(__file__))), "guest")
import sys
sys.path.append(custSpecMgrPath)
from customizationSpecs import CustomizationSpecManager
# inherit existing CustomizationSpecMananger to create a default linux
# customizationSpec for the following test
class NewCustomizationSpecManager(CustomizationSpecManager):
def __init__(self, client):
global custSpecMgrPath
self.client = client
self.specs_svc = client.vcenter.guest.CustomizationSpecs
# get customization config
self.config = configparser.ConfigParser()
self.linCfgPath = pjoin(custSpecMgrPath, 'linSpec.cfg')
self.specsAdded = []
class CustomizeVM(object):
"""
Demo how to customize a virtual machine
Sample Prerequisites:
The sample needs an existing Linux VM with vmtools/open-vm-tools installed.
"""
def __init__(self):
parser = build_arg_parser()
parser.add_argument('-n', '--vm_name',
action='store',
help='Name of the testing vm')
args = parser.parse_args()
if args.vm_name:
self.vm_name = args.vm_name
else:
raise Exception('Must specifiy an existing Linux VM for test' +
' with "-n VM_NAME"')
self.cleardata = args.cleardata
session = get_unverified_session() if args.skipverification else None
self.client = create_vsphere_client(server=args.server,
username=args.username,
password=args.password,
session=session)
# get si
sslContext = ssl._create_unverified_context()
self.si = SmartConnect(host=args.server,
user=args.username,
pwd=args.password,
sslContext=sslContext)
atexit.register(Disconnect, self.si)
# init specs_svc and vmcust_svc
self.specs_svc = self.client.vcenter.guest.CustomizationSpecs
self.vmcust_svc = self.client.vcenter.vm.guest.Customization
self.custSpecMgr = NewCustomizationSpecManager(self.client)
self.vm = get_vm(self.client, self.vm_name)
if not self.vm:
raise Exception('Need an existing Linux vm with name ({}).'
'Please create the vm first.'.format(self.vm_name))
self.vmRef = self._getVmRef(vmName=self.vm_name)
def _getVmRef(self, vmName=None):
content = self.si.RetrieveContent()
for child in content.rootFolder.childEntity:
if hasattr(child, 'vmFolder'):
datacenter = child
vmFolder = datacenter.vmFolder
vmList = vmFolder.childEntity
for vm in vmList:
if vm.name == vmName:
return vm
raise Exception("Cannot find the vm with name: {0}".format(vmName))
def _findCustEvent(self, expectedEvent, tsBeforeQuery):
vm = self.vmRef
eventMgr = self.si.content.eventManager
recOpt = vim.event.EventFilterSpec.RecursionOption()
evtFilterEnt = \
vim.event.EventFilterSpec.ByEntity(entity=vm,
recursion=recOpt.self)
evtFilterTime = \
vim.event.EventFilterSpec.ByTime(beginTime=tsBeforeQuery)
eventFilterSpec = vim.event.EventFilterSpec(entity=evtFilterEnt,
disableFullMessage=False,
time=evtFilterTime)
eventList = eventMgr.QueryEvents(eventFilterSpec)
for event in eventList:
if type(event) == expectedEvent:
print('Find expected customization Event %s' % expectedEvent)
return True
print('Did not find expected customization event, waiting...')
def waitForCustEvent(self, expectedEvent, timeout):
print('Waiting for customization event %s in %d seconds' %
(expectedEvent, timeout))
tsBeforeQuery = self.si.CurrentTime()
timeout = time.time() + timeout
while time.time() < timeout:
if self._findCustEvent(expectedEvent, tsBeforeQuery):
return True
time.sleep(10)
raise Exception('Timeout to find expected customization event')
def setVM(self):
print("Test Step: Using VM '{}' ({}) for Customize test".
format(self.vm_name, self.vm))
# create a linux customizationSpec
self.custSpecMgr.parseLinuxCfg()
self.specName = self.custSpecMgr.specName
print("Test Step: Create a default Linux customizationSpec '{}'".
format(self.specName))
try:
self.specs_svc.get(self.specName)
print("Default customizationSpec '{}' exists. Skip creating".
format(self.specName))
except NotFound:
self.custSpecMgr.createLinuxSpec()
self.setSpec = self.vmcust_svc.SetSpec(name=self.specName, spec=None)
print('Test Step: Do VM customization by vAPI set method')
# customize VM
self.vmcust_svc.set(vm=self.vm, spec=self.setSpec)
def powerOnVerify(self):
print("Test Step: Power on VM '{}' and verify".format(self.vm_name))
self.client.vcenter.vm.Power.start(self.vm)
status = self.client.vcenter.vm.Power.get(self.vm)
if status == Power.Info(state=Power.State.POWERED_ON):
print('\nVM is powered on...')
if self.waitForCustEvent(vim.event.CustomizationSucceeded, 900):
print("Test PASS!")
def cleanUp(self):
vm = self.vm
# clear customizationSpec
self.custSpecMgr.deleteSpec()
# Power off the vm if it is on
status = self.client.vcenter.vm.Power.get(vm)
if status == Power.Info(state=Power.State.POWERED_ON):
print('\n#VM is powered on, power it off')
self.client.vcenter.vm.Power.stop(vm)
print('vm.Power.stop({})'.format(vm))
status = self.client.vcenter.vm.Power.get(vm)
if status == Power.Info(state=Power.State.POWERED_OFF):
self.client.vcenter.VM.delete(vm)
print("Deleted VM -- '{}-({})".format(self.vm_name, vm))
def main():
myCustomizeVM = CustomizeVM()
myCustomizeVM.setVM()
myCustomizeVM.powerOnVerify()
if myCustomizeVM.cleardata:
print("Clean up in progress...")
myCustomizeVM.cleanUp()
if __name__ == '__main__':
main()

View File

@ -0,0 +1,169 @@
#!/usr/bin/env python
"""
* *******************************************************
* Copyright (c) VMware, Inc. 2021. All Rights Reserved.
* SPDX-License-Identifier: MIT
* *******************************************************
*
* DISCLAIMER. THIS PROGRAM IS PROVIDED TO YOU "AS IS" WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, WHETHER ORAL OR WRITTEN,
* EXPRESS OR IMPLIED. THE AUTHOR SPECIFICALLY DISCLAIMS ANY IMPLIED
* WARRANTIES OR CONDITIONS OF MERCHANTABILITY, SATISFACTORY QUALITY,
* NON-INFRINGEMENT AND FITNESS FOR A PARTICULAR PURPOSE.
"""
__author__ = 'VMware, Inc.'
__copyright__ = 'Copyright 2021 VMware, Inc. All rights reserved.'
__vcenter_version__ = 'VCenter 7.0 U3'
import atexit
import os
import time
import ssl
from com.vmware.vcenter.guest_client import CustomizationSpec, \
CloudConfiguration, CloudinitConfiguration, ConfigurationSpec, \
GlobalDNSSettings
from samples.vsphere.common import sample_cli, sample_util
from samples.vsphere.common.ssl_helper import get_unverified_session
from samples.vsphere.common.vim.helpers.vim_utils import get_obj
from samples.vsphere.vcenter.helper.vm_helper import get_vm
from pyVim.connect import (SmartConnect, Disconnect)
from pyVmomi import vim
from vmware.vapi.vsphere.client import create_vsphere_client
class CustomizeVMWithCloudinitData(object):
"""
Demo how to customize a virtual machine with cloud-init data
Sample Prerequisites:
This sample needs an existing Linux VM with both open-vm-tools
version 11.3.0+ and cloud-init version 21.1+ installed
"""
def __init__(self):
self.metadata = None
self.userdata = None
self.specName = 'cloudinitDataSpec'
self.specDesc =\
'cloud-init data customization spec with metadata and userdata'
self.parser = sample_cli.build_arg_parser()
self.parser.add_argument('-n', '--vm_name', action='store',
help='Name of the Linux vm')
self.args = sample_util.process_cli_args(self.parser.parse_args())
if self.args.vm_name is None:
raise Exception('Must specify an existing Linux VM for test with '
'"-n VM_NAME"')
self.session =\
get_unverified_session() if self.args.skipverification else None
self.client = create_vsphere_client(server=self.args.server,
username=self.args.username,
password=self.args.password,
session=self.session)
# get si
self.sslContext = ssl._create_unverified_context()
self.si = SmartConnect(host=self.args.server,
user=self.args.username,
pwd=self.args.password,
sslContext=self.sslContext)
atexit.register(Disconnect, self.si)
# init specs_svc and vmcust_svc
self.specs_svc = self.client.vcenter.guest.CustomizationSpecs
self.vmcust_svc = self.client.vcenter.vm.guest.Customization
self.vm = get_vm(self.client, self.args.vm_name)
if self.vm is None:
raise Exception('Need an existing Linux vm with name ({}). Please '
'create the vm first.'.format(self.args.vm_name))
self.vmRef = self._getVmRef(vmName=self.args.vm_name)
def _getVmRef(self, vmName=None):
content = self.si.RetrieveContent()
return get_obj(content, [vim.VirtualMachine], vmName)
def _findCustEvent(self, expectedEvent, tsBeforeQuery):
eventMgr = self.si.content.eventManager
recOpt = vim.event.EventFilterSpec.RecursionOption()
evtFilterEnt = \
vim.event.EventFilterSpec.ByEntity(entity=self.vmRef,
recursion=recOpt.self)
evtFilterTime = \
vim.event.EventFilterSpec.ByTime(beginTime=tsBeforeQuery)
eventFilterSpec = vim.event.EventFilterSpec(entity=evtFilterEnt,
disableFullMessage=False,
time=evtFilterTime)
eventList = eventMgr.QueryEvents(eventFilterSpec)
for event in eventList:
if type(event) == expectedEvent:
print('Find expected customization Event %s' % expectedEvent)
return True
print('Did not find expected customization event, waiting...')
def createCloudinitDataSpec(self):
"""
create a cloud-init data customizationSpec
"""
print('------create 1 linux cloud-init data CustomizationSpec-------')
metadataYamlFilePath = os.path.join(os.path.dirname(
os.path.realpath(__file__)),
'../../guest/sample_metadata.json')
userdataFilePath = os.path.join(os.path.dirname(
os.path.realpath(__file__)),
'../../guest/sample_userdata')
with open(metadataYamlFilePath, "r") as fp:
self.metadata = fp.read().rstrip('\n')
with open(userdataFilePath, "r") as fp:
self.userdata = fp.read().rstrip('\n')
cloudinitConfig = CloudinitConfiguration(metadata=self.metadata,
userdata=self.userdata)
cloudConfig =\
CloudConfiguration(cloudinit=cloudinitConfig,
type=CloudConfiguration.Type('CLOUDINIT'))
configSpec = ConfigurationSpec(cloud_config=cloudConfig)
globalDnsSettings = GlobalDNSSettings()
adapterMappingList = []
customizationSpec =\
CustomizationSpec(configuration_spec=configSpec,
global_dns_settings=globalDnsSettings,
interfaces=adapterMappingList)
createSpec = self.specs_svc.CreateSpec(name=self.specName,
description=self.specDesc,
spec=customizationSpec)
self.specs_svc.create(spec=createSpec)
print('Spec {} has been created'.format(self.specName))
def setVM(self):
print('---customize VM {} with cloud-init data CustomizationSpec---'.
format(self.args.vm_name))
# create a linux cloud-init data customizationSpec
self.createCloudinitDataSpec()
setSpec = self.vmcust_svc.SetSpec(name=self.specName, spec=None)
# customize VM with the created cloud-init data customizationSpec
self.vmcust_svc.set(vm=self.vm, spec=setSpec)
def waitForCustEvent(self, expectedEvent, timeout):
print('Waiting for customization event {} in {} seconds'.
format(expectedEvent, timeout))
currentTime = self.si.CurrentTime()
timeout = time.time() + timeout
while time.time() < timeout:
if self._findCustEvent(expectedEvent, currentTime):
return True
time.sleep(10)
raise Exception('Timeout to find expected customization event')
def powerOnAndVerifyCustomizationResult(self):
print('---power on VM {} and verify customization result---'.
format(self.args.vm_name))
self.client.vcenter.vm.Power.start(self.vm)
if self.waitForCustEvent(vim.event.CustomizationSucceeded, 900):
print("Test PASS!")
def main():
customizeVMWithCloudinitData = CustomizeVMWithCloudinitData()
customizeVMWithCloudinitData.setVM()
customizeVMWithCloudinitData.powerOnAndVerifyCustomizationResult()
if __name__ == '__main__':
main()