Module glasswall.libraries.rebuild.rebuild
Expand source code
import ctypes as ct
import io
import os
from typing import Union
import glasswall
from glasswall import determine_file_type as dft
from glasswall import utils
from glasswall.config.logging import log
from glasswall.libraries.library import Library
from glasswall.libraries.rebuild import errors, successes
class Rebuild(Library):
""" A high level Python wrapper for Glasswall Rebuild / Classic. """
def __init__(self, library_path: str):
super().__init__(library_path=library_path)
self.library = self.load_library(os.path.abspath(library_path))
# Set content management configuration to default
self.set_content_management_policy(input_file=None)
# Validate killswitch has not activated
self.validate_license()
log.info(f"Loaded Glasswall {self.__class__.__name__} version {self.version()} from {self.library_path}")
def validate_license(self):
""" Validates the license of the library by attempting to call protect_file on a known supported file.
Raises:
RebuildError: If the license could not be validated.
"""
# Call protect file on a known good bitmap to see if license has expired
try:
self.protect_file(
input_file=b"BM:\x00\x00\x00\x00\x00\x00\x006\x00\x00\x00(\x00\x00\x00\x01\x00\x00\x00\x01\x00\x00\x00\x01\x00\x18\x00\x00\x00\x00\x00\x04\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xff\xff\xff\x00",
raise_unsupported=True
)
log.debug(f"{self.__class__.__name__} license validated successfully.")
except errors.RebuildError:
log.error(f"{self.__class__.__name__} license validation failed.")
raise
def version(self):
""" Returns the Glasswall library version.
Returns:
version (str): The Glasswall library version.
"""
# Declare the return type
self.library.GWFileVersion.restype = ct.c_wchar_p
# API call
version = self.library.GWFileVersion()
return version
def determine_file_type(self, input_file: Union[str, bytes, bytearray, io.BytesIO], as_string: bool = False):
""" Returns an int representing the file type / file format of a file.
Args:
input_file (Union[str, bytes, bytearray, io.BytesIO]): The input file, can be a local path.
as_string (bool, optional): Return file type as string, eg: "bmp" instead of: 29. Defaults to False.
Returns:
file_type (Union[int, str]): The file format.
"""
if isinstance(input_file, str):
if not os.path.isfile(input_file):
raise FileNotFoundError(input_file)
self.library.GWDetermineFileTypeFromFile.argtypes = [ct.c_wchar_p]
self.library.GWDetermineFileTypeFromFile.restype = ct.c_int
# convert to ct.c_wchar_p
ct_input_file = ct.c_wchar_p(input_file)
# API call
file_type = self.library.GWDetermineFileTypeFromFile(ct_input_file)
elif isinstance(input_file, (bytes, bytearray, io.BytesIO)):
self.library.GWDetermineFileTypeFromFileInMem.argtypes = [ct.c_char_p, ct.c_size_t]
self.library.GWDetermineFileTypeFromFileInMem.restype = ct.c_int
# convert to bytes
bytes_input_file = utils.as_bytes(input_file)
# ctypes conversion
ct_input_buffer = ct.c_char_p(bytes_input_file)
ct_butter_length = ct.c_size_t(len(bytes_input_file))
# API call
file_type = self.library.GWDetermineFileTypeFromFileInMem(
ct_input_buffer,
ct_butter_length
)
file_type_as_string = dft.file_type_int_to_str(file_type)
input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray,)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file
if not dft.is_success(file_type):
log.warning(f"\n\tfile_type: {file_type}\n\tfile_type_as_string: {file_type_as_string}\n\tinput_file: {input_file_repr}")
raise dft.int_class_map.get(file_type, dft.errors.UnknownErrorCode)(file_type)
else:
log.debug(f"\n\tfile_type: {file_type}\n\tfile_type_as_string: {file_type_as_string}\n\tinput_file: {input_file_repr}")
if as_string:
return file_type_as_string
return file_type
def get_content_management_policy(self):
""" Gets the current content management configuration.
Returns:
xml_string (str): The XML string of the current content management configuration.
"""
# Declare argument types
self.library.GWFileConfigGet.argtypes = [
ct.POINTER(ct.POINTER(ct.c_wchar)),
ct.POINTER(ct.c_size_t)
]
# Variable initialisation
ct_input_buffer = ct.POINTER(ct.c_wchar)()
ct_input_size = ct.c_size_t(0)
# API call
status = self.library.GWFileConfigGet(
ct.byref(ct_input_buffer),
ct.byref(ct_input_size)
)
if status not in successes.success_codes:
log.error(f"\n\tstatus: {status}\n\tGWFileErrorMsg: {self.GWFileErrorMsg()}")
raise errors.error_codes.get(status, errors.UnknownErrorCode)(status)
else:
log.debug(f"\n\tstatus: {status}")
# As string
xml_string = utils.validate_xml(ct.wstring_at(ct_input_buffer))
return xml_string
def set_content_management_policy(self, input_file: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None):
""" Sets the content management policy configuration. If input_file is None then default settings (sanitise) are applied.
Args:
input_file (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): Default None (sanitise). The content management policy to apply.
Returns:
status (int): The result of the Glasswall API call.
"""
# Validate type
if not isinstance(input_file, (type(None), str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)):
raise TypeError(input_file)
# self.library.GWFileConfigRevertToDefaults doesn't work, load default instead
# Set input_file to default if input_file is None
if input_file is None:
input_file = glasswall.content_management.policies.Rebuild(default="sanitise")
# Validate xml content is parsable
xml_string = utils.validate_xml(input_file)
# Declare argument types
self.library.GWFileConfigXML.argtypes = [ct.c_wchar_p]
# API call
status = self.library.GWFileConfigXML(
ct.c_wchar_p(xml_string)
)
if status not in successes.success_codes:
log.error(f"\n\tstatus: {status}\n\tGWFileErrorMsg: {self.GWFileErrorMsg()}")
raise errors.error_codes.get(status, errors.UnknownErrorCode)(status)
else:
log.debug(f"\n\tstatus: {status}")
return status
def protect_file(self, input_file: Union[str, bytes, bytearray, io.BytesIO], output_file: Union[None, str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, raise_unsupported: bool = True):
""" Protects a file using the current content management configuration, returning the file bytes. The protected file is written to output_file if it is provided.
Args:
input_file (Union[str, bytes, bytearray, io.BytesIO]): The input file path or bytes.
output_file (Union[None, str], optional): The output file path where the protected file will be written.
content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): The content management policy to apply.
raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
Returns:
file_bytes (bytes): The protected file bytes.
"""
# Validate arg types
if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO)):
raise TypeError(input_file)
if not isinstance(output_file, (type(None), str)):
raise TypeError(output_file)
if not isinstance(content_management_policy, (type(None), str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)):
raise TypeError(content_management_policy)
if not isinstance(raise_unsupported, bool):
raise TypeError(raise_unsupported)
# Convert string path arguments to absolute paths
if isinstance(input_file, str):
if not os.path.isfile(input_file):
raise FileNotFoundError(input_file)
input_file = os.path.abspath(input_file)
if isinstance(output_file, str):
output_file = os.path.abspath(output_file)
# make directories that do not exist
os.makedirs(os.path.dirname(output_file), exist_ok=True)
if isinstance(content_management_policy, str) and os.path.isfile(content_management_policy):
content_management_policy = os.path.abspath(content_management_policy)
# Convert memory inputs to bytes
if isinstance(input_file, (bytes, bytearray, io.BytesIO)):
input_file = utils.as_bytes(input_file)
# Check that file type is supported
try:
file_type = self.determine_file_type(input_file=input_file)
except dft.errors.FileTypeEnumError:
if raise_unsupported:
raise
else:
return None
with utils.CwdHandler(self.library_path):
# Set content management policy
self.set_content_management_policy(content_management_policy)
# file to file
if isinstance(input_file, str) and isinstance(output_file, str):
# API function declaration
self.library.GWFileToFileProtect.argtypes = [
ct.c_wchar_p,
ct.c_wchar_p,
ct.c_wchar_p
]
# Variable initialisation
ct_input_file = ct.c_wchar_p(input_file)
ct_file_type = ct.c_wchar_p(dft.file_type_int_to_str(file_type))
ct_output_file = ct.c_wchar_p(output_file)
# API call
status = self.library.GWFileToFileProtect(
ct_input_file,
ct_file_type,
ct_output_file
)
# file to memory
elif isinstance(input_file, str) and output_file is None:
# API function declaration
self.library.GWFileProtect.argtypes = [
ct.c_wchar_p,
ct.c_wchar_p,
ct.POINTER(ct.c_void_p),
ct.POINTER(ct.c_size_t)
]
# Variable initialisation
ct_input_file = ct.c_wchar_p(input_file)
ct_file_type = ct.c_wchar_p(dft.file_type_int_to_str(file_type))
ct_output_buffer = ct.c_void_p(0)
ct_output_size = ct.c_size_t(0)
# API call
status = self.library.GWFileProtect(
ct_input_file,
ct_file_type,
ct.byref(ct_output_buffer),
ct.byref(ct_output_size)
)
# memory to memory and memory to file
elif isinstance(input_file, bytes):
# API function declaration
self.library.GWMemoryToMemoryProtect.argtypes = [
ct.c_void_p,
ct.c_size_t,
ct.c_wchar_p,
ct.POINTER(ct.c_void_p),
ct.POINTER(ct.c_size_t)
]
# Variable initialization
bytearray_buffer = bytearray(input_file)
ct_input_buffer = (ct.c_ubyte * len(bytearray_buffer)).from_buffer(bytearray_buffer)
ct_input_size = ct.c_size_t(len(input_file))
ct_file_type = ct.c_wchar_p(dft.file_type_int_to_str(file_type))
ct_output_buffer = ct.c_void_p(0)
ct_output_size = ct.c_size_t(0)
status = self.library.GWMemoryToMemoryProtect(
ct_input_buffer,
ct_input_size,
ct_file_type,
ct.byref(ct_output_buffer),
ct.byref(ct_output_size)
)
input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray,)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file
if status not in successes.success_codes:
log.error(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {status}\n\tGWFileErrorMsg: {self.GWFileErrorMsg()}")
if raise_unsupported:
raise errors.error_codes.get(status, errors.UnknownErrorCode)(status)
else:
file_bytes = None
else:
log.debug(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {status}")
if isinstance(input_file, str) and isinstance(output_file, str):
# file to file, read the bytes of the file that Rebuild has already written
if not os.path.isfile(output_file):
log.error(f"Rebuild returned success code: {status} but no output file was found: {output_file}")
file_bytes = None
else:
with open(output_file, "rb") as f:
file_bytes = f.read()
else:
# file to memory, memory to memory
file_bytes = utils.buffer_to_bytes(
ct_output_buffer,
ct_output_size
)
if isinstance(output_file, str):
# memory to file
# no Rebuild function exists for memory to file, write the memory to file ourselves
with open(output_file, "wb") as f:
f.write(file_bytes)
return file_bytes
def protect_directory(self, input_directory: str, output_directory: Union[None, str], content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, raise_unsupported: bool = True):
""" Recursively processes all files in a directory in protect mode using the given content management policy.
The protected files are written to output_directory maintaining the same directory structure as input_directory.
Args:
input_directory (str): The input directory containing files to protect.
output_directory (Union[None, str]): The output directory where the protected file will be written, or None to not write files.
content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): Default None (sanitise). The content management policy to apply.
raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
Returns:
protected_files_dict (dict): A dictionary of file paths relative to input_directory, and file bytes.
"""
protected_files_dict = {}
# Call protect_file on each file in input_directory to output_directory
for input_file in utils.list_file_paths(input_directory):
relative_path = os.path.relpath(input_file, input_directory)
output_file = None if output_directory is None else os.path.join(os.path.abspath(output_directory), relative_path)
protected_bytes = self.protect_file(
input_file=input_file,
output_file=output_file,
raise_unsupported=raise_unsupported,
content_management_policy=content_management_policy,
)
protected_files_dict[relative_path] = protected_bytes
return protected_files_dict
def analyse_file(self, input_file: Union[str, bytes, bytearray, io.BytesIO], output_file: Union[None, str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, raise_unsupported: bool = True):
""" Analyses a file, returning the analysis bytes. The analysis is written to output_file if it is provided.
Args:
input_file (Union[str, bytes, bytearray, io.BytesIO]): The input file path or bytes.
output_file (Union[None, str], optional): The output file path where the analysis file will be written.
content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): The content management policy to apply.
raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
Returns:
file_bytes (bytes): The analysis file bytes.
"""
# Validate arg types
if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO)):
raise TypeError(input_file)
if not isinstance(output_file, (type(None), str)):
raise TypeError(output_file)
if not isinstance(content_management_policy, (type(None), str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)):
raise TypeError(content_management_policy)
if not isinstance(raise_unsupported, bool):
raise TypeError(raise_unsupported)
# Convert string path arguments to absolute paths
if isinstance(input_file, str):
if not os.path.isfile(input_file):
raise FileNotFoundError(input_file)
input_file = os.path.abspath(input_file)
if isinstance(output_file, str):
output_file = os.path.abspath(output_file)
# make directories that do not exist
os.makedirs(os.path.dirname(output_file), exist_ok=True)
if isinstance(content_management_policy, str) and os.path.isfile(content_management_policy):
content_management_policy = os.path.abspath(content_management_policy)
# Convert memory inputs to bytes
if isinstance(input_file, (bytes, bytearray, io.BytesIO)):
input_file = utils.as_bytes(input_file)
# Check that file type is supported
try:
file_type = self.determine_file_type(input_file=input_file)
except dft.errors.FileTypeEnumError:
if raise_unsupported:
raise
else:
return None
with utils.CwdHandler(self.library_path):
# Set content management policy
self.set_content_management_policy(content_management_policy)
# file to file
if isinstance(input_file, str) and isinstance(output_file, str):
# API function declaration
self.library.GWFileToFileAnalysisAudit.argtypes = [
ct.c_wchar_p,
ct.c_wchar_p,
ct.c_wchar_p
]
# Variable initialisation
ct_input_file = ct.c_wchar_p(input_file)
ct_file_type = ct.c_wchar_p(dft.file_type_int_to_str(file_type))
ct_output_file = ct.c_wchar_p(output_file)
# API call
status = self.library.GWFileToFileAnalysisAudit(
ct_input_file,
ct_file_type,
ct_output_file
)
# file to memory
elif isinstance(input_file, str) and output_file is None:
# API function declaration
self.library.GWFileAnalysisAudit.argtypes = [
ct.c_wchar_p,
ct.c_wchar_p,
ct.POINTER(ct.c_void_p),
ct.POINTER(ct.c_size_t)
]
# Variable initialisation
ct_input_file = ct.c_wchar_p(input_file)
ct_file_type = ct.c_wchar_p(dft.file_type_int_to_str(file_type))
ct_output_buffer = ct.c_void_p(0)
ct_output_size = ct.c_size_t(0)
# API call
status = self.library.GWFileAnalysisAudit(
ct_input_file,
ct_file_type,
ct.byref(ct_output_buffer),
ct.byref(ct_output_size)
)
# memory to memory and memory to file
elif isinstance(input_file, bytes):
# API function declaration
self.library.GWMemoryToMemoryAnalysisAudit.argtypes = [
ct.c_void_p,
ct.c_size_t,
ct.c_wchar_p,
ct.POINTER(ct.c_void_p),
ct.POINTER(ct.c_size_t)
]
# Variable initialization
bytearray_buffer = bytearray(input_file)
ct_input_buffer = (ct.c_ubyte * len(bytearray_buffer)).from_buffer(bytearray_buffer)
ct_input_size = ct.c_size_t(len(input_file))
ct_file_type = ct.c_wchar_p(dft.file_type_int_to_str(file_type))
ct_output_buffer = ct.c_void_p(0)
ct_output_size = ct.c_size_t(0)
status = self.library.GWMemoryToMemoryAnalysisAudit(
ct.byref(ct_input_buffer),
ct_input_size,
ct_file_type,
ct.byref(ct_output_buffer),
ct.byref(ct_output_size)
)
input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray,)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file
if status not in successes.success_codes:
log.error(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {status}\n\tGWFileErrorMsg: {self.GWFileErrorMsg()}")
if raise_unsupported:
raise errors.error_codes.get(status, errors.UnknownErrorCode)(status)
else:
file_bytes = None
else:
log.debug(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {status}")
if isinstance(input_file, str) and isinstance(output_file, str):
# file to file, read the bytes of the file that Rebuild has already written
if not os.path.isfile(output_file):
log.error(f"Rebuild returned success code: {status} but no output file was found: {output_file}")
file_bytes = None
else:
with open(output_file, "rb") as f:
file_bytes = f.read()
else:
# file to memory, memory to memory
file_bytes = utils.buffer_to_bytes(
ct_output_buffer,
ct_output_size
)
if isinstance(output_file, str):
# memory to file
# no Rebuild function exists for memory to file, write the memory to file ourselves
with open(output_file, "wb") as f:
f.write(file_bytes)
return file_bytes
def analyse_directory(self, input_directory: str, output_directory: Union[None, str], content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, raise_unsupported: bool = True):
""" Analyses all files in a directory and its subdirectories. The analysis files are written to output_directory maintaining the same directory structure as input_directory.
Args:
input_directory (str): The input directory containing files to analyse.
output_directory (Union[None, str]): The output directory where the analysis files will be written, or None to not write files.
content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): Default None (sanitise). The content management policy to apply.
raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
Returns:
analysis_files_dict (dict): A dictionary of file paths relative to input_directory, and file bytes.
"""
analysis_files_dict = {}
# Call analyse_file on each file in input_directory to output_directory
for input_file in utils.list_file_paths(input_directory):
relative_path = os.path.relpath(input_file, input_directory) + ".xml"
output_file = None if output_directory is None else os.path.join(os.path.abspath(output_directory), relative_path)
analysis_bytes = self.analyse_file(
input_file=input_file,
output_file=output_file,
raise_unsupported=raise_unsupported,
content_management_policy=content_management_policy,
)
analysis_files_dict[relative_path] = analysis_bytes
return analysis_files_dict
def export_file(self, input_file: Union[str, bytes, bytearray, io.BytesIO], output_file: Union[None, str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, raise_unsupported: bool = True):
""" Export a file, returning the .zip file bytes. The .zip file is written to output_file.
Args:
input_file (Union[str, bytes, bytearray, io.BytesIO]): The input file path or bytes.
output_file (Union[None, str], optional): The output file path where the .zip file will be written.
content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): The content management policy to apply.
raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
Returns:
file_bytes (bytes): The exported .zip file.
"""
# Validate arg types
if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO)):
raise TypeError(input_file)
if not isinstance(output_file, (type(None), str)):
raise TypeError(output_file)
if not isinstance(content_management_policy, (type(None), str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)):
raise TypeError(content_management_policy)
if not isinstance(raise_unsupported, bool):
raise TypeError(raise_unsupported)
# Convert string path arguments to absolute paths
if isinstance(input_file, str):
if not os.path.isfile(input_file):
raise FileNotFoundError(input_file)
input_file = os.path.abspath(input_file)
if isinstance(output_file, str):
output_file = os.path.abspath(output_file)
# make directories that do not exist
os.makedirs(os.path.dirname(output_file), exist_ok=True)
if isinstance(content_management_policy, str) and os.path.isfile(content_management_policy):
content_management_policy = os.path.abspath(content_management_policy)
# Convert memory inputs to bytes
if isinstance(input_file, (bytes, bytearray, io.BytesIO)):
input_file = utils.as_bytes(input_file)
# Check that file type is supported
try:
self.determine_file_type(input_file=input_file)
except dft.errors.FileTypeEnumError:
if raise_unsupported:
raise
else:
return None
with utils.CwdHandler(self.library_path):
# Set content management policy
self.set_content_management_policy(content_management_policy)
# file to file
if isinstance(input_file, str) and isinstance(output_file, str):
# API function declaration
self.library.GWFileToFileAnalysisProtectAndExport.argtypes = [
ct.c_wchar_p,
ct.c_wchar_p
]
# Variable initialisation
ct_input_file = ct.c_wchar_p(input_file)
ct_output_file = ct.c_wchar_p(output_file)
# API call
status = self.library.GWFileToFileAnalysisProtectAndExport(
ct_input_file,
ct_output_file
)
# file to memory
elif isinstance(input_file, str) and output_file is None:
# API function declaration
self.library.GWFileToMemoryAnalysisProtectAndExport.argtypes = [
ct.c_wchar_p,
ct.POINTER(ct.c_void_p),
ct.POINTER(ct.c_size_t)
]
# Variable initialisation
ct_input_file = ct.c_wchar_p(input_file)
ct_output_buffer = ct.c_void_p(0)
ct_output_size = ct.c_size_t(0)
# API call
status = self.library.GWFileToMemoryAnalysisProtectAndExport(
ct_input_file,
ct.byref(ct_output_buffer),
ct.byref(ct_output_size)
)
# memory to memory and memory to file
elif isinstance(input_file, bytes):
# API function declaration
self.library.GWMemoryToMemoryAnalysisProtectAndExport.argtypes = [
ct.c_void_p,
ct.c_size_t,
ct.POINTER(ct.c_void_p),
ct.POINTER(ct.c_size_t)
]
# Variable initialization
bytearray_buffer = bytearray(input_file)
ct_input_buffer = (ct.c_ubyte * len(bytearray_buffer)).from_buffer(bytearray_buffer)
ct_input_size = ct.c_size_t(len(input_file))
ct_output_buffer = ct.c_void_p(0)
ct_output_size = ct.c_size_t(0)
status = self.library.GWMemoryToMemoryAnalysisProtectAndExport(
ct_input_buffer,
ct_input_size,
ct.byref(ct_output_buffer),
ct.byref(ct_output_size)
)
input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray,)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file
if status not in successes.success_codes:
log.error(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {status}\n\tGWFileErrorMsg: {self.GWFileErrorMsg()}")
if raise_unsupported:
raise errors.error_codes.get(status, errors.UnknownErrorCode)(status)
else:
file_bytes = None
else:
log.debug(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {status}")
if isinstance(input_file, str) and isinstance(output_file, str):
# file to file, read the bytes of the file that Rebuild has already written
if not os.path.isfile(output_file):
log.error(f"Rebuild returned success code: {status} but no output file was found: {output_file}")
file_bytes = None
else:
with open(output_file, "rb") as f:
file_bytes = f.read()
else:
# file to memory, memory to memory
file_bytes = utils.buffer_to_bytes(
ct_output_buffer,
ct_output_size
)
if isinstance(output_file, str):
# memory to file
# no Rebuild function exists for memory to file, write the memory to file ourselves
with open(output_file, "wb") as f:
f.write(file_bytes)
return file_bytes
def export_directory(self, input_directory: str, output_directory: Union[None, str], content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, raise_unsupported: bool = True):
""" Exports all files in a directory and its subdirectories. The export files are written to output_directory maintaining the same directory structure as input_directory.
Args:
input_directory (str): The input directory containing files to export.
output_directory (Union[None, str]): The output directory where the export files will be written, or None to not write files.
content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): Default None (sanitise). The content management policy to apply.
raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
Returns:
export_files_dict (dict): A dictionary of file paths relative to input_directory, and file bytes.
"""
export_files_dict = {}
# Call export_file on each file in input_directory to output_directory
for input_file in utils.list_file_paths(input_directory):
relative_path = os.path.relpath(input_file, input_directory) + ".zip"
output_file = None if output_directory is None else os.path.join(os.path.abspath(output_directory), relative_path)
export_bytes = self.export_file(
input_file=input_file,
output_file=output_file,
raise_unsupported=raise_unsupported,
content_management_policy=content_management_policy,
)
export_files_dict[relative_path] = export_bytes
return export_files_dict
def import_file(self, input_file: Union[str, bytes, bytearray, io.BytesIO], output_file: Union[None, str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, raise_unsupported: bool = True):
""" Import a .zip file, constructs a file from the .zip file and returns the file bytes. The file is written to output_file if it is provided.
Args:
input_file (Union[str, bytes, bytearray, io.BytesIO]): The .zip input file path or bytes.
output_file (Union[None, str], optional): The output file path where the constructed file will be written.
content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): The content management policy to apply to the session.
raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
Returns:
file_bytes (bytes): The imported file bytes.
"""
# Validate arg types
if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO)):
raise TypeError(input_file)
if not isinstance(output_file, (type(None), str)):
raise TypeError(output_file)
if not isinstance(content_management_policy, (type(None), str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)):
raise TypeError(content_management_policy)
if not isinstance(raise_unsupported, bool):
raise TypeError(raise_unsupported)
# Convert string path arguments to absolute paths
if isinstance(input_file, str):
if not os.path.isfile(input_file):
raise FileNotFoundError(input_file)
input_file = os.path.abspath(input_file)
if isinstance(output_file, str):
output_file = os.path.abspath(output_file)
# make directories that do not exist
os.makedirs(os.path.dirname(output_file), exist_ok=True)
if isinstance(content_management_policy, str) and os.path.isfile(content_management_policy):
content_management_policy = os.path.abspath(content_management_policy)
# Convert memory inputs to bytes
if isinstance(input_file, (bytes, bytearray, io.BytesIO)):
input_file = utils.as_bytes(input_file)
# Check that file type is supported
try:
self.determine_file_type(input_file=input_file)
except dft.errors.FileTypeEnumError:
if raise_unsupported:
raise
else:
return None
with utils.CwdHandler(self.library_path):
# Set content management policy
self.set_content_management_policy(content_management_policy)
# file to file
if isinstance(input_file, str) and isinstance(output_file, str):
# API function declaration
self.library.GWFileToFileProtectAndImport.argtypes = [
ct.c_wchar_p,
ct.c_wchar_p
]
# Variable initialisation
ct_input_file = ct.c_wchar_p(input_file)
ct_output_file = ct.c_wchar_p(output_file)
# API call
status = self.library.GWFileToFileProtectAndImport(
ct_input_file,
ct_output_file
)
# file to memory
elif isinstance(input_file, str) and output_file is None:
# API function declaration
self.library.GWFileToMemoryProtectAndImport.argtypes = [
ct.c_wchar_p,
ct.POINTER(ct.c_void_p),
ct.POINTER(ct.c_size_t)
]
# Variable initialisation
ct_input_file = ct.c_wchar_p(input_file)
ct_output_buffer = ct.c_void_p(0)
ct_output_size = ct.c_size_t(0)
# API call
status = self.library.GWFileToMemoryProtectAndImport(
ct_input_file,
ct.byref(ct_output_buffer),
ct.byref(ct_output_size)
)
# memory to memory and memory to file
elif isinstance(input_file, bytes):
# API function declaration
self.library.GWMemoryToMemoryProtectAndImport.argtypes = [
ct.c_void_p,
ct.c_size_t,
ct.POINTER(ct.c_void_p),
ct.POINTER(ct.c_size_t)
]
# Variable initialization
bytearray_buffer = bytearray(input_file)
ct_input_buffer = (ct.c_ubyte * len(bytearray_buffer)).from_buffer(bytearray_buffer)
ct_input_size = ct.c_size_t(len(input_file))
ct_output_buffer = ct.c_void_p(0)
ct_output_size = ct.c_size_t(0)
status = self.library.GWMemoryToMemoryProtectAndImport(
ct_input_buffer,
ct_input_size,
ct.byref(ct_output_buffer),
ct.byref(ct_output_size)
)
input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray,)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file
if status not in successes.success_codes:
log.error(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {status}\n\tGWFileErrorMsg: {self.GWFileErrorMsg()}")
if raise_unsupported:
raise errors.error_codes.get(status, errors.UnknownErrorCode)(status)
else:
file_bytes = None
else:
log.debug(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {status}")
if isinstance(input_file, str) and isinstance(output_file, str):
# file to file, read the bytes of the file that Rebuild has already written
if not os.path.isfile(output_file):
log.error(f"Rebuild returned success code: {status} but no output file was found: {output_file}")
file_bytes = None
else:
with open(output_file, "rb") as f:
file_bytes = f.read()
else:
# file to memory, memory to memory
file_bytes = utils.buffer_to_bytes(
ct_output_buffer,
ct_output_size
)
if isinstance(output_file, str):
# memory to file
# no Rebuild function exists for memory to file, write the memory to file ourselves
with open(output_file, "wb") as f:
f.write(file_bytes)
return file_bytes
def import_directory(self, input_directory: str, output_directory: Union[None, str], content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, raise_unsupported: bool = True):
""" Imports all files in a directory and its subdirectories. Files are expected as .zip but this is not forced.
The constructed files are written to output_directory maintaining the same directory structure as input_directory.
Args:
input_directory (str): The input directory containing files to import.
output_directory (Union[None, str]): The output directory where the constructed files will be written, or None to not write files.
content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): Default None (sanitise). The content management policy to apply.
raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
Returns:
import_files_dict (dict): A dictionary of file paths relative to input_directory, and file bytes.
"""
import_files_dict = {}
# Call import_file on each file in input_directory to output_directory
for input_file in utils.list_file_paths(input_directory):
relative_path = os.path.relpath(input_file, input_directory)
# Remove .zip extension from relative_path
relative_path = os.path.splitext(relative_path)[0]
output_file = None if output_directory is None else os.path.join(os.path.abspath(output_directory), relative_path)
import_bytes = self.import_file(
input_file=input_file,
output_file=output_file,
raise_unsupported=raise_unsupported,
content_management_policy=content_management_policy,
)
import_files_dict[relative_path] = import_bytes
return import_files_dict
def GWFileErrorMsg(self):
""" Retrieve the Glasswall Process error message.
Returns:
error_message (str): The Glasswall Process error message.
"""
# Declare the return type
self.library.GWFileErrorMsg.restype = ct.c_wchar_p
# API call
error_message = self.library.GWFileErrorMsg()
return error_message
Classes
class Rebuild (library_path: str)
-
A high level Python wrapper for Glasswall Rebuild / Classic.
Expand source code
class Rebuild(Library): """ A high level Python wrapper for Glasswall Rebuild / Classic. """ def __init__(self, library_path: str): super().__init__(library_path=library_path) self.library = self.load_library(os.path.abspath(library_path)) # Set content management configuration to default self.set_content_management_policy(input_file=None) # Validate killswitch has not activated self.validate_license() log.info(f"Loaded Glasswall {self.__class__.__name__} version {self.version()} from {self.library_path}") def validate_license(self): """ Validates the license of the library by attempting to call protect_file on a known supported file. Raises: RebuildError: If the license could not be validated. """ # Call protect file on a known good bitmap to see if license has expired try: self.protect_file( input_file=b"BM:\x00\x00\x00\x00\x00\x00\x006\x00\x00\x00(\x00\x00\x00\x01\x00\x00\x00\x01\x00\x00\x00\x01\x00\x18\x00\x00\x00\x00\x00\x04\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xff\xff\xff\x00", raise_unsupported=True ) log.debug(f"{self.__class__.__name__} license validated successfully.") except errors.RebuildError: log.error(f"{self.__class__.__name__} license validation failed.") raise def version(self): """ Returns the Glasswall library version. Returns: version (str): The Glasswall library version. """ # Declare the return type self.library.GWFileVersion.restype = ct.c_wchar_p # API call version = self.library.GWFileVersion() return version def determine_file_type(self, input_file: Union[str, bytes, bytearray, io.BytesIO], as_string: bool = False): """ Returns an int representing the file type / file format of a file. Args: input_file (Union[str, bytes, bytearray, io.BytesIO]): The input file, can be a local path. as_string (bool, optional): Return file type as string, eg: "bmp" instead of: 29. Defaults to False. Returns: file_type (Union[int, str]): The file format. """ if isinstance(input_file, str): if not os.path.isfile(input_file): raise FileNotFoundError(input_file) self.library.GWDetermineFileTypeFromFile.argtypes = [ct.c_wchar_p] self.library.GWDetermineFileTypeFromFile.restype = ct.c_int # convert to ct.c_wchar_p ct_input_file = ct.c_wchar_p(input_file) # API call file_type = self.library.GWDetermineFileTypeFromFile(ct_input_file) elif isinstance(input_file, (bytes, bytearray, io.BytesIO)): self.library.GWDetermineFileTypeFromFileInMem.argtypes = [ct.c_char_p, ct.c_size_t] self.library.GWDetermineFileTypeFromFileInMem.restype = ct.c_int # convert to bytes bytes_input_file = utils.as_bytes(input_file) # ctypes conversion ct_input_buffer = ct.c_char_p(bytes_input_file) ct_butter_length = ct.c_size_t(len(bytes_input_file)) # API call file_type = self.library.GWDetermineFileTypeFromFileInMem( ct_input_buffer, ct_butter_length ) file_type_as_string = dft.file_type_int_to_str(file_type) input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray,)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file if not dft.is_success(file_type): log.warning(f"\n\tfile_type: {file_type}\n\tfile_type_as_string: {file_type_as_string}\n\tinput_file: {input_file_repr}") raise dft.int_class_map.get(file_type, dft.errors.UnknownErrorCode)(file_type) else: log.debug(f"\n\tfile_type: {file_type}\n\tfile_type_as_string: {file_type_as_string}\n\tinput_file: {input_file_repr}") if as_string: return file_type_as_string return file_type def get_content_management_policy(self): """ Gets the current content management configuration. Returns: xml_string (str): The XML string of the current content management configuration. """ # Declare argument types self.library.GWFileConfigGet.argtypes = [ ct.POINTER(ct.POINTER(ct.c_wchar)), ct.POINTER(ct.c_size_t) ] # Variable initialisation ct_input_buffer = ct.POINTER(ct.c_wchar)() ct_input_size = ct.c_size_t(0) # API call status = self.library.GWFileConfigGet( ct.byref(ct_input_buffer), ct.byref(ct_input_size) ) if status not in successes.success_codes: log.error(f"\n\tstatus: {status}\n\tGWFileErrorMsg: {self.GWFileErrorMsg()}") raise errors.error_codes.get(status, errors.UnknownErrorCode)(status) else: log.debug(f"\n\tstatus: {status}") # As string xml_string = utils.validate_xml(ct.wstring_at(ct_input_buffer)) return xml_string def set_content_management_policy(self, input_file: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None): """ Sets the content management policy configuration. If input_file is None then default settings (sanitise) are applied. Args: input_file (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): Default None (sanitise). The content management policy to apply. Returns: status (int): The result of the Glasswall API call. """ # Validate type if not isinstance(input_file, (type(None), str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)): raise TypeError(input_file) # self.library.GWFileConfigRevertToDefaults doesn't work, load default instead # Set input_file to default if input_file is None if input_file is None: input_file = glasswall.content_management.policies.Rebuild(default="sanitise") # Validate xml content is parsable xml_string = utils.validate_xml(input_file) # Declare argument types self.library.GWFileConfigXML.argtypes = [ct.c_wchar_p] # API call status = self.library.GWFileConfigXML( ct.c_wchar_p(xml_string) ) if status not in successes.success_codes: log.error(f"\n\tstatus: {status}\n\tGWFileErrorMsg: {self.GWFileErrorMsg()}") raise errors.error_codes.get(status, errors.UnknownErrorCode)(status) else: log.debug(f"\n\tstatus: {status}") return status def protect_file(self, input_file: Union[str, bytes, bytearray, io.BytesIO], output_file: Union[None, str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, raise_unsupported: bool = True): """ Protects a file using the current content management configuration, returning the file bytes. The protected file is written to output_file if it is provided. Args: input_file (Union[str, bytes, bytearray, io.BytesIO]): The input file path or bytes. output_file (Union[None, str], optional): The output file path where the protected file will be written. content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): The content management policy to apply. raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. Returns: file_bytes (bytes): The protected file bytes. """ # Validate arg types if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO)): raise TypeError(input_file) if not isinstance(output_file, (type(None), str)): raise TypeError(output_file) if not isinstance(content_management_policy, (type(None), str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)): raise TypeError(content_management_policy) if not isinstance(raise_unsupported, bool): raise TypeError(raise_unsupported) # Convert string path arguments to absolute paths if isinstance(input_file, str): if not os.path.isfile(input_file): raise FileNotFoundError(input_file) input_file = os.path.abspath(input_file) if isinstance(output_file, str): output_file = os.path.abspath(output_file) # make directories that do not exist os.makedirs(os.path.dirname(output_file), exist_ok=True) if isinstance(content_management_policy, str) and os.path.isfile(content_management_policy): content_management_policy = os.path.abspath(content_management_policy) # Convert memory inputs to bytes if isinstance(input_file, (bytes, bytearray, io.BytesIO)): input_file = utils.as_bytes(input_file) # Check that file type is supported try: file_type = self.determine_file_type(input_file=input_file) except dft.errors.FileTypeEnumError: if raise_unsupported: raise else: return None with utils.CwdHandler(self.library_path): # Set content management policy self.set_content_management_policy(content_management_policy) # file to file if isinstance(input_file, str) and isinstance(output_file, str): # API function declaration self.library.GWFileToFileProtect.argtypes = [ ct.c_wchar_p, ct.c_wchar_p, ct.c_wchar_p ] # Variable initialisation ct_input_file = ct.c_wchar_p(input_file) ct_file_type = ct.c_wchar_p(dft.file_type_int_to_str(file_type)) ct_output_file = ct.c_wchar_p(output_file) # API call status = self.library.GWFileToFileProtect( ct_input_file, ct_file_type, ct_output_file ) # file to memory elif isinstance(input_file, str) and output_file is None: # API function declaration self.library.GWFileProtect.argtypes = [ ct.c_wchar_p, ct.c_wchar_p, ct.POINTER(ct.c_void_p), ct.POINTER(ct.c_size_t) ] # Variable initialisation ct_input_file = ct.c_wchar_p(input_file) ct_file_type = ct.c_wchar_p(dft.file_type_int_to_str(file_type)) ct_output_buffer = ct.c_void_p(0) ct_output_size = ct.c_size_t(0) # API call status = self.library.GWFileProtect( ct_input_file, ct_file_type, ct.byref(ct_output_buffer), ct.byref(ct_output_size) ) # memory to memory and memory to file elif isinstance(input_file, bytes): # API function declaration self.library.GWMemoryToMemoryProtect.argtypes = [ ct.c_void_p, ct.c_size_t, ct.c_wchar_p, ct.POINTER(ct.c_void_p), ct.POINTER(ct.c_size_t) ] # Variable initialization bytearray_buffer = bytearray(input_file) ct_input_buffer = (ct.c_ubyte * len(bytearray_buffer)).from_buffer(bytearray_buffer) ct_input_size = ct.c_size_t(len(input_file)) ct_file_type = ct.c_wchar_p(dft.file_type_int_to_str(file_type)) ct_output_buffer = ct.c_void_p(0) ct_output_size = ct.c_size_t(0) status = self.library.GWMemoryToMemoryProtect( ct_input_buffer, ct_input_size, ct_file_type, ct.byref(ct_output_buffer), ct.byref(ct_output_size) ) input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray,)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file if status not in successes.success_codes: log.error(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {status}\n\tGWFileErrorMsg: {self.GWFileErrorMsg()}") if raise_unsupported: raise errors.error_codes.get(status, errors.UnknownErrorCode)(status) else: file_bytes = None else: log.debug(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {status}") if isinstance(input_file, str) and isinstance(output_file, str): # file to file, read the bytes of the file that Rebuild has already written if not os.path.isfile(output_file): log.error(f"Rebuild returned success code: {status} but no output file was found: {output_file}") file_bytes = None else: with open(output_file, "rb") as f: file_bytes = f.read() else: # file to memory, memory to memory file_bytes = utils.buffer_to_bytes( ct_output_buffer, ct_output_size ) if isinstance(output_file, str): # memory to file # no Rebuild function exists for memory to file, write the memory to file ourselves with open(output_file, "wb") as f: f.write(file_bytes) return file_bytes def protect_directory(self, input_directory: str, output_directory: Union[None, str], content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, raise_unsupported: bool = True): """ Recursively processes all files in a directory in protect mode using the given content management policy. The protected files are written to output_directory maintaining the same directory structure as input_directory. Args: input_directory (str): The input directory containing files to protect. output_directory (Union[None, str]): The output directory where the protected file will be written, or None to not write files. content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): Default None (sanitise). The content management policy to apply. raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. Returns: protected_files_dict (dict): A dictionary of file paths relative to input_directory, and file bytes. """ protected_files_dict = {} # Call protect_file on each file in input_directory to output_directory for input_file in utils.list_file_paths(input_directory): relative_path = os.path.relpath(input_file, input_directory) output_file = None if output_directory is None else os.path.join(os.path.abspath(output_directory), relative_path) protected_bytes = self.protect_file( input_file=input_file, output_file=output_file, raise_unsupported=raise_unsupported, content_management_policy=content_management_policy, ) protected_files_dict[relative_path] = protected_bytes return protected_files_dict def analyse_file(self, input_file: Union[str, bytes, bytearray, io.BytesIO], output_file: Union[None, str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, raise_unsupported: bool = True): """ Analyses a file, returning the analysis bytes. The analysis is written to output_file if it is provided. Args: input_file (Union[str, bytes, bytearray, io.BytesIO]): The input file path or bytes. output_file (Union[None, str], optional): The output file path where the analysis file will be written. content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): The content management policy to apply. raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. Returns: file_bytes (bytes): The analysis file bytes. """ # Validate arg types if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO)): raise TypeError(input_file) if not isinstance(output_file, (type(None), str)): raise TypeError(output_file) if not isinstance(content_management_policy, (type(None), str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)): raise TypeError(content_management_policy) if not isinstance(raise_unsupported, bool): raise TypeError(raise_unsupported) # Convert string path arguments to absolute paths if isinstance(input_file, str): if not os.path.isfile(input_file): raise FileNotFoundError(input_file) input_file = os.path.abspath(input_file) if isinstance(output_file, str): output_file = os.path.abspath(output_file) # make directories that do not exist os.makedirs(os.path.dirname(output_file), exist_ok=True) if isinstance(content_management_policy, str) and os.path.isfile(content_management_policy): content_management_policy = os.path.abspath(content_management_policy) # Convert memory inputs to bytes if isinstance(input_file, (bytes, bytearray, io.BytesIO)): input_file = utils.as_bytes(input_file) # Check that file type is supported try: file_type = self.determine_file_type(input_file=input_file) except dft.errors.FileTypeEnumError: if raise_unsupported: raise else: return None with utils.CwdHandler(self.library_path): # Set content management policy self.set_content_management_policy(content_management_policy) # file to file if isinstance(input_file, str) and isinstance(output_file, str): # API function declaration self.library.GWFileToFileAnalysisAudit.argtypes = [ ct.c_wchar_p, ct.c_wchar_p, ct.c_wchar_p ] # Variable initialisation ct_input_file = ct.c_wchar_p(input_file) ct_file_type = ct.c_wchar_p(dft.file_type_int_to_str(file_type)) ct_output_file = ct.c_wchar_p(output_file) # API call status = self.library.GWFileToFileAnalysisAudit( ct_input_file, ct_file_type, ct_output_file ) # file to memory elif isinstance(input_file, str) and output_file is None: # API function declaration self.library.GWFileAnalysisAudit.argtypes = [ ct.c_wchar_p, ct.c_wchar_p, ct.POINTER(ct.c_void_p), ct.POINTER(ct.c_size_t) ] # Variable initialisation ct_input_file = ct.c_wchar_p(input_file) ct_file_type = ct.c_wchar_p(dft.file_type_int_to_str(file_type)) ct_output_buffer = ct.c_void_p(0) ct_output_size = ct.c_size_t(0) # API call status = self.library.GWFileAnalysisAudit( ct_input_file, ct_file_type, ct.byref(ct_output_buffer), ct.byref(ct_output_size) ) # memory to memory and memory to file elif isinstance(input_file, bytes): # API function declaration self.library.GWMemoryToMemoryAnalysisAudit.argtypes = [ ct.c_void_p, ct.c_size_t, ct.c_wchar_p, ct.POINTER(ct.c_void_p), ct.POINTER(ct.c_size_t) ] # Variable initialization bytearray_buffer = bytearray(input_file) ct_input_buffer = (ct.c_ubyte * len(bytearray_buffer)).from_buffer(bytearray_buffer) ct_input_size = ct.c_size_t(len(input_file)) ct_file_type = ct.c_wchar_p(dft.file_type_int_to_str(file_type)) ct_output_buffer = ct.c_void_p(0) ct_output_size = ct.c_size_t(0) status = self.library.GWMemoryToMemoryAnalysisAudit( ct.byref(ct_input_buffer), ct_input_size, ct_file_type, ct.byref(ct_output_buffer), ct.byref(ct_output_size) ) input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray,)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file if status not in successes.success_codes: log.error(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {status}\n\tGWFileErrorMsg: {self.GWFileErrorMsg()}") if raise_unsupported: raise errors.error_codes.get(status, errors.UnknownErrorCode)(status) else: file_bytes = None else: log.debug(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {status}") if isinstance(input_file, str) and isinstance(output_file, str): # file to file, read the bytes of the file that Rebuild has already written if not os.path.isfile(output_file): log.error(f"Rebuild returned success code: {status} but no output file was found: {output_file}") file_bytes = None else: with open(output_file, "rb") as f: file_bytes = f.read() else: # file to memory, memory to memory file_bytes = utils.buffer_to_bytes( ct_output_buffer, ct_output_size ) if isinstance(output_file, str): # memory to file # no Rebuild function exists for memory to file, write the memory to file ourselves with open(output_file, "wb") as f: f.write(file_bytes) return file_bytes def analyse_directory(self, input_directory: str, output_directory: Union[None, str], content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, raise_unsupported: bool = True): """ Analyses all files in a directory and its subdirectories. The analysis files are written to output_directory maintaining the same directory structure as input_directory. Args: input_directory (str): The input directory containing files to analyse. output_directory (Union[None, str]): The output directory where the analysis files will be written, or None to not write files. content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): Default None (sanitise). The content management policy to apply. raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. Returns: analysis_files_dict (dict): A dictionary of file paths relative to input_directory, and file bytes. """ analysis_files_dict = {} # Call analyse_file on each file in input_directory to output_directory for input_file in utils.list_file_paths(input_directory): relative_path = os.path.relpath(input_file, input_directory) + ".xml" output_file = None if output_directory is None else os.path.join(os.path.abspath(output_directory), relative_path) analysis_bytes = self.analyse_file( input_file=input_file, output_file=output_file, raise_unsupported=raise_unsupported, content_management_policy=content_management_policy, ) analysis_files_dict[relative_path] = analysis_bytes return analysis_files_dict def export_file(self, input_file: Union[str, bytes, bytearray, io.BytesIO], output_file: Union[None, str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, raise_unsupported: bool = True): """ Export a file, returning the .zip file bytes. The .zip file is written to output_file. Args: input_file (Union[str, bytes, bytearray, io.BytesIO]): The input file path or bytes. output_file (Union[None, str], optional): The output file path where the .zip file will be written. content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): The content management policy to apply. raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. Returns: file_bytes (bytes): The exported .zip file. """ # Validate arg types if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO)): raise TypeError(input_file) if not isinstance(output_file, (type(None), str)): raise TypeError(output_file) if not isinstance(content_management_policy, (type(None), str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)): raise TypeError(content_management_policy) if not isinstance(raise_unsupported, bool): raise TypeError(raise_unsupported) # Convert string path arguments to absolute paths if isinstance(input_file, str): if not os.path.isfile(input_file): raise FileNotFoundError(input_file) input_file = os.path.abspath(input_file) if isinstance(output_file, str): output_file = os.path.abspath(output_file) # make directories that do not exist os.makedirs(os.path.dirname(output_file), exist_ok=True) if isinstance(content_management_policy, str) and os.path.isfile(content_management_policy): content_management_policy = os.path.abspath(content_management_policy) # Convert memory inputs to bytes if isinstance(input_file, (bytes, bytearray, io.BytesIO)): input_file = utils.as_bytes(input_file) # Check that file type is supported try: self.determine_file_type(input_file=input_file) except dft.errors.FileTypeEnumError: if raise_unsupported: raise else: return None with utils.CwdHandler(self.library_path): # Set content management policy self.set_content_management_policy(content_management_policy) # file to file if isinstance(input_file, str) and isinstance(output_file, str): # API function declaration self.library.GWFileToFileAnalysisProtectAndExport.argtypes = [ ct.c_wchar_p, ct.c_wchar_p ] # Variable initialisation ct_input_file = ct.c_wchar_p(input_file) ct_output_file = ct.c_wchar_p(output_file) # API call status = self.library.GWFileToFileAnalysisProtectAndExport( ct_input_file, ct_output_file ) # file to memory elif isinstance(input_file, str) and output_file is None: # API function declaration self.library.GWFileToMemoryAnalysisProtectAndExport.argtypes = [ ct.c_wchar_p, ct.POINTER(ct.c_void_p), ct.POINTER(ct.c_size_t) ] # Variable initialisation ct_input_file = ct.c_wchar_p(input_file) ct_output_buffer = ct.c_void_p(0) ct_output_size = ct.c_size_t(0) # API call status = self.library.GWFileToMemoryAnalysisProtectAndExport( ct_input_file, ct.byref(ct_output_buffer), ct.byref(ct_output_size) ) # memory to memory and memory to file elif isinstance(input_file, bytes): # API function declaration self.library.GWMemoryToMemoryAnalysisProtectAndExport.argtypes = [ ct.c_void_p, ct.c_size_t, ct.POINTER(ct.c_void_p), ct.POINTER(ct.c_size_t) ] # Variable initialization bytearray_buffer = bytearray(input_file) ct_input_buffer = (ct.c_ubyte * len(bytearray_buffer)).from_buffer(bytearray_buffer) ct_input_size = ct.c_size_t(len(input_file)) ct_output_buffer = ct.c_void_p(0) ct_output_size = ct.c_size_t(0) status = self.library.GWMemoryToMemoryAnalysisProtectAndExport( ct_input_buffer, ct_input_size, ct.byref(ct_output_buffer), ct.byref(ct_output_size) ) input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray,)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file if status not in successes.success_codes: log.error(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {status}\n\tGWFileErrorMsg: {self.GWFileErrorMsg()}") if raise_unsupported: raise errors.error_codes.get(status, errors.UnknownErrorCode)(status) else: file_bytes = None else: log.debug(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {status}") if isinstance(input_file, str) and isinstance(output_file, str): # file to file, read the bytes of the file that Rebuild has already written if not os.path.isfile(output_file): log.error(f"Rebuild returned success code: {status} but no output file was found: {output_file}") file_bytes = None else: with open(output_file, "rb") as f: file_bytes = f.read() else: # file to memory, memory to memory file_bytes = utils.buffer_to_bytes( ct_output_buffer, ct_output_size ) if isinstance(output_file, str): # memory to file # no Rebuild function exists for memory to file, write the memory to file ourselves with open(output_file, "wb") as f: f.write(file_bytes) return file_bytes def export_directory(self, input_directory: str, output_directory: Union[None, str], content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, raise_unsupported: bool = True): """ Exports all files in a directory and its subdirectories. The export files are written to output_directory maintaining the same directory structure as input_directory. Args: input_directory (str): The input directory containing files to export. output_directory (Union[None, str]): The output directory where the export files will be written, or None to not write files. content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): Default None (sanitise). The content management policy to apply. raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. Returns: export_files_dict (dict): A dictionary of file paths relative to input_directory, and file bytes. """ export_files_dict = {} # Call export_file on each file in input_directory to output_directory for input_file in utils.list_file_paths(input_directory): relative_path = os.path.relpath(input_file, input_directory) + ".zip" output_file = None if output_directory is None else os.path.join(os.path.abspath(output_directory), relative_path) export_bytes = self.export_file( input_file=input_file, output_file=output_file, raise_unsupported=raise_unsupported, content_management_policy=content_management_policy, ) export_files_dict[relative_path] = export_bytes return export_files_dict def import_file(self, input_file: Union[str, bytes, bytearray, io.BytesIO], output_file: Union[None, str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, raise_unsupported: bool = True): """ Import a .zip file, constructs a file from the .zip file and returns the file bytes. The file is written to output_file if it is provided. Args: input_file (Union[str, bytes, bytearray, io.BytesIO]): The .zip input file path or bytes. output_file (Union[None, str], optional): The output file path where the constructed file will be written. content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): The content management policy to apply to the session. raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. Returns: file_bytes (bytes): The imported file bytes. """ # Validate arg types if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO)): raise TypeError(input_file) if not isinstance(output_file, (type(None), str)): raise TypeError(output_file) if not isinstance(content_management_policy, (type(None), str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)): raise TypeError(content_management_policy) if not isinstance(raise_unsupported, bool): raise TypeError(raise_unsupported) # Convert string path arguments to absolute paths if isinstance(input_file, str): if not os.path.isfile(input_file): raise FileNotFoundError(input_file) input_file = os.path.abspath(input_file) if isinstance(output_file, str): output_file = os.path.abspath(output_file) # make directories that do not exist os.makedirs(os.path.dirname(output_file), exist_ok=True) if isinstance(content_management_policy, str) and os.path.isfile(content_management_policy): content_management_policy = os.path.abspath(content_management_policy) # Convert memory inputs to bytes if isinstance(input_file, (bytes, bytearray, io.BytesIO)): input_file = utils.as_bytes(input_file) # Check that file type is supported try: self.determine_file_type(input_file=input_file) except dft.errors.FileTypeEnumError: if raise_unsupported: raise else: return None with utils.CwdHandler(self.library_path): # Set content management policy self.set_content_management_policy(content_management_policy) # file to file if isinstance(input_file, str) and isinstance(output_file, str): # API function declaration self.library.GWFileToFileProtectAndImport.argtypes = [ ct.c_wchar_p, ct.c_wchar_p ] # Variable initialisation ct_input_file = ct.c_wchar_p(input_file) ct_output_file = ct.c_wchar_p(output_file) # API call status = self.library.GWFileToFileProtectAndImport( ct_input_file, ct_output_file ) # file to memory elif isinstance(input_file, str) and output_file is None: # API function declaration self.library.GWFileToMemoryProtectAndImport.argtypes = [ ct.c_wchar_p, ct.POINTER(ct.c_void_p), ct.POINTER(ct.c_size_t) ] # Variable initialisation ct_input_file = ct.c_wchar_p(input_file) ct_output_buffer = ct.c_void_p(0) ct_output_size = ct.c_size_t(0) # API call status = self.library.GWFileToMemoryProtectAndImport( ct_input_file, ct.byref(ct_output_buffer), ct.byref(ct_output_size) ) # memory to memory and memory to file elif isinstance(input_file, bytes): # API function declaration self.library.GWMemoryToMemoryProtectAndImport.argtypes = [ ct.c_void_p, ct.c_size_t, ct.POINTER(ct.c_void_p), ct.POINTER(ct.c_size_t) ] # Variable initialization bytearray_buffer = bytearray(input_file) ct_input_buffer = (ct.c_ubyte * len(bytearray_buffer)).from_buffer(bytearray_buffer) ct_input_size = ct.c_size_t(len(input_file)) ct_output_buffer = ct.c_void_p(0) ct_output_size = ct.c_size_t(0) status = self.library.GWMemoryToMemoryProtectAndImport( ct_input_buffer, ct_input_size, ct.byref(ct_output_buffer), ct.byref(ct_output_size) ) input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray,)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file if status not in successes.success_codes: log.error(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {status}\n\tGWFileErrorMsg: {self.GWFileErrorMsg()}") if raise_unsupported: raise errors.error_codes.get(status, errors.UnknownErrorCode)(status) else: file_bytes = None else: log.debug(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {status}") if isinstance(input_file, str) and isinstance(output_file, str): # file to file, read the bytes of the file that Rebuild has already written if not os.path.isfile(output_file): log.error(f"Rebuild returned success code: {status} but no output file was found: {output_file}") file_bytes = None else: with open(output_file, "rb") as f: file_bytes = f.read() else: # file to memory, memory to memory file_bytes = utils.buffer_to_bytes( ct_output_buffer, ct_output_size ) if isinstance(output_file, str): # memory to file # no Rebuild function exists for memory to file, write the memory to file ourselves with open(output_file, "wb") as f: f.write(file_bytes) return file_bytes def import_directory(self, input_directory: str, output_directory: Union[None, str], content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, raise_unsupported: bool = True): """ Imports all files in a directory and its subdirectories. Files are expected as .zip but this is not forced. The constructed files are written to output_directory maintaining the same directory structure as input_directory. Args: input_directory (str): The input directory containing files to import. output_directory (Union[None, str]): The output directory where the constructed files will be written, or None to not write files. content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): Default None (sanitise). The content management policy to apply. raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. Returns: import_files_dict (dict): A dictionary of file paths relative to input_directory, and file bytes. """ import_files_dict = {} # Call import_file on each file in input_directory to output_directory for input_file in utils.list_file_paths(input_directory): relative_path = os.path.relpath(input_file, input_directory) # Remove .zip extension from relative_path relative_path = os.path.splitext(relative_path)[0] output_file = None if output_directory is None else os.path.join(os.path.abspath(output_directory), relative_path) import_bytes = self.import_file( input_file=input_file, output_file=output_file, raise_unsupported=raise_unsupported, content_management_policy=content_management_policy, ) import_files_dict[relative_path] = import_bytes return import_files_dict def GWFileErrorMsg(self): """ Retrieve the Glasswall Process error message. Returns: error_message (str): The Glasswall Process error message. """ # Declare the return type self.library.GWFileErrorMsg.restype = ct.c_wchar_p # API call error_message = self.library.GWFileErrorMsg() return error_message
Ancestors
Methods
def GWFileErrorMsg(self)
-
Retrieve the Glasswall Process error message.
Returns
error_message (str): The Glasswall Process error message.
Expand source code
def GWFileErrorMsg(self): """ Retrieve the Glasswall Process error message. Returns: error_message (str): The Glasswall Process error message. """ # Declare the return type self.library.GWFileErrorMsg.restype = ct.c_wchar_p # API call error_message = self.library.GWFileErrorMsg() return error_message
def analyse_directory(self, input_directory: str, output_directory: Optional[str], content_management_policy: Union[ForwardRef(None), str, bytes, bytearray, _io.BytesIO, ForwardRef('Policy')] = None, raise_unsupported: bool = True)
-
Analyses all files in a directory and its subdirectories. The analysis files are written to output_directory maintaining the same directory structure as input_directory.
Args
input_directory
:str
- The input directory containing files to analyse.
output_directory
:Union[None, str]
- The output directory where the analysis files will be written, or None to not write files.
content_management_policy
:Union[None, str, bytes, bytearray, io.BytesIO, Policy]
, optional- Default None (sanitise). The content management policy to apply.
raise_unsupported
:bool
, optional- Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
Returns
analysis_files_dict (dict): A dictionary of file paths relative to input_directory, and file bytes.
Expand source code
def analyse_directory(self, input_directory: str, output_directory: Union[None, str], content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, raise_unsupported: bool = True): """ Analyses all files in a directory and its subdirectories. The analysis files are written to output_directory maintaining the same directory structure as input_directory. Args: input_directory (str): The input directory containing files to analyse. output_directory (Union[None, str]): The output directory where the analysis files will be written, or None to not write files. content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): Default None (sanitise). The content management policy to apply. raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. Returns: analysis_files_dict (dict): A dictionary of file paths relative to input_directory, and file bytes. """ analysis_files_dict = {} # Call analyse_file on each file in input_directory to output_directory for input_file in utils.list_file_paths(input_directory): relative_path = os.path.relpath(input_file, input_directory) + ".xml" output_file = None if output_directory is None else os.path.join(os.path.abspath(output_directory), relative_path) analysis_bytes = self.analyse_file( input_file=input_file, output_file=output_file, raise_unsupported=raise_unsupported, content_management_policy=content_management_policy, ) analysis_files_dict[relative_path] = analysis_bytes return analysis_files_dict
def analyse_file(self, input_file: Union[str, bytes, bytearray, _io.BytesIO], output_file: Optional[str] = None, content_management_policy: Union[ForwardRef(None), str, bytes, bytearray, _io.BytesIO, ForwardRef('Policy')] = None, raise_unsupported: bool = True)
-
Analyses a file, returning the analysis bytes. The analysis is written to output_file if it is provided.
Args
input_file
:Union[str, bytes, bytearray, io.BytesIO]
- The input file path or bytes.
output_file
:Union[None, str]
, optional- The output file path where the analysis file will be written.
content_management_policy
:Union[None, str, bytes, bytearray, io.BytesIO, Policy]
, optional- The content management policy to apply.
raise_unsupported
:bool
, optional- Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
Returns
file_bytes (bytes): The analysis file bytes.
Expand source code
def analyse_file(self, input_file: Union[str, bytes, bytearray, io.BytesIO], output_file: Union[None, str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, raise_unsupported: bool = True): """ Analyses a file, returning the analysis bytes. The analysis is written to output_file if it is provided. Args: input_file (Union[str, bytes, bytearray, io.BytesIO]): The input file path or bytes. output_file (Union[None, str], optional): The output file path where the analysis file will be written. content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): The content management policy to apply. raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. Returns: file_bytes (bytes): The analysis file bytes. """ # Validate arg types if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO)): raise TypeError(input_file) if not isinstance(output_file, (type(None), str)): raise TypeError(output_file) if not isinstance(content_management_policy, (type(None), str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)): raise TypeError(content_management_policy) if not isinstance(raise_unsupported, bool): raise TypeError(raise_unsupported) # Convert string path arguments to absolute paths if isinstance(input_file, str): if not os.path.isfile(input_file): raise FileNotFoundError(input_file) input_file = os.path.abspath(input_file) if isinstance(output_file, str): output_file = os.path.abspath(output_file) # make directories that do not exist os.makedirs(os.path.dirname(output_file), exist_ok=True) if isinstance(content_management_policy, str) and os.path.isfile(content_management_policy): content_management_policy = os.path.abspath(content_management_policy) # Convert memory inputs to bytes if isinstance(input_file, (bytes, bytearray, io.BytesIO)): input_file = utils.as_bytes(input_file) # Check that file type is supported try: file_type = self.determine_file_type(input_file=input_file) except dft.errors.FileTypeEnumError: if raise_unsupported: raise else: return None with utils.CwdHandler(self.library_path): # Set content management policy self.set_content_management_policy(content_management_policy) # file to file if isinstance(input_file, str) and isinstance(output_file, str): # API function declaration self.library.GWFileToFileAnalysisAudit.argtypes = [ ct.c_wchar_p, ct.c_wchar_p, ct.c_wchar_p ] # Variable initialisation ct_input_file = ct.c_wchar_p(input_file) ct_file_type = ct.c_wchar_p(dft.file_type_int_to_str(file_type)) ct_output_file = ct.c_wchar_p(output_file) # API call status = self.library.GWFileToFileAnalysisAudit( ct_input_file, ct_file_type, ct_output_file ) # file to memory elif isinstance(input_file, str) and output_file is None: # API function declaration self.library.GWFileAnalysisAudit.argtypes = [ ct.c_wchar_p, ct.c_wchar_p, ct.POINTER(ct.c_void_p), ct.POINTER(ct.c_size_t) ] # Variable initialisation ct_input_file = ct.c_wchar_p(input_file) ct_file_type = ct.c_wchar_p(dft.file_type_int_to_str(file_type)) ct_output_buffer = ct.c_void_p(0) ct_output_size = ct.c_size_t(0) # API call status = self.library.GWFileAnalysisAudit( ct_input_file, ct_file_type, ct.byref(ct_output_buffer), ct.byref(ct_output_size) ) # memory to memory and memory to file elif isinstance(input_file, bytes): # API function declaration self.library.GWMemoryToMemoryAnalysisAudit.argtypes = [ ct.c_void_p, ct.c_size_t, ct.c_wchar_p, ct.POINTER(ct.c_void_p), ct.POINTER(ct.c_size_t) ] # Variable initialization bytearray_buffer = bytearray(input_file) ct_input_buffer = (ct.c_ubyte * len(bytearray_buffer)).from_buffer(bytearray_buffer) ct_input_size = ct.c_size_t(len(input_file)) ct_file_type = ct.c_wchar_p(dft.file_type_int_to_str(file_type)) ct_output_buffer = ct.c_void_p(0) ct_output_size = ct.c_size_t(0) status = self.library.GWMemoryToMemoryAnalysisAudit( ct.byref(ct_input_buffer), ct_input_size, ct_file_type, ct.byref(ct_output_buffer), ct.byref(ct_output_size) ) input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray,)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file if status not in successes.success_codes: log.error(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {status}\n\tGWFileErrorMsg: {self.GWFileErrorMsg()}") if raise_unsupported: raise errors.error_codes.get(status, errors.UnknownErrorCode)(status) else: file_bytes = None else: log.debug(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {status}") if isinstance(input_file, str) and isinstance(output_file, str): # file to file, read the bytes of the file that Rebuild has already written if not os.path.isfile(output_file): log.error(f"Rebuild returned success code: {status} but no output file was found: {output_file}") file_bytes = None else: with open(output_file, "rb") as f: file_bytes = f.read() else: # file to memory, memory to memory file_bytes = utils.buffer_to_bytes( ct_output_buffer, ct_output_size ) if isinstance(output_file, str): # memory to file # no Rebuild function exists for memory to file, write the memory to file ourselves with open(output_file, "wb") as f: f.write(file_bytes) return file_bytes
def determine_file_type(self, input_file: Union[str, bytes, bytearray, _io.BytesIO], as_string: bool = False)
-
Returns an int representing the file type / file format of a file.
Args
input_file
:Union[str, bytes, bytearray, io.BytesIO]
- The input file, can be a local path.
as_string
:bool
, optional- Return file type as string, eg: "bmp" instead of: 29. Defaults to False.
Returns
file_type (Union[int, str]): The file format.
Expand source code
def determine_file_type(self, input_file: Union[str, bytes, bytearray, io.BytesIO], as_string: bool = False): """ Returns an int representing the file type / file format of a file. Args: input_file (Union[str, bytes, bytearray, io.BytesIO]): The input file, can be a local path. as_string (bool, optional): Return file type as string, eg: "bmp" instead of: 29. Defaults to False. Returns: file_type (Union[int, str]): The file format. """ if isinstance(input_file, str): if not os.path.isfile(input_file): raise FileNotFoundError(input_file) self.library.GWDetermineFileTypeFromFile.argtypes = [ct.c_wchar_p] self.library.GWDetermineFileTypeFromFile.restype = ct.c_int # convert to ct.c_wchar_p ct_input_file = ct.c_wchar_p(input_file) # API call file_type = self.library.GWDetermineFileTypeFromFile(ct_input_file) elif isinstance(input_file, (bytes, bytearray, io.BytesIO)): self.library.GWDetermineFileTypeFromFileInMem.argtypes = [ct.c_char_p, ct.c_size_t] self.library.GWDetermineFileTypeFromFileInMem.restype = ct.c_int # convert to bytes bytes_input_file = utils.as_bytes(input_file) # ctypes conversion ct_input_buffer = ct.c_char_p(bytes_input_file) ct_butter_length = ct.c_size_t(len(bytes_input_file)) # API call file_type = self.library.GWDetermineFileTypeFromFileInMem( ct_input_buffer, ct_butter_length ) file_type_as_string = dft.file_type_int_to_str(file_type) input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray,)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file if not dft.is_success(file_type): log.warning(f"\n\tfile_type: {file_type}\n\tfile_type_as_string: {file_type_as_string}\n\tinput_file: {input_file_repr}") raise dft.int_class_map.get(file_type, dft.errors.UnknownErrorCode)(file_type) else: log.debug(f"\n\tfile_type: {file_type}\n\tfile_type_as_string: {file_type_as_string}\n\tinput_file: {input_file_repr}") if as_string: return file_type_as_string return file_type
def export_directory(self, input_directory: str, output_directory: Optional[str], content_management_policy: Union[ForwardRef(None), str, bytes, bytearray, _io.BytesIO, ForwardRef('Policy')] = None, raise_unsupported: bool = True)
-
Exports all files in a directory and its subdirectories. The export files are written to output_directory maintaining the same directory structure as input_directory.
Args
input_directory
:str
- The input directory containing files to export.
output_directory
:Union[None, str]
- The output directory where the export files will be written, or None to not write files.
content_management_policy
:Union[None, str, bytes, bytearray, io.BytesIO, Policy]
, optional- Default None (sanitise). The content management policy to apply.
raise_unsupported
:bool
, optional- Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
Returns
export_files_dict (dict): A dictionary of file paths relative to input_directory, and file bytes.
Expand source code
def export_directory(self, input_directory: str, output_directory: Union[None, str], content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, raise_unsupported: bool = True): """ Exports all files in a directory and its subdirectories. The export files are written to output_directory maintaining the same directory structure as input_directory. Args: input_directory (str): The input directory containing files to export. output_directory (Union[None, str]): The output directory where the export files will be written, or None to not write files. content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): Default None (sanitise). The content management policy to apply. raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. Returns: export_files_dict (dict): A dictionary of file paths relative to input_directory, and file bytes. """ export_files_dict = {} # Call export_file on each file in input_directory to output_directory for input_file in utils.list_file_paths(input_directory): relative_path = os.path.relpath(input_file, input_directory) + ".zip" output_file = None if output_directory is None else os.path.join(os.path.abspath(output_directory), relative_path) export_bytes = self.export_file( input_file=input_file, output_file=output_file, raise_unsupported=raise_unsupported, content_management_policy=content_management_policy, ) export_files_dict[relative_path] = export_bytes return export_files_dict
def export_file(self, input_file: Union[str, bytes, bytearray, _io.BytesIO], output_file: Optional[str] = None, content_management_policy: Union[ForwardRef(None), str, bytes, bytearray, _io.BytesIO, ForwardRef('Policy')] = None, raise_unsupported: bool = True)
-
Export a file, returning the .zip file bytes. The .zip file is written to output_file.
Args
input_file
:Union[str, bytes, bytearray, io.BytesIO]
- The input file path or bytes.
output_file
:Union[None, str]
, optional- The output file path where the .zip file will be written.
content_management_policy
:Union[None, str, bytes, bytearray, io.BytesIO, Policy]
, optional- The content management policy to apply.
raise_unsupported
:bool
, optional- Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
Returns
file_bytes (bytes): The exported .zip file.
Expand source code
def export_file(self, input_file: Union[str, bytes, bytearray, io.BytesIO], output_file: Union[None, str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, raise_unsupported: bool = True): """ Export a file, returning the .zip file bytes. The .zip file is written to output_file. Args: input_file (Union[str, bytes, bytearray, io.BytesIO]): The input file path or bytes. output_file (Union[None, str], optional): The output file path where the .zip file will be written. content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): The content management policy to apply. raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. Returns: file_bytes (bytes): The exported .zip file. """ # Validate arg types if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO)): raise TypeError(input_file) if not isinstance(output_file, (type(None), str)): raise TypeError(output_file) if not isinstance(content_management_policy, (type(None), str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)): raise TypeError(content_management_policy) if not isinstance(raise_unsupported, bool): raise TypeError(raise_unsupported) # Convert string path arguments to absolute paths if isinstance(input_file, str): if not os.path.isfile(input_file): raise FileNotFoundError(input_file) input_file = os.path.abspath(input_file) if isinstance(output_file, str): output_file = os.path.abspath(output_file) # make directories that do not exist os.makedirs(os.path.dirname(output_file), exist_ok=True) if isinstance(content_management_policy, str) and os.path.isfile(content_management_policy): content_management_policy = os.path.abspath(content_management_policy) # Convert memory inputs to bytes if isinstance(input_file, (bytes, bytearray, io.BytesIO)): input_file = utils.as_bytes(input_file) # Check that file type is supported try: self.determine_file_type(input_file=input_file) except dft.errors.FileTypeEnumError: if raise_unsupported: raise else: return None with utils.CwdHandler(self.library_path): # Set content management policy self.set_content_management_policy(content_management_policy) # file to file if isinstance(input_file, str) and isinstance(output_file, str): # API function declaration self.library.GWFileToFileAnalysisProtectAndExport.argtypes = [ ct.c_wchar_p, ct.c_wchar_p ] # Variable initialisation ct_input_file = ct.c_wchar_p(input_file) ct_output_file = ct.c_wchar_p(output_file) # API call status = self.library.GWFileToFileAnalysisProtectAndExport( ct_input_file, ct_output_file ) # file to memory elif isinstance(input_file, str) and output_file is None: # API function declaration self.library.GWFileToMemoryAnalysisProtectAndExport.argtypes = [ ct.c_wchar_p, ct.POINTER(ct.c_void_p), ct.POINTER(ct.c_size_t) ] # Variable initialisation ct_input_file = ct.c_wchar_p(input_file) ct_output_buffer = ct.c_void_p(0) ct_output_size = ct.c_size_t(0) # API call status = self.library.GWFileToMemoryAnalysisProtectAndExport( ct_input_file, ct.byref(ct_output_buffer), ct.byref(ct_output_size) ) # memory to memory and memory to file elif isinstance(input_file, bytes): # API function declaration self.library.GWMemoryToMemoryAnalysisProtectAndExport.argtypes = [ ct.c_void_p, ct.c_size_t, ct.POINTER(ct.c_void_p), ct.POINTER(ct.c_size_t) ] # Variable initialization bytearray_buffer = bytearray(input_file) ct_input_buffer = (ct.c_ubyte * len(bytearray_buffer)).from_buffer(bytearray_buffer) ct_input_size = ct.c_size_t(len(input_file)) ct_output_buffer = ct.c_void_p(0) ct_output_size = ct.c_size_t(0) status = self.library.GWMemoryToMemoryAnalysisProtectAndExport( ct_input_buffer, ct_input_size, ct.byref(ct_output_buffer), ct.byref(ct_output_size) ) input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray,)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file if status not in successes.success_codes: log.error(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {status}\n\tGWFileErrorMsg: {self.GWFileErrorMsg()}") if raise_unsupported: raise errors.error_codes.get(status, errors.UnknownErrorCode)(status) else: file_bytes = None else: log.debug(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {status}") if isinstance(input_file, str) and isinstance(output_file, str): # file to file, read the bytes of the file that Rebuild has already written if not os.path.isfile(output_file): log.error(f"Rebuild returned success code: {status} but no output file was found: {output_file}") file_bytes = None else: with open(output_file, "rb") as f: file_bytes = f.read() else: # file to memory, memory to memory file_bytes = utils.buffer_to_bytes( ct_output_buffer, ct_output_size ) if isinstance(output_file, str): # memory to file # no Rebuild function exists for memory to file, write the memory to file ourselves with open(output_file, "wb") as f: f.write(file_bytes) return file_bytes
def get_content_management_policy(self)
-
Gets the current content management configuration.
Returns
xml_string (str): The XML string of the current content management configuration.
Expand source code
def get_content_management_policy(self): """ Gets the current content management configuration. Returns: xml_string (str): The XML string of the current content management configuration. """ # Declare argument types self.library.GWFileConfigGet.argtypes = [ ct.POINTER(ct.POINTER(ct.c_wchar)), ct.POINTER(ct.c_size_t) ] # Variable initialisation ct_input_buffer = ct.POINTER(ct.c_wchar)() ct_input_size = ct.c_size_t(0) # API call status = self.library.GWFileConfigGet( ct.byref(ct_input_buffer), ct.byref(ct_input_size) ) if status not in successes.success_codes: log.error(f"\n\tstatus: {status}\n\tGWFileErrorMsg: {self.GWFileErrorMsg()}") raise errors.error_codes.get(status, errors.UnknownErrorCode)(status) else: log.debug(f"\n\tstatus: {status}") # As string xml_string = utils.validate_xml(ct.wstring_at(ct_input_buffer)) return xml_string
def import_directory(self, input_directory: str, output_directory: Optional[str], content_management_policy: Union[ForwardRef(None), str, bytes, bytearray, _io.BytesIO, ForwardRef('Policy')] = None, raise_unsupported: bool = True)
-
Imports all files in a directory and its subdirectories. Files are expected as .zip but this is not forced. The constructed files are written to output_directory maintaining the same directory structure as input_directory.
Args
input_directory
:str
- The input directory containing files to import.
output_directory
:Union[None, str]
- The output directory where the constructed files will be written, or None to not write files.
content_management_policy
:Union[None, str, bytes, bytearray, io.BytesIO, Policy]
, optional- Default None (sanitise). The content management policy to apply.
raise_unsupported
:bool
, optional- Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
Returns
import_files_dict (dict): A dictionary of file paths relative to input_directory, and file bytes.
Expand source code
def import_directory(self, input_directory: str, output_directory: Union[None, str], content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, raise_unsupported: bool = True): """ Imports all files in a directory and its subdirectories. Files are expected as .zip but this is not forced. The constructed files are written to output_directory maintaining the same directory structure as input_directory. Args: input_directory (str): The input directory containing files to import. output_directory (Union[None, str]): The output directory where the constructed files will be written, or None to not write files. content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): Default None (sanitise). The content management policy to apply. raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. Returns: import_files_dict (dict): A dictionary of file paths relative to input_directory, and file bytes. """ import_files_dict = {} # Call import_file on each file in input_directory to output_directory for input_file in utils.list_file_paths(input_directory): relative_path = os.path.relpath(input_file, input_directory) # Remove .zip extension from relative_path relative_path = os.path.splitext(relative_path)[0] output_file = None if output_directory is None else os.path.join(os.path.abspath(output_directory), relative_path) import_bytes = self.import_file( input_file=input_file, output_file=output_file, raise_unsupported=raise_unsupported, content_management_policy=content_management_policy, ) import_files_dict[relative_path] = import_bytes return import_files_dict
def import_file(self, input_file: Union[str, bytes, bytearray, _io.BytesIO], output_file: Optional[str] = None, content_management_policy: Union[ForwardRef(None), str, bytes, bytearray, _io.BytesIO, ForwardRef('Policy')] = None, raise_unsupported: bool = True)
-
Import a .zip file, constructs a file from the .zip file and returns the file bytes. The file is written to output_file if it is provided.
Args
input_file
:Union[str, bytes, bytearray, io.BytesIO]
- The .zip input file path or bytes.
output_file
:Union[None, str]
, optional- The output file path where the constructed file will be written.
content_management_policy
:Union[None, str, bytes, bytearray, io.BytesIO, Policy]
, optional- The content management policy to apply to the session.
raise_unsupported
:bool
, optional- Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
Returns
file_bytes (bytes): The imported file bytes.
Expand source code
def import_file(self, input_file: Union[str, bytes, bytearray, io.BytesIO], output_file: Union[None, str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, raise_unsupported: bool = True): """ Import a .zip file, constructs a file from the .zip file and returns the file bytes. The file is written to output_file if it is provided. Args: input_file (Union[str, bytes, bytearray, io.BytesIO]): The .zip input file path or bytes. output_file (Union[None, str], optional): The output file path where the constructed file will be written. content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): The content management policy to apply to the session. raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. Returns: file_bytes (bytes): The imported file bytes. """ # Validate arg types if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO)): raise TypeError(input_file) if not isinstance(output_file, (type(None), str)): raise TypeError(output_file) if not isinstance(content_management_policy, (type(None), str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)): raise TypeError(content_management_policy) if not isinstance(raise_unsupported, bool): raise TypeError(raise_unsupported) # Convert string path arguments to absolute paths if isinstance(input_file, str): if not os.path.isfile(input_file): raise FileNotFoundError(input_file) input_file = os.path.abspath(input_file) if isinstance(output_file, str): output_file = os.path.abspath(output_file) # make directories that do not exist os.makedirs(os.path.dirname(output_file), exist_ok=True) if isinstance(content_management_policy, str) and os.path.isfile(content_management_policy): content_management_policy = os.path.abspath(content_management_policy) # Convert memory inputs to bytes if isinstance(input_file, (bytes, bytearray, io.BytesIO)): input_file = utils.as_bytes(input_file) # Check that file type is supported try: self.determine_file_type(input_file=input_file) except dft.errors.FileTypeEnumError: if raise_unsupported: raise else: return None with utils.CwdHandler(self.library_path): # Set content management policy self.set_content_management_policy(content_management_policy) # file to file if isinstance(input_file, str) and isinstance(output_file, str): # API function declaration self.library.GWFileToFileProtectAndImport.argtypes = [ ct.c_wchar_p, ct.c_wchar_p ] # Variable initialisation ct_input_file = ct.c_wchar_p(input_file) ct_output_file = ct.c_wchar_p(output_file) # API call status = self.library.GWFileToFileProtectAndImport( ct_input_file, ct_output_file ) # file to memory elif isinstance(input_file, str) and output_file is None: # API function declaration self.library.GWFileToMemoryProtectAndImport.argtypes = [ ct.c_wchar_p, ct.POINTER(ct.c_void_p), ct.POINTER(ct.c_size_t) ] # Variable initialisation ct_input_file = ct.c_wchar_p(input_file) ct_output_buffer = ct.c_void_p(0) ct_output_size = ct.c_size_t(0) # API call status = self.library.GWFileToMemoryProtectAndImport( ct_input_file, ct.byref(ct_output_buffer), ct.byref(ct_output_size) ) # memory to memory and memory to file elif isinstance(input_file, bytes): # API function declaration self.library.GWMemoryToMemoryProtectAndImport.argtypes = [ ct.c_void_p, ct.c_size_t, ct.POINTER(ct.c_void_p), ct.POINTER(ct.c_size_t) ] # Variable initialization bytearray_buffer = bytearray(input_file) ct_input_buffer = (ct.c_ubyte * len(bytearray_buffer)).from_buffer(bytearray_buffer) ct_input_size = ct.c_size_t(len(input_file)) ct_output_buffer = ct.c_void_p(0) ct_output_size = ct.c_size_t(0) status = self.library.GWMemoryToMemoryProtectAndImport( ct_input_buffer, ct_input_size, ct.byref(ct_output_buffer), ct.byref(ct_output_size) ) input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray,)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file if status not in successes.success_codes: log.error(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {status}\n\tGWFileErrorMsg: {self.GWFileErrorMsg()}") if raise_unsupported: raise errors.error_codes.get(status, errors.UnknownErrorCode)(status) else: file_bytes = None else: log.debug(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {status}") if isinstance(input_file, str) and isinstance(output_file, str): # file to file, read the bytes of the file that Rebuild has already written if not os.path.isfile(output_file): log.error(f"Rebuild returned success code: {status} but no output file was found: {output_file}") file_bytes = None else: with open(output_file, "rb") as f: file_bytes = f.read() else: # file to memory, memory to memory file_bytes = utils.buffer_to_bytes( ct_output_buffer, ct_output_size ) if isinstance(output_file, str): # memory to file # no Rebuild function exists for memory to file, write the memory to file ourselves with open(output_file, "wb") as f: f.write(file_bytes) return file_bytes
def protect_directory(self, input_directory: str, output_directory: Optional[str], content_management_policy: Union[ForwardRef(None), str, bytes, bytearray, _io.BytesIO, ForwardRef('Policy')] = None, raise_unsupported: bool = True)
-
Recursively processes all files in a directory in protect mode using the given content management policy. The protected files are written to output_directory maintaining the same directory structure as input_directory.
Args
input_directory
:str
- The input directory containing files to protect.
output_directory
:Union[None, str]
- The output directory where the protected file will be written, or None to not write files.
content_management_policy
:Union[None, str, bytes, bytearray, io.BytesIO, Policy]
, optional- Default None (sanitise). The content management policy to apply.
raise_unsupported
:bool
, optional- Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
Returns
protected_files_dict (dict): A dictionary of file paths relative to input_directory, and file bytes.
Expand source code
def protect_directory(self, input_directory: str, output_directory: Union[None, str], content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, raise_unsupported: bool = True): """ Recursively processes all files in a directory in protect mode using the given content management policy. The protected files are written to output_directory maintaining the same directory structure as input_directory. Args: input_directory (str): The input directory containing files to protect. output_directory (Union[None, str]): The output directory where the protected file will be written, or None to not write files. content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): Default None (sanitise). The content management policy to apply. raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. Returns: protected_files_dict (dict): A dictionary of file paths relative to input_directory, and file bytes. """ protected_files_dict = {} # Call protect_file on each file in input_directory to output_directory for input_file in utils.list_file_paths(input_directory): relative_path = os.path.relpath(input_file, input_directory) output_file = None if output_directory is None else os.path.join(os.path.abspath(output_directory), relative_path) protected_bytes = self.protect_file( input_file=input_file, output_file=output_file, raise_unsupported=raise_unsupported, content_management_policy=content_management_policy, ) protected_files_dict[relative_path] = protected_bytes return protected_files_dict
def protect_file(self, input_file: Union[str, bytes, bytearray, _io.BytesIO], output_file: Optional[str] = None, content_management_policy: Union[ForwardRef(None), str, bytes, bytearray, _io.BytesIO, ForwardRef('Policy')] = None, raise_unsupported: bool = True)
-
Protects a file using the current content management configuration, returning the file bytes. The protected file is written to output_file if it is provided.
Args
input_file
:Union[str, bytes, bytearray, io.BytesIO]
- The input file path or bytes.
output_file
:Union[None, str]
, optional- The output file path where the protected file will be written.
content_management_policy
:Union[None, str, bytes, bytearray, io.BytesIO, Policy]
, optional- The content management policy to apply.
raise_unsupported
:bool
, optional- Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
Returns
file_bytes (bytes): The protected file bytes.
Expand source code
def protect_file(self, input_file: Union[str, bytes, bytearray, io.BytesIO], output_file: Union[None, str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, raise_unsupported: bool = True): """ Protects a file using the current content management configuration, returning the file bytes. The protected file is written to output_file if it is provided. Args: input_file (Union[str, bytes, bytearray, io.BytesIO]): The input file path or bytes. output_file (Union[None, str], optional): The output file path where the protected file will be written. content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): The content management policy to apply. raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. Returns: file_bytes (bytes): The protected file bytes. """ # Validate arg types if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO)): raise TypeError(input_file) if not isinstance(output_file, (type(None), str)): raise TypeError(output_file) if not isinstance(content_management_policy, (type(None), str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)): raise TypeError(content_management_policy) if not isinstance(raise_unsupported, bool): raise TypeError(raise_unsupported) # Convert string path arguments to absolute paths if isinstance(input_file, str): if not os.path.isfile(input_file): raise FileNotFoundError(input_file) input_file = os.path.abspath(input_file) if isinstance(output_file, str): output_file = os.path.abspath(output_file) # make directories that do not exist os.makedirs(os.path.dirname(output_file), exist_ok=True) if isinstance(content_management_policy, str) and os.path.isfile(content_management_policy): content_management_policy = os.path.abspath(content_management_policy) # Convert memory inputs to bytes if isinstance(input_file, (bytes, bytearray, io.BytesIO)): input_file = utils.as_bytes(input_file) # Check that file type is supported try: file_type = self.determine_file_type(input_file=input_file) except dft.errors.FileTypeEnumError: if raise_unsupported: raise else: return None with utils.CwdHandler(self.library_path): # Set content management policy self.set_content_management_policy(content_management_policy) # file to file if isinstance(input_file, str) and isinstance(output_file, str): # API function declaration self.library.GWFileToFileProtect.argtypes = [ ct.c_wchar_p, ct.c_wchar_p, ct.c_wchar_p ] # Variable initialisation ct_input_file = ct.c_wchar_p(input_file) ct_file_type = ct.c_wchar_p(dft.file_type_int_to_str(file_type)) ct_output_file = ct.c_wchar_p(output_file) # API call status = self.library.GWFileToFileProtect( ct_input_file, ct_file_type, ct_output_file ) # file to memory elif isinstance(input_file, str) and output_file is None: # API function declaration self.library.GWFileProtect.argtypes = [ ct.c_wchar_p, ct.c_wchar_p, ct.POINTER(ct.c_void_p), ct.POINTER(ct.c_size_t) ] # Variable initialisation ct_input_file = ct.c_wchar_p(input_file) ct_file_type = ct.c_wchar_p(dft.file_type_int_to_str(file_type)) ct_output_buffer = ct.c_void_p(0) ct_output_size = ct.c_size_t(0) # API call status = self.library.GWFileProtect( ct_input_file, ct_file_type, ct.byref(ct_output_buffer), ct.byref(ct_output_size) ) # memory to memory and memory to file elif isinstance(input_file, bytes): # API function declaration self.library.GWMemoryToMemoryProtect.argtypes = [ ct.c_void_p, ct.c_size_t, ct.c_wchar_p, ct.POINTER(ct.c_void_p), ct.POINTER(ct.c_size_t) ] # Variable initialization bytearray_buffer = bytearray(input_file) ct_input_buffer = (ct.c_ubyte * len(bytearray_buffer)).from_buffer(bytearray_buffer) ct_input_size = ct.c_size_t(len(input_file)) ct_file_type = ct.c_wchar_p(dft.file_type_int_to_str(file_type)) ct_output_buffer = ct.c_void_p(0) ct_output_size = ct.c_size_t(0) status = self.library.GWMemoryToMemoryProtect( ct_input_buffer, ct_input_size, ct_file_type, ct.byref(ct_output_buffer), ct.byref(ct_output_size) ) input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray,)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file if status not in successes.success_codes: log.error(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {status}\n\tGWFileErrorMsg: {self.GWFileErrorMsg()}") if raise_unsupported: raise errors.error_codes.get(status, errors.UnknownErrorCode)(status) else: file_bytes = None else: log.debug(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {status}") if isinstance(input_file, str) and isinstance(output_file, str): # file to file, read the bytes of the file that Rebuild has already written if not os.path.isfile(output_file): log.error(f"Rebuild returned success code: {status} but no output file was found: {output_file}") file_bytes = None else: with open(output_file, "rb") as f: file_bytes = f.read() else: # file to memory, memory to memory file_bytes = utils.buffer_to_bytes( ct_output_buffer, ct_output_size ) if isinstance(output_file, str): # memory to file # no Rebuild function exists for memory to file, write the memory to file ourselves with open(output_file, "wb") as f: f.write(file_bytes) return file_bytes
def set_content_management_policy(self, input_file: Union[ForwardRef(None), str, bytes, bytearray, _io.BytesIO, ForwardRef('Policy')] = None)
-
Sets the content management policy configuration. If input_file is None then default settings (sanitise) are applied.
Args
input_file
:Union[None, str, bytes, bytearray, io.BytesIO, Policy]
, optional- Default None (sanitise). The content management policy to apply.
Returns
status (int): The result of the Glasswall API call.
Expand source code
def set_content_management_policy(self, input_file: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None): """ Sets the content management policy configuration. If input_file is None then default settings (sanitise) are applied. Args: input_file (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): Default None (sanitise). The content management policy to apply. Returns: status (int): The result of the Glasswall API call. """ # Validate type if not isinstance(input_file, (type(None), str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)): raise TypeError(input_file) # self.library.GWFileConfigRevertToDefaults doesn't work, load default instead # Set input_file to default if input_file is None if input_file is None: input_file = glasswall.content_management.policies.Rebuild(default="sanitise") # Validate xml content is parsable xml_string = utils.validate_xml(input_file) # Declare argument types self.library.GWFileConfigXML.argtypes = [ct.c_wchar_p] # API call status = self.library.GWFileConfigXML( ct.c_wchar_p(xml_string) ) if status not in successes.success_codes: log.error(f"\n\tstatus: {status}\n\tGWFileErrorMsg: {self.GWFileErrorMsg()}") raise errors.error_codes.get(status, errors.UnknownErrorCode)(status) else: log.debug(f"\n\tstatus: {status}") return status
def validate_license(self)
-
Validates the license of the library by attempting to call protect_file on a known supported file.
Raises
RebuildError
- If the license could not be validated.
Expand source code
def validate_license(self): """ Validates the license of the library by attempting to call protect_file on a known supported file. Raises: RebuildError: If the license could not be validated. """ # Call protect file on a known good bitmap to see if license has expired try: self.protect_file( input_file=b"BM:\x00\x00\x00\x00\x00\x00\x006\x00\x00\x00(\x00\x00\x00\x01\x00\x00\x00\x01\x00\x00\x00\x01\x00\x18\x00\x00\x00\x00\x00\x04\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xff\xff\xff\x00", raise_unsupported=True ) log.debug(f"{self.__class__.__name__} license validated successfully.") except errors.RebuildError: log.error(f"{self.__class__.__name__} license validation failed.") raise
def version(self)
-
Returns the Glasswall library version.
Returns
version (str): The Glasswall library version.
Expand source code
def version(self): """ Returns the Glasswall library version. Returns: version (str): The Glasswall library version. """ # Declare the return type self.library.GWFileVersion.restype = ct.c_wchar_p # API call version = self.library.GWFileVersion() return version