Python Forum
Add Tagged Objects to a Palo Alto Firewall device Group
Thread Rating:
  • 0 Vote(s) - 0 Average
  • 1
  • 2
  • 3
  • 4
  • 5
Add Tagged Objects to a Palo Alto Firewall device Group
#1
I just thought I would share some code Ive written for interacting with Palo Alto firewalls which are managed by Panorama.
import requests
import pandevice
import json
import time
import sys
import ipaddress
from requests.exceptions import HTTPError
from pandevice import panorama
from pandevice import objects


#this code block will prevent ssl errors until i learn certificate handling
verify = False
if not verify:
        from requests.packages.urllib3.exceptions import InsecureRequestWarning
        requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
        
# This defines the panorama device interface to connect to and the DeviceGroup
device = "your_panorama_ip_address_goes_here"
devicegroup = "Production"


# This will set the description of the node to the Release Ticket number for audit purposes
description = input('Please enter the NETENG ticket number(just the numbers): ')

#this is required to execute your script. you need a valid api key
auth_key = input('Please past in your palo alto api key with no extra spaces: ')

#this is the api call for the script to use to do the basic panorama commmit 
commit_panorama_api = "https://<your panorama name here>/api/?type=commit&cmd=<commit></commit>&key="+auth_key

#this is the api call for the script to use to do the device group commit
commit_devicegroup_api = 'https://<your panorama name here>/api/?type=commit&action=all&cmd=<commit-all><shared-policy><device-group><entry name="Production"/></device-group></shared-policy></commit-all>&key='+auth_key

# This defines how we will connect to panorama
pano = panorama.Panorama(device, api_key=auth_key)

# This defines the device group we wil be connecting to
palo_device_group = panorama.DeviceGroup(devicegroup)
pano.add(palo_device_group)

#this is the api call to get the object list from the production device group
getProdAddressObjectsApi = 'https://<your panorama name here>/restapi/9.0/Objects/Addresses?location=device-group&device-group=Production&key='+auth_key


#definitons of functions to be used below. this include validation of IP information entered... formatting the object names properly for palo alto. the function necessary to
#clean up the text file entries. and both the panorama commit as well as the device group commit.   
    
#this function helps to validate the ip address information entered manually or via txt file. this will be used by otehr functions that deal with IP addresses and IP CIDR notations.
def valid_ip_or_cidr(ip):
    try:
        ipaddress.IPv4Address(ip)
        #print('This is a valid IPs address')
        return True
    except:
        try:
            ipaddress.IPv4Network(ip)
            #print('This is a valid network CIDR')
            return True
        except:
            print(ip + ' This is neither a valid IPV4 address  or a valid IPV4 CIDR notation, please check your data and try again')
            #return False
            sys.exit()

#this function is what generates the object names that match our standard naming schemes and meets palo alot requirements. 
def palo_object_name(ip):
    try:
        ipaddress.IPv4Address(ip)
        object_name = ("node-"+str(ip))
        return object_name
    except:
        try:
            ipaddress.IPv4Network(ip)
            object_name = str(ip)
            temp_object_name = object_name.replace("/","-")
            object_name = ("net-"+ temp_object_name)
            return object_name
        except:
            print("Somthing went wrong creating the object name, check the contents of the cleanips.txt file")
            sys.exit() 

#this function basically cleans the txt file data and remove the trailing '\n' from the objects so that each entry is seen as a string of just the IP address or CIDR                   
def remove_lines_list (file):
    ip_list = open(file).readlines()
    return [address.rstrip('\n') for address in ip_list]

#this function does the basic palo alto panorama commmit
def palo_panorama_commit():
    # this next piece will commit the change to panorama
    try:
        panorama_commit_response = requests.get(commit_panorama_api, verify=False)
        # if the response was successful, no Exception will be raised
        panorama_commit_response.raise_for_status()
    except HTTPError as http_err:
        print(f'HTTP error occurred: {http_err}')
    except Exception as err:
        print(f'Other error occurred: {err}')
    else:
        print("Please wait while your panorama commit processes!")
        time.sleep(30)
        print('Your panorama commit was successful')

