1
0
mirror of https://github.com/vmware/vsphere-automation-sdk-python.git synced 2024-11-24 02:19:58 -05:00
vsphere-automation-sdk-python/samples/vsan/snapservice/snapshot/delete_protection_group_snapshots.py

127 lines
5.1 KiB
Python
Raw Permalink Normal View History

#!/usr/bin/env python
"""
* *******************************************************
* Copyright (c) 2024 Broadcom. All Rights Reserved.
* SPDX-License-Identifier: MIT
* *******************************************************
*
* DISCLAIMER. THIS PROGRAM IS PROVIDED TO YOU "AS IS" WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, WHETHER ORAL OR WRITTEN,
* EXPRESS OR IMPLIED. THE AUTHOR SPECIFICALLY DISCLAIMS ANY IMPLIED
* WARRANTIES OR CONDITIONS OF MERCHANTABILITY, SATISFACTORY QUALITY,
* NON-INFRINGEMENT AND FITNESS FOR A PARTICULAR PURPOSE.
"""
__author__ = 'Broadcom, Inc.'
__vcenter_version__ = '8.0.3+'
from samples.vsphere.common import sample_cli
from samples.vsphere.common import sample_util
from samples.vsphere.common.ssl_helper import get_unverified_session
from samples.vsan.snapservice.vsan_snapservice_client import create_snapservice_client
from vmware.vapi.vsphere.client import create_vsphere_client
from com.vmware.vcenter_client import Cluster
from com.vmware.snapservice_client import *
from com.vmware.snapservice.clusters_client import *
class DeleteProtectionGroupSnapshots(object):
"""
Description: Demonstrates deleting protection group snapshots.
"""
def __init__(self):
parser = sample_cli.build_arg_parser()
required_args = parser.add_argument_group(
'snapservice required arguments')
required_args.add_argument('--snapservice',
action='store',
required=True,
help='Snapservice IP/hostname to connect to')
required_args.add_argument('--cluster',
action='store',
required=True,
help='Cluster where the protection group locates')
required_args.add_argument('--pgname',
action='store',
required=True,
help='Protection group to delete snapshot')
parser.add_argument_group('snapservice optional arguments')\
.add_argument('--remain',
action='store',
help='How many protection group snapshots to leave')
args = sample_util.process_cli_args(parser.parse_args())
self.cluster = args.cluster
self.pgname = args.pgname
self.remain = int(args.remain) if int(args.remain) > 0 else 0
skipverification = True if args.skipverification else False
vcSession = get_unverified_session() if skipverification else None
self.vcClient = create_vsphere_client(server=args.server,
username=args.username,
password=args.password,
session=vcSession)
ssSession = get_unverified_session() if skipverification else None
self.ssClient = create_snapservice_client(server=args.snapservice,
vc=args.server,
username=args.username,
password=args.password,
session=ssSession,
skip_verification=skipverification)
def run(self):
# Get cluster identifier
cluster_spec = set([self.cluster])
cluster_list = self.vcClient.vcenter.Cluster.list(
Cluster.FilterSpec(names=cluster_spec))
clusterId = cluster_list[0].cluster
print("\nGot cluster '{}' id: {}".format(self.cluster, clusterId))
# Get protection group identifier
pgs_info = self.ssClient.snapservice.clusters.ProtectionGroups.list(clusterId)
pgId = 0
for pg_info in pgs_info.items:
if pg_info.info.name == self.pgname:
pgId = pg_info.pg
print("\nProtection groups '{}': '{}'\n".format(self.pgname, pgId))
# Get protection group snapshots
snapshots_info = self.ssClient.snapservice.clusters.protection_groups\
.Snapshots.list(clusterId, pgId)
print("\n\nGet snapshots:\n{}".format(snapshots_info))
# Get snapshot to delete
snapshotTimeMap = {}
for snapshot in snapshots_info.snapshots:
snapshotTimeMap[snapshot.info.expires_at] = snapshot.snapshot
timeToDel = []
if self.remain == 0:
timeToDel = sorted(snapshotTimeMap)
elif self.remain < len(snapshotTimeMap):
timeToDel = sorted(snapshotTimeMap)[0:-self.remain]
else:
print("\n\n No snapshot to delete.\n")
for time in timeToDel:
print("\n\n##Deleting pg snapshot '{}' which will expire at '{}'"
.format(snapshotTimeMap[time], time))
task = self.ssClient.snapservice.clusters.protection_groups\
.Snapshots.delete(clusterId, pgId, snapshotTimeMap[time])
def main():
delete_pg_snapshot = DeleteProtectionGroupSnapshots()
delete_pg_snapshot.run()
if __name__ == '__main__':
main()