1
0
mirror of https://github.com/vmware/vsphere-automation-sdk-python.git synced 2024-11-22 01:39:58 -05:00

vSphere 6.7 Release

Signed-off-by: Tianhao He <het@vmware.com>
This commit is contained in:
Tianhao He 2018-04-17 11:01:27 -07:00
parent 1ea926ad45
commit f1032b4e43
17 changed files with 860 additions and 2 deletions

Binary file not shown.

Binary file not shown.

View File

@ -1 +1 @@
<a href='vapi_client_bindings-1.1.0-py2.py3-none-any.whl'>vapi_client_bindings-1.1.0-py2.py3-none-any.whl</a><br />
<a href='vapi_client_bindings-1.2.0-py2.py3-none-any.whl'>vapi_client_bindings-1.2.0-py2.py3-none-any.whl</a><br />

View File

@ -2,6 +2,7 @@ pyVmomi >= 6.5
suds ; python_version < '3'
suds-jurko ; python_version >= '3.0'
tabulate
vapi-client-bindings == 1.1.0
vapi-client-bindings == 1.2.0
vmc-client-bindings == 1.1.0
vapi-vmc-client

View File

@ -0,0 +1,16 @@
This directory contains samples for Backup Restore APIs:
* List all backup jobs
* CRUD operations on backup schedule
Running the samples
$ python <sample-dir>/<sample>.py --server <vCenter Server IP> --username <username> --password <password> <additional-sample-parameters>
The additional sample parameters are as follows (all parameters can be displayed for any sample using option --help)
* backup_schedule.py --location <location URL> --location_user <location-user> location_password <location-password>
* Testbed Requirement:
- 1 vCenter Server
- Backup server reachable through any of the supported protocols FTP/FTPS/SCP/HTTP/HTTPS

View File

@ -0,0 +1,26 @@
"""
* *******************************************************
* Copyright VMware, Inc. 2017. All Rights Reserved.
* SPDX-License-Identifier: MIT
* *******************************************************
*
* DISCLAIMER. THIS PROGRAM IS PROVIDED TO YOU "AS IS" WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, WHETHER ORAL OR WRITTEN,
* EXPRESS OR IMPLIED. THE AUTHOR SPECIFICALLY DISCLAIMS ANY IMPLIED
* WARRANTIES OR CONDITIONS OF MERCHANTABILITY, SATISFACTORY QUALITY,
* NON-INFRINGEMENT AND FITNESS FOR A PARTICULAR PURPOSE.
"""
__author__ = 'VMware, Inc.'
__copyright__ = 'Copyright 2017 VMware, Inc. All rights reserved.'
# Required to distribute different parts of this
# package as multiple distribution
try:
import pkg_resources
pkg_resources.declare_namespace(__name__)
except ImportError:
from pkgutil import extend_path
__path__ = extend_path(__path__, __name__) # @ReservedAssignment

View File

@ -0,0 +1,77 @@
#!/usr/bin/env python
"""
* *******************************************************
* Copyright (c) VMware, Inc. 2017. All Rights Reserved.
* SPDX-License-Identifier: MIT
* *******************************************************
*
* DISCLAIMER. THIS PROGRAM IS PROVIDED TO YOU "AS IS" WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, WHETHER ORAL OR WRITTEN,
* EXPRESS OR IMPLIED. THE AUTHOR SPECIFICALLY DISCLAIMS ANY IMPLIED
* WARRANTIES OR CONDITIONS OF MERCHANTABILITY, SATISFACTORY QUALITY,
* NON-INFRINGEMENT AND FITNESS FOR A PARTICULAR PURPOSE.
"""
__author__ = 'VMware, Inc.'
__copyright__ = 'Copyright 2017 VMware, Inc. All rights reserved.'
__vcenter_version__ = '6.7+'
from tabulate import tabulate
from samples.vsphere.common import sample_cli
from samples.vsphere.common import sample_util
from samples.vsphere.common import vapiconnect
from com.vmware.appliance.recovery.backup.job_client import Details
class BackupJobList(object):
"""
Demonstrates backup job list operation
Retrieves backup job details from vCenter and prints the data in
tabular format
Prerequisites:
- vCenter
- Backup operation is performed on the vCenter either manually or
by scheduled backups
"""
def __init__(self):
self.stub_config = None
def setup(self):
parser = sample_cli.build_arg_parser()
args = sample_util.process_cli_args(parser.parse_args())
# Connect to vAPI services
self.stub_config = vapiconnect.connect(
host=args.server,
user=args.username,
pwd=args.password,
skip_verification=args.skipverification)
def run(self):
details_client = Details(self.stub_config)
job_list = details_client.list()
table = []
for info in job_list.itervalues():
row = [info.start_time.strftime("%b %d %Y %H:%M"),
info.duration,
info.type,
info.status,
info.location]
table.append(row)
headers = ["Start time", "Duration", "Type", "Status", "Location"]
print(tabulate(table, headers))
def main():
backup_job_list = BackupJobList()
backup_job_list.setup()
backup_job_list.run()
if __name__ == '__main__':
main()

