diff --git a/samples/vmc/networks_nsxt/dfw_firewall_crud.py b/samples/vmc/networks_nsxt/dfw_firewall_crud.py new file mode 100644 index 00000000..97ef4c72 --- /dev/null +++ b/samples/vmc/networks_nsxt/dfw_firewall_crud.py @@ -0,0 +1,176 @@ +#!/usr/bin/env python + +""" +* ******************************************************* +* Copyright (c) VMware, Inc. 2019. All Rights Reserved. +* SPDX-License-Identifier: MIT +* ******************************************************* +* +* DISCLAIMER. THIS PROGRAM IS PROVIDED TO YOU "AS IS" WITHOUT +* WARRANTIES OR CONDITIONS OF ANY KIND, WHETHER ORAL OR WRITTEN, +* EXPRESS OR IMPLIED. THE AUTHOR SPECIFICALLY DISCLAIMS ANY IMPLIED +* WARRANTIES OR CONDITIONS OF MERCHANTABILITY, SATISFACTORY QUALITY, +* NON-INFRINGEMENT AND FITNESS FOR A PARTICULAR PURPOSE. +""" + +__author__ = 'VMware, Inc.' +__vcenter_version__ = '6.8.0+' + +import argparse +import requests +from com.vmware.nsx_policy_client_for_vmc import create_nsx_policy_client_for_vmc +from com.vmware.nsx_policy.model_client import IPAddressExpression +from com.vmware.nsx_policy.model_client import Group +from com.vmware.nsx_policy.model_client import CommunicationMap +from com.vmware.nsx_policy.model_client import CommunicationEntry +from com.vmware.nsx_policy.infra.domains.communication_maps_client import CommunicationEntries +from vmware.vapi.bindings.struct import PrettyPrinter as NsxPrettyPrinter +from com.vmware.nsx_policy.model_client import ApiError + +# format NSXT objects for readability +nsx_pp = NsxPrettyPrinter() + + +class NSXPolicyDFWFirewall(object): + """ + e.g. Demonstrate access to NSX Policy Manager + and show access to DFW firewall CRUD operations + """ + + def __init__(self): + parser = argparse.ArgumentParser( + formatter_class=argparse.ArgumentDefaultsHelpFormatter) + + parser.add_argument('--refresh_token', + required=True, + help='Refresh token obtained from CSP') + + parser.add_argument('--org_id', + required=True, + help='Orgization ID') + + parser.add_argument('--sddc_id', + required=True, + help='Sddc ID') + + args = parser.parse_args() + + self.nsx_client = create_nsx_policy_client_for_vmc( + refresh_token=args.refresh_token, + org_id=args.org_id, + sddc_id=args.sddc_id) + + def get_group(self, group_id): + print(' Get Group '.center(70, '=')) + self.cgw_group = self.nsx_client.infra.domains.Groups.get('cgw', group_id) + nsx_pp.pprint(self.cgw_group) + return self.cgw_group + + def patch_group(self, group_id, ip_address): + print(' Create Group '.center(70, '=')) + try: + ip_address_expression_obj = IPAddressExpression(ip_addresses=[ip_address]) # IP Address of a VM + group_obj = Group(display_name=group_id, expression=[ip_address_expression_obj]) + self.nsx_client.infra.domains.Groups.patch('cgw', group_id, group_obj) + except Exception as ex: + print(ex) + self.log_error(ex) + + def delete_group(self, group_id): + print(' Delete Group '.center(70, '=')) + try: + self.nsx_client.infra.domains.Groups.delete('cgw', group_id) + except Exception as ex: + print(ex) + self.log_error(ex) + + def get_communication_map(self): + print(' Communication Map '.center(70, '=')) + self.communication_map = self.nsx_client.infra.domains.CommunicationMaps.get('cgw', 'application-1') + nsx_pp.pprint(self.communication_map) + return self.communication_map + + def patch_communication_map(self): + print(' Create Communication Map '.center(70, '=')) + try: + communication_map_obj = CommunicationMap(category='Application', display_name='application-1') + self.nsx_client.infra.domains.CommunicationMaps.patch('cgw', 'application-1', communication_map_obj) + except Exception as ex: + print(ex) + self.log_error(ex) + + def delete_communication_map(self): + print(' Delete Communication Map '.center(70, '=')) + try: + self.nsx_client.infra.domains.CommunicationMaps.delete('cgw', 'application-1') + except Exception as ex: + print(ex) + self.log_error(ex) + + def get_dfw_firewall_rules(self): + print(' DFW Firewall Rules '.center(70, '=')) + self.dfw_rules = self.nsx_client.infra.domains.communication_maps.CommunicationEntries.get('cgw', + 'application-1', 'AllowPingFromVM1ToVM2') + nsx_pp.pprint(self.dfw_rules) + return self.dfw_rules + + def patch_dfw_firewall_rule(self): + print(' Create DFW Firewall Rule '.center(70, '=')) + self.patch_group('VM1_Group', '172.16.1.2') + cgw_group1 = self.get_group('VM1_Group') + self.patch_group('VM2_Group', '172.16.2.2') + cgw_group2 = self.get_group('VM2_Group') + try: + ce_obj = CommunicationEntry(action='ALLOW', + scope=['ANY'], + services=['/infra/services/ICMP-ALL'], + source_groups=[cgw_group1.path], + destination_groups=[cgw_group2.path], + display_name='AllowPingFromVM1ToVM2', sequence_number=10) + + self.nsx_client.infra.domains.communication_maps.CommunicationEntries.patch('cgw', 'application-1', + 'AllowPingFromVM1ToVM2', ce_obj) + except Exception as ex: + print(ex) + self.log_error(ex) + + def delete_dfw_firewall_rule(self): + print(' Delete DFW Firewall Rule '.center(70, '=')) + try: + self.nsx_client.infra.domains.communication_maps.CommunicationEntries.delete('cgw', 'application-1', + 'AllowPingFromVM1ToVM2') + except Exception as ex: + print(ex) + self.log_error(ex) + + def log_error(self, ex): + """ + Generic error logger that will use NSXT API Error message decoders for + more descriptive information on errors + """ + api_error = ex.data.convert_to(ApiError) + print("Error configuring {}".format(api_error.error_message)) + print("{}".format(api_error.__dict__)) + print("{}".format(api_error.details)) + + def run(self): + self.patch_communication_map() + self.get_communication_map() + self.patch_dfw_firewall_rule() + self.get_dfw_firewall_rules() + + def cleanup(self): + self.delete_dfw_firewall_rule() + self.delete_communication_map() + self.delete_group('VM1_Group') + self.delete_group('VM2_Group') + + +def main(): + nsx = NSXPolicyDFWFirewall() + nsx.run() + nsx.cleanup() + + +if __name__ == '__main__': + main()