1
0
mirror of https://github.com/vmware/vsphere-automation-sdk-python.git synced 2024-11-21 17:29:59 -05:00

Add tests to make sure vsphere client stubs are present

Add unittests for vsphere client
Fixed some format issues
Run the tests as part of travis ci
This commit is contained in:
het 2018-09-21 13:38:40 -07:00
parent 5ba4450b3b
commit cfa9acdb88
16 changed files with 190 additions and 49 deletions

View File

@ -6,6 +6,7 @@ python:
install: install:
- pip install -r requirements.txt --extra-index-url file://$PWD/lib --upgrade --ignore-installed six - pip install -r requirements.txt --extra-index-url file://$PWD/lib --upgrade --ignore-installed six
- pip install -r test-requirements.txt - pip install -r test-requirements.txt
- pip install pycodestyle
# command to run tests # command to run tests
script: pycodestyle samples/*.py script: pycodestyle samples tests
script: pytest

View File

@ -48,7 +48,6 @@ class BackupSchedule(object):
self._schedule_id = 'test_schedule' self._schedule_id = 'test_schedule'
def setup(self): def setup(self):
parser = sample_cli.build_arg_parser() parser = sample_cli.build_arg_parser()
@ -154,5 +153,6 @@ def main():
schedule.setup() schedule.setup()
schedule.run() schedule.run()
if __name__ == '__main__': if __name__ == '__main__':
main() main()

View File

@ -42,6 +42,7 @@ UTF_8 = 'utf-8'
SHA256 = 'sha256' SHA256 = 'sha256'
SHA512 = 'sha512' SHA512 = 'sha512'
def _extract_certificate(cert): def _extract_certificate(cert):
''' '''
Extract DER certificate/private key from DER/base64-ed DER/PEM string. Extract DER certificate/private key from DER/base64-ed DER/PEM string.
@ -72,6 +73,7 @@ class SoapException(Exception):
''' '''
Exception raised in case of STS request failure. Exception raised in case of STS request failure.
''' '''
def __init__(self, soap_msg, fault_code, fault_string): def __init__(self, soap_msg, fault_code, fault_string):
''' '''
Initializer for SoapException. Initializer for SoapException.
@ -104,6 +106,7 @@ class SSOHTTPSConnection(six.moves.http_client.HTTPSConnection):
''' '''
An HTTPS class that verifies server's certificate on connect. An HTTPS class that verifies server's certificate on connect.
''' '''
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
''' '''
Initializer. See httplib.HTTPSConnection for other arguments Initializer. See httplib.HTTPSConnection for other arguments
@ -407,7 +410,8 @@ class SsoAuthenticator(object):
@rtype: C{str} @rtype: C{str}
@return: The SAML assertion. @return: The SAML assertion.
''' '''
import sspi, win32api import sspi
import win32api
spn = "sts/%s.com" % win32api.GetDomainName() spn = "sts/%s.com" % win32api.GetDomainName()
sspiclient = sspi.ClientAuth("Kerberos", targetspn=spn) sspiclient = sspi.ClientAuth("Kerberos", targetspn=spn)
in_buf = None in_buf = None
@ -467,7 +471,8 @@ class SsoAuthenticator(object):
@rtype: C{str} @rtype: C{str}
@return: The SAML assertion in Unicode. @return: The SAML assertion in Unicode.
''' '''
import kerberos, platform import kerberos
import platform
service = 'host@%s' % platform.node() service = 'host@%s' % platform.node()
_, context = kerberos.authGSSClientInit(service, 0) _, context = kerberos.authGSSClientInit(service, 0)
challenge = '' challenge = ''
@ -533,7 +538,7 @@ class SsoAuthenticator(object):
token_duration, delegatable, renewable) token_duration, delegatable, renewable)
else: else:
raise Exception("Currently, not supported on this platform") raise Exception("Currently, not supported on this platform")
## TODO Remove this exception once SSO supports validation of tickets # TODO Remove this exception once SSO supports validation of tickets
# generated against host machines # generated against host machines
# saml_token = self._get_bearer_saml_assertion_lin(request_duration, token_duration, delegatable) # saml_token = self._get_bearer_saml_assertion_lin(request_duration, token_duration, delegatable)
return saml_token return saml_token
@ -647,6 +652,7 @@ class SsoAuthenticator(object):
{'saml2': "urn:oasis:names:tc:SAML:2.0:assertion"}), {'saml2': "urn:oasis:names:tc:SAML:2.0:assertion"}),
pretty_print=False).decode(UTF_8) pretty_print=False).decode(UTF_8)
class SecurityTokenRequest(object): class SecurityTokenRequest(object):
''' '''
SecurityTokenRequest class handles the serialization of request to the STS SecurityTokenRequest class handles the serialization of request to the STS
@ -699,8 +705,7 @@ class SecurityTokenRequest(object):
self._expires = time.strftime(TIME_FORMAT, self._expires = time.strftime(TIME_FORMAT,
time.gmtime(current + token_duration)) time.gmtime(current + token_duration))
self._request_expires = time.strftime(TIME_FORMAT, self._request_expires = time.strftime(TIME_FORMAT,
time.gmtime(current + time.gmtime(current + request_duration))
request_duration))
self._timestamp = TIMESTAMP_TEMPLATE % self.__dict__ self._timestamp = TIMESTAMP_TEMPLATE % self.__dict__
self._username = escape(username) if username else username self._username = escape(username) if username else username
self._password = escape(password) if password else password self._password = escape(password) if password else password
@ -942,15 +947,17 @@ def _load_private_key(der_key):
for key_type in ('PRIVATE KEY', 'RSA PRIVATE KEY'): for key_type in ('PRIVATE KEY', 'RSA PRIVATE KEY'):
try: try:
return crypto.load_privatekey(crypto.FILETYPE_PEM, return crypto.load_privatekey(crypto.FILETYPE_PEM,
'-----BEGIN ' + key_type + '-----\n' + '-----BEGIN {}-----\n{}-----END {}-----\n'.format(
base64.encodestring(der_key).decode(UTF_8) + key_type,
'-----END ' + key_type + '-----\n', base64.encodestring(der_key).decode(UTF_8),
key_type),
b'') b'')
except (crypto.Error, ValueError): except (crypto.Error, ValueError):
pass pass
# We could try 'ENCRYPTED PRIVATE KEY' here - but we do not know passphrase. # We could try 'ENCRYPTED PRIVATE KEY' here - but we do not know passphrase.
raise raise
def _sign(private_key, data, digest=SHA256): def _sign(private_key, data, digest=SHA256):
''' '''
An internal helper method to sign the 'data' with the 'private_key'. An internal helper method to sign the 'data' with the 'private_key'.
@ -972,6 +979,7 @@ def _sign(private_key, data, digest=SHA256):
pkey = _load_private_key(_extract_certificate(private_key)) pkey = _load_private_key(_extract_certificate(private_key))
return base64.b64encode(crypto.sign(pkey, data, digest)) return base64.b64encode(crypto.sign(pkey, data, digest))
def _canonicalize(xml_string): def _canonicalize(xml_string):
''' '''
Given an xml string, canonicalize the string per Given an xml string, canonicalize the string per
@ -989,6 +997,7 @@ def _canonicalize(xml_string):
tree.write_c14n(string, exclusive=True, with_comments=False) tree.write_c14n(string, exclusive=True, with_comments=False)
return string.getvalue().decode(UTF_8) return string.getvalue().decode(UTF_8)
def _extract_element(xml, element_name, namespace): def _extract_element(xml, element_name, namespace):
''' '''
An internal method provided to extract an element from the given XML. An internal method provided to extract an element from the given XML.

View File

@ -19,7 +19,7 @@ from samples.vsphere.common.vim.inventory import get_datastore_mo
from samples.vsphere.common.vim import datastore_file from samples.vsphere.common.vim import datastore_file
datastore_path_regex = re.compile('\[(.+)\]\s?(.*)') datastore_path_regex = re.compile(br'\[(.+)\]\s?(.*)')
def parse_datastore_path(datastore_path): def parse_datastore_path(datastore_path):

View File

@ -128,5 +128,6 @@ def main():
log_forwarding.setup() log_forwarding.setup()
log_forwarding.run() log_forwarding.run()
if __name__ == '__main__': if __name__ == '__main__':
main() main()

View File

@ -25,6 +25,7 @@ from samples.vsphere.common.ssl_helper import get_unverified_session
from samples.vsphere.common import sample_cli from samples.vsphere.common import sample_cli
from samples.vsphere.common import sample_util from samples.vsphere.common import sample_util
class ListServices(object): class ListServices(object):
""" """
Demonstrates the details of vCenter Services Demonstrates the details of vCenter Services
@ -35,7 +36,6 @@ class ListServices(object):
- vCenter Server - vCenter Server
""" """
def __init__(self): def __init__(self):
# Create argument parser for standard inputs: # Create argument parser for standard inputs:
# server, username, password and skipverification # server, username, password and skipverification
@ -51,6 +51,7 @@ class ListServices(object):
username=args.username, username=args.username,
password=args.password, password=args.password,
session=session) session=session)
def run(self): def run(self):
services_list = self.client.vcenter.services.Service.list_details() services_list = self.client.vcenter.services.Service.list_details()
table = [] table = []
@ -64,9 +65,11 @@ class ListServices(object):
headers = ["Service Name", "Service Name Key", "Service Health", "Service Status", "Service Startup Type"] headers = ["Service Name", "Service Name Key", "Service Health", "Service Status", "Service Startup Type"]
print(tabulate(table, headers)) print(tabulate(table, headers))
def main(): def main():
list_services = ListServices() list_services = ListServices()
list_services.run() list_services.run()
if __name__ == '__main__': if __name__ == '__main__':
main() main()

View File

@ -62,6 +62,7 @@ def setup(context=None):
password=password, password=password,
session=session) session=session)
def run(): def run():
global vm global vm
vm = get_vm(client, vm_name) vm = get_vm(client, vm_name)

View File

@ -63,6 +63,7 @@ def setup(context=None):
password=password, password=password,
session=session) session=session)
def run(): def run():
global vm global vm
vm = get_vm(client, vm_name) vm = get_vm(client, vm_name)

View File

@ -58,6 +58,7 @@ def setup(context=None):
password=password, password=password,
session=session) session=session)
def run(): def run():
global vm global vm
vm = get_vm(client, vm_name) vm = get_vm(client, vm_name)

View File

@ -59,6 +59,7 @@ def setup(context=None):
password=password, password=password,
session=session) session=session)
def run(): def run():
global vm global vm
vm = get_vm(client, vm_name) vm = get_vm(client, vm_name)

View File

@ -59,6 +59,7 @@ def setup(context=None):
password=password, password=password,
session=session) session=session)
def run(): def run():
# * Floppy images must be pre-existing. This API does not expose # * Floppy images must be pre-existing. This API does not expose
# a way to create new floppy images. # a way to create new floppy images.

View File

@ -58,6 +58,7 @@ def setup(context=None):
password=password, password=password,
session=session) session=session)
def run(): def run():
global vm global vm
vm = get_vm(client, vm_name) vm = get_vm(client, vm_name)

View File

@ -1,2 +1,2 @@
[pycodestyle] [pycodestyle]
ignore = E402, E501, E122, E126, E127, E128, E129, E131 ignore = E402, E501, E122, E126, E127, E128, E129, E131, W503, W504

View File

@ -1 +1,3 @@
testtools>=0.9.34 pytest
pycodestyle

View File

@ -0,0 +1,119 @@
#!/usr/bin/env python
"""
* *******************************************************
* Copyright (c) VMware, Inc. 2018. All Rights Reserved.
* SPDX-License-Identifier: MIT
* *******************************************************
*
* DISCLAIMER. THIS PROGRAM IS PROVIDED TO YOU "AS IS" WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, WHETHER ORAL OR WRITTEN,
* EXPRESS OR IMPLIED. THE AUTHOR SPECIFICALLY DISCLAIMS ANY IMPLIED
* WARRANTIES OR CONDITIONS OF MERCHANTABILITY, SATISFACTORY QUALITY,
* NON-INFRINGEMENT AND FITNESS FOR A PARTICULAR PURPOSE.
"""
__author__ = 'VMware, Inc.'
import requests
from vmware.vapi.bindings.stub import ApiClient, StubFactoryBase
from vmware.vapi.lib.connect import get_requests_connector
from vmware.vapi.stdlib.client.factories import StubConfigurationFactory
from vmware.vapi.vsphere.client import StubFactory
stub_config = StubConfigurationFactory.new_std_configuration(
get_requests_connector(session=requests.session(), url='https://localhost/vapi'))
stub_factory = StubFactory(stub_config)
client = ApiClient(stub_factory)
def test_vcenter_client():
assert hasattr(client, 'vcenter')
assert isinstance(client.vcenter, StubFactoryBase)
def test_cluster_client():
assert hasattr(client.vcenter, 'Cluster')
def test_datacenter_client():
assert hasattr(client.vcenter, 'Datacenter')
def test_datastore_client():
assert hasattr(client.vcenter, 'Datastore')
def test_deployment_client():
assert hasattr(client.vcenter, 'Deployment')
def test_configuration_client():
assert hasattr(client.content, 'Configuration')
def test_appliance_client():
assert hasattr(client, 'appliance')
assert isinstance(client.appliance, StubFactoryBase)
def test_content_client():
assert hasattr(client, 'content')
assert isinstance(client.content, StubFactoryBase)
def test_tagging_client():
assert hasattr(client, 'tagging')
assert isinstance(client.tagging, StubFactoryBase)
def test_ovf_client():
assert hasattr(client.vcenter, 'ovf')
assert isinstance(client.vcenter.ovf, StubFactoryBase)
def test_hvc_client():
assert hasattr(client.vcenter, 'hvc')
assert isinstance(client.vcenter.hvc, StubFactoryBase)
def test_inventory_client():
assert hasattr(client.vcenter, 'inventory')
assert isinstance(client.vcenter.inventory, StubFactoryBase)
def test_iso_client():
assert hasattr(client.vcenter, 'iso')
assert isinstance(client.vcenter.iso, StubFactoryBase)
def test_ovf_client():
assert hasattr(client.vcenter, 'ovf')
assert isinstance(client.vcenter.ovf, StubFactoryBase)
def test_vm_template_client():
assert hasattr(client.vcenter, 'vm_template')
assert isinstance(client.vcenter.vm_template, StubFactoryBase)
def test_appliance_update_client():
assert hasattr(client.appliance, 'recovery')
assert isinstance(client.appliance.recovery, StubFactoryBase)
def test_appliance_vmon_client():
assert hasattr(client.appliance, 'vmon')
assert isinstance(client.appliance.vmon, StubFactoryBase)
def test_compute_policy_client():
assert hasattr(client.vcenter, 'compute')
assert isinstance(client.vcenter.compute, StubFactoryBase)
def test_vm_compute_policy_client():
assert hasattr(client.vcenter.vm, 'compute')
assert isinstance(client.vcenter.vm.compute, StubFactoryBase)