View File

@ -0,0 +1,158 @@
#!/usr/bin/env python
"""
* *******************************************************
* Copyright (c) VMware, Inc. 2017. All Rights Reserved.
* SPDX-License-Identifier: MIT
* *******************************************************
*
* DISCLAIMER. THIS PROGRAM IS PROVIDED TO YOU "AS IS" WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, WHETHER ORAL OR WRITTEN,
* EXPRESS OR IMPLIED. THE AUTHOR SPECIFICALLY DISCLAIMS ANY IMPLIED
* WARRANTIES OR CONDITIONS OF MERCHANTABILITY, SATISFACTORY QUALITY,
* NON-INFRINGEMENT AND FITNESS FOR A PARTICULAR PURPOSE.
"""
__author__ = 'VMware, Inc.'
__copyright__ = 'Copyright 2017 VMware, Inc. All rights reserved.'
__vcenter_version__ = '6.7+'
from samples.vsphere.common import sample_cli
from samples.vsphere.common import sample_util
from samples.vsphere.common import vapiconnect
from tabulate import tabulate
from com.vmware.appliance.recovery.backup_client import Schedules
class BackupSchedule(object):
"""
Demonstrates backup schedule operations
Prerequisites:
- vCenter
- Backup server (ftp/ftps/http/https/scp)
"""
def __init__(self):
self.stub_config = None
# Scheudle backup to run on weekdays at 10:30 pm
self.days = ["MONDAY", "TUESDAY", "WEDNESDAY", "THURSDAY", "FRIDAY"]
self.hour = 22
self.minute = 30
# Retain last 30 backups
self.max_count = 30
self._schedule_id = 'test_schedule'
def setup(self):
parser = sample_cli.build_arg_parser()
parser.add_argument('-location', '--location',
required=True,
action='store',
help='URL of the backup location')
parser.add_argument('--location_user',
required=True,
action='store',
help='Username for the given location')
parser.add_argument('--location_password',
required=True,
action='store',
help='Password for the given location')
args = sample_util.process_cli_args(parser.parse_args())
self.location = args.location
self.location_user = args.location_user
self.location_password = args.location_password
# Connect to vAPI services
self.stub_config = vapiconnect.connect(
host=args.server,
user=args.username,
pwd=args.password,
skip_verification=args.skipverification)
self.schedule_client = Schedules(self.stub_config)
def run(self):
# Create a backup schedule
self.create_schedule()
# Update the backup schedule to take backup only on weekends
self.days = ["SATURDAY", "SUNDAY"]
self.update_schedule()
# Get the updated backup schedule
self.get_schedule()
# Run backup operation using the scheduled configuration
self.run_backup()
# Delete the backup schedule
self.delete_schedule()
def create_schedule(self):
retention_info = Schedules.RetentionInfo(self.max_count)
recurrence_info = Schedules.RecurrenceInfo(
days=self.days,
hour=self.hour,
minute=self.minute)
create_spec = Schedules.CreateSpec(
location=self.location,
location_user=self.location_user,
location_password=self.location_password,
recurrence_info=recurrence_info,
retention_info=retention_info)
self.schedule_client.create(self._schedule_id, create_spec)
def update_schedule(self):
retention_info = Schedules.RetentionInfo(self.max_count)
recurrence_info = Schedules.RecurrenceInfo(
days=self.days,
hour=self.hour,
minute=self.minute)
update_spec = Schedules.UpdateSpec(
location=self.location,
location_user=self.location_user,
location_password=self.location_password,
recurrence_info=recurrence_info,
retention_info=retention_info)
self.schedule_client.update(self._schedule_id, update_spec)
def get_schedule(self):
self.schedule_client = Schedules(self.stub_config)
schedule_spec = self.schedule_client.get(self._schedule_id)
recurrence_info = schedule_spec.recurrence_info
retention_info = schedule_spec.retention_info
table = []
data = [self._schedule_id,
"{}:{}".format(recurrence_info.hour, recurrence_info.minute),
" ".join(recurrence_info.days),
retention_info.max_count]
table.append(data)
headers = ["Schedule ID", "Time", "Days", "Retention"]
print(tabulate(table, headers))
def run_backup(self):
schedule_spec = self.schedule_client.run(self._schedule_id)
def delete_schedule(self):
self.schedule_client.delete(self._schedule_id)
def main():
schedule = BackupSchedule()
schedule.setup()
schedule.run()
if __name__ == '__main__':
main()

