1
0
mirror of https://github.com/vmware/vsphere-automation-sdk-python.git synced 2024-11-21 17:29:59 -05:00

adding oauth samples

This commit is contained in:
anushah 2020-10-12 12:01:02 +05:30
parent 009074c8fa
commit d7d2a1d364
8 changed files with 718 additions and 0 deletions

View File

@ -0,0 +1,54 @@
## Grant types available
| sample | grant_type |
| ------ | ------ |
| list_vms_authotization_code.py | authorization_code |
| list_vms_client_credentials.py | client_credentials |
| list_vms_refresh_token.py | refresh_token |
| list_vms_password.py | password |
## Login Steps
1. From a given VC IP/hostname, find the Identity Provider (sample available at [list_external_identity_providers.py](https://github.com/vmware/vsphere-automation-sdk-python/blob/master/samples/vsphere/oauth/list_external_identity_providers.py))
2. Make a note of the auth/discovery/token endpoints from the identity provider object
3. Get access token by making the call to endpoints based on parameters relevant to different grant types
4. Convert access token to saml token (sample avaialble at [exchange_access_id_token_for_saml.py](https://github.com/vmware/vsphere-automation-sdk-python/blob/master/samples/vsphere/oauth/exchange_access_id_token_for_saml.py))
5. Use this saml assertion to login to vCenter as a bearer token
## Executing the samples
vCenter needs to be registered with an Identity Provider. Applicable for VC 7.0+
### list_vms_authorization_code.py
Create an OAuth app and make a note of the *app_id*, *app_secret* and *redirect_uri*
First start the webserver code at [webserver.py](https://github.com/vmware/vsphere-automation-sdk-python/blob/master/samples/vsphere/oauth/grant_types/webserver.py). Note, this server is not recommended in a production setting, this is only to demonstarte the sample workflow
`$ python3 webserver.py`
Run the sample,
`$ python list_vms_authorization_code.py --server <VC_IP> --client_id <client_id> --client_secret <client_secret> --org_id <org_id> --skipverification`
### list_vms_client_credentials.py
Create an OAuth app and make a note of the *client_id* and *client_secret*
Run the sample,
`$ python list_vms_client_credentials.py --server <VC_IP> -- client_id <client_id> --client_secret <client_secret> --skipverification`
### list_vms_refresh_token.py
Use the *refresh_token* that was returned along with the access token in authorization_code workflow
Run the sample,
`$ python list_vms_refresh_token.py --server <VC_IP> --client_id <client_id> --client_secret <client_secret> --refresh_token <refresh_token> --skipverification`
### list_vms_password.py
Obtain access token using *username* and *password*
Run the sample,
`$ python list_vms_password --server <VC_IP> --username <username> --password <password> --skipverification`
## References
[Understanding vCenter Server Identity Provider Federation](https://docs.vmware.com/en/VMware-vSphere/7.0/com.vmware.vsphere.authentication.doc/GUID-0A3A19E6-150A-493B-8B57-37E19AB420F2.html)

View File

@ -0,0 +1,25 @@
"""
* *******************************************************
* Copyright VMware, Inc. 2020. All Rights Reserved.
* SPDX-License-Identifier: MIT
* *******************************************************
*
* DISCLAIMER. THIS PROGRAM IS PROVIDED TO YOU "AS IS" WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, WHETHER ORAL OR WRITTEN,
* EXPRESS OR IMPLIED. THE AUTHOR SPECIFICALLY DISCLAIMS ANY IMPLIED
* WARRANTIES OR CONDITIONS OF MERCHANTABILITY, SATISFACTORY QUALITY,
* NON-INFRINGEMENT AND FITNESS FOR A PARTICULAR PURPOSE.
"""
__author__ = 'VMware, Inc.'
# Required to distribute different parts of this
# package as multiple distribution
try:
import pkg_resources
pkg_resources.declare_namespace(__name__)
except ImportError:
from pkgutil import extend_path
__path__ = extend_path(__path__, __name__) # @ReservedAssignment

View File

@ -0,0 +1,103 @@
#!/usr/bin/env python
"""
* *******************************************************
* Copyright (c) VMware, Inc. 2020. All Rights Reserved.
* SPDX-License-Identifier: MIT
* *******************************************************
*
* DISCLAIMER. THIS PROGRAM IS PROVIDED TO YOU "AS IS" WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, WHETHER ORAL OR WRITTEN,
* EXPRESS OR IMPLIED. THE AUTHOR SPECIFICALLY DISCLAIMS ANY IMPLIED
* WARRANTIES OR CONDITIONS OF MERCHANTABILITY, SATISFACTORY QUALITY,
* NON-INFRINGEMENT AND FITNESS FOR A PARTICULAR PURPOSE.
"""
from vmware.vapi.vsphere.client import create_vsphere_client
from samples.vsphere.common.ssl_helper import get_unverified_session
from samples.vsphere.oauth.grant_types.oauth_utility \
import login_using_authorization_code
from urllib.parse import parse_qs
import webbrowser
import urllib.parse as urlparse
import requests
import uuid
import argparse
__author__ = 'VMware, Inc.'
__copyright__ = 'Copyright 2020 VMware, Inc. All rights reserved.'
__vcenter_version__ = '7.0+'
"""
To run this sample,
In a different tab, keep the webserver running,
$ python webserver.py
Then execute the following
$ python list_vms_authorization_code.py --server <VC_IP> \
--client_id <client_id> --client_secret <client_secret> \
--org_id <org_id> --skipverification
"""
parser = argparse.ArgumentParser()
parser.add_argument("--server",
help="VC IP or hostname")
parser.add_argument("--client_id",
help="Client/Application ID of the webapp")
parser.add_argument("--client_secret",
help="Client/Application secret \
of the webapp")
parser.add_argument("--redirect_uri",
help="Redirect uri \
given at the time of client registration")
parser.add_argument('--skipverification',
action='store_true',
help='Verify server certificate when connecting to vc.')
args = parser.parse_args()
def get_auth_code_and_state(url):
openbrowser(url)
parsed = urlparse.urlparse(url)
redirect_uri = parse_qs(parsed.query)['redirect_uri']
get_code_uri = redirect_uri[0].rsplit('/', 1)[0]
get_code_uri = get_code_uri + "/getcode"
response = get_response(get_code_uri)
while "code" not in response or response == '':
response = get_response(get_code_uri)
res = response.split(':')
code = res[1]
state = res[3]
return [code, state]
def openbrowser(url):
webbrowser.open(url)
pass
def get_response(url):
response = requests.get(url)
return response.text
session = get_unverified_session() if args.skipverification else None
saml_assertion = login_using_authorization_code(
args.server,
session,
args.client_id,
args.client_secret,
args.redirect_uri,
get_auth_code_and_state)
client = create_vsphere_client(
server=args.server,
bearer_token=saml_assertion,
session=session)
vms = client.vcenter.VM.list()
print(vms)

View File

@ -0,0 +1,60 @@
#!/usr/bin/env python
"""
* *******************************************************
* Copyright (c) VMware, Inc. 2020. All Rights Reserved.
* SPDX-License-Identifier: MIT
* *******************************************************
*
* DISCLAIMER. THIS PROGRAM IS PROVIDED TO YOU "AS IS" WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, WHETHER ORAL OR WRITTEN,
* EXPRESS OR IMPLIED. THE AUTHOR SPECIFICALLY DISCLAIMS ANY IMPLIED
* WARRANTIES OR CONDITIONS OF MERCHANTABILITY, SATISFACTORY QUALITY,
* NON-INFRINGEMENT AND FITNESS FOR A PARTICULAR PURPOSE.
"""
from vmware.vapi.vsphere.client import create_vsphere_client
from samples.vsphere.common import sample_cli
from samples.vsphere.common import sample_util
from samples.vsphere.common.ssl_helper import get_unverified_session
from samples.vsphere.oauth.grant_types.oauth_utility \
import login_using_client_credentials
import argparse
__author__ = 'VMware, Inc.'
__copyright__ = 'Copyright 2020 VMware, Inc. All rights reserved.'
__vcenter_version__ = '7.0+'
"""
To run this sample,
$ python list_vms_client_credentials.py --server <VC_IP> \
-- client_id <client_id> --client_secret <client_secret> --skipverification
"""
parser = argparse.ArgumentParser()
parser.add_argument("--server",
help="VC IP or hostname")
parser.add_argument("--client_id",
help="Client/Application ID of the server to server app")
parser.add_argument("--client_secret",
help="Client/Application secret \
of the server to server app")
parser.add_argument('--skipverification',
action='store_true',
help='Verify server certificate when connecting to vc.')
args = parser.parse_args()
session = get_unverified_session() if args.skipverification else None
saml_assertion = login_using_client_credentials(
args.server,
session,
args.client_id,
args.client_secret)
client = create_vsphere_client(
server=args.server,
bearer_token=saml_assertion,
session=session)
vms = client.vcenter.VM.list()
print(vms)

View File

@ -0,0 +1,61 @@
#!/usr/bin/env python
"""
* *******************************************************
* Copyright (c) VMware, Inc. 2020. All Rights Reserved.
* SPDX-License-Identifier: MIT
* *******************************************************
*
* DISCLAIMER. THIS PROGRAM IS PROVIDED TO YOU "AS IS" WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, WHETHER ORAL OR WRITTEN,
* EXPRESS OR IMPLIED. THE AUTHOR SPECIFICALLY DISCLAIMS ANY IMPLIED
* WARRANTIES OR CONDITIONS OF MERCHANTABILITY, SATISFACTORY QUALITY,
* NON-INFRINGEMENT AND FITNESS FOR A PARTICULAR PURPOSE.
"""
from vmware.vapi.vsphere.client import create_vsphere_client
from samples.vsphere.common import sample_cli
from samples.vsphere.common import sample_util
from samples.vsphere.common.ssl_helper import get_unverified_session
from samples.vsphere.oauth.grant_types.oauth_utility \
import login_using_password
import argparse
__author__ = 'VMware, Inc.'
__copyright__ = 'Copyright 2020 VMware, Inc. All rights reserved.'
__vcenter_version__ = '7.0+'
"""
To run this sample,
$ python list_vms_password --server <VC_IP> \
--username <username> --password <password> --skipverification
"""
parser = argparse.ArgumentParser()
parser.add_argument("--server",
help="VC IP or hostname")
parser.add_argument("--username",
help="username to login \
to vCenter")
parser.add_argument("--password",
help="password to login \
to vCenter")
parser.add_argument('--skipverification',
action='store_true',
help='Verify server certificate when connecting to vc.')
args = parser.parse_args()
session = get_unverified_session() if args.skipverification else None
saml_assertion = login_using_password(
args.server,
session,
args.username,
args.password)
client = create_vsphere_client(
server=args.server,
bearer_token=saml_assertion,
session=session)
vms = client.vcenter.VM.list()
print(vms)

View File

@ -0,0 +1,64 @@
#!/usr/bin/env python
"""
* *******************************************************
* Copyright (c) VMware, Inc. 2020. All Rights Reserved.
* SPDX-License-Identifier: MIT
* *******************************************************
*
* DISCLAIMER. THIS PROGRAM IS PROVIDED TO YOU "AS IS" WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, WHETHER ORAL OR WRITTEN,
* EXPRESS OR IMPLIED. THE AUTHOR SPECIFICALLY DISCLAIMS ANY IMPLIED
* WARRANTIES OR CONDITIONS OF MERCHANTABILITY, SATISFACTORY QUALITY,
* NON-INFRINGEMENT AND FITNESS FOR A PARTICULAR PURPOSE.
"""
from vmware.vapi.vsphere.client import create_vsphere_client
from samples.vsphere.common import sample_cli
from samples.vsphere.common import sample_util
from samples.vsphere.common.ssl_helper import get_unverified_session
from samples.vsphere.oauth.grant_types.oauth_utility \
import login_using_refresh_token
import argparse
__author__ = 'VMware, Inc.'
__copyright__ = 'Copyright 2020 VMware, Inc. All rights reserved.'
__vcenter_version__ = '7.0+'
"""
To run this sample,
$ python list_vms_refresh_token.py \
--server <VC_IP> --client_id <client_id> --client_secret <client_secret>\
--refresh_token <refresh_token> --skipverification
"""
parser = argparse.ArgumentParser()
parser.add_argument("--server",
help="VC IP or hostname")
parser.add_argument("--client_id",
help="Client/Application ID of the server to server app")
parser.add_argument("--client_secret",
help="Client/Application secret \
of the server to server app")
parser.add_argument("--refresh_token",
help="Refresh token used to refresh the access token")
parser.add_argument('--skipverification',
action='store_true',
help='Verify server certificate when connecting to vc.')
args = parser.parse_args()
session = get_unverified_session() if args.skipverification else None
saml_assertion = login_using_refresh_token(
args.server,
session,
args.client_id,
args.client_secret,
args.refresh_token)
client = create_vsphere_client(
server=args.server,
bearer_token=saml_assertion,
session=session)
vms = client.vcenter.VM.list()
print(vms)

View File

@ -0,0 +1,218 @@
from vmware.vapi.stdlib.client.factories import StubConfigurationFactory
from vmware.vapi.lib.connect import get_requests_connector
from com.vmware.vcenter.identity_client import Providers
from com.vmware.vcenter.tokenservice_client import TokenExchange
from vmware.vapi.security.oauth import create_oauth_security_context
import base64
from lxml import etree
import uuid
# Constants
HTTP_ENDPOINT = "https://{}/api"
AUTHORIZATION_CODE = "authorization_code"
CLIENT_CREDENTIALS = "client_credentials"
REFRESH_TOKEN = "refresh_token"
PASSWORD = "password"
OAUTH2_CONFIG_TYPE = "oauth2"
OIDC_CONFIG_TYPE = "oidc"
def get_identity_provider(server, session):
'''
Get the identity provider for the given vc/server
Sample can be found at
https://github.com/vmware/vsphere-automation-sdk-python/blob/master/samples/vsphere/oauth/list_external_identity_providers.py
'''
stub_config = StubConfigurationFactory.new_std_configuration(
get_requests_connector(
session=session,
url=HTTP_ENDPOINT.format(
server)))
id_client = Providers(stub_config)
providers = id_client.list()
identity_provider = ""
for provider in providers:
if provider.is_default:
identity_provider = provider
break
return identity_provider
def get_saml_assertion(server, session, access_token, id_token=None):
"""
Exchange access token to saml token to connect to VC
Sample can be found at
https://github.com/vmware/vsphere-automation-sdk-python/blob/master/samples/vsphere/oauth/exchange_access_id_token_for_saml.py
"""
stub_config = StubConfigurationFactory.new_std_configuration(
get_requests_connector(
session=session,
url=HTTP_ENDPOINT.format(server)
)
)
oauth_security_context = create_oauth_security_context(access_token)
stub_config.connector.set_security_context(oauth_security_context)
token_exchange = TokenExchange(stub_config)
exchange_spec = token_exchange.ExchangeSpec(
grant_type=token_exchange.TOKEN_EXCHANGE_GRANT,
subject_token_type=token_exchange.ACCESS_TOKEN_TYPE,
actor_token_type=token_exchange.ID_TOKEN_TYPE,
requested_token_type=token_exchange.SAML2_TOKEN_TYPE,
actor_token=id_token, subject_token=access_token)
response = token_exchange.exchange(exchange_spec)
saml_token = response.access_token
# convert saml token to saml assertion
samlAssertion = etree.tostring(
etree.XML(base64.decodebytes(
bytes(saml_token, 'utf-8')
))
).decode('utf-8')
return samlAssertion
def get_endpoints(identity_provider):
"""
Extract different ednpoints from the identity provider object
Note that the endpoint naming convention might vary for different providers
Currently, support is provided for
oauth2 -> Cloud Service Provider (CSP)
oidc -> Microssoft ADFS
"""
if identity_provider.auth_query_params is not None:
auth_query_params = identity_provider.auth_query_params
else:
auth_query_params = {}
if identity_provider.config_tag.lower() == OAUTH2_CONFIG_TYPE:
auth_endpoint = identity_provider.oauth2.auth_endpoint
token_endpoint = identity_provider.oauth2.token_endpoint
auth_query_params.update(identity_provider.oauth2.auth_query_params)
if identity_provider.config_tag.lower() == OIDC_CONFIG_TYPE:
auth_endpoint = identity_provider.oidc.discovery_endpoint
token_endpoint = identity_provider.oidc.auth_endpoint
auth_query_params.update(identity_provider.oidc.auth_query_params)
return [auth_endpoint, token_endpoint, auth_query_params]
def get_basic_auth_string(id, secret):
"""
Return authorization string
"""
auth_string = id + ":" + secret
auth_string = "Basic " + base64.b64encode(auth_string.encode()).decode()
return auth_string
def login_using_client_credentials(server, session, client_id, client_secret):
"""
Get access token when grant_type is client_credentials
"""
identity_provider = get_identity_provider(server, session)
[discovery_endpoint, token_endpoint, auth_query_params] = \
get_endpoints(identity_provider)
headers = {
'Content-Type': 'application/x-www-form-urlencoded',
'Authorization': get_basic_auth_string(client_id, client_secret),
'Accept': 'application/json'
}
data = {
'grant_type': CLIENT_CREDENTIALS
}
response = session.post(token_endpoint, headers=headers, data=data).json()
access_token = response['access_token']
return get_saml_assertion(server, session, access_token)
def login_using_authorization_code(
server,
session,
client_id,
client_secret,
redirect_uri,
callback):
"""
Get access token when grant_type is authorization_code
"""
identity_provider = get_identity_provider(server, session)
[auth_endpoint, token_endpoint, auth_query_params] = \
get_endpoints(identity_provider)
state = uuid.uuid1()
auth_endpoint += "?client_id=" + client_id + "&redirect_uri=" + \
redirect_uri + "&state=" + str(state)
for key, value in auth_query_params.items():
auth_endpoint += "&" + key + "="
if isinstance(value, list):
auth_endpoint += value[0]
[code, state] = callback(auth_endpoint)
headers = {
"Content-Type": "application/x-www-form-urlencoded",
"Authorization": get_basic_auth_string(client_id, client_secret),
"Accept": "application/json"
}
data = {
"grant_type": AUTHORIZATION_CODE,
"client_id": client_id,
"client_secret": client_secret,
"redirect_uri": redirect_uri,
"code": code,
"state": state
}
response = session.post(token_endpoint, headers=headers, data=data).json()
access_token = response['access_token']
return get_saml_assertion(server, session, access_token)
def login_using_refresh_token(
server,
session,
client_id,
client_secret,
refresh_token):
"""
Get access token when grant_type is refresh_token
"""
identity_provider = get_identity_provider(server, session)
[auth_endpoint, token_endpoint, auth_query_params] = \
get_endpoints(identity_provider)
headers = {
"Content-Type": "application/x-www-form-urlencoded",
"Authorization": get_basic_auth_string(client_id, client_secret),
"Accept": "application/json"
}
data = {
"grant_type": REFRESH_TOKEN,
"refresh_token": refresh_token
}
response = session.post(token_endpoint, headers=headers, data=data).json()
access_token = response['access_token']
return get_saml_assertion(server, session, access_token)
def login_using_password(server, session, username, password):
"""
Get access token when grant_type is password
"""
identity_provider = get_identity_provider(server, session)
[auth_endpoint, token_endpoint, auth_query_params] = \
get_endpoints(identity_provider)
headers = {
"Content-Type": "application/x-www-form-urlencoded",
"Authorization": get_basic_auth_string(username, password),
"Accept": "application/json"
}
data = {
"grant_type": PASSWORD,
"username": username,
"password": password
}
response = session.post(token_endpoint, headers=headers, data=data).json()
print(response)
access_token = response['access_token']
return get_saml_assertion(server, session, access_token)

View File

@ -0,0 +1,133 @@
'''
This is a lightweight webserver
****Not recommended in a production setting****
Before you run the oauth samples, this server needs to be up.
Make sure to start it before trying out the samples
Or start it as a daemon process
We define listeners for two endpoints,
1. /getcode -> Endpoint to fetch the 'code' and 'state' variable
It is a GET request
Once the response is returned,
the variables need to be reassigned with ''
or None, to avoid inconsistent values
2. /authcode -> Redirect endpoint which will be called by the CSP server
It is a GET request
code and state are the request params
e.g., /authcode?code=xxxx&state=xxxxx
In case, you want to change the names of these endpoints in your client,
make sure to change in the below server code as well
'''
try:
# these imports are specific to Python 2.x
from BaseHTTPServer import BaseHTTPRequestHandler
import SocketServer
from urlparse import urlparse
except ImportError:
# these imports are specific to Python 3.x
from http.server import BaseHTTPRequestHandler, HTTPServer
from urllib.parse import urlparse
import argparse
import socket
import json
PORT = ''
AUTHCODE = '/authcode'
code = state = ''
hostname = socket.gethostname()
IPAddr = "127.0.0.1"
class SimpleHTTPRequestHandler(BaseHTTPRequestHandler):
'''
This class defines the handlers for the incoming GET requests
'''
def _set_headers(self):
self.send_response(200)
self.send_header('Content-type', 'text/html')
self.end_headers()
def do_GET(self):
global code, state, AUTHCODE
print(self.path)
'''
defining multiple GET endpoints is not very elegant,
the very reason why you shouldn't use this in a production setting!
below are the listeners for /getcode and /authcode
'''
if self.path == '/getcode':
self._set_headers()
print('call to getcode')
print(code)
print(state)
if code != '' and state != '':
'''
the response is defined to be in this format
code:xxxx:state:xxxx
the client sample assumes the response to be in this format,
any change to the response format,
will need changes in the client code response parser
'''
res = "code:" + code + ":state:" + state
self.wfile.write(res.encode('utf-8'))
# code and state variables need to be reset
code = state = ''
elif self.path.startswith(AUTHCODE, 0):
print("call to authcode")
global IPAddr, PORT
redirect_url = "http://" + IPAddr + ":" + str(PORT) + "/authcode"
print("Redirect URL: " + redirect_url)
self._set_headers()
query = urlparse(self.path).query
# CSP always sends request in this format
# /authcode?code=xxxx&state=xxxxx
query = query.split('&')
param_code = query[0].split('=')
code = param_code[1]
param_state = query[1].split('=')
state = param_state[1]
print("code: ", code)
print("state: ", state)
self.wfile.write(b'Code and state variables are set,\
you may now close the browser tab')
else:
pass
def parse_args():
parser = argparse.ArgumentParser(description='Input port and pathname')
# port number by default will be 8080
parser.add_argument(
'--port',
dest='port',
default=8086,
help='webserver port')
args = parser.parse_args()
return args
if __name__ == '__main__':
args = parse_args()
PORT = int(args.port)
try:
httpd = SocketServer.TCPServer(("", PORT), SimpleHTTPRequestHandler)
except Exception as e:
httpd = HTTPServer(("", PORT), SimpleHTTPRequestHandler)
httpd.serve_forever()