Module glasswall.libraries.security_tagging.security_tagging
Expand source code
import ctypes as ct
import os
import sys
from glasswall import utils
from glasswall.config.logging import log
from glasswall.libraries.library import Library
from glasswall.libraries.security_tagging import errors, successes
class SecurityTagging(Library):
""" A high level Python wrapper for Glasswall Security Tagging. """
def __init__(self, library_path: str):
super().__init__(library_path=library_path)
self.library = self.load_library(os.path.abspath(library_path))
log.info(f"Loaded Glasswall {self.__class__.__name__} version {self.version()} from {self.library_path}")
def version(self):
""" Returns the Glasswall library version.
Returns:
version (str): The Glasswall library version.
"""
# TODO security tagging currently has no version function
return "NOT_IMPLEMENTED"
def tag_file(self, tags_path: str, input_file: str, output_file: str, raise_unsupported: bool = True):
""" Tags the input_file with xml loaded from tags_path, writing to output_file.
Args:
tags_path (str): The path to the .xml file containing tags to add.
input_file (str): The path to the input file.
output_file (str): The path to the output file where the tagged input_file will be written to.
raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
Returns:
status (int): An integer indicating the file process status.
Raises:
TypeError: If any arguments are of incorrect type.
NotImplementedError: If raise_unsupported is True and the status of calling GWSecuTag_TagFile is not 1 (success)
"""
# Validate arg types
if not isinstance(tags_path, str):
raise TypeError(tags_path)
elif not os.path.isfile(tags_path):
raise FileNotFoundError(tags_path)
if not isinstance(input_file, str):
raise TypeError(input_file)
elif not os.path.isfile(input_file):
raise FileNotFoundError(input_file)
if not isinstance(output_file, str):
raise TypeError(output_file)
if not isinstance(raise_unsupported, bool):
raise TypeError(raise_unsupported)
# Convert paths to absolute paths
tags_path = os.path.abspath(tags_path)
input_file = os.path.abspath(input_file)
output_file = os.path.abspath(output_file)
# Make output directory if it does not exist
os.makedirs(os.path.dirname(output_file), exist_ok=True)
with utils.CwdHandler(self.library_path):
# API function declaration
self.library.GWSecuTag_TagFile.argtypes = [ct.c_char_p]
# Variable initialisation
ct_tags_path = ct.c_char_p(tags_path.encode("utf-8"))
ct_input_file = ct.c_char_p(input_file.encode("utf-8"))
ct_output_file = ct.c_char_p(output_file.encode("utf-8"))
# API call
status = self.library.GWSecuTag_TagFile(
ct_input_file,
ct_tags_path,
ct_output_file,
)
if status not in successes.success_codes:
log.error(f"\n\tinput_file: {input_file}\n\toutput_file: {output_file}\n\tstatus: {status}\n\ttags_path: {tags_path}")
if raise_unsupported:
raise errors.error_codes.get(status, errors.UnknownErrorCode)(status)
else:
log.debug(f"\n\tinput_file: {input_file}\n\toutput_file: {output_file}\n\tstatus: {status}\n\ttags_path: {tags_path}")
# TODO remove, temp fix - check if tags are retrievable, if not delete the file just created by glasswall
# if status not in successes.success_codes: # status is currently incorrect in the library
if not os.path.isfile(output_file):
log.error(f"\n\toutput file does not exist: {output_file}")
status = "OUTPUT_NOT_CREATED"
elif os.path.isfile(output_file):
with utils.TempFilePath() as temp_file:
self.retrieve_tags(
input_file=output_file,
output_file=temp_file,
raise_unsupported=False
)
try:
dict_ = utils.xml_as_dict(temp_file)
except ValueError:
dict_ = {}
if not dict_:
os.remove(output_file)
log.debug(f"\n\tunable to retrieve tags, deleted output file\n\toutput_file: {output_file}\n\t")
status = "OUTPUT_NOT_RETRIEVABLE"
return status
def tag_directory(self, tags_path: str, input_directory: str, output_directory: str, raise_unsupported: bool = True):
""" Tags all files in input_directory with the xml loaded from tags_path, writing to output_directory and maintaining the same directory structure.
Args:
tags_path (str): The path to the .xml file containing tags to add.
input_directory (str): The path to the input directory.
output_directory (str): The path to the output directory where the tagged files will be written to.
raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
Returns:
status (int): An integer indicating the file process status.
Raises:
TypeError: If any arguments are of incorrect type.
NotImplementedError: If raise_unsupported is True and the status of calling GWSecuTag_TagFile is not 1 (success)
"""
# Validate arg types
if not isinstance(tags_path, str):
raise TypeError(tags_path)
elif not os.path.isfile(tags_path):
raise FileNotFoundError(tags_path)
if not isinstance(input_directory, str):
raise TypeError(input_directory)
elif not os.path.isdir(input_directory):
raise NotADirectoryError(input_directory)
if not isinstance(output_directory, str):
raise TypeError(output_directory)
if not isinstance(raise_unsupported, bool):
raise TypeError(raise_unsupported)
for relative_path in utils.list_file_paths(input_directory, absolute=False):
# construct absolute paths
input_file = os.path.abspath(os.path.join(input_directory, relative_path))
output_file = os.path.abspath(os.path.join(output_directory, relative_path))
# call tag_file on each file in input to output
self.tag_file(
tags_path=tags_path,
input_file=input_file,
output_file=output_file,
raise_unsupported=raise_unsupported,
)
utils.delete_empty_subdirectories(output_directory)
def retrieve_tags(self, input_file: str, output_file: str, raise_unsupported=True):
""" Retrieves the xml tags of the input_file and writes it to output_file.
Args:
input_file (str): The path to the input file.
output_file (str): The path to the output file where the xml tags will be written to.
raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
Returns:
status (int): An integer indicating the file process status.
Raises:
TypeError: If any arguments are of incorrect type.
NotImplementedError: If raise_unsupported is True and the status of calling GWSecuTag_RetrieveTagFile is not 1 (success)
"""
if not isinstance(input_file, str):
raise TypeError(input_file)
elif not os.path.isfile(input_file):
raise FileNotFoundError(input_file)
if not isinstance(output_file, str):
raise TypeError(output_file)
# Convert paths to absolute paths
input_file = os.path.abspath(input_file)
output_file = os.path.abspath(output_file)
# Make output directory if it does not exist
os.makedirs(os.path.dirname(output_file), exist_ok=True)
log.debug(f"Attempting {sys._getframe().f_code.co_name}:\n\tinput_file: {input_file}\n\toutput_file: {output_file}")
with utils.CwdHandler(self.library_path):
# API function declaration
self.library.GWSecuTag_RetrieveTagFile.argtypes = [ct.c_char_p]
# Variable initialisation
ct_input_file = ct.c_char_p(input_file.encode("utf-8"))
ct_output_file = ct.c_char_p(output_file.encode("utf-8"))
# API call
status = self.library.GWSecuTag_RetrieveTagFile(
ct_input_file,
ct_output_file,
)
if status not in successes.success_codes:
log.error(f"\n\tinput_file: {input_file}\n\toutput_file: {output_file}\n\tstatus: {status}")
if raise_unsupported:
raise errors.error_codes.get(status, errors.UnknownErrorCode)(status)
else:
log.debug(f"\n\tinput_file: {input_file}\n\toutput_file: {output_file}\n\tstatus: {status}")
return status
def retrieve_tags_directory(self, input_directory: str, output_directory: str, raise_unsupported=True):
""" Retrieves all tags from files in input_directory, writing XML to output_directory and maintaining the same directory structure.
Args:
input_directory (str): The path to the input directory.
output_directory (str): The path to the output directory where the tagged files will be written to.
raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
Returns:
status (int): The status of the function call.
Raises:
TypeError: If any arguments are of incorrect type.
NotImplementedError: If raise_unsupported is True and the status of calling GWSecuTag_RetrieveTagFile is not 1 (success)
"""
# Validate arg types
if not isinstance(input_directory, str):
raise TypeError(input_directory)
elif not os.path.isdir(input_directory):
raise NotADirectoryError(input_directory)
if not isinstance(output_directory, str):
raise TypeError(output_directory)
if not isinstance(raise_unsupported, bool):
raise TypeError(raise_unsupported)
for relative_path in utils.list_file_paths(input_directory, absolute=False):
# construct absolute paths
input_file = os.path.abspath(os.path.join(input_directory, relative_path))
output_file = os.path.abspath(os.path.join(output_directory, relative_path + ".xml"))
# call retrieve_tags on each file in input to output
self.retrieve_tags(
input_file=input_file,
output_file=output_file,
raise_unsupported=raise_unsupported,
)
utils.delete_empty_subdirectories(output_directory)
Classes
class SecurityTagging (library_path: str)
-
A high level Python wrapper for Glasswall Security Tagging.
Expand source code
class SecurityTagging(Library): """ A high level Python wrapper for Glasswall Security Tagging. """ def __init__(self, library_path: str): super().__init__(library_path=library_path) self.library = self.load_library(os.path.abspath(library_path)) log.info(f"Loaded Glasswall {self.__class__.__name__} version {self.version()} from {self.library_path}") def version(self): """ Returns the Glasswall library version. Returns: version (str): The Glasswall library version. """ # TODO security tagging currently has no version function return "NOT_IMPLEMENTED" def tag_file(self, tags_path: str, input_file: str, output_file: str, raise_unsupported: bool = True): """ Tags the input_file with xml loaded from tags_path, writing to output_file. Args: tags_path (str): The path to the .xml file containing tags to add. input_file (str): The path to the input file. output_file (str): The path to the output file where the tagged input_file will be written to. raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. Returns: status (int): An integer indicating the file process status. Raises: TypeError: If any arguments are of incorrect type. NotImplementedError: If raise_unsupported is True and the status of calling GWSecuTag_TagFile is not 1 (success) """ # Validate arg types if not isinstance(tags_path, str): raise TypeError(tags_path) elif not os.path.isfile(tags_path): raise FileNotFoundError(tags_path) if not isinstance(input_file, str): raise TypeError(input_file) elif not os.path.isfile(input_file): raise FileNotFoundError(input_file) if not isinstance(output_file, str): raise TypeError(output_file) if not isinstance(raise_unsupported, bool): raise TypeError(raise_unsupported) # Convert paths to absolute paths tags_path = os.path.abspath(tags_path) input_file = os.path.abspath(input_file) output_file = os.path.abspath(output_file) # Make output directory if it does not exist os.makedirs(os.path.dirname(output_file), exist_ok=True) with utils.CwdHandler(self.library_path): # API function declaration self.library.GWSecuTag_TagFile.argtypes = [ct.c_char_p] # Variable initialisation ct_tags_path = ct.c_char_p(tags_path.encode("utf-8")) ct_input_file = ct.c_char_p(input_file.encode("utf-8")) ct_output_file = ct.c_char_p(output_file.encode("utf-8")) # API call status = self.library.GWSecuTag_TagFile( ct_input_file, ct_tags_path, ct_output_file, ) if status not in successes.success_codes: log.error(f"\n\tinput_file: {input_file}\n\toutput_file: {output_file}\n\tstatus: {status}\n\ttags_path: {tags_path}") if raise_unsupported: raise errors.error_codes.get(status, errors.UnknownErrorCode)(status) else: log.debug(f"\n\tinput_file: {input_file}\n\toutput_file: {output_file}\n\tstatus: {status}\n\ttags_path: {tags_path}") # TODO remove, temp fix - check if tags are retrievable, if not delete the file just created by glasswall # if status not in successes.success_codes: # status is currently incorrect in the library if not os.path.isfile(output_file): log.error(f"\n\toutput file does not exist: {output_file}") status = "OUTPUT_NOT_CREATED" elif os.path.isfile(output_file): with utils.TempFilePath() as temp_file: self.retrieve_tags( input_file=output_file, output_file=temp_file, raise_unsupported=False ) try: dict_ = utils.xml_as_dict(temp_file) except ValueError: dict_ = {} if not dict_: os.remove(output_file) log.debug(f"\n\tunable to retrieve tags, deleted output file\n\toutput_file: {output_file}\n\t") status = "OUTPUT_NOT_RETRIEVABLE" return status def tag_directory(self, tags_path: str, input_directory: str, output_directory: str, raise_unsupported: bool = True): """ Tags all files in input_directory with the xml loaded from tags_path, writing to output_directory and maintaining the same directory structure. Args: tags_path (str): The path to the .xml file containing tags to add. input_directory (str): The path to the input directory. output_directory (str): The path to the output directory where the tagged files will be written to. raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. Returns: status (int): An integer indicating the file process status. Raises: TypeError: If any arguments are of incorrect type. NotImplementedError: If raise_unsupported is True and the status of calling GWSecuTag_TagFile is not 1 (success) """ # Validate arg types if not isinstance(tags_path, str): raise TypeError(tags_path) elif not os.path.isfile(tags_path): raise FileNotFoundError(tags_path) if not isinstance(input_directory, str): raise TypeError(input_directory) elif not os.path.isdir(input_directory): raise NotADirectoryError(input_directory) if not isinstance(output_directory, str): raise TypeError(output_directory) if not isinstance(raise_unsupported, bool): raise TypeError(raise_unsupported) for relative_path in utils.list_file_paths(input_directory, absolute=False): # construct absolute paths input_file = os.path.abspath(os.path.join(input_directory, relative_path)) output_file = os.path.abspath(os.path.join(output_directory, relative_path)) # call tag_file on each file in input to output self.tag_file( tags_path=tags_path, input_file=input_file, output_file=output_file, raise_unsupported=raise_unsupported, ) utils.delete_empty_subdirectories(output_directory) def retrieve_tags(self, input_file: str, output_file: str, raise_unsupported=True): """ Retrieves the xml tags of the input_file and writes it to output_file. Args: input_file (str): The path to the input file. output_file (str): The path to the output file where the xml tags will be written to. raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. Returns: status (int): An integer indicating the file process status. Raises: TypeError: If any arguments are of incorrect type. NotImplementedError: If raise_unsupported is True and the status of calling GWSecuTag_RetrieveTagFile is not 1 (success) """ if not isinstance(input_file, str): raise TypeError(input_file) elif not os.path.isfile(input_file): raise FileNotFoundError(input_file) if not isinstance(output_file, str): raise TypeError(output_file) # Convert paths to absolute paths input_file = os.path.abspath(input_file) output_file = os.path.abspath(output_file) # Make output directory if it does not exist os.makedirs(os.path.dirname(output_file), exist_ok=True) log.debug(f"Attempting {sys._getframe().f_code.co_name}:\n\tinput_file: {input_file}\n\toutput_file: {output_file}") with utils.CwdHandler(self.library_path): # API function declaration self.library.GWSecuTag_RetrieveTagFile.argtypes = [ct.c_char_p] # Variable initialisation ct_input_file = ct.c_char_p(input_file.encode("utf-8")) ct_output_file = ct.c_char_p(output_file.encode("utf-8")) # API call status = self.library.GWSecuTag_RetrieveTagFile( ct_input_file, ct_output_file, ) if status not in successes.success_codes: log.error(f"\n\tinput_file: {input_file}\n\toutput_file: {output_file}\n\tstatus: {status}") if raise_unsupported: raise errors.error_codes.get(status, errors.UnknownErrorCode)(status) else: log.debug(f"\n\tinput_file: {input_file}\n\toutput_file: {output_file}\n\tstatus: {status}") return status def retrieve_tags_directory(self, input_directory: str, output_directory: str, raise_unsupported=True): """ Retrieves all tags from files in input_directory, writing XML to output_directory and maintaining the same directory structure. Args: input_directory (str): The path to the input directory. output_directory (str): The path to the output directory where the tagged files will be written to. raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. Returns: status (int): The status of the function call. Raises: TypeError: If any arguments are of incorrect type. NotImplementedError: If raise_unsupported is True and the status of calling GWSecuTag_RetrieveTagFile is not 1 (success) """ # Validate arg types if not isinstance(input_directory, str): raise TypeError(input_directory) elif not os.path.isdir(input_directory): raise NotADirectoryError(input_directory) if not isinstance(output_directory, str): raise TypeError(output_directory) if not isinstance(raise_unsupported, bool): raise TypeError(raise_unsupported) for relative_path in utils.list_file_paths(input_directory, absolute=False): # construct absolute paths input_file = os.path.abspath(os.path.join(input_directory, relative_path)) output_file = os.path.abspath(os.path.join(output_directory, relative_path + ".xml")) # call retrieve_tags on each file in input to output self.retrieve_tags( input_file=input_file, output_file=output_file, raise_unsupported=raise_unsupported, ) utils.delete_empty_subdirectories(output_directory)
Ancestors
Methods
-
Retrieves the xml tags of the input_file and writes it to output_file.
Args
input_file
:str
- The path to the input file.
output_file
:str
- The path to the output file where the xml tags will be written to.
raise_unsupported
:bool
, optional- Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
Returns
status (int): An integer indicating the file process status.
Raises
TypeError
- If any arguments are of incorrect type.
NotImplementedError
- If raise_unsupported is True and the status of calling GWSecuTag_RetrieveTagFile is not 1 (success)
Expand source code
def retrieve_tags(self, input_file: str, output_file: str, raise_unsupported=True): """ Retrieves the xml tags of the input_file and writes it to output_file. Args: input_file (str): The path to the input file. output_file (str): The path to the output file where the xml tags will be written to. raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. Returns: status (int): An integer indicating the file process status. Raises: TypeError: If any arguments are of incorrect type. NotImplementedError: If raise_unsupported is True and the status of calling GWSecuTag_RetrieveTagFile is not 1 (success) """ if not isinstance(input_file, str): raise TypeError(input_file) elif not os.path.isfile(input_file): raise FileNotFoundError(input_file) if not isinstance(output_file, str): raise TypeError(output_file) # Convert paths to absolute paths input_file = os.path.abspath(input_file) output_file = os.path.abspath(output_file) # Make output directory if it does not exist os.makedirs(os.path.dirname(output_file), exist_ok=True) log.debug(f"Attempting {sys._getframe().f_code.co_name}:\n\tinput_file: {input_file}\n\toutput_file: {output_file}") with utils.CwdHandler(self.library_path): # API function declaration self.library.GWSecuTag_RetrieveTagFile.argtypes = [ct.c_char_p] # Variable initialisation ct_input_file = ct.c_char_p(input_file.encode("utf-8")) ct_output_file = ct.c_char_p(output_file.encode("utf-8")) # API call status = self.library.GWSecuTag_RetrieveTagFile( ct_input_file, ct_output_file, ) if status not in successes.success_codes: log.error(f"\n\tinput_file: {input_file}\n\toutput_file: {output_file}\n\tstatus: {status}") if raise_unsupported: raise errors.error_codes.get(status, errors.UnknownErrorCode)(status) else: log.debug(f"\n\tinput_file: {input_file}\n\toutput_file: {output_file}\n\tstatus: {status}") return status
-
Retrieves all tags from files in input_directory, writing XML to output_directory and maintaining the same directory structure.
Args
input_directory
:str
- The path to the input directory.
output_directory
:str
- The path to the output directory where the tagged files will be written to.
raise_unsupported
:bool
, optional- Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
Returns
status (int): The status of the function call.
Raises
TypeError
- If any arguments are of incorrect type.
NotImplementedError
- If raise_unsupported is True and the status of calling GWSecuTag_RetrieveTagFile is not 1 (success)
Expand source code
def retrieve_tags_directory(self, input_directory: str, output_directory: str, raise_unsupported=True): """ Retrieves all tags from files in input_directory, writing XML to output_directory and maintaining the same directory structure. Args: input_directory (str): The path to the input directory. output_directory (str): The path to the output directory where the tagged files will be written to. raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. Returns: status (int): The status of the function call. Raises: TypeError: If any arguments are of incorrect type. NotImplementedError: If raise_unsupported is True and the status of calling GWSecuTag_RetrieveTagFile is not 1 (success) """ # Validate arg types if not isinstance(input_directory, str): raise TypeError(input_directory) elif not os.path.isdir(input_directory): raise NotADirectoryError(input_directory) if not isinstance(output_directory, str): raise TypeError(output_directory) if not isinstance(raise_unsupported, bool): raise TypeError(raise_unsupported) for relative_path in utils.list_file_paths(input_directory, absolute=False): # construct absolute paths input_file = os.path.abspath(os.path.join(input_directory, relative_path)) output_file = os.path.abspath(os.path.join(output_directory, relative_path + ".xml")) # call retrieve_tags on each file in input to output self.retrieve_tags( input_file=input_file, output_file=output_file, raise_unsupported=raise_unsupported, ) utils.delete_empty_subdirectories(output_directory)
def tag_directory(self, tags_path: str, input_directory: str, output_directory: str, raise_unsupported: bool = True)
-
Tags all files in input_directory with the xml loaded from tags_path, writing to output_directory and maintaining the same directory structure.
Args
tags_path
:str
- The path to the .xml file containing tags to add.
input_directory
:str
- The path to the input directory.
output_directory
:str
- The path to the output directory where the tagged files will be written to.
raise_unsupported
:bool
, optional- Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
Returns
status (int): An integer indicating the file process status.
Raises
TypeError
- If any arguments are of incorrect type.
NotImplementedError
- If raise_unsupported is True and the status of calling GWSecuTag_TagFile is not 1 (success)
Expand source code
def tag_directory(self, tags_path: str, input_directory: str, output_directory: str, raise_unsupported: bool = True): """ Tags all files in input_directory with the xml loaded from tags_path, writing to output_directory and maintaining the same directory structure. Args: tags_path (str): The path to the .xml file containing tags to add. input_directory (str): The path to the input directory. output_directory (str): The path to the output directory where the tagged files will be written to. raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. Returns: status (int): An integer indicating the file process status. Raises: TypeError: If any arguments are of incorrect type. NotImplementedError: If raise_unsupported is True and the status of calling GWSecuTag_TagFile is not 1 (success) """ # Validate arg types if not isinstance(tags_path, str): raise TypeError(tags_path) elif not os.path.isfile(tags_path): raise FileNotFoundError(tags_path) if not isinstance(input_directory, str): raise TypeError(input_directory) elif not os.path.isdir(input_directory): raise NotADirectoryError(input_directory) if not isinstance(output_directory, str): raise TypeError(output_directory) if not isinstance(raise_unsupported, bool): raise TypeError(raise_unsupported) for relative_path in utils.list_file_paths(input_directory, absolute=False): # construct absolute paths input_file = os.path.abspath(os.path.join(input_directory, relative_path)) output_file = os.path.abspath(os.path.join(output_directory, relative_path)) # call tag_file on each file in input to output self.tag_file( tags_path=tags_path, input_file=input_file, output_file=output_file, raise_unsupported=raise_unsupported, ) utils.delete_empty_subdirectories(output_directory)
def tag_file(self, tags_path: str, input_file: str, output_file: str, raise_unsupported: bool = True)
-
Tags the input_file with xml loaded from tags_path, writing to output_file.
Args
tags_path
:str
- The path to the .xml file containing tags to add.
input_file
:str
- The path to the input file.
output_file
:str
- The path to the output file where the tagged input_file will be written to.
raise_unsupported
:bool
, optional- Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
Returns
status (int): An integer indicating the file process status.
Raises
TypeError
- If any arguments are of incorrect type.
NotImplementedError
- If raise_unsupported is True and the status of calling GWSecuTag_TagFile is not 1 (success)
Expand source code
def tag_file(self, tags_path: str, input_file: str, output_file: str, raise_unsupported: bool = True): """ Tags the input_file with xml loaded from tags_path, writing to output_file. Args: tags_path (str): The path to the .xml file containing tags to add. input_file (str): The path to the input file. output_file (str): The path to the output file where the tagged input_file will be written to. raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. Returns: status (int): An integer indicating the file process status. Raises: TypeError: If any arguments are of incorrect type. NotImplementedError: If raise_unsupported is True and the status of calling GWSecuTag_TagFile is not 1 (success) """ # Validate arg types if not isinstance(tags_path, str): raise TypeError(tags_path) elif not os.path.isfile(tags_path): raise FileNotFoundError(tags_path) if not isinstance(input_file, str): raise TypeError(input_file) elif not os.path.isfile(input_file): raise FileNotFoundError(input_file) if not isinstance(output_file, str): raise TypeError(output_file) if not isinstance(raise_unsupported, bool): raise TypeError(raise_unsupported) # Convert paths to absolute paths tags_path = os.path.abspath(tags_path) input_file = os.path.abspath(input_file) output_file = os.path.abspath(output_file) # Make output directory if it does not exist os.makedirs(os.path.dirname(output_file), exist_ok=True) with utils.CwdHandler(self.library_path): # API function declaration self.library.GWSecuTag_TagFile.argtypes = [ct.c_char_p] # Variable initialisation ct_tags_path = ct.c_char_p(tags_path.encode("utf-8")) ct_input_file = ct.c_char_p(input_file.encode("utf-8")) ct_output_file = ct.c_char_p(output_file.encode("utf-8")) # API call status = self.library.GWSecuTag_TagFile( ct_input_file, ct_tags_path, ct_output_file, ) if status not in successes.success_codes: log.error(f"\n\tinput_file: {input_file}\n\toutput_file: {output_file}\n\tstatus: {status}\n\ttags_path: {tags_path}") if raise_unsupported: raise errors.error_codes.get(status, errors.UnknownErrorCode)(status) else: log.debug(f"\n\tinput_file: {input_file}\n\toutput_file: {output_file}\n\tstatus: {status}\n\ttags_path: {tags_path}") # TODO remove, temp fix - check if tags are retrievable, if not delete the file just created by glasswall # if status not in successes.success_codes: # status is currently incorrect in the library if not os.path.isfile(output_file): log.error(f"\n\toutput file does not exist: {output_file}") status = "OUTPUT_NOT_CREATED" elif os.path.isfile(output_file): with utils.TempFilePath() as temp_file: self.retrieve_tags( input_file=output_file, output_file=temp_file, raise_unsupported=False ) try: dict_ = utils.xml_as_dict(temp_file) except ValueError: dict_ = {} if not dict_: os.remove(output_file) log.debug(f"\n\tunable to retrieve tags, deleted output file\n\toutput_file: {output_file}\n\t") status = "OUTPUT_NOT_RETRIEVABLE" return status
def version(self)
-
Returns the Glasswall library version.
Returns
version (str): The Glasswall library version.
Expand source code
def version(self): """ Returns the Glasswall library version. Returns: version (str): The Glasswall library version. """ # TODO security tagging currently has no version function return "NOT_IMPLEMENTED"
-