View File

@ -0,0 +1,45 @@
This directory contains samples for the Defer History Data Import APIs. Defer History Data Import is a new
feature for the upgrade that allows historical data and performance metrics data to be imported in the background
after the upgrade is done, thus allowing shorter downtime for the whole upgrade process. The feature is only applicable
for upgrades or migrations from 6.0 and 6.5 with external database. For more information, please review the official
release notes.
The operations on the API are as follow:
* status of defer history data import
* pause defer history data import
* resume defer history data import
* cancel defer history data import
Overview of the directory code samples:
* vc_import_history_sample.py - running a simple workflow to pause and resume
Defer History Data Import that is still not completed.
* vc_import_history_cli.py - allowing to trigger different parts of the API,
showing example code structure.
* vc_import_history_common.py - common functionality between the main files.
To view the available command-line options:
$ python vc_import_history_sample.py --help
$ python vc_import_history_cli.py --help
Running the samples:
$ python vc_import_history_sample.py --server <vCenter Server IP> --username <username> --password <password>
Running the cli:
$ python vc_import_history_cli.py --server <vCenter Server IP> --username <username> --password <password> --operation <operation>
The operation choice is as follows (information is also available using --help)
* status
* pause
* resume
* cancel
Testbed Requirement:
* 1 vCenter Server appliance version 6.7 or above successfully upgraded using the option for transferring historical data after upgrade.

View File

@ -0,0 +1,26 @@
"""
* *******************************************************
* Copyright VMware, Inc. 2017. All Rights Reserved.
* SPDX-License-Identifier: MIT
* *******************************************************
*
* DISCLAIMER. THIS PROGRAM IS PROVIDED TO YOU "AS IS" WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, WHETHER ORAL OR WRITTEN,
* EXPRESS OR IMPLIED. THE AUTHOR SPECIFICALLY DISCLAIMS ANY IMPLIED
* WARRANTIES OR CONDITIONS OF MERCHANTABILITY, SATISFACTORY QUALITY,
* NON-INFRINGEMENT AND FITNESS FOR A PARTICULAR PURPOSE.
"""
__author__ = 'VMware, Inc.'
__copyright__ = 'Copyright 2017 VMware, Inc. All rights reserved.'
# Required to distribute different parts of this
# package as multiple distribution
try:
import pkg_resources
pkg_resources.declare_namespace(__name__)
except ImportError:
from pkgutil import extend_path
__path__ = extend_path(__path__, __name__) # @ReservedAssignment

View File