#this function does the palo alto device group commit    
def palo_devicegroup_commit():
    #this should commit the device 
    try:
        device_commit_response = requests.get(commit_devicegroup_api, verify=False)
        # if the response was successful, no Exception will be raised
        device_commit_response.raise_for_status()
    except HTTPError as http_err:
        print(f'HTTP error occurred: {http_err}')
    except Exception as err:
        print(f'Other error occurred: {err}')
    else:
        print("Please wait while the Production Device group commits!")
        time.sleep(60)
        print('Your SFTP allow list update was successfully commmit')

#validate ip address info is actually real valid IPV4 address or real valid IPV4 CIDR notation. 
def ipv4address_ipv4cidr_validation():
    for newIpAddress in addressList:
        if valid_ip_or_cidr(newIpAddress):
            continue
        else:
            print(newIpAddress + " is not a valid IPV4 address nor IPV4 CIDR, check your data and try again.")
            #this will generally happen if your cleanips.txt list is blank or has invalid characters in it.
            quit()

def isPrivate_validaton():
    for newIpAddress in addressList:
        if ipaddress.ip_address(newIpAddress).is_private:
            print(newIpAddress + " this is a private address and cannot be used to access our Public SFTP service.")
            quit()
        else:
            continue
            
            
            
#this code uses the above function defined to clean the text file data
addressList = remove_lines_list('myips.txt')

#this api call creates the dictionary of address ojbects that already exist in the firewall so we can compare it to the list of addresses being submit for addition
try:
    current_address_objects_response = requests.get(getProdAddressObjectsApi, verify=False)
    # if the response was successful, no Exception will be raised
    current_address_objects_response.raise_for_status()
except HTTPError as http_err:
    print(f'HTTP error occurred: {http_err}')
except Exception as err:
    print(f'Other error occurred: {err}')
else:
    print("Please wait while we prepare your query")
    time.sleep(5)
    productionAddressDict = json.loads(current_address_objects_response.text)


#the below function will iterate through the addressList and compare it the data pulled from the palo alto api call ( a dictionary) and determine if the IPV4 address or IPV4 CIDR 
#already exists. if it does it will skip over it. if not it will add it to a new txt file "cleanips.txt" and then it cleans up that file to a list and validates its not empty. when not
#empty it will create an address ojbect in the devicegroup specified at the top of this file with the SFTP tag so that it gets automaticaly added to the Dynamic Address Group which permits 
#inbound SFTP access.

def create_sftp_object():
    for newIpAddress in addressList:
        my_var = 0
        for entry in productionAddressDict['result']['entry']:
            if 'ip-netmask' in entry:
                temp_ip = entry['ip-netmask']
                if temp_ip != newIpAddress:
                    my_var = my_var + 1
                    continue
                elif temp_ip == newIpAddress:
                    print("Skipping "+ newIpAddress +" because it already exists")
                    my_var = 0
                    break
            else:
                continue
        if my_var >= 1:
            with open ("cleanips.txt", 'a') as file_object:
                file_object.write(newIpAddress)
                file_object.write("\n")
                my_var = 0
                        
    cleanAddressList = remove_lines_list('cleanips.txt') 
    #print(cleanAddressList)
    #print(type(cleanAddressList))
    #print(len(cleanAddressList))
    
    if (len(cleanAddressList)) == 0:
        print("All the addresses you submit already exist in the SFTP allow list.")
        sys.exit()
    else:
        for newIpAddress in cleanAddressList:
            name = palo_object_name(newIpAddress)
            full_description = ("NETENG-"+ description)
            sftp_server = pandevice.objects.AddressObject(name=name, value=newIpAddress, description=full_description, tag="SFTP")
            palo_device_group.add(sftp_server)
            sftp_server.create()
        cleanAddressList.clear()
                
        
ipv4address_ipv4cidr_validation()
isPrivate_validaton()
create_sftp_object()
palo_panorama_commit()
palo_devicegroup_commit()
Reply


Possibly Related Threads…
Thread Author Replies Views Last Post
  A remote firewall Python library that I'm working on Ripolak 0 2,093 Sep-24-2018, 10:33 AM
Last Post: Ripolak

Forum Jump:

User Panel Messages

Announcements
Announcement #1 8/1/2020
Announcement #2 8/2/2020
Announcement #3 8/6/2020