@ -0,0 +1,117 @@
#!/usr/bin/env python
"""
* *******************************************************
* Copyright (c) VMware, Inc. 2017. All Rights Reserved.
* SPDX-License-Identifier: MIT
* *******************************************************
*
* DISCLAIMER. THIS PROGRAM IS PROVIDED TO YOU "AS IS" WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, WHETHER ORAL OR WRITTEN,
* EXPRESS OR IMPLIED. THE AUTHOR SPECIFICALLY DISCLAIMS ANY IMPLIED
* WARRANTIES OR CONDITIONS OF MERCHANTABILITY, SATISFACTORY QUALITY,
* NON-INFRINGEMENT AND FITNESS FOR A PARTICULAR PURPOSE.
"""
__author__ = 'VMware, Inc.'
__copyright__ = 'Copyright 2017 VMware, Inc. All rights reserved.'
__vcenter_version__ = '6.7+'
import atexit
from samples.vsphere.common import sample_cli
from samples.vsphere.common import sample_util
from samples.vsphere.common.service_manager import ServiceManager
from com.vmware.vcenter.deployment_client import ImportHistory
from com.vmware.vapi.std.errors_client import NotAllowedInCurrentState, \
Error, Unauthenticated, AlreadyInDesiredState, Unauthorized
from samples.vsphere.deferhistoryimport.vc_import_history_common import \
get_defer_history_import_status, get_message_as_text
class ImportHistorySampleCli(object):
"""
Sample demonstrating how the API for the upgrade's Defer History Data
Import feature can be used. To use this feature you need to have an
appliance upgrade or migrated to the 6.7 or later version, using the
option for transferring historical data after upgrade.
"""
def __init__(self):
self.service_manager = None
self.operation = None
def setup(self):
# Create argument parser for standard inputs:
# server, username, password, cleanup and skipverification
parser = sample_cli.build_arg_parser()
parser.add_argument('-o', '--operation',
action='store',
default='status',
choices=['status',
'start',
'pause',
'resume',
'cancel'],
help='Operation to execute')
args = sample_util.process_cli_args(parser.parse_args())
self.operation = args.operation
self.service_manager = ServiceManager(args.server,
args.username,
args.password,
args.skipverification)
self.service_manager.connect()
atexit.register(self.service_manager.disconnect)
def run(self):
"""
Runs the requested operation
"""
# Using REST API service
import_history = ImportHistory(self.service_manager.stub_config)
if self.operation == 'status':
get_defer_history_import_status(import_history)
return
try:
operations = {
'start': import_history.start,
'pause': import_history.pause,
'resume': import_history.resume,
'cancel': import_history.cancel
}
print('Executing operation "{0}"'.format(self.operation))
if self.operation in operations:
operations[self.operation]()
print('Executing operation "{0}" was successful'.format(
self.operation))
else:
print('Unknown operation {0}'.format(self.operation))
except AlreadyInDesiredState:
print('The Defer History Data Import is already in the '
'desired state.')
except Error as error:
print('Request "{0}" returned error.'.format(self.operation))
for err in error.messages:
print('Error: {0}'.format(get_message_as_text(err)))
def cleanup(self):
# Nothing to clean up
pass
def main():
import_history_sample_cli = ImportHistorySampleCli()
import_history_sample_cli.setup()
import_history_sample_cli.run()
import_history_sample_cli.cleanup()
if __name__ == '__main__':
main()

View File

@ -0,0 +1,123 @@
#!/usr/bin/env python
"""
* *******************************************************
* Copyright (c) VMware, Inc. 2017. All Rights Reserved.
* SPDX-License-Identifier: MIT
* *******************************************************
*
* DISCLAIMER. THIS PROGRAM IS PROVIDED TO YOU "AS IS" WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, WHETHER ORAL OR WRITTEN,
* EXPRESS OR IMPLIED. THE AUTHOR SPECIFICALLY DISCLAIMS ANY IMPLIED
* WARRANTIES OR CONDITIONS OF MERCHANTABILITY, SATISFACTORY QUALITY,
* NON-INFRINGEMENT AND FITNESS FOR A PARTICULAR PURPOSE.
"""
__author__ = 'VMware, Inc.'
__copyright__ = 'Copyright 2017 VMware, Inc. All rights reserved.'
__vcenter_version__ = '6.7+'
class Status(object):
""" Constant used to indicate what is the current status
"""
RUNNING = 'Running'
SUCCEEDED = 'Succeeded'
CANCELED = 'Canceled'
PAUSED = 'Paused'
NOT_STARTED = 'Not started'
UNKNOWN = 'Unknown'
@staticmethod
def parse(apiStatus):
""" Parses an API status and returns Status constant based on it
"""
return STATUS_TRANSLATION_MATRIX.get(apiStatus, Status.UNKNOWN)
STATUS_TRANSLATION_MATRIX = {
'RUNNING': Status.RUNNING,
'SUCCEEDED': Status.SUCCEEDED,
'FAILED': Status.CANCELED,
'BLOCKED': Status.PAUSED,
'PENDING': Status.NOT_STARTED
}
def get_message_as_text(msg):
"""
Creates displayable message in correct form from a message of
the API. There will be no translations.
@param msg: Message returned by the API
@type msg: LocalizableMessage
"""
if not msg:
return None
return msg.default_message % msg.args
def get_defer_history_import_status(import_history):
"""
Gets the status of the Defer History Data Import and print it to
the stdout. It does not do exception handling.
Stdout example output for running status:
--------------------
Defer History Data Import Status: Running
Description: vCenter Server history import
Started: 2017-10-24 14:30:50.752000
Progress: 10%
Last progress message: Importing historical data...
--------------------
@param import_history: object representing the vAPI Endpoint
@type import_history: deployment_client.ImportHistory
@return: status of the Defer History Data Import
@rtype: Status
"""
result = import_history.get()
if not result:
print('Could not acquire status of Defer History Data Import. '
'Aborting.')
return Status.UNKNOWN
delimitar = '-' * 20
print(delimitar)
status = Status.parse(result.status)
print('Defer History Data Import Status: {0}'.format(status))
description = get_message_as_text(result.description)
print('Description: {0}'.format(description))
if result.start_time:
print('Started: {0}'.format(result.start_time))
if result.end_time:
print('Finished: {0}'.format(result.end_time))
# Progress is reported as completed steps out of total and
# need to be calculated at the clients side if want to be reported
# as percentages
progress = result.progress
if progress:
print('Progress: {0}%'.format(progress.completed))
progress_message = get_message_as_text(progress.message)
print('Last progress message: {0}'.format(progress_message))
if result.error:
print('Error: {0}'.format(get_message_as_text(result.error)))
for msg in result.result.errors:
print('Error: {0}'.format(get_message_as_text(msg)))
for msg in result.result.warnings:
print('Warning: {0}'.format(get_message_as_text(msg)))
for msg in result.result.info:
print('Message: {0}'.format(get_message_as_text(msg)))
print(delimitar)
return status

View File

@ -0,0 +1,119 @@
#!/usr/bin/env python
"""
* *******************************************************
* Copyright (c) VMware, Inc. 2017. All Rights Reserved.
* SPDX-License-Identifier: MIT
* *******************************************************
*
* DISCLAIMER. THIS PROGRAM IS PROVIDED TO YOU "AS IS" WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, WHETHER ORAL OR WRITTEN,
* EXPRESS OR IMPLIED. THE AUTHOR SPECIFICALLY DISCLAIMS ANY IMPLIED
* WARRANTIES OR CONDITIONS OF MERCHANTABILITY, SATISFACTORY QUALITY,
* NON-INFRINGEMENT AND FITNESS FOR A PARTICULAR PURPOSE.
"""
__author__ = 'VMware, Inc.'
__copyright__ = 'Copyright 2017 VMware, Inc. All rights reserved.'
__vcenter_version__ = '6.7+'
import atexit
from samples.vsphere.common import sample_cli
from samples.vsphere.common import sample_util
from samples.vsphere.common.service_manager import ServiceManager
from com.vmware.vcenter.deployment_client import ImportHistory
from com.vmware.vapi.std.errors_client import NotAllowedInCurrentState, \
Error, Unauthenticated, AlreadyInDesiredState, Unauthorized
from samples.vsphere.deferhistoryimport.vc_import_history_common import \
Status, get_defer_history_import_status, get_message_as_text
class ImportHistorySample(object):
"""
Sample demonstrating how one can change the state of the Defer History Data
Import using its vAPI. To use this feature you need to have an appliance
upgrade or migrated to the 6.7 or later version, using the option for
transferring historical data after upgrade.
"""
def __init__(self):
self.service_manager = None
def setup(self):
# Create argument parser for standard inputs:
# server, username, password, cleanup and skipverification
parser = sample_cli.build_arg_parser()
args = sample_util.process_cli_args(parser.parse_args())
self.service_manager = ServiceManager(args.server,
args.username,
args.password,
args.skipverification)
self.service_manager.connect()
atexit.register(self.service_manager.disconnect)
def run(self):
"""
Runs the sample's operations
"""
try:
# Using REST API service
import_history = ImportHistory(self.service_manager.stub_config)
# Change the status - either pause or resume it
start_status = get_defer_history_import_status(import_history)
if start_status == Status.RUNNING:
print('Pausing Defer History Data Import.')
import_history.pause()
expected_status = Status.PAUSED
revert_operation = import_history.resume
elif start_status == Status.PAUSED:
print('Resuming Defer History Data Import.')
import_history.resume()
expected_status = Status.RUNNING
revert_operation = import_history.pause
else:
print('Sample can only work if the status of Defer History '
'Data Import is paused or running, current status '
'is: {0}'.format(start_status))
return
after_ops_status = get_defer_history_import_status(import_history)
if after_ops_status == expected_status:
print('Operation finished successfully.')
else:
print('Executed operation did not bring the process in '
'desired state. Current status is "{0}". '
'Aborting'.format(after_ops_status))
return
# revert to the original status
print('Reverting to original state.')
revert_operation()
get_defer_history_import_status(import_history)
except AlreadyInDesiredState:
print('The Defer History Data Import is already in the '
'desired state.')
except Error as error:
for err in error.messages:
print('Error: {0}'.format(get_message_as_text(err)))
def cleanup(self):
# Nothing to clean up
pass
def main():
import_history_sample = ImportHistorySample()
import_history_sample.setup()
import_history_sample.run()
import_history_sample.cleanup()
if __name__ == '__main__':
main()

View File

@ -0,0 +1,18 @@
This directory contains samples for log forwarding APIs:
* Create log forwarding configurations
* View log forwarding configurations
* Update log forwarding configurations
* Test log forwarding configurations
To view the available command-line options:
$ python logforwarding/logforwarding.py --help
Running the samples:
$ python logforwarding/logforwarding.py --server <vCenter Server IP> --username <username> --password <password> --loghost <log host> --port <log host port> --protocol <log protocol>
* Testbed Requirement:
- 1 vCenter Server
- Log host listening to syslog packets over any of the supported protocols UDP/TCP/TLS

View File

@ -0,0 +1,132 @@
#!/usr/bin/env python
"""
* *******************************************************
* Copyright (c) VMware, Inc. 2017, 2018. All Rights Reserved.
* SPDX-License-Identifier: MIT
* *******************************************************
*
* DISCLAIMER. THIS PROGRAM IS PROVIDED TO YOU "AS IS" WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, WHETHER ORAL OR WRITTEN,
* EXPRESS OR IMPLIED. THE AUTHOR SPECIFICALLY DISCLAIMS ANY IMPLIED
* WARRANTIES OR CONDITIONS OF MERCHANTABILITY, SATISFACTORY QUALITY,
* NON-INFRINGEMENT AND FITNESS FOR A PARTICULAR PURPOSE.
"""
__author__ = 'VMware, Inc.'
__vcenter_version__ = '6.7+'
from samples.vsphere.common import sample_cli
from samples.vsphere.common import sample_util
from samples.vsphere.common import vapiconnect
from tabulate import tabulate
from com.vmware.appliance.logging_client import Forwarding
class LogForwarding(object):
"""
Demonstrates log forwarding API operations
Prerequisites:
- vCenter
- Log host listening to syslog packets over any of the supported
protocols UDP/TCP/TLS
"""
def __init__(self):
self.loghost = None
self.protocol = None
self.port = None
self.stub_config = None
self.log_forwarding_client = None
def setup(self):
parser = sample_cli.build_arg_parser()
parser.add_argument('--loghost',
required=True,
action='store',
help='The log host')
parser.add_argument('--port',
required=True,
action='store',
help='The log host port number')
parser.add_argument('--protocol',
required=True,
action='store',
help='The log host protocol (TCP/UDP/TLS)')
args = sample_util.process_cli_args(parser.parse_args())
self.loghost = args.loghost
self.protocol = args.protocol
self.port = int(args.port)
# Connect to vAPI services
self.stub_config = vapiconnect.connect(
host=args.server,
user=args.username,
pwd=args.password,
skip_verification=args.skipverification)
self.log_forwarding_client = Forwarding(self.stub_config)
def run(self):
# Set log forwarding configuration
self.set_log_forwarding()
# Get log forwarding configuration
self.get_log_forwarding()
# Test log forwarding configuration
self.test_log_forwarding()
# Update log forwarding configuration
self.update_log_forwarding()
def set_log_forwarding(self):
log_forwarding_config = [Forwarding.Config(hostname=self.loghost,
port=self.port,
protocol=self.protocol)]
self.log_forwarding_client.set(log_forwarding_config)
def get_log_forwarding(self):
configs = self.log_forwarding_client.get()
print("\nLog forwarding configurations:")
table = [[cfg.hostname, cfg.port, cfg.protocol] for cfg in configs]
headers = ["Loghost", "Port", "Protocol"]
print(tabulate(table, headers))
def test_log_forwarding(self):
test_response = self.log_forwarding_client.test(True)
print("\nLog forwarding test response:")
table = [[resp.hostname,
resp.state,
resp.message.default_message if resp.message else None]
for resp in test_response]
headers = ["Loghost", "State", "Message"]
print(tabulate(table, headers))
def update_log_forwarding(self):
# Read log forwarding configuration
log_forwarding_config = self.log_forwarding_client.get()
# Delete the newly added configuration
log_forwarding_config = list(filter(
lambda cfg: cfg.hostname != self.loghost,
log_forwarding_config))
# Apply the modified log forwarding configuration
self.log_forwarding_client.set(log_forwarding_config)
def main():
log_forwarding = LogForwarding()
log_forwarding.setup()
log_forwarding.run()
if __name__ == '__main__':
main()