Module glasswall.libraries.archive_manager.archive_manager
Expand source code
import ctypes as ct
import functools
import io
import os
from typing import Optional, Union
import glasswall
from glasswall import utils
from glasswall.config.logging import log
from glasswall.libraries.archive_manager import errors, successes
from glasswall.libraries.library import Library
class ArchiveManager(Library):
""" A high level Python wrapper for Glasswall Archive Manager. """
def __init__(self, library_path):
super().__init__(library_path)
self.library = self.load_library(os.path.abspath(library_path))
log.info(f"Loaded Glasswall {self.__class__.__name__} version {self.version()} from {self.library_path}")
def version(self):
""" Returns the Glasswall library version.
Returns:
version (str): The Glasswall library version.
"""
# API function declaration
self.library.GwArchiveVersion.restype = ct.c_char_p
# API call
version = self.library.GwArchiveVersion()
# Convert to Python string
version = ct.string_at(version).decode()
return version
def release(self):
""" Releases any resources held by the Glasswall Archive Manager library. """
self.library.GwArchiveDone()
@functools.lru_cache()
def is_supported_archive(self, archive_type: str):
""" Returns True if the archive type (e.g. `7z`) is supported. """
# API function declaration
self.library.GwIsSupportedArchiveType.argtypes = [
ct.c_char_p
]
self.library.GwIsSupportedArchiveType.restype = ct.c_bool
ct_archive_type = ct.c_char_p(archive_type.encode()) # const char* type
result = self.library.GwIsSupportedArchiveType(ct_archive_type)
return result
def list_archive_paths(self, directory: str, recursive: bool = True, absolute: bool = True, followlinks: bool = True):
""" Returns a list of file paths of supported archives in a directory and all of its subdirectories. """
return [
file_path
for file_path in glasswall.utils.list_file_paths(
directory=directory,
recursive=recursive,
absolute=absolute,
followlinks=followlinks,
)
if self.is_supported_archive(utils.get_file_type(file_path))
]
def analyse_archive(self, input_file: Union[str, bytes, bytearray, io.BytesIO], output_file: Optional[str] = None, output_report: Optional[str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager] = None, raise_unsupported: bool = True):
""" Extracts the input_file archive and processes each file within the archive using the Glasswall engine. Repackages all files regenerated by the Glasswall engine into a new archive, optionally writing the new archive and report to the paths specified by output_file and output_report.
Args:
input_file (Union[str, bytes, bytearray, io.BytesIO]): The archive file path or bytes.
output_file (Optional[str], optional): Default None. If str, write the archive to the output_file path.
output_report (Optional[str], optional): Default None. If str, write the analysis report to the output_report path.
content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager], optional): The content management policy to apply.
raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
Returns:
gw_return_object (glasswall.GwReturnObj): An instance of class glasswall.GwReturnObj containing attributes: "status" (int), "output_file" (bytes), "output_report" (bytes)
"""
# Validate arg types
if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO)):
raise TypeError(input_file)
if not isinstance(output_file, (type(None), str)):
raise TypeError(output_file)
if not isinstance(output_report, (type(None), str)):
raise TypeError(output_report)
if not isinstance(content_management_policy, (type(None), str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)):
raise TypeError(content_management_policy)
# Convert string path arguments to absolute paths
if isinstance(output_file, str):
output_file = os.path.abspath(output_file)
if isinstance(output_report, str):
output_report = os.path.abspath(output_report)
# Convert inputs to bytes
if isinstance(input_file, str):
if not os.path.isfile(input_file):
raise FileNotFoundError(input_file)
with open(input_file, "rb") as f:
input_file_bytes = f.read()
elif isinstance(input_file, (bytes, bytearray, io.BytesIO)):
input_file_bytes = utils.as_bytes(input_file)
if isinstance(content_management_policy, str) and os.path.isfile(content_management_policy):
with open(content_management_policy, "rb") as f:
content_management_policy = f.read()
elif isinstance(content_management_policy, type(None)):
# Load default
content_management_policy = glasswall.content_management.policies.ArchiveManager(default="sanitise", default_archive_manager="process")
content_management_policy = utils.validate_xml(content_management_policy)
# API function declaration
self.library.GwFileAnalysisArchive.argtypes = [
ct.c_void_p,
ct.c_size_t,
ct.POINTER(ct.c_void_p),
ct.POINTER(ct.c_size_t),
ct.POINTER(ct.c_void_p),
ct.POINTER(ct.c_size_t),
ct.c_char_p
]
# Variable initialisation
input_buffer_bytearray = bytearray(input_file_bytes)
ct_input_buffer = (ct.c_ubyte * len(input_buffer_bytearray)).from_buffer(input_buffer_bytearray) # void *inputBuffer
ct_input_buffer_length = ct.c_size_t(len(input_file_bytes)) # size_t inputBufferLength
ct_output_buffer = ct.c_void_p() # void **outputFileBuffer
ct_output_buffer_length = ct.c_size_t() # size_t *outputFileBufferLength
ct_output_report_buffer = ct.c_void_p() # void **outputAnalysisReportBuffer
ct_output_report_buffer_length = ct.c_size_t() # size_t *outputAnalysisReportBufferLength
ct_content_management_policy = ct.c_char_p(content_management_policy.encode()) # const char *xmlConfigString
gw_return_object = glasswall.GwReturnObj()
with utils.CwdHandler(new_cwd=self.library_path):
# API call
gw_return_object.status = self.library.GwFileAnalysisArchive(
ct.byref(ct_input_buffer),
ct_input_buffer_length,
ct.byref(ct_output_buffer),
ct.byref(ct_output_buffer_length),
ct.byref(ct_output_report_buffer),
ct.byref(ct_output_report_buffer_length),
ct_content_management_policy
)
input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray,)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file
if gw_return_object.status not in successes.success_codes:
log.error(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {gw_return_object.status}")
if raise_unsupported:
raise errors.error_codes.get(gw_return_object.status, errors.UnknownErrorCode)(gw_return_object.status)
else:
log.debug(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {gw_return_object.status}")
gw_return_object.output_file = utils.buffer_to_bytes(
ct_output_buffer,
ct_output_buffer_length
)
gw_return_object.output_report = utils.buffer_to_bytes(
ct_output_report_buffer,
ct_output_report_buffer_length
)
# Write output file
if gw_return_object.output_file:
if isinstance(output_file, str):
os.makedirs(os.path.dirname(output_file), exist_ok=True)
with open(output_file, "wb") as f:
f.write(gw_return_object.output_file)
# Write output report
if gw_return_object.output_report:
if isinstance(output_report, str):
os.makedirs(os.path.dirname(output_report), exist_ok=True)
with open(output_report, "wb") as f:
f.write(gw_return_object.output_report)
self.release()
return gw_return_object
def analyse_directory(self, input_directory: str, output_directory: Optional[str] = None, output_report_directory: Optional[str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager] = None, raise_unsupported: bool = True):
""" Calls analyse_archive on each file in input_directory using the given content management configuration. The resulting archives and analysis reports are written to output_directory maintaining the same directory structure as input_directory.
Args:
input_directory (str): The input directory containing archives to analyse.
output_directory (Optional[str], optional): Default None. If str, the output directory where the archives containing analysis reports of each file will be written.
output_report_directory (Optional[str], optional): Default None. If str, the output directory where xml reports for each archive will be written.
content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager], optional): The content management policy to apply.
raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
Returns:
analysed_archives_dict (dict): A dictionary of file paths relative to input_directory, and glasswall.GwReturnObj with attributes: "status" (int), "output_file" (bytes), "output_report" (bytes)
"""
analysed_archives_dict = {}
# Call analyse_archive on each file in input_directory
for input_file in utils.list_file_paths(input_directory):
relative_path = os.path.relpath(input_file, input_directory)
# Construct paths for output file and output report
output_file = None if output_directory is None else os.path.join(os.path.abspath(output_directory), relative_path)
output_report = None if output_report_directory is None else os.path.join(os.path.abspath(output_report_directory), relative_path + ".xml")
result = self.analyse_archive(
input_file=input_file,
output_file=output_file,
output_report=output_report,
content_management_policy=content_management_policy,
raise_unsupported=raise_unsupported,
)
analysed_archives_dict[relative_path] = result
return analysed_archives_dict
def protect_archive(self, input_file: Union[str, bytes, bytearray, io.BytesIO], output_file: Optional[str] = None, output_report: Optional[str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager] = None, raise_unsupported: bool = True):
""" Extracts the input_file archive and processes each file within the archive using the Glasswall engine. Repackages all files regenerated by the Glasswall engine into a new archive, optionally writing the new archive and report to the paths specified by output_file and output_report.
Args:
input_file (Union[str, bytes, bytearray, io.BytesIO]): The archive file path or bytes.
output_file (Optional[str], optional): Default None. If str, write the archive to the output_file path.
output_report (Optional[str], optional): Default None. If str, write the analysis report to the output_report path.
content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager], optional): The content management policy to apply.
raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
Returns:
gw_return_object (glasswall.GwReturnObj): An instance of class glasswall.GwReturnObj containing attributes: "status" (int), "output_file" (bytes), "output_report" (bytes)
"""
# Validate arg types
if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO)):
raise TypeError(input_file)
if not isinstance(output_file, (type(None), str)):
raise TypeError(output_file)
if not isinstance(output_report, (type(None), str)):
raise TypeError(output_report)
if not isinstance(content_management_policy, (type(None), str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)):
raise TypeError(content_management_policy)
# Convert string path arguments to absolute paths
if isinstance(output_file, str):
output_file = os.path.abspath(output_file)
if isinstance(output_report, str):
output_report = os.path.abspath(output_report)
# Convert inputs to bytes
if isinstance(input_file, str):
if not os.path.isfile(input_file):
raise FileNotFoundError(input_file)
with open(input_file, "rb") as f:
input_file_bytes = f.read()
elif isinstance(input_file, (bytes, bytearray, io.BytesIO)):
input_file_bytes = utils.as_bytes(input_file)
if isinstance(content_management_policy, str) and os.path.isfile(content_management_policy):
with open(content_management_policy, "rb") as f:
content_management_policy = f.read()
elif isinstance(content_management_policy, type(None)):
# Load default
content_management_policy = glasswall.content_management.policies.ArchiveManager(default="sanitise", default_archive_manager="process")
content_management_policy = utils.validate_xml(content_management_policy)
# API function declaration
self.library.GwFileProtectAndReportArchive.argtypes = [
ct.c_void_p,
ct.c_size_t,
ct.POINTER(ct.c_void_p),
ct.POINTER(ct.c_size_t),
ct.POINTER(ct.c_void_p),
ct.POINTER(ct.c_size_t),
ct.c_char_p
]
# Variable initialisation
input_buffer_bytearray = bytearray(input_file_bytes)
ct_input_buffer = (ct.c_ubyte * len(input_buffer_bytearray)).from_buffer(input_buffer_bytearray) # void *inputBuffer
ct_input_buffer_length = ct.c_size_t(len(input_file_bytes)) # size_t inputBufferLength
ct_output_buffer = ct.c_void_p() # void **outputFileBuffer
ct_output_buffer_length = ct.c_size_t() # size_t *outputFileBufferLength
ct_output_report_buffer = ct.c_void_p() # void **outputAnalysisReportBuffer
ct_output_report_buffer_length = ct.c_size_t() # size_t *outputAnalysisReportBufferLength
ct_content_management_policy = ct.c_char_p(content_management_policy.encode()) # const char *xmlConfigString
gw_return_object = glasswall.GwReturnObj()
with utils.CwdHandler(new_cwd=self.library_path):
# API call
gw_return_object.status = self.library.GwFileProtectAndReportArchive(
ct.byref(ct_input_buffer),
ct_input_buffer_length,
ct.byref(ct_output_buffer),
ct.byref(ct_output_buffer_length),
ct.byref(ct_output_report_buffer),
ct.byref(ct_output_report_buffer_length),
ct_content_management_policy
)
input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray,)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file
if gw_return_object.status not in successes.success_codes:
log.error(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {gw_return_object.status}")
if raise_unsupported:
raise errors.error_codes.get(gw_return_object.status, errors.UnknownErrorCode)(gw_return_object.status)
else:
log.debug(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {gw_return_object.status}")
gw_return_object.output_file = utils.buffer_to_bytes(
ct_output_buffer,
ct_output_buffer_length
)
gw_return_object.output_report = utils.buffer_to_bytes(
ct_output_report_buffer,
ct_output_report_buffer_length
)
# Write output file
if gw_return_object.output_file:
if isinstance(output_file, str):
os.makedirs(os.path.dirname(output_file), exist_ok=True)
with open(output_file, "wb") as f:
f.write(gw_return_object.output_file)
# Write output report
if gw_return_object.output_report:
if isinstance(output_report, str):
os.makedirs(os.path.dirname(output_report), exist_ok=True)
with open(output_report, "wb") as f:
f.write(gw_return_object.output_report)
self.release()
return gw_return_object
def protect_directory(self, input_directory: str, output_directory: Optional[str] = None, output_report_directory: Optional[str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager] = None, raise_unsupported: bool = True):
""" Calls protect_archive on each file in input_directory using the given content management configuration. The resulting archives are written to output_directory maintaining the same directory structure as input_directory.
Args:
input_directory (str): The input directory containing archives to protect.
output_directory (Optional[str], optional): Default None. If str, the output directory where the archives will be written.
output_report_directory (Optional[str], optional): Default None. If str, the output directory where xml reports for each archive will be written.
content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager], optional): The content management policy to apply.
raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
Returns:
protected_archives_dict (dict): A dictionary of file paths relative to input_directory, and glasswall.GwReturnObj with attributes: "status" (int), "output_file" (bytes), "output_report" (bytes)
"""
protected_archives_dict = {}
# Call protect_archive on each file in input_directory to output_directory
for input_file in utils.list_file_paths(input_directory):
relative_path = os.path.relpath(input_file, input_directory)
# Construct paths for output file and output report
output_file = None if output_directory is None else os.path.join(os.path.abspath(output_directory), relative_path)
output_report = None if output_report_directory is None else os.path.join(os.path.abspath(output_report_directory), relative_path + ".xml")
result = self.protect_archive(
input_file=input_file,
output_file=output_file,
output_report=output_report,
content_management_policy=content_management_policy,
raise_unsupported=raise_unsupported,
)
protected_archives_dict[relative_path] = result
return protected_archives_dict
def file_to_file_unpack(self, input_file: str, output_directory: str, file_type: Optional[str] = None, raise_unsupported: bool = True):
# Validate arg types
if not isinstance(input_file, str):
raise TypeError(input_file)
elif not os.path.isfile(input_file):
raise FileNotFoundError(input_file)
if not isinstance(output_directory, str):
raise TypeError(output_directory)
if not file_type:
file_type = utils.get_file_type(input_file)
# API function declaration
self.library.GwFileToFileUnpack.argtypes = [
ct.c_char_p,
ct.c_char_p,
ct.c_char_p,
]
# Variable initialisation
gw_return_object = glasswall.GwReturnObj()
gw_return_object.ct_input_file = ct.c_char_p(input_file.encode()) # const char* inputFilePath
gw_return_object.ct_output_directory = ct.c_char_p(output_directory.encode()) # const char* outputDirPath
gw_return_object.ct_file_type = ct.c_char_p(file_type.encode()) # const char *fileType
with utils.CwdHandler(new_cwd=self.library_path):
try:
# API call
gw_return_object.status = self.library.GwFileToFileUnpack(
gw_return_object.ct_input_file,
gw_return_object.ct_output_directory,
gw_return_object.ct_file_type,
)
except OSError:
# bz2, gz currently OSError on unpack, not supported unpacking
# OSError: exception: access violation reading 0x0000000000000000
if raise_unsupported:
raise
return 0
if gw_return_object.status not in successes.success_codes:
log.error(f"\n\tinput_file: {input_file}\n\tstatus: {gw_return_object.status}")
if raise_unsupported:
raise errors.error_codes.get(gw_return_object.status, errors.UnknownErrorCode)(gw_return_object.status)
else:
log.debug(f"\n\tinput_file: {input_file}\n\tstatus: {gw_return_object.status}")
self.release()
return gw_return_object
def file_to_file_pack(self, input_directory: str, output_directory: str, file_type: Optional[str] = None, raise_unsupported: bool = True):
# Validate arg types
if not isinstance(input_directory, str):
raise TypeError(input_directory)
elif not os.path.isdir(input_directory):
raise NotADirectoryError(input_directory)
if not isinstance(output_directory, str):
raise TypeError(output_directory)
if not file_type:
file_type = utils.get_file_type(input_directory)
# Ensure output_directory exists
os.makedirs(output_directory, exist_ok=True)
# API function declaration
self.library.GwFileToFilePack.argtypes = [
ct.c_char_p,
ct.c_char_p,
ct.c_char_p,
]
# Variable initialisation
gw_return_object = glasswall.GwReturnObj()
gw_return_object.ct_input_directory = ct.c_char_p(input_directory.encode()) # const char* inputDirPath
gw_return_object.ct_output_directory = ct.c_char_p(output_directory.encode()) # const char* outputDirPath
gw_return_object.ct_file_type = ct.c_char_p(file_type.encode()) # const char *fileType
with utils.CwdHandler(new_cwd=self.library_path):
# API call
gw_return_object.status = self.library.GwFileToFilePack(
gw_return_object.ct_input_directory,
gw_return_object.ct_output_directory,
gw_return_object.ct_file_type,
)
if gw_return_object.status not in successes.success_codes:
log.error(f"\n\tinput_directory: {input_directory}\n\tstatus: {gw_return_object.status}")
if raise_unsupported:
raise errors.error_codes.get(gw_return_object.status, errors.UnknownErrorCode)(gw_return_object.status)
else:
log.debug(f"\n\tinput_directory: {input_directory}\n\tstatus: {gw_return_object.status}")
self.release()
return gw_return_object
def unpack(self, input_file: str, output_directory: str, recursive: bool = True, file_type: Optional[str] = None, include_file_type: Optional[bool] = False, raise_unsupported: bool = True, delete_origin: bool = False):
""" Unpack an archive, maintaining directory structure. Supported archive formats are: "7z", "bz2", "gz", "rar", "tar", "zip".
Args:
input_file (str): The archive file path
output_directory (str): The output directory where the archive will be unpacked to a new directory.
recursive (bool, optional): Default True. Recursively unpack all nested archives.
file_type (str, optional): Default None (use extension). The archive file type.
include_file_type (bool, optional): Default False. Include the archive format in the directory name. Useful when there are multiple same-named archives of different formats.
raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
delete_origin (bool, optional): Default False. Delete input_file after unpacking to output_directory.
"""
# Convert to absolute paths
input_file = os.path.abspath(input_file)
output_directory = os.path.abspath(output_directory)
if include_file_type:
archive_name = os.path.basename(input_file)
else:
archive_name = os.path.splitext(os.path.basename(input_file))[0]
archive_output_directory = os.path.join(output_directory, archive_name)
# Unpack
log.debug(f"Unpacking\n\tsrc: {input_file}\n\tdst: {archive_output_directory}")
status = self.file_to_file_unpack(input_file=input_file, output_directory=archive_output_directory, file_type=file_type, raise_unsupported=raise_unsupported).status
if status not in successes.success_codes:
log.error(f"\n\tinput_file: {input_file}\n\tstatus: {status}")
if raise_unsupported:
raise errors.error_codes.get(status, errors.UnknownErrorCode)(status)
else:
log.debug(f"\n\tinput_file: {input_file}\n\tstatus: {status}")
if delete_origin:
os.remove(input_file)
if recursive:
# Unpack sub archives
for subarchive in self.list_archive_paths(archive_output_directory):
self.unpack(
input_file=subarchive,
output_directory=archive_output_directory,
recursive=recursive,
raise_unsupported=raise_unsupported,
delete_origin=True
)
return status
def unpack_directory(self, input_directory: str, output_directory: str, recursive: bool = True, file_type: Optional[str] = None, include_file_type: Optional[bool] = False, raise_unsupported: bool = True, delete_origin: bool = False):
""" Unpack a directory of archives, maintaining directory structure.
Args:
input_directory (str): The input directory containing archives to unpack.
output_directory (str): The output directory where archives will be unpacked to a new directory.
recursive (bool, optional): Default True. Recursively unpack all nested archives.
file_type (str, optional): Default None (use extension). The archive file type of all archives within the directory.
include_file_type (bool, optional): Default False. Include the archive format in the directory name. Useful when there are multiple same-named archives of different formats.
raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
delete_origin (bool, optional): Default False. Delete input_file after unpacking to output_directory.
"""
# Convert to absolute paths
input_directory = os.path.abspath(input_directory)
output_directory = os.path.abspath(output_directory)
for archive_input_file in self.list_archive_paths(input_directory):
relative_path = os.path.relpath(archive_input_file, input_directory)
archive_output_file = os.path.dirname(os.path.join(output_directory, relative_path))
self.unpack(
input_file=archive_input_file,
output_directory=archive_output_file,
recursive=recursive,
file_type=file_type,
include_file_type=include_file_type,
raise_unsupported=raise_unsupported,
delete_origin=delete_origin
)
def pack_directory(self, input_directory: str, output_directory: str, file_type: str, raise_unsupported: bool = True, delete_origin: bool = False):
""" Pack a directory. Supported archive formats are: "7z", "bz2", "gz", "rar", "tar", "zip".
Args:
input_file (str): The archive file path
output_directory (str): The output directory where the archive will be unpacked to a new directory.
recursive (bool, optional): Default True. Recursively unpack all nested archives.
file_type (str): The archive file type.
raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
delete_origin (bool, optional): Default False. Delete input_file after unpacking to output_directory.
"""
# Convert to absolute paths
input_directory = os.path.abspath(input_directory)
output_directory = os.path.abspath(output_directory)
# Pack
log.debug(f"Packing\n\tsrc: {input_directory}\n\tdst: {output_directory}")
status = self.file_to_file_pack(input_directory=input_directory, output_directory=output_directory, file_type=file_type, raise_unsupported=raise_unsupported).status
if status not in successes.success_codes:
log.error(f"\n\tinput_directory: {input_directory}\n\tstatus: {status}")
if raise_unsupported:
raise errors.error_codes.get(status, errors.UnknownErrorCode)(status)
else:
log.debug(f"\n\tinput_directory: {input_directory}\n\tstatus: {status}")
if delete_origin:
utils.delete_directory(input_directory)
def export_archive(self, input_file: Union[str, bytes, bytearray, io.BytesIO], output_file: Optional[str] = None, output_report: Optional[str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager] = None, raise_unsupported: bool = True):
""" Exports an archive using the Glasswall engine.
Args:
input_file (Union[str, bytes, bytearray, io.BytesIO]): The archive file path or bytes.
output_file (Optional[str], optional): Default None. If str, write the archive to the output_file path.
output_report (Optional[str], optional): Default None. If str, write the analysis report to the output_report path.
content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager], optional): The content management policy to apply.
raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
Returns:
gw_return_object (glasswall.GwReturnObj): An instance of class glasswall.GwReturnObj containing attributes: "status" (int), "output_file" (bytes), "output_report" (bytes)
"""
# Validate arg types
if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO)):
raise TypeError(input_file)
if not isinstance(output_file, (type(None), str)):
raise TypeError(output_file)
if not isinstance(output_report, (type(None), str)):
raise TypeError(output_report)
if not isinstance(content_management_policy, (type(None), str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)):
raise TypeError(content_management_policy)
# Convert string path arguments to absolute paths
if isinstance(output_file, str):
output_file = os.path.abspath(output_file)
if isinstance(output_report, str):
output_report = os.path.abspath(output_report)
# Convert inputs to bytes
if isinstance(input_file, str):
if not os.path.isfile(input_file):
raise FileNotFoundError(input_file)
with open(input_file, "rb") as f:
input_file_bytes = f.read()
elif isinstance(input_file, (bytes, bytearray, io.BytesIO)):
input_file_bytes = utils.as_bytes(input_file)
if isinstance(content_management_policy, str) and os.path.isfile(content_management_policy):
with open(content_management_policy, "rb") as f:
content_management_policy = f.read()
elif isinstance(content_management_policy, type(None)):
# Load default
content_management_policy = glasswall.content_management.policies.ArchiveManager(default="sanitise", default_archive_manager="process")
content_management_policy = utils.validate_xml(content_management_policy)
# API function declaration
self.library.GwFileExportArchive.argtypes = [
ct.c_void_p,
ct.c_size_t,
ct.POINTER(ct.c_void_p),
ct.POINTER(ct.c_size_t),
ct.POINTER(ct.c_void_p),
ct.POINTER(ct.c_size_t),
ct.c_char_p
]
# Variable initialisation
input_buffer_bytearray = bytearray(input_file_bytes)
ct_input_buffer = (ct.c_ubyte * len(input_buffer_bytearray)).from_buffer(input_buffer_bytearray) # void *inputBuffer
ct_input_buffer_length = ct.c_size_t(len(input_file_bytes)) # size_t inputBufferLength
ct_output_buffer = ct.c_void_p() # void **outputFileBuffer
ct_output_buffer_length = ct.c_size_t() # size_t *outputFileBufferLength
ct_output_report_buffer = ct.c_void_p() # void **outputReportBuffer
ct_output_report_buffer_length = ct.c_size_t() # size_t *outputReportBufferLength
ct_content_management_policy = ct.c_char_p(content_management_policy.encode()) # const char *xmlConfigString
gw_return_object = glasswall.GwReturnObj()
with utils.CwdHandler(new_cwd=self.library_path):
# API call
gw_return_object.status = self.library.GwFileExportArchive(
ct.byref(ct_input_buffer),
ct_input_buffer_length,
ct.byref(ct_output_buffer),
ct.byref(ct_output_buffer_length),
ct.byref(ct_output_report_buffer),
ct.byref(ct_output_report_buffer_length),
ct_content_management_policy
)
input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray,)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file
if gw_return_object.status not in successes.success_codes:
log.error(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {gw_return_object.status}")
if raise_unsupported:
raise errors.error_codes.get(gw_return_object.status, errors.UnknownErrorCode)(gw_return_object.status)
else:
log.debug(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {gw_return_object.status}")
gw_return_object.output_file = utils.buffer_to_bytes(
ct_output_buffer,
ct_output_buffer_length
)
gw_return_object.output_report = utils.buffer_to_bytes(
ct_output_report_buffer,
ct_output_report_buffer_length
)
# Write output file
if gw_return_object.output_file:
if isinstance(output_file, str):
os.makedirs(os.path.dirname(output_file), exist_ok=True)
with open(output_file, "wb") as f:
f.write(gw_return_object.output_file)
# Write output report
if gw_return_object.output_report:
if isinstance(output_report, str):
os.makedirs(os.path.dirname(output_report), exist_ok=True)
with open(output_report, "wb") as f:
f.write(gw_return_object.output_report)
self.release()
return gw_return_object
def export_directory(self, input_directory: str, output_directory: Optional[str], output_report_directory: Optional[str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager] = None, raise_unsupported: bool = True):
""" Calls export_archive on each file in input_directory. The exported archives are written to output_directory maintaining the same directory structure as input_directory.
Args:
input_directory (str): The input directory containing archives to export.
output_directory (Optional[str], optional): Default None. If str, the output directory where the archives will be written.
output_report_directory (Optional[str], optional): Default None. If str, the output directory where xml reports for each archive will be written.
content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager], optional): The content management policy to apply.
raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
Returns:
exported_archives_dict (dict): A dictionary of file paths relative to input_directory, and glasswall.GwReturnObj with attributes: "status" (int), "output_file" (bytes), "output_report" (bytes)
"""
exported_archives_dict = {}
# Call export_archive on each file in input_directory to output_directory
for input_file in utils.list_file_paths(input_directory):
relative_path = os.path.relpath(input_file, input_directory)
# Construct paths for output file and output report
output_file = None if output_directory is None else os.path.join(os.path.abspath(output_directory), relative_path)
output_report = None if output_report_directory is None else os.path.join(os.path.abspath(output_report_directory), relative_path + ".xml")
result = self.export_archive(
input_file=input_file,
output_file=output_file,
output_report=output_report,
content_management_policy=content_management_policy,
raise_unsupported=raise_unsupported,
)
exported_archives_dict[relative_path] = result
return exported_archives_dict
def import_archive(self, input_file: Union[str, bytes, bytearray, io.BytesIO], output_file: Optional[str] = None, output_report: Optional[str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager] = None, include_analysis_report: Optional[int] = 1, raise_unsupported: bool = True):
""" Imports an archive using the Glasswall engine.
Args:
input_file (Union[str, bytes, bytearray, io.BytesIO]): The archive file path or bytes.
output_file (Optional[str], optional): Default None. If str, write the archive to the output_file path.
output_report (Optional[str], optional): Default None. If str, write the analysis report to the output_report path.
content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager], optional): The content management policy to apply.
include_analysis_report (Optional[int], optional): Default 1. If 1, write the analysis report into imported archive.
raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
Returns:
gw_return_object (glasswall.GwReturnObj): An instance of class glasswall.GwReturnObj containing attributes: "status" (int), "output_file" (bytes), "output_report" (bytes)
"""
# Validate arg types
if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO)):
raise TypeError(input_file)
if not isinstance(output_file, (type(None), str)):
raise TypeError(output_file)
if not isinstance(output_report, (type(None), str)):
raise TypeError(output_report)
if not isinstance(content_management_policy, (type(None), str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)):
raise TypeError(content_management_policy)
# Convert string path arguments to absolute paths
if isinstance(output_file, str):
output_file = os.path.abspath(output_file)
if isinstance(output_report, str):
output_report = os.path.abspath(output_report)
# Convert inputs to bytes
if isinstance(input_file, str):
if not os.path.isfile(input_file):
raise FileNotFoundError(input_file)
with open(input_file, "rb") as f:
input_file_bytes = f.read()
elif isinstance(input_file, (bytes, bytearray, io.BytesIO)):
input_file_bytes = utils.as_bytes(input_file)
if isinstance(content_management_policy, str) and os.path.isfile(content_management_policy):
with open(content_management_policy, "rb") as f:
content_management_policy = f.read()
elif isinstance(content_management_policy, type(None)):
# Load default
content_management_policy = glasswall.content_management.policies.ArchiveManager(default="sanitise", default_archive_manager="process")
content_management_policy = utils.validate_xml(content_management_policy)
# API function declaration
self.library.GwFileImportArchive.argtypes = [
ct.c_void_p,
ct.c_size_t,
ct.POINTER(ct.c_void_p),
ct.POINTER(ct.c_size_t),
ct.POINTER(ct.c_void_p),
ct.POINTER(ct.c_size_t),
ct.c_char_p,
ct.c_int
]
# Variable initialisation
input_buffer_bytearray = bytearray(input_file_bytes)
ct_input_buffer = (ct.c_ubyte * len(input_buffer_bytearray)).from_buffer(input_buffer_bytearray) # void *inputBuffer
ct_input_buffer_length = ct.c_size_t(len(input_file_bytes)) # size_t inputBufferLength
ct_output_buffer = ct.c_void_p() # void **outputFileBuffer
ct_output_buffer_length = ct.c_size_t() # size_t *outputFileBufferLength
ct_output_report_buffer = ct.c_void_p() # void **outputReportBuffer
ct_output_report_buffer_length = ct.c_size_t() # size_t *outputReportBufferLength
ct_content_management_policy = ct.c_char_p(content_management_policy.encode()) # const char *xmlConfigString
ct_include_analysis_report = ct.c_int(include_analysis_report) # int
gw_return_object = glasswall.GwReturnObj()
with utils.CwdHandler(new_cwd=self.library_path):
# API call
gw_return_object.status = self.library.GwFileImportArchive(
ct.byref(ct_input_buffer),
ct_input_buffer_length,
ct.byref(ct_output_buffer),
ct.byref(ct_output_buffer_length),
ct.byref(ct_output_report_buffer),
ct.byref(ct_output_report_buffer_length),
ct_content_management_policy,
ct_include_analysis_report
)
input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray,)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file
if gw_return_object.status not in successes.success_codes:
log.error(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {gw_return_object.status}")
if raise_unsupported:
raise errors.error_codes.get(gw_return_object.status, errors.UnknownErrorCode)(gw_return_object.status)
else:
log.debug(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {gw_return_object.status}")
gw_return_object.output_file = utils.buffer_to_bytes(
ct_output_buffer,
ct_output_buffer_length
)
gw_return_object.output_report = utils.buffer_to_bytes(
ct_output_report_buffer,
ct_output_report_buffer_length
)
# Write output file
if gw_return_object.output_file:
if isinstance(output_file, str):
os.makedirs(os.path.dirname(output_file), exist_ok=True)
with open(output_file, "wb") as f:
f.write(gw_return_object.output_file)
# Write output report
if gw_return_object.output_report:
if isinstance(output_report, str):
os.makedirs(os.path.dirname(output_report), exist_ok=True)
with open(output_report, "wb") as f:
f.write(gw_return_object.output_report)
self.release()
return gw_return_object
def import_directory(self, input_directory: str, output_directory: Optional[str], output_report_directory: Optional[str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager] = None, raise_unsupported: bool = True):
""" Calls import_archive on each file in input_directory. The imported archives are written to output_directory maintaining the same directory structure as input_directory.
Args:
input_directory (str): The input directory containing archives to import.
output_directory (Optional[str], optional): Default None. If str, the output directory where the archives will be written.
output_report_directory (Optional[str], optional): Default None. If str, the output directory where xml reports for each archive will be written.
content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager], optional): The content management policy to apply.
raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
Returns:
imported_archives_dict (dict): A dictionary of file paths relative to input_directory, and glasswall.GwReturnObj with attributes: "status" (int), "output_file" (bytes), "output_report" (bytes)
"""
imported_archives_dict = {}
# Call import_archive on each file in input_directory to output_directory
for input_file in utils.list_file_paths(input_directory):
relative_path = os.path.relpath(input_file, input_directory)
# Construct paths for output file and output report
output_file = None if output_directory is None else os.path.join(os.path.abspath(output_directory), relative_path)
output_report = None if output_report_directory is None else os.path.join(os.path.abspath(output_report_directory), relative_path + ".xml")
result = self.import_archive(
input_file=input_file,
output_file=output_file,
output_report=output_report,
content_management_policy=content_management_policy,
raise_unsupported=raise_unsupported,
)
imported_archives_dict[relative_path] = result
return imported_archives_dict
Classes
class ArchiveManager (library_path)
-
A high level Python wrapper for Glasswall Archive Manager.
Expand source code
class ArchiveManager(Library): """ A high level Python wrapper for Glasswall Archive Manager. """ def __init__(self, library_path): super().__init__(library_path) self.library = self.load_library(os.path.abspath(library_path)) log.info(f"Loaded Glasswall {self.__class__.__name__} version {self.version()} from {self.library_path}") def version(self): """ Returns the Glasswall library version. Returns: version (str): The Glasswall library version. """ # API function declaration self.library.GwArchiveVersion.restype = ct.c_char_p # API call version = self.library.GwArchiveVersion() # Convert to Python string version = ct.string_at(version).decode() return version def release(self): """ Releases any resources held by the Glasswall Archive Manager library. """ self.library.GwArchiveDone() @functools.lru_cache() def is_supported_archive(self, archive_type: str): """ Returns True if the archive type (e.g. `7z`) is supported. """ # API function declaration self.library.GwIsSupportedArchiveType.argtypes = [ ct.c_char_p ] self.library.GwIsSupportedArchiveType.restype = ct.c_bool ct_archive_type = ct.c_char_p(archive_type.encode()) # const char* type result = self.library.GwIsSupportedArchiveType(ct_archive_type) return result def list_archive_paths(self, directory: str, recursive: bool = True, absolute: bool = True, followlinks: bool = True): """ Returns a list of file paths of supported archives in a directory and all of its subdirectories. """ return [ file_path for file_path in glasswall.utils.list_file_paths( directory=directory, recursive=recursive, absolute=absolute, followlinks=followlinks, ) if self.is_supported_archive(utils.get_file_type(file_path)) ] def analyse_archive(self, input_file: Union[str, bytes, bytearray, io.BytesIO], output_file: Optional[str] = None, output_report: Optional[str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager] = None, raise_unsupported: bool = True): """ Extracts the input_file archive and processes each file within the archive using the Glasswall engine. Repackages all files regenerated by the Glasswall engine into a new archive, optionally writing the new archive and report to the paths specified by output_file and output_report. Args: input_file (Union[str, bytes, bytearray, io.BytesIO]): The archive file path or bytes. output_file (Optional[str], optional): Default None. If str, write the archive to the output_file path. output_report (Optional[str], optional): Default None. If str, write the analysis report to the output_report path. content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager], optional): The content management policy to apply. raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. Returns: gw_return_object (glasswall.GwReturnObj): An instance of class glasswall.GwReturnObj containing attributes: "status" (int), "output_file" (bytes), "output_report" (bytes) """ # Validate arg types if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO)): raise TypeError(input_file) if not isinstance(output_file, (type(None), str)): raise TypeError(output_file) if not isinstance(output_report, (type(None), str)): raise TypeError(output_report) if not isinstance(content_management_policy, (type(None), str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)): raise TypeError(content_management_policy) # Convert string path arguments to absolute paths if isinstance(output_file, str): output_file = os.path.abspath(output_file) if isinstance(output_report, str): output_report = os.path.abspath(output_report) # Convert inputs to bytes if isinstance(input_file, str): if not os.path.isfile(input_file): raise FileNotFoundError(input_file) with open(input_file, "rb") as f: input_file_bytes = f.read() elif isinstance(input_file, (bytes, bytearray, io.BytesIO)): input_file_bytes = utils.as_bytes(input_file) if isinstance(content_management_policy, str) and os.path.isfile(content_management_policy): with open(content_management_policy, "rb") as f: content_management_policy = f.read() elif isinstance(content_management_policy, type(None)): # Load default content_management_policy = glasswall.content_management.policies.ArchiveManager(default="sanitise", default_archive_manager="process") content_management_policy = utils.validate_xml(content_management_policy) # API function declaration self.library.GwFileAnalysisArchive.argtypes = [ ct.c_void_p, ct.c_size_t, ct.POINTER(ct.c_void_p), ct.POINTER(ct.c_size_t), ct.POINTER(ct.c_void_p), ct.POINTER(ct.c_size_t), ct.c_char_p ] # Variable initialisation input_buffer_bytearray = bytearray(input_file_bytes) ct_input_buffer = (ct.c_ubyte * len(input_buffer_bytearray)).from_buffer(input_buffer_bytearray) # void *inputBuffer ct_input_buffer_length = ct.c_size_t(len(input_file_bytes)) # size_t inputBufferLength ct_output_buffer = ct.c_void_p() # void **outputFileBuffer ct_output_buffer_length = ct.c_size_t() # size_t *outputFileBufferLength ct_output_report_buffer = ct.c_void_p() # void **outputAnalysisReportBuffer ct_output_report_buffer_length = ct.c_size_t() # size_t *outputAnalysisReportBufferLength ct_content_management_policy = ct.c_char_p(content_management_policy.encode()) # const char *xmlConfigString gw_return_object = glasswall.GwReturnObj() with utils.CwdHandler(new_cwd=self.library_path): # API call gw_return_object.status = self.library.GwFileAnalysisArchive( ct.byref(ct_input_buffer), ct_input_buffer_length, ct.byref(ct_output_buffer), ct.byref(ct_output_buffer_length), ct.byref(ct_output_report_buffer), ct.byref(ct_output_report_buffer_length), ct_content_management_policy ) input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray,)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file if gw_return_object.status not in successes.success_codes: log.error(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {gw_return_object.status}") if raise_unsupported: raise errors.error_codes.get(gw_return_object.status, errors.UnknownErrorCode)(gw_return_object.status) else: log.debug(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {gw_return_object.status}") gw_return_object.output_file = utils.buffer_to_bytes( ct_output_buffer, ct_output_buffer_length ) gw_return_object.output_report = utils.buffer_to_bytes( ct_output_report_buffer, ct_output_report_buffer_length ) # Write output file if gw_return_object.output_file: if isinstance(output_file, str): os.makedirs(os.path.dirname(output_file), exist_ok=True) with open(output_file, "wb") as f: f.write(gw_return_object.output_file) # Write output report if gw_return_object.output_report: if isinstance(output_report, str): os.makedirs(os.path.dirname(output_report), exist_ok=True) with open(output_report, "wb") as f: f.write(gw_return_object.output_report) self.release() return gw_return_object def analyse_directory(self, input_directory: str, output_directory: Optional[str] = None, output_report_directory: Optional[str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager] = None, raise_unsupported: bool = True): """ Calls analyse_archive on each file in input_directory using the given content management configuration. The resulting archives and analysis reports are written to output_directory maintaining the same directory structure as input_directory. Args: input_directory (str): The input directory containing archives to analyse. output_directory (Optional[str], optional): Default None. If str, the output directory where the archives containing analysis reports of each file will be written. output_report_directory (Optional[str], optional): Default None. If str, the output directory where xml reports for each archive will be written. content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager], optional): The content management policy to apply. raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. Returns: analysed_archives_dict (dict): A dictionary of file paths relative to input_directory, and glasswall.GwReturnObj with attributes: "status" (int), "output_file" (bytes), "output_report" (bytes) """ analysed_archives_dict = {} # Call analyse_archive on each file in input_directory for input_file in utils.list_file_paths(input_directory): relative_path = os.path.relpath(input_file, input_directory) # Construct paths for output file and output report output_file = None if output_directory is None else os.path.join(os.path.abspath(output_directory), relative_path) output_report = None if output_report_directory is None else os.path.join(os.path.abspath(output_report_directory), relative_path + ".xml") result = self.analyse_archive( input_file=input_file, output_file=output_file, output_report=output_report, content_management_policy=content_management_policy, raise_unsupported=raise_unsupported, ) analysed_archives_dict[relative_path] = result return analysed_archives_dict def protect_archive(self, input_file: Union[str, bytes, bytearray, io.BytesIO], output_file: Optional[str] = None, output_report: Optional[str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager] = None, raise_unsupported: bool = True): """ Extracts the input_file archive and processes each file within the archive using the Glasswall engine. Repackages all files regenerated by the Glasswall engine into a new archive, optionally writing the new archive and report to the paths specified by output_file and output_report. Args: input_file (Union[str, bytes, bytearray, io.BytesIO]): The archive file path or bytes. output_file (Optional[str], optional): Default None. If str, write the archive to the output_file path. output_report (Optional[str], optional): Default None. If str, write the analysis report to the output_report path. content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager], optional): The content management policy to apply. raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. Returns: gw_return_object (glasswall.GwReturnObj): An instance of class glasswall.GwReturnObj containing attributes: "status" (int), "output_file" (bytes), "output_report" (bytes) """ # Validate arg types if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO)): raise TypeError(input_file) if not isinstance(output_file, (type(None), str)): raise TypeError(output_file) if not isinstance(output_report, (type(None), str)): raise TypeError(output_report) if not isinstance(content_management_policy, (type(None), str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)): raise TypeError(content_management_policy) # Convert string path arguments to absolute paths if isinstance(output_file, str): output_file = os.path.abspath(output_file) if isinstance(output_report, str): output_report = os.path.abspath(output_report) # Convert inputs to bytes if isinstance(input_file, str): if not os.path.isfile(input_file): raise FileNotFoundError(input_file) with open(input_file, "rb") as f: input_file_bytes = f.read() elif isinstance(input_file, (bytes, bytearray, io.BytesIO)): input_file_bytes = utils.as_bytes(input_file) if isinstance(content_management_policy, str) and os.path.isfile(content_management_policy): with open(content_management_policy, "rb") as f: content_management_policy = f.read() elif isinstance(content_management_policy, type(None)): # Load default content_management_policy = glasswall.content_management.policies.ArchiveManager(default="sanitise", default_archive_manager="process") content_management_policy = utils.validate_xml(content_management_policy) # API function declaration self.library.GwFileProtectAndReportArchive.argtypes = [ ct.c_void_p, ct.c_size_t, ct.POINTER(ct.c_void_p), ct.POINTER(ct.c_size_t), ct.POINTER(ct.c_void_p), ct.POINTER(ct.c_size_t), ct.c_char_p ] # Variable initialisation input_buffer_bytearray = bytearray(input_file_bytes) ct_input_buffer = (ct.c_ubyte * len(input_buffer_bytearray)).from_buffer(input_buffer_bytearray) # void *inputBuffer ct_input_buffer_length = ct.c_size_t(len(input_file_bytes)) # size_t inputBufferLength ct_output_buffer = ct.c_void_p() # void **outputFileBuffer ct_output_buffer_length = ct.c_size_t() # size_t *outputFileBufferLength ct_output_report_buffer = ct.c_void_p() # void **outputAnalysisReportBuffer ct_output_report_buffer_length = ct.c_size_t() # size_t *outputAnalysisReportBufferLength ct_content_management_policy = ct.c_char_p(content_management_policy.encode()) # const char *xmlConfigString gw_return_object = glasswall.GwReturnObj() with utils.CwdHandler(new_cwd=self.library_path): # API call gw_return_object.status = self.library.GwFileProtectAndReportArchive( ct.byref(ct_input_buffer), ct_input_buffer_length, ct.byref(ct_output_buffer), ct.byref(ct_output_buffer_length), ct.byref(ct_output_report_buffer), ct.byref(ct_output_report_buffer_length), ct_content_management_policy ) input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray,)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file if gw_return_object.status not in successes.success_codes: log.error(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {gw_return_object.status}") if raise_unsupported: raise errors.error_codes.get(gw_return_object.status, errors.UnknownErrorCode)(gw_return_object.status) else: log.debug(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {gw_return_object.status}") gw_return_object.output_file = utils.buffer_to_bytes( ct_output_buffer, ct_output_buffer_length ) gw_return_object.output_report = utils.buffer_to_bytes( ct_output_report_buffer, ct_output_report_buffer_length ) # Write output file if gw_return_object.output_file: if isinstance(output_file, str): os.makedirs(os.path.dirname(output_file), exist_ok=True) with open(output_file, "wb") as f: f.write(gw_return_object.output_file) # Write output report if gw_return_object.output_report: if isinstance(output_report, str): os.makedirs(os.path.dirname(output_report), exist_ok=True) with open(output_report, "wb") as f: f.write(gw_return_object.output_report) self.release() return gw_return_object def protect_directory(self, input_directory: str, output_directory: Optional[str] = None, output_report_directory: Optional[str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager] = None, raise_unsupported: bool = True): """ Calls protect_archive on each file in input_directory using the given content management configuration. The resulting archives are written to output_directory maintaining the same directory structure as input_directory. Args: input_directory (str): The input directory containing archives to protect. output_directory (Optional[str], optional): Default None. If str, the output directory where the archives will be written. output_report_directory (Optional[str], optional): Default None. If str, the output directory where xml reports for each archive will be written. content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager], optional): The content management policy to apply. raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. Returns: protected_archives_dict (dict): A dictionary of file paths relative to input_directory, and glasswall.GwReturnObj with attributes: "status" (int), "output_file" (bytes), "output_report" (bytes) """ protected_archives_dict = {} # Call protect_archive on each file in input_directory to output_directory for input_file in utils.list_file_paths(input_directory): relative_path = os.path.relpath(input_file, input_directory) # Construct paths for output file and output report output_file = None if output_directory is None else os.path.join(os.path.abspath(output_directory), relative_path) output_report = None if output_report_directory is None else os.path.join(os.path.abspath(output_report_directory), relative_path + ".xml") result = self.protect_archive( input_file=input_file, output_file=output_file, output_report=output_report, content_management_policy=content_management_policy, raise_unsupported=raise_unsupported, ) protected_archives_dict[relative_path] = result return protected_archives_dict def file_to_file_unpack(self, input_file: str, output_directory: str, file_type: Optional[str] = None, raise_unsupported: bool = True): # Validate arg types if not isinstance(input_file, str): raise TypeError(input_file) elif not os.path.isfile(input_file): raise FileNotFoundError(input_file) if not isinstance(output_directory, str): raise TypeError(output_directory) if not file_type: file_type = utils.get_file_type(input_file) # API function declaration self.library.GwFileToFileUnpack.argtypes = [ ct.c_char_p, ct.c_char_p, ct.c_char_p, ] # Variable initialisation gw_return_object = glasswall.GwReturnObj() gw_return_object.ct_input_file = ct.c_char_p(input_file.encode()) # const char* inputFilePath gw_return_object.ct_output_directory = ct.c_char_p(output_directory.encode()) # const char* outputDirPath gw_return_object.ct_file_type = ct.c_char_p(file_type.encode()) # const char *fileType with utils.CwdHandler(new_cwd=self.library_path): try: # API call gw_return_object.status = self.library.GwFileToFileUnpack( gw_return_object.ct_input_file, gw_return_object.ct_output_directory, gw_return_object.ct_file_type, ) except OSError: # bz2, gz currently OSError on unpack, not supported unpacking # OSError: exception: access violation reading 0x0000000000000000 if raise_unsupported: raise return 0 if gw_return_object.status not in successes.success_codes: log.error(f"\n\tinput_file: {input_file}\n\tstatus: {gw_return_object.status}") if raise_unsupported: raise errors.error_codes.get(gw_return_object.status, errors.UnknownErrorCode)(gw_return_object.status) else: log.debug(f"\n\tinput_file: {input_file}\n\tstatus: {gw_return_object.status}") self.release() return gw_return_object def file_to_file_pack(self, input_directory: str, output_directory: str, file_type: Optional[str] = None, raise_unsupported: bool = True): # Validate arg types if not isinstance(input_directory, str): raise TypeError(input_directory) elif not os.path.isdir(input_directory): raise NotADirectoryError(input_directory) if not isinstance(output_directory, str): raise TypeError(output_directory) if not file_type: file_type = utils.get_file_type(input_directory) # Ensure output_directory exists os.makedirs(output_directory, exist_ok=True) # API function declaration self.library.GwFileToFilePack.argtypes = [ ct.c_char_p, ct.c_char_p, ct.c_char_p, ] # Variable initialisation gw_return_object = glasswall.GwReturnObj() gw_return_object.ct_input_directory = ct.c_char_p(input_directory.encode()) # const char* inputDirPath gw_return_object.ct_output_directory = ct.c_char_p(output_directory.encode()) # const char* outputDirPath gw_return_object.ct_file_type = ct.c_char_p(file_type.encode()) # const char *fileType with utils.CwdHandler(new_cwd=self.library_path): # API call gw_return_object.status = self.library.GwFileToFilePack( gw_return_object.ct_input_directory, gw_return_object.ct_output_directory, gw_return_object.ct_file_type, ) if gw_return_object.status not in successes.success_codes: log.error(f"\n\tinput_directory: {input_directory}\n\tstatus: {gw_return_object.status}") if raise_unsupported: raise errors.error_codes.get(gw_return_object.status, errors.UnknownErrorCode)(gw_return_object.status) else: log.debug(f"\n\tinput_directory: {input_directory}\n\tstatus: {gw_return_object.status}") self.release() return gw_return_object def unpack(self, input_file: str, output_directory: str, recursive: bool = True, file_type: Optional[str] = None, include_file_type: Optional[bool] = False, raise_unsupported: bool = True, delete_origin: bool = False): """ Unpack an archive, maintaining directory structure. Supported archive formats are: "7z", "bz2", "gz", "rar", "tar", "zip". Args: input_file (str): The archive file path output_directory (str): The output directory where the archive will be unpacked to a new directory. recursive (bool, optional): Default True. Recursively unpack all nested archives. file_type (str, optional): Default None (use extension). The archive file type. include_file_type (bool, optional): Default False. Include the archive format in the directory name. Useful when there are multiple same-named archives of different formats. raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. delete_origin (bool, optional): Default False. Delete input_file after unpacking to output_directory. """ # Convert to absolute paths input_file = os.path.abspath(input_file) output_directory = os.path.abspath(output_directory) if include_file_type: archive_name = os.path.basename(input_file) else: archive_name = os.path.splitext(os.path.basename(input_file))[0] archive_output_directory = os.path.join(output_directory, archive_name) # Unpack log.debug(f"Unpacking\n\tsrc: {input_file}\n\tdst: {archive_output_directory}") status = self.file_to_file_unpack(input_file=input_file, output_directory=archive_output_directory, file_type=file_type, raise_unsupported=raise_unsupported).status if status not in successes.success_codes: log.error(f"\n\tinput_file: {input_file}\n\tstatus: {status}") if raise_unsupported: raise errors.error_codes.get(status, errors.UnknownErrorCode)(status) else: log.debug(f"\n\tinput_file: {input_file}\n\tstatus: {status}") if delete_origin: os.remove(input_file) if recursive: # Unpack sub archives for subarchive in self.list_archive_paths(archive_output_directory): self.unpack( input_file=subarchive, output_directory=archive_output_directory, recursive=recursive, raise_unsupported=raise_unsupported, delete_origin=True ) return status def unpack_directory(self, input_directory: str, output_directory: str, recursive: bool = True, file_type: Optional[str] = None, include_file_type: Optional[bool] = False, raise_unsupported: bool = True, delete_origin: bool = False): """ Unpack a directory of archives, maintaining directory structure. Args: input_directory (str): The input directory containing archives to unpack. output_directory (str): The output directory where archives will be unpacked to a new directory. recursive (bool, optional): Default True. Recursively unpack all nested archives. file_type (str, optional): Default None (use extension). The archive file type of all archives within the directory. include_file_type (bool, optional): Default False. Include the archive format in the directory name. Useful when there are multiple same-named archives of different formats. raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. delete_origin (bool, optional): Default False. Delete input_file after unpacking to output_directory. """ # Convert to absolute paths input_directory = os.path.abspath(input_directory) output_directory = os.path.abspath(output_directory) for archive_input_file in self.list_archive_paths(input_directory): relative_path = os.path.relpath(archive_input_file, input_directory) archive_output_file = os.path.dirname(os.path.join(output_directory, relative_path)) self.unpack( input_file=archive_input_file, output_directory=archive_output_file, recursive=recursive, file_type=file_type, include_file_type=include_file_type, raise_unsupported=raise_unsupported, delete_origin=delete_origin ) def pack_directory(self, input_directory: str, output_directory: str, file_type: str, raise_unsupported: bool = True, delete_origin: bool = False): """ Pack a directory. Supported archive formats are: "7z", "bz2", "gz", "rar", "tar", "zip". Args: input_file (str): The archive file path output_directory (str): The output directory where the archive will be unpacked to a new directory. recursive (bool, optional): Default True. Recursively unpack all nested archives. file_type (str): The archive file type. raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. delete_origin (bool, optional): Default False. Delete input_file after unpacking to output_directory. """ # Convert to absolute paths input_directory = os.path.abspath(input_directory) output_directory = os.path.abspath(output_directory) # Pack log.debug(f"Packing\n\tsrc: {input_directory}\n\tdst: {output_directory}") status = self.file_to_file_pack(input_directory=input_directory, output_directory=output_directory, file_type=file_type, raise_unsupported=raise_unsupported).status if status not in successes.success_codes: log.error(f"\n\tinput_directory: {input_directory}\n\tstatus: {status}") if raise_unsupported: raise errors.error_codes.get(status, errors.UnknownErrorCode)(status) else: log.debug(f"\n\tinput_directory: {input_directory}\n\tstatus: {status}") if delete_origin: utils.delete_directory(input_directory) def export_archive(self, input_file: Union[str, bytes, bytearray, io.BytesIO], output_file: Optional[str] = None, output_report: Optional[str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager] = None, raise_unsupported: bool = True): """ Exports an archive using the Glasswall engine. Args: input_file (Union[str, bytes, bytearray, io.BytesIO]): The archive file path or bytes. output_file (Optional[str], optional): Default None. If str, write the archive to the output_file path. output_report (Optional[str], optional): Default None. If str, write the analysis report to the output_report path. content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager], optional): The content management policy to apply. raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. Returns: gw_return_object (glasswall.GwReturnObj): An instance of class glasswall.GwReturnObj containing attributes: "status" (int), "output_file" (bytes), "output_report" (bytes) """ # Validate arg types if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO)): raise TypeError(input_file) if not isinstance(output_file, (type(None), str)): raise TypeError(output_file) if not isinstance(output_report, (type(None), str)): raise TypeError(output_report) if not isinstance(content_management_policy, (type(None), str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)): raise TypeError(content_management_policy) # Convert string path arguments to absolute paths if isinstance(output_file, str): output_file = os.path.abspath(output_file) if isinstance(output_report, str): output_report = os.path.abspath(output_report) # Convert inputs to bytes if isinstance(input_file, str): if not os.path.isfile(input_file): raise FileNotFoundError(input_file) with open(input_file, "rb") as f: input_file_bytes = f.read() elif isinstance(input_file, (bytes, bytearray, io.BytesIO)): input_file_bytes = utils.as_bytes(input_file) if isinstance(content_management_policy, str) and os.path.isfile(content_management_policy): with open(content_management_policy, "rb") as f: content_management_policy = f.read() elif isinstance(content_management_policy, type(None)): # Load default content_management_policy = glasswall.content_management.policies.ArchiveManager(default="sanitise", default_archive_manager="process") content_management_policy = utils.validate_xml(content_management_policy) # API function declaration self.library.GwFileExportArchive.argtypes = [ ct.c_void_p, ct.c_size_t, ct.POINTER(ct.c_void_p), ct.POINTER(ct.c_size_t), ct.POINTER(ct.c_void_p), ct.POINTER(ct.c_size_t), ct.c_char_p ] # Variable initialisation input_buffer_bytearray = bytearray(input_file_bytes) ct_input_buffer = (ct.c_ubyte * len(input_buffer_bytearray)).from_buffer(input_buffer_bytearray) # void *inputBuffer ct_input_buffer_length = ct.c_size_t(len(input_file_bytes)) # size_t inputBufferLength ct_output_buffer = ct.c_void_p() # void **outputFileBuffer ct_output_buffer_length = ct.c_size_t() # size_t *outputFileBufferLength ct_output_report_buffer = ct.c_void_p() # void **outputReportBuffer ct_output_report_buffer_length = ct.c_size_t() # size_t *outputReportBufferLength ct_content_management_policy = ct.c_char_p(content_management_policy.encode()) # const char *xmlConfigString gw_return_object = glasswall.GwReturnObj() with utils.CwdHandler(new_cwd=self.library_path): # API call gw_return_object.status = self.library.GwFileExportArchive( ct.byref(ct_input_buffer), ct_input_buffer_length, ct.byref(ct_output_buffer), ct.byref(ct_output_buffer_length), ct.byref(ct_output_report_buffer), ct.byref(ct_output_report_buffer_length), ct_content_management_policy ) input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray,)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file if gw_return_object.status not in successes.success_codes: log.error(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {gw_return_object.status}") if raise_unsupported: raise errors.error_codes.get(gw_return_object.status, errors.UnknownErrorCode)(gw_return_object.status) else: log.debug(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {gw_return_object.status}") gw_return_object.output_file = utils.buffer_to_bytes( ct_output_buffer, ct_output_buffer_length ) gw_return_object.output_report = utils.buffer_to_bytes( ct_output_report_buffer, ct_output_report_buffer_length ) # Write output file if gw_return_object.output_file: if isinstance(output_file, str): os.makedirs(os.path.dirname(output_file), exist_ok=True) with open(output_file, "wb") as f: f.write(gw_return_object.output_file) # Write output report if gw_return_object.output_report: if isinstance(output_report, str): os.makedirs(os.path.dirname(output_report), exist_ok=True) with open(output_report, "wb") as f: f.write(gw_return_object.output_report) self.release() return gw_return_object def export_directory(self, input_directory: str, output_directory: Optional[str], output_report_directory: Optional[str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager] = None, raise_unsupported: bool = True): """ Calls export_archive on each file in input_directory. The exported archives are written to output_directory maintaining the same directory structure as input_directory. Args: input_directory (str): The input directory containing archives to export. output_directory (Optional[str], optional): Default None. If str, the output directory where the archives will be written. output_report_directory (Optional[str], optional): Default None. If str, the output directory where xml reports for each archive will be written. content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager], optional): The content management policy to apply. raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. Returns: exported_archives_dict (dict): A dictionary of file paths relative to input_directory, and glasswall.GwReturnObj with attributes: "status" (int), "output_file" (bytes), "output_report" (bytes) """ exported_archives_dict = {} # Call export_archive on each file in input_directory to output_directory for input_file in utils.list_file_paths(input_directory): relative_path = os.path.relpath(input_file, input_directory) # Construct paths for output file and output report output_file = None if output_directory is None else os.path.join(os.path.abspath(output_directory), relative_path) output_report = None if output_report_directory is None else os.path.join(os.path.abspath(output_report_directory), relative_path + ".xml") result = self.export_archive( input_file=input_file, output_file=output_file, output_report=output_report, content_management_policy=content_management_policy, raise_unsupported=raise_unsupported, ) exported_archives_dict[relative_path] = result return exported_archives_dict def import_archive(self, input_file: Union[str, bytes, bytearray, io.BytesIO], output_file: Optional[str] = None, output_report: Optional[str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager] = None, include_analysis_report: Optional[int] = 1, raise_unsupported: bool = True): """ Imports an archive using the Glasswall engine. Args: input_file (Union[str, bytes, bytearray, io.BytesIO]): The archive file path or bytes. output_file (Optional[str], optional): Default None. If str, write the archive to the output_file path. output_report (Optional[str], optional): Default None. If str, write the analysis report to the output_report path. content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager], optional): The content management policy to apply. include_analysis_report (Optional[int], optional): Default 1. If 1, write the analysis report into imported archive. raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. Returns: gw_return_object (glasswall.GwReturnObj): An instance of class glasswall.GwReturnObj containing attributes: "status" (int), "output_file" (bytes), "output_report" (bytes) """ # Validate arg types if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO)): raise TypeError(input_file) if not isinstance(output_file, (type(None), str)): raise TypeError(output_file) if not isinstance(output_report, (type(None), str)): raise TypeError(output_report) if not isinstance(content_management_policy, (type(None), str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)): raise TypeError(content_management_policy) # Convert string path arguments to absolute paths if isinstance(output_file, str): output_file = os.path.abspath(output_file) if isinstance(output_report, str): output_report = os.path.abspath(output_report) # Convert inputs to bytes if isinstance(input_file, str): if not os.path.isfile(input_file): raise FileNotFoundError(input_file) with open(input_file, "rb") as f: input_file_bytes = f.read() elif isinstance(input_file, (bytes, bytearray, io.BytesIO)): input_file_bytes = utils.as_bytes(input_file) if isinstance(content_management_policy, str) and os.path.isfile(content_management_policy): with open(content_management_policy, "rb") as f: content_management_policy = f.read() elif isinstance(content_management_policy, type(None)): # Load default content_management_policy = glasswall.content_management.policies.ArchiveManager(default="sanitise", default_archive_manager="process") content_management_policy = utils.validate_xml(content_management_policy) # API function declaration self.library.GwFileImportArchive.argtypes = [ ct.c_void_p, ct.c_size_t, ct.POINTER(ct.c_void_p), ct.POINTER(ct.c_size_t), ct.POINTER(ct.c_void_p), ct.POINTER(ct.c_size_t), ct.c_char_p, ct.c_int ] # Variable initialisation input_buffer_bytearray = bytearray(input_file_bytes) ct_input_buffer = (ct.c_ubyte * len(input_buffer_bytearray)).from_buffer(input_buffer_bytearray) # void *inputBuffer ct_input_buffer_length = ct.c_size_t(len(input_file_bytes)) # size_t inputBufferLength ct_output_buffer = ct.c_void_p() # void **outputFileBuffer ct_output_buffer_length = ct.c_size_t() # size_t *outputFileBufferLength ct_output_report_buffer = ct.c_void_p() # void **outputReportBuffer ct_output_report_buffer_length = ct.c_size_t() # size_t *outputReportBufferLength ct_content_management_policy = ct.c_char_p(content_management_policy.encode()) # const char *xmlConfigString ct_include_analysis_report = ct.c_int(include_analysis_report) # int gw_return_object = glasswall.GwReturnObj() with utils.CwdHandler(new_cwd=self.library_path): # API call gw_return_object.status = self.library.GwFileImportArchive( ct.byref(ct_input_buffer), ct_input_buffer_length, ct.byref(ct_output_buffer), ct.byref(ct_output_buffer_length), ct.byref(ct_output_report_buffer), ct.byref(ct_output_report_buffer_length), ct_content_management_policy, ct_include_analysis_report ) input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray,)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file if gw_return_object.status not in successes.success_codes: log.error(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {gw_return_object.status}") if raise_unsupported: raise errors.error_codes.get(gw_return_object.status, errors.UnknownErrorCode)(gw_return_object.status) else: log.debug(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {gw_return_object.status}") gw_return_object.output_file = utils.buffer_to_bytes( ct_output_buffer, ct_output_buffer_length ) gw_return_object.output_report = utils.buffer_to_bytes( ct_output_report_buffer, ct_output_report_buffer_length ) # Write output file if gw_return_object.output_file: if isinstance(output_file, str): os.makedirs(os.path.dirname(output_file), exist_ok=True) with open(output_file, "wb") as f: f.write(gw_return_object.output_file) # Write output report if gw_return_object.output_report: if isinstance(output_report, str): os.makedirs(os.path.dirname(output_report), exist_ok=True) with open(output_report, "wb") as f: f.write(gw_return_object.output_report) self.release() return gw_return_object def import_directory(self, input_directory: str, output_directory: Optional[str], output_report_directory: Optional[str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager] = None, raise_unsupported: bool = True): """ Calls import_archive on each file in input_directory. The imported archives are written to output_directory maintaining the same directory structure as input_directory. Args: input_directory (str): The input directory containing archives to import. output_directory (Optional[str], optional): Default None. If str, the output directory where the archives will be written. output_report_directory (Optional[str], optional): Default None. If str, the output directory where xml reports for each archive will be written. content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager], optional): The content management policy to apply. raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. Returns: imported_archives_dict (dict): A dictionary of file paths relative to input_directory, and glasswall.GwReturnObj with attributes: "status" (int), "output_file" (bytes), "output_report" (bytes) """ imported_archives_dict = {} # Call import_archive on each file in input_directory to output_directory for input_file in utils.list_file_paths(input_directory): relative_path = os.path.relpath(input_file, input_directory) # Construct paths for output file and output report output_file = None if output_directory is None else os.path.join(os.path.abspath(output_directory), relative_path) output_report = None if output_report_directory is None else os.path.join(os.path.abspath(output_report_directory), relative_path + ".xml") result = self.import_archive( input_file=input_file, output_file=output_file, output_report=output_report, content_management_policy=content_management_policy, raise_unsupported=raise_unsupported, ) imported_archives_dict[relative_path] = result return imported_archives_dict
Ancestors
Methods
def analyse_archive(self, input_file: Union[str, bytes, bytearray, _io.BytesIO], output_file: Optional[str] = None, output_report: Optional[str] = None, content_management_policy: Union[ForwardRef(None), str, bytes, bytearray, _io.BytesIO, ArchiveManager] = None, raise_unsupported: bool = True)
-
Extracts the input_file archive and processes each file within the archive using the Glasswall engine. Repackages all files regenerated by the Glasswall engine into a new archive, optionally writing the new archive and report to the paths specified by output_file and output_report.
Args
input_file
:Union[str, bytes, bytearray, io.BytesIO]
- The archive file path or bytes.
output_file
:Optional[str]
, optional- Default None. If str, write the archive to the output_file path.
output_report
:Optional[str]
, optional- Default None. If str, write the analysis report to the output_report path.
content_management_policy
:Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager]
, optional- The content management policy to apply.
raise_unsupported
:bool
, optional- Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
Returns
gw_return_object (glasswall.GwReturnObj): An instance of class glasswall.GwReturnObj containing attributes: "status" (int), "output_file" (bytes), "output_report" (bytes)
Expand source code
def analyse_archive(self, input_file: Union[str, bytes, bytearray, io.BytesIO], output_file: Optional[str] = None, output_report: Optional[str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager] = None, raise_unsupported: bool = True): """ Extracts the input_file archive and processes each file within the archive using the Glasswall engine. Repackages all files regenerated by the Glasswall engine into a new archive, optionally writing the new archive and report to the paths specified by output_file and output_report. Args: input_file (Union[str, bytes, bytearray, io.BytesIO]): The archive file path or bytes. output_file (Optional[str], optional): Default None. If str, write the archive to the output_file path. output_report (Optional[str], optional): Default None. If str, write the analysis report to the output_report path. content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager], optional): The content management policy to apply. raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. Returns: gw_return_object (glasswall.GwReturnObj): An instance of class glasswall.GwReturnObj containing attributes: "status" (int), "output_file" (bytes), "output_report" (bytes) """ # Validate arg types if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO)): raise TypeError(input_file) if not isinstance(output_file, (type(None), str)): raise TypeError(output_file) if not isinstance(output_report, (type(None), str)): raise TypeError(output_report) if not isinstance(content_management_policy, (type(None), str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)): raise TypeError(content_management_policy) # Convert string path arguments to absolute paths if isinstance(output_file, str): output_file = os.path.abspath(output_file) if isinstance(output_report, str): output_report = os.path.abspath(output_report) # Convert inputs to bytes if isinstance(input_file, str): if not os.path.isfile(input_file): raise FileNotFoundError(input_file) with open(input_file, "rb") as f: input_file_bytes = f.read() elif isinstance(input_file, (bytes, bytearray, io.BytesIO)): input_file_bytes = utils.as_bytes(input_file) if isinstance(content_management_policy, str) and os.path.isfile(content_management_policy): with open(content_management_policy, "rb") as f: content_management_policy = f.read() elif isinstance(content_management_policy, type(None)): # Load default content_management_policy = glasswall.content_management.policies.ArchiveManager(default="sanitise", default_archive_manager="process") content_management_policy = utils.validate_xml(content_management_policy) # API function declaration self.library.GwFileAnalysisArchive.argtypes = [ ct.c_void_p, ct.c_size_t, ct.POINTER(ct.c_void_p), ct.POINTER(ct.c_size_t), ct.POINTER(ct.c_void_p), ct.POINTER(ct.c_size_t), ct.c_char_p ] # Variable initialisation input_buffer_bytearray = bytearray(input_file_bytes) ct_input_buffer = (ct.c_ubyte * len(input_buffer_bytearray)).from_buffer(input_buffer_bytearray) # void *inputBuffer ct_input_buffer_length = ct.c_size_t(len(input_file_bytes)) # size_t inputBufferLength ct_output_buffer = ct.c_void_p() # void **outputFileBuffer ct_output_buffer_length = ct.c_size_t() # size_t *outputFileBufferLength ct_output_report_buffer = ct.c_void_p() # void **outputAnalysisReportBuffer ct_output_report_buffer_length = ct.c_size_t() # size_t *outputAnalysisReportBufferLength ct_content_management_policy = ct.c_char_p(content_management_policy.encode()) # const char *xmlConfigString gw_return_object = glasswall.GwReturnObj() with utils.CwdHandler(new_cwd=self.library_path): # API call gw_return_object.status = self.library.GwFileAnalysisArchive( ct.byref(ct_input_buffer), ct_input_buffer_length, ct.byref(ct_output_buffer), ct.byref(ct_output_buffer_length), ct.byref(ct_output_report_buffer), ct.byref(ct_output_report_buffer_length), ct_content_management_policy ) input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray,)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file if gw_return_object.status not in successes.success_codes: log.error(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {gw_return_object.status}") if raise_unsupported: raise errors.error_codes.get(gw_return_object.status, errors.UnknownErrorCode)(gw_return_object.status) else: log.debug(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {gw_return_object.status}") gw_return_object.output_file = utils.buffer_to_bytes( ct_output_buffer, ct_output_buffer_length ) gw_return_object.output_report = utils.buffer_to_bytes( ct_output_report_buffer, ct_output_report_buffer_length ) # Write output file if gw_return_object.output_file: if isinstance(output_file, str): os.makedirs(os.path.dirname(output_file), exist_ok=True) with open(output_file, "wb") as f: f.write(gw_return_object.output_file) # Write output report if gw_return_object.output_report: if isinstance(output_report, str): os.makedirs(os.path.dirname(output_report), exist_ok=True) with open(output_report, "wb") as f: f.write(gw_return_object.output_report) self.release() return gw_return_object
def analyse_directory(self, input_directory: str, output_directory: Optional[str] = None, output_report_directory: Optional[str] = None, content_management_policy: Union[ForwardRef(None), str, bytes, bytearray, _io.BytesIO, ArchiveManager] = None, raise_unsupported: bool = True)
-
Calls analyse_archive on each file in input_directory using the given content management configuration. The resulting archives and analysis reports are written to output_directory maintaining the same directory structure as input_directory.
Args
input_directory
:str
- The input directory containing archives to analyse.
output_directory
:Optional[str]
, optional- Default None. If str, the output directory where the archives containing analysis reports of each file will be written.
output_report_directory
:Optional[str]
, optional- Default None. If str, the output directory where xml reports for each archive will be written.
content_management_policy
:Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager]
, optional- The content management policy to apply.
raise_unsupported
:bool
, optional- Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
Returns
analysed_archives_dict (dict): A dictionary of file paths relative to input_directory, and glasswall.GwReturnObj with attributes: "status" (int), "output_file" (bytes), "output_report" (bytes)
Expand source code
def analyse_directory(self, input_directory: str, output_directory: Optional[str] = None, output_report_directory: Optional[str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager] = None, raise_unsupported: bool = True): """ Calls analyse_archive on each file in input_directory using the given content management configuration. The resulting archives and analysis reports are written to output_directory maintaining the same directory structure as input_directory. Args: input_directory (str): The input directory containing archives to analyse. output_directory (Optional[str], optional): Default None. If str, the output directory where the archives containing analysis reports of each file will be written. output_report_directory (Optional[str], optional): Default None. If str, the output directory where xml reports for each archive will be written. content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager], optional): The content management policy to apply. raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. Returns: analysed_archives_dict (dict): A dictionary of file paths relative to input_directory, and glasswall.GwReturnObj with attributes: "status" (int), "output_file" (bytes), "output_report" (bytes) """ analysed_archives_dict = {} # Call analyse_archive on each file in input_directory for input_file in utils.list_file_paths(input_directory): relative_path = os.path.relpath(input_file, input_directory) # Construct paths for output file and output report output_file = None if output_directory is None else os.path.join(os.path.abspath(output_directory), relative_path) output_report = None if output_report_directory is None else os.path.join(os.path.abspath(output_report_directory), relative_path + ".xml") result = self.analyse_archive( input_file=input_file, output_file=output_file, output_report=output_report, content_management_policy=content_management_policy, raise_unsupported=raise_unsupported, ) analysed_archives_dict[relative_path] = result return analysed_archives_dict
def export_archive(self, input_file: Union[str, bytes, bytearray, _io.BytesIO], output_file: Optional[str] = None, output_report: Optional[str] = None, content_management_policy: Union[ForwardRef(None), str, bytes, bytearray, _io.BytesIO, ArchiveManager] = None, raise_unsupported: bool = True)
-
Exports an archive using the Glasswall engine.
Args
input_file
:Union[str, bytes, bytearray, io.BytesIO]
- The archive file path or bytes.
output_file
:Optional[str]
, optional- Default None. If str, write the archive to the output_file path.
output_report
:Optional[str]
, optional- Default None. If str, write the analysis report to the output_report path.
content_management_policy
:Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager]
, optional- The content management policy to apply.
raise_unsupported
:bool
, optional- Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
Returns
gw_return_object (glasswall.GwReturnObj): An instance of class glasswall.GwReturnObj containing attributes: "status" (int), "output_file" (bytes), "output_report" (bytes)
Expand source code
def export_archive(self, input_file: Union[str, bytes, bytearray, io.BytesIO], output_file: Optional[str] = None, output_report: Optional[str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager] = None, raise_unsupported: bool = True): """ Exports an archive using the Glasswall engine. Args: input_file (Union[str, bytes, bytearray, io.BytesIO]): The archive file path or bytes. output_file (Optional[str], optional): Default None. If str, write the archive to the output_file path. output_report (Optional[str], optional): Default None. If str, write the analysis report to the output_report path. content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager], optional): The content management policy to apply. raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. Returns: gw_return_object (glasswall.GwReturnObj): An instance of class glasswall.GwReturnObj containing attributes: "status" (int), "output_file" (bytes), "output_report" (bytes) """ # Validate arg types if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO)): raise TypeError(input_file) if not isinstance(output_file, (type(None), str)): raise TypeError(output_file) if not isinstance(output_report, (type(None), str)): raise TypeError(output_report) if not isinstance(content_management_policy, (type(None), str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)): raise TypeError(content_management_policy) # Convert string path arguments to absolute paths if isinstance(output_file, str): output_file = os.path.abspath(output_file) if isinstance(output_report, str): output_report = os.path.abspath(output_report) # Convert inputs to bytes if isinstance(input_file, str): if not os.path.isfile(input_file): raise FileNotFoundError(input_file) with open(input_file, "rb") as f: input_file_bytes = f.read() elif isinstance(input_file, (bytes, bytearray, io.BytesIO)): input_file_bytes = utils.as_bytes(input_file) if isinstance(content_management_policy, str) and os.path.isfile(content_management_policy): with open(content_management_policy, "rb") as f: content_management_policy = f.read() elif isinstance(content_management_policy, type(None)): # Load default content_management_policy = glasswall.content_management.policies.ArchiveManager(default="sanitise", default_archive_manager="process") content_management_policy = utils.validate_xml(content_management_policy) # API function declaration self.library.GwFileExportArchive.argtypes = [ ct.c_void_p, ct.c_size_t, ct.POINTER(ct.c_void_p), ct.POINTER(ct.c_size_t), ct.POINTER(ct.c_void_p), ct.POINTER(ct.c_size_t), ct.c_char_p ] # Variable initialisation input_buffer_bytearray = bytearray(input_file_bytes) ct_input_buffer = (ct.c_ubyte * len(input_buffer_bytearray)).from_buffer(input_buffer_bytearray) # void *inputBuffer ct_input_buffer_length = ct.c_size_t(len(input_file_bytes)) # size_t inputBufferLength ct_output_buffer = ct.c_void_p() # void **outputFileBuffer ct_output_buffer_length = ct.c_size_t() # size_t *outputFileBufferLength ct_output_report_buffer = ct.c_void_p() # void **outputReportBuffer ct_output_report_buffer_length = ct.c_size_t() # size_t *outputReportBufferLength ct_content_management_policy = ct.c_char_p(content_management_policy.encode()) # const char *xmlConfigString gw_return_object = glasswall.GwReturnObj() with utils.CwdHandler(new_cwd=self.library_path): # API call gw_return_object.status = self.library.GwFileExportArchive( ct.byref(ct_input_buffer), ct_input_buffer_length, ct.byref(ct_output_buffer), ct.byref(ct_output_buffer_length), ct.byref(ct_output_report_buffer), ct.byref(ct_output_report_buffer_length), ct_content_management_policy ) input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray,)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file if gw_return_object.status not in successes.success_codes: log.error(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {gw_return_object.status}") if raise_unsupported: raise errors.error_codes.get(gw_return_object.status, errors.UnknownErrorCode)(gw_return_object.status) else: log.debug(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {gw_return_object.status}") gw_return_object.output_file = utils.buffer_to_bytes( ct_output_buffer, ct_output_buffer_length ) gw_return_object.output_report = utils.buffer_to_bytes( ct_output_report_buffer, ct_output_report_buffer_length ) # Write output file if gw_return_object.output_file: if isinstance(output_file, str): os.makedirs(os.path.dirname(output_file), exist_ok=True) with open(output_file, "wb") as f: f.write(gw_return_object.output_file) # Write output report if gw_return_object.output_report: if isinstance(output_report, str): os.makedirs(os.path.dirname(output_report), exist_ok=True) with open(output_report, "wb") as f: f.write(gw_return_object.output_report) self.release() return gw_return_object
def export_directory(self, input_directory: str, output_directory: Optional[str], output_report_directory: Optional[str] = None, content_management_policy: Union[ForwardRef(None), str, bytes, bytearray, _io.BytesIO, ArchiveManager] = None, raise_unsupported: bool = True)
-
Calls export_archive on each file in input_directory. The exported archives are written to output_directory maintaining the same directory structure as input_directory.
Args
input_directory
:str
- The input directory containing archives to export.
output_directory
:Optional[str]
, optional- Default None. If str, the output directory where the archives will be written.
output_report_directory
:Optional[str]
, optional- Default None. If str, the output directory where xml reports for each archive will be written.
content_management_policy
:Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager]
, optional- The content management policy to apply.
raise_unsupported
:bool
, optional- Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
Returns
exported_archives_dict (dict): A dictionary of file paths relative to input_directory, and glasswall.GwReturnObj with attributes: "status" (int), "output_file" (bytes), "output_report" (bytes)
Expand source code
def export_directory(self, input_directory: str, output_directory: Optional[str], output_report_directory: Optional[str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager] = None, raise_unsupported: bool = True): """ Calls export_archive on each file in input_directory. The exported archives are written to output_directory maintaining the same directory structure as input_directory. Args: input_directory (str): The input directory containing archives to export. output_directory (Optional[str], optional): Default None. If str, the output directory where the archives will be written. output_report_directory (Optional[str], optional): Default None. If str, the output directory where xml reports for each archive will be written. content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager], optional): The content management policy to apply. raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. Returns: exported_archives_dict (dict): A dictionary of file paths relative to input_directory, and glasswall.GwReturnObj with attributes: "status" (int), "output_file" (bytes), "output_report" (bytes) """ exported_archives_dict = {} # Call export_archive on each file in input_directory to output_directory for input_file in utils.list_file_paths(input_directory): relative_path = os.path.relpath(input_file, input_directory) # Construct paths for output file and output report output_file = None if output_directory is None else os.path.join(os.path.abspath(output_directory), relative_path) output_report = None if output_report_directory is None else os.path.join(os.path.abspath(output_report_directory), relative_path + ".xml") result = self.export_archive( input_file=input_file, output_file=output_file, output_report=output_report, content_management_policy=content_management_policy, raise_unsupported=raise_unsupported, ) exported_archives_dict[relative_path] = result return exported_archives_dict
def file_to_file_pack(self, input_directory: str, output_directory: str, file_type: Optional[str] = None, raise_unsupported: bool = True)
-
Expand source code
def file_to_file_pack(self, input_directory: str, output_directory: str, file_type: Optional[str] = None, raise_unsupported: bool = True): # Validate arg types if not isinstance(input_directory, str): raise TypeError(input_directory) elif not os.path.isdir(input_directory): raise NotADirectoryError(input_directory) if not isinstance(output_directory, str): raise TypeError(output_directory) if not file_type: file_type = utils.get_file_type(input_directory) # Ensure output_directory exists os.makedirs(output_directory, exist_ok=True) # API function declaration self.library.GwFileToFilePack.argtypes = [ ct.c_char_p, ct.c_char_p, ct.c_char_p, ] # Variable initialisation gw_return_object = glasswall.GwReturnObj() gw_return_object.ct_input_directory = ct.c_char_p(input_directory.encode()) # const char* inputDirPath gw_return_object.ct_output_directory = ct.c_char_p(output_directory.encode()) # const char* outputDirPath gw_return_object.ct_file_type = ct.c_char_p(file_type.encode()) # const char *fileType with utils.CwdHandler(new_cwd=self.library_path): # API call gw_return_object.status = self.library.GwFileToFilePack( gw_return_object.ct_input_directory, gw_return_object.ct_output_directory, gw_return_object.ct_file_type, ) if gw_return_object.status not in successes.success_codes: log.error(f"\n\tinput_directory: {input_directory}\n\tstatus: {gw_return_object.status}") if raise_unsupported: raise errors.error_codes.get(gw_return_object.status, errors.UnknownErrorCode)(gw_return_object.status) else: log.debug(f"\n\tinput_directory: {input_directory}\n\tstatus: {gw_return_object.status}") self.release() return gw_return_object
def file_to_file_unpack(self, input_file: str, output_directory: str, file_type: Optional[str] = None, raise_unsupported: bool = True)
-
Expand source code
def file_to_file_unpack(self, input_file: str, output_directory: str, file_type: Optional[str] = None, raise_unsupported: bool = True): # Validate arg types if not isinstance(input_file, str): raise TypeError(input_file) elif not os.path.isfile(input_file): raise FileNotFoundError(input_file) if not isinstance(output_directory, str): raise TypeError(output_directory) if not file_type: file_type = utils.get_file_type(input_file) # API function declaration self.library.GwFileToFileUnpack.argtypes = [ ct.c_char_p, ct.c_char_p, ct.c_char_p, ] # Variable initialisation gw_return_object = glasswall.GwReturnObj() gw_return_object.ct_input_file = ct.c_char_p(input_file.encode()) # const char* inputFilePath gw_return_object.ct_output_directory = ct.c_char_p(output_directory.encode()) # const char* outputDirPath gw_return_object.ct_file_type = ct.c_char_p(file_type.encode()) # const char *fileType with utils.CwdHandler(new_cwd=self.library_path): try: # API call gw_return_object.status = self.library.GwFileToFileUnpack( gw_return_object.ct_input_file, gw_return_object.ct_output_directory, gw_return_object.ct_file_type, ) except OSError: # bz2, gz currently OSError on unpack, not supported unpacking # OSError: exception: access violation reading 0x0000000000000000 if raise_unsupported: raise return 0 if gw_return_object.status not in successes.success_codes: log.error(f"\n\tinput_file: {input_file}\n\tstatus: {gw_return_object.status}") if raise_unsupported: raise errors.error_codes.get(gw_return_object.status, errors.UnknownErrorCode)(gw_return_object.status) else: log.debug(f"\n\tinput_file: {input_file}\n\tstatus: {gw_return_object.status}") self.release() return gw_return_object
def import_archive(self, input_file: Union[str, bytes, bytearray, _io.BytesIO], output_file: Optional[str] = None, output_report: Optional[str] = None, content_management_policy: Union[ForwardRef(None), str, bytes, bytearray, _io.BytesIO, ArchiveManager] = None, include_analysis_report: Optional[int] = 1, raise_unsupported: bool = True)
-
Imports an archive using the Glasswall engine.
Args
input_file
:Union[str, bytes, bytearray, io.BytesIO]
- The archive file path or bytes.
output_file
:Optional[str]
, optional- Default None. If str, write the archive to the output_file path.
output_report
:Optional[str]
, optional- Default None. If str, write the analysis report to the output_report path.
content_management_policy
:Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager]
, optional- The content management policy to apply.
include_analysis_report
:Optional[int]
, optional- Default 1. If 1, write the analysis report into imported archive.
raise_unsupported
:bool
, optional- Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
Returns
gw_return_object (glasswall.GwReturnObj): An instance of class glasswall.GwReturnObj containing attributes: "status" (int), "output_file" (bytes), "output_report" (bytes)
Expand source code
def import_archive(self, input_file: Union[str, bytes, bytearray, io.BytesIO], output_file: Optional[str] = None, output_report: Optional[str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager] = None, include_analysis_report: Optional[int] = 1, raise_unsupported: bool = True): """ Imports an archive using the Glasswall engine. Args: input_file (Union[str, bytes, bytearray, io.BytesIO]): The archive file path or bytes. output_file (Optional[str], optional): Default None. If str, write the archive to the output_file path. output_report (Optional[str], optional): Default None. If str, write the analysis report to the output_report path. content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager], optional): The content management policy to apply. include_analysis_report (Optional[int], optional): Default 1. If 1, write the analysis report into imported archive. raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. Returns: gw_return_object (glasswall.GwReturnObj): An instance of class glasswall.GwReturnObj containing attributes: "status" (int), "output_file" (bytes), "output_report" (bytes) """ # Validate arg types if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO)): raise TypeError(input_file) if not isinstance(output_file, (type(None), str)): raise TypeError(output_file) if not isinstance(output_report, (type(None), str)): raise TypeError(output_report) if not isinstance(content_management_policy, (type(None), str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)): raise TypeError(content_management_policy) # Convert string path arguments to absolute paths if isinstance(output_file, str): output_file = os.path.abspath(output_file) if isinstance(output_report, str): output_report = os.path.abspath(output_report) # Convert inputs to bytes if isinstance(input_file, str): if not os.path.isfile(input_file): raise FileNotFoundError(input_file) with open(input_file, "rb") as f: input_file_bytes = f.read() elif isinstance(input_file, (bytes, bytearray, io.BytesIO)): input_file_bytes = utils.as_bytes(input_file) if isinstance(content_management_policy, str) and os.path.isfile(content_management_policy): with open(content_management_policy, "rb") as f: content_management_policy = f.read() elif isinstance(content_management_policy, type(None)): # Load default content_management_policy = glasswall.content_management.policies.ArchiveManager(default="sanitise", default_archive_manager="process") content_management_policy = utils.validate_xml(content_management_policy) # API function declaration self.library.GwFileImportArchive.argtypes = [ ct.c_void_p, ct.c_size_t, ct.POINTER(ct.c_void_p), ct.POINTER(ct.c_size_t), ct.POINTER(ct.c_void_p), ct.POINTER(ct.c_size_t), ct.c_char_p, ct.c_int ] # Variable initialisation input_buffer_bytearray = bytearray(input_file_bytes) ct_input_buffer = (ct.c_ubyte * len(input_buffer_bytearray)).from_buffer(input_buffer_bytearray) # void *inputBuffer ct_input_buffer_length = ct.c_size_t(len(input_file_bytes)) # size_t inputBufferLength ct_output_buffer = ct.c_void_p() # void **outputFileBuffer ct_output_buffer_length = ct.c_size_t() # size_t *outputFileBufferLength ct_output_report_buffer = ct.c_void_p() # void **outputReportBuffer ct_output_report_buffer_length = ct.c_size_t() # size_t *outputReportBufferLength ct_content_management_policy = ct.c_char_p(content_management_policy.encode()) # const char *xmlConfigString ct_include_analysis_report = ct.c_int(include_analysis_report) # int gw_return_object = glasswall.GwReturnObj() with utils.CwdHandler(new_cwd=self.library_path): # API call gw_return_object.status = self.library.GwFileImportArchive( ct.byref(ct_input_buffer), ct_input_buffer_length, ct.byref(ct_output_buffer), ct.byref(ct_output_buffer_length), ct.byref(ct_output_report_buffer), ct.byref(ct_output_report_buffer_length), ct_content_management_policy, ct_include_analysis_report ) input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray,)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file if gw_return_object.status not in successes.success_codes: log.error(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {gw_return_object.status}") if raise_unsupported: raise errors.error_codes.get(gw_return_object.status, errors.UnknownErrorCode)(gw_return_object.status) else: log.debug(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {gw_return_object.status}") gw_return_object.output_file = utils.buffer_to_bytes( ct_output_buffer, ct_output_buffer_length ) gw_return_object.output_report = utils.buffer_to_bytes( ct_output_report_buffer, ct_output_report_buffer_length ) # Write output file if gw_return_object.output_file: if isinstance(output_file, str): os.makedirs(os.path.dirname(output_file), exist_ok=True) with open(output_file, "wb") as f: f.write(gw_return_object.output_file) # Write output report if gw_return_object.output_report: if isinstance(output_report, str): os.makedirs(os.path.dirname(output_report), exist_ok=True) with open(output_report, "wb") as f: f.write(gw_return_object.output_report) self.release() return gw_return_object
def import_directory(self, input_directory: str, output_directory: Optional[str], output_report_directory: Optional[str] = None, content_management_policy: Union[ForwardRef(None), str, bytes, bytearray, _io.BytesIO, ArchiveManager] = None, raise_unsupported: bool = True)
-
Calls import_archive on each file in input_directory. The imported archives are written to output_directory maintaining the same directory structure as input_directory.
Args
input_directory
:str
- The input directory containing archives to import.
output_directory
:Optional[str]
, optional- Default None. If str, the output directory where the archives will be written.
output_report_directory
:Optional[str]
, optional- Default None. If str, the output directory where xml reports for each archive will be written.
content_management_policy
:Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager]
, optional- The content management policy to apply.
raise_unsupported
:bool
, optional- Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
Returns
imported_archives_dict (dict): A dictionary of file paths relative to input_directory, and glasswall.GwReturnObj with attributes: "status" (int), "output_file" (bytes), "output_report" (bytes)
Expand source code
def import_directory(self, input_directory: str, output_directory: Optional[str], output_report_directory: Optional[str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager] = None, raise_unsupported: bool = True): """ Calls import_archive on each file in input_directory. The imported archives are written to output_directory maintaining the same directory structure as input_directory. Args: input_directory (str): The input directory containing archives to import. output_directory (Optional[str], optional): Default None. If str, the output directory where the archives will be written. output_report_directory (Optional[str], optional): Default None. If str, the output directory where xml reports for each archive will be written. content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager], optional): The content management policy to apply. raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. Returns: imported_archives_dict (dict): A dictionary of file paths relative to input_directory, and glasswall.GwReturnObj with attributes: "status" (int), "output_file" (bytes), "output_report" (bytes) """ imported_archives_dict = {} # Call import_archive on each file in input_directory to output_directory for input_file in utils.list_file_paths(input_directory): relative_path = os.path.relpath(input_file, input_directory) # Construct paths for output file and output report output_file = None if output_directory is None else os.path.join(os.path.abspath(output_directory), relative_path) output_report = None if output_report_directory is None else os.path.join(os.path.abspath(output_report_directory), relative_path + ".xml") result = self.import_archive( input_file=input_file, output_file=output_file, output_report=output_report, content_management_policy=content_management_policy, raise_unsupported=raise_unsupported, ) imported_archives_dict[relative_path] = result return imported_archives_dict
def is_supported_archive(self, archive_type: str)
-
Returns True if the archive type (e.g.
7z
) is supported.Expand source code
@functools.lru_cache() def is_supported_archive(self, archive_type: str): """ Returns True if the archive type (e.g. `7z`) is supported. """ # API function declaration self.library.GwIsSupportedArchiveType.argtypes = [ ct.c_char_p ] self.library.GwIsSupportedArchiveType.restype = ct.c_bool ct_archive_type = ct.c_char_p(archive_type.encode()) # const char* type result = self.library.GwIsSupportedArchiveType(ct_archive_type) return result
def list_archive_paths(self, directory: str, recursive: bool = True, absolute: bool = True, followlinks: bool = True)
-
Returns a list of file paths of supported archives in a directory and all of its subdirectories.
Expand source code
def list_archive_paths(self, directory: str, recursive: bool = True, absolute: bool = True, followlinks: bool = True): """ Returns a list of file paths of supported archives in a directory and all of its subdirectories. """ return [ file_path for file_path in glasswall.utils.list_file_paths( directory=directory, recursive=recursive, absolute=absolute, followlinks=followlinks, ) if self.is_supported_archive(utils.get_file_type(file_path)) ]
def pack_directory(self, input_directory: str, output_directory: str, file_type: str, raise_unsupported: bool = True, delete_origin: bool = False)
-
Pack a directory. Supported archive formats are: "7z", "bz2", "gz", "rar", "tar", "zip".
Args
input_file
:str
- The archive file path
output_directory
:str
- The output directory where the archive will be unpacked to a new directory.
recursive
:bool
, optional- Default True. Recursively unpack all nested archives.
file_type
:str
- The archive file type.
raise_unsupported
:bool
, optional- Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
delete_origin
:bool
, optional- Default False. Delete input_file after unpacking to output_directory.
Expand source code
def pack_directory(self, input_directory: str, output_directory: str, file_type: str, raise_unsupported: bool = True, delete_origin: bool = False): """ Pack a directory. Supported archive formats are: "7z", "bz2", "gz", "rar", "tar", "zip". Args: input_file (str): The archive file path output_directory (str): The output directory where the archive will be unpacked to a new directory. recursive (bool, optional): Default True. Recursively unpack all nested archives. file_type (str): The archive file type. raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. delete_origin (bool, optional): Default False. Delete input_file after unpacking to output_directory. """ # Convert to absolute paths input_directory = os.path.abspath(input_directory) output_directory = os.path.abspath(output_directory) # Pack log.debug(f"Packing\n\tsrc: {input_directory}\n\tdst: {output_directory}") status = self.file_to_file_pack(input_directory=input_directory, output_directory=output_directory, file_type=file_type, raise_unsupported=raise_unsupported).status if status not in successes.success_codes: log.error(f"\n\tinput_directory: {input_directory}\n\tstatus: {status}") if raise_unsupported: raise errors.error_codes.get(status, errors.UnknownErrorCode)(status) else: log.debug(f"\n\tinput_directory: {input_directory}\n\tstatus: {status}") if delete_origin: utils.delete_directory(input_directory)
def protect_archive(self, input_file: Union[str, bytes, bytearray, _io.BytesIO], output_file: Optional[str] = None, output_report: Optional[str] = None, content_management_policy: Union[ForwardRef(None), str, bytes, bytearray, _io.BytesIO, ArchiveManager] = None, raise_unsupported: bool = True)
-
Extracts the input_file archive and processes each file within the archive using the Glasswall engine. Repackages all files regenerated by the Glasswall engine into a new archive, optionally writing the new archive and report to the paths specified by output_file and output_report.
Args
input_file
:Union[str, bytes, bytearray, io.BytesIO]
- The archive file path or bytes.
output_file
:Optional[str]
, optional- Default None. If str, write the archive to the output_file path.
output_report
:Optional[str]
, optional- Default None. If str, write the analysis report to the output_report path.
content_management_policy
:Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager]
, optional- The content management policy to apply.
raise_unsupported
:bool
, optional- Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
Returns
gw_return_object (glasswall.GwReturnObj): An instance of class glasswall.GwReturnObj containing attributes: "status" (int), "output_file" (bytes), "output_report" (bytes)
Expand source code
def protect_archive(self, input_file: Union[str, bytes, bytearray, io.BytesIO], output_file: Optional[str] = None, output_report: Optional[str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager] = None, raise_unsupported: bool = True): """ Extracts the input_file archive and processes each file within the archive using the Glasswall engine. Repackages all files regenerated by the Glasswall engine into a new archive, optionally writing the new archive and report to the paths specified by output_file and output_report. Args: input_file (Union[str, bytes, bytearray, io.BytesIO]): The archive file path or bytes. output_file (Optional[str], optional): Default None. If str, write the archive to the output_file path. output_report (Optional[str], optional): Default None. If str, write the analysis report to the output_report path. content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager], optional): The content management policy to apply. raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. Returns: gw_return_object (glasswall.GwReturnObj): An instance of class glasswall.GwReturnObj containing attributes: "status" (int), "output_file" (bytes), "output_report" (bytes) """ # Validate arg types if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO)): raise TypeError(input_file) if not isinstance(output_file, (type(None), str)): raise TypeError(output_file) if not isinstance(output_report, (type(None), str)): raise TypeError(output_report) if not isinstance(content_management_policy, (type(None), str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)): raise TypeError(content_management_policy) # Convert string path arguments to absolute paths if isinstance(output_file, str): output_file = os.path.abspath(output_file) if isinstance(output_report, str): output_report = os.path.abspath(output_report) # Convert inputs to bytes if isinstance(input_file, str): if not os.path.isfile(input_file): raise FileNotFoundError(input_file) with open(input_file, "rb") as f: input_file_bytes = f.read() elif isinstance(input_file, (bytes, bytearray, io.BytesIO)): input_file_bytes = utils.as_bytes(input_file) if isinstance(content_management_policy, str) and os.path.isfile(content_management_policy): with open(content_management_policy, "rb") as f: content_management_policy = f.read() elif isinstance(content_management_policy, type(None)): # Load default content_management_policy = glasswall.content_management.policies.ArchiveManager(default="sanitise", default_archive_manager="process") content_management_policy = utils.validate_xml(content_management_policy) # API function declaration self.library.GwFileProtectAndReportArchive.argtypes = [ ct.c_void_p, ct.c_size_t, ct.POINTER(ct.c_void_p), ct.POINTER(ct.c_size_t), ct.POINTER(ct.c_void_p), ct.POINTER(ct.c_size_t), ct.c_char_p ] # Variable initialisation input_buffer_bytearray = bytearray(input_file_bytes) ct_input_buffer = (ct.c_ubyte * len(input_buffer_bytearray)).from_buffer(input_buffer_bytearray) # void *inputBuffer ct_input_buffer_length = ct.c_size_t(len(input_file_bytes)) # size_t inputBufferLength ct_output_buffer = ct.c_void_p() # void **outputFileBuffer ct_output_buffer_length = ct.c_size_t() # size_t *outputFileBufferLength ct_output_report_buffer = ct.c_void_p() # void **outputAnalysisReportBuffer ct_output_report_buffer_length = ct.c_size_t() # size_t *outputAnalysisReportBufferLength ct_content_management_policy = ct.c_char_p(content_management_policy.encode()) # const char *xmlConfigString gw_return_object = glasswall.GwReturnObj() with utils.CwdHandler(new_cwd=self.library_path): # API call gw_return_object.status = self.library.GwFileProtectAndReportArchive( ct.byref(ct_input_buffer), ct_input_buffer_length, ct.byref(ct_output_buffer), ct.byref(ct_output_buffer_length), ct.byref(ct_output_report_buffer), ct.byref(ct_output_report_buffer_length), ct_content_management_policy ) input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray,)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file if gw_return_object.status not in successes.success_codes: log.error(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {gw_return_object.status}") if raise_unsupported: raise errors.error_codes.get(gw_return_object.status, errors.UnknownErrorCode)(gw_return_object.status) else: log.debug(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {gw_return_object.status}") gw_return_object.output_file = utils.buffer_to_bytes( ct_output_buffer, ct_output_buffer_length ) gw_return_object.output_report = utils.buffer_to_bytes( ct_output_report_buffer, ct_output_report_buffer_length ) # Write output file if gw_return_object.output_file: if isinstance(output_file, str): os.makedirs(os.path.dirname(output_file), exist_ok=True) with open(output_file, "wb") as f: f.write(gw_return_object.output_file) # Write output report if gw_return_object.output_report: if isinstance(output_report, str): os.makedirs(os.path.dirname(output_report), exist_ok=True) with open(output_report, "wb") as f: f.write(gw_return_object.output_report) self.release() return gw_return_object
def protect_directory(self, input_directory: str, output_directory: Optional[str] = None, output_report_directory: Optional[str] = None, content_management_policy: Union[ForwardRef(None), str, bytes, bytearray, _io.BytesIO, ArchiveManager] = None, raise_unsupported: bool = True)
-
Calls protect_archive on each file in input_directory using the given content management configuration. The resulting archives are written to output_directory maintaining the same directory structure as input_directory.
Args
input_directory
:str
- The input directory containing archives to protect.
output_directory
:Optional[str]
, optional- Default None. If str, the output directory where the archives will be written.
output_report_directory
:Optional[str]
, optional- Default None. If str, the output directory where xml reports for each archive will be written.
content_management_policy
:Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager]
, optional- The content management policy to apply.
raise_unsupported
:bool
, optional- Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
Returns
protected_archives_dict (dict): A dictionary of file paths relative to input_directory, and glasswall.GwReturnObj with attributes: "status" (int), "output_file" (bytes), "output_report" (bytes)
Expand source code
def protect_directory(self, input_directory: str, output_directory: Optional[str] = None, output_report_directory: Optional[str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager] = None, raise_unsupported: bool = True): """ Calls protect_archive on each file in input_directory using the given content management configuration. The resulting archives are written to output_directory maintaining the same directory structure as input_directory. Args: input_directory (str): The input directory containing archives to protect. output_directory (Optional[str], optional): Default None. If str, the output directory where the archives will be written. output_report_directory (Optional[str], optional): Default None. If str, the output directory where xml reports for each archive will be written. content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager], optional): The content management policy to apply. raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. Returns: protected_archives_dict (dict): A dictionary of file paths relative to input_directory, and glasswall.GwReturnObj with attributes: "status" (int), "output_file" (bytes), "output_report" (bytes) """ protected_archives_dict = {} # Call protect_archive on each file in input_directory to output_directory for input_file in utils.list_file_paths(input_directory): relative_path = os.path.relpath(input_file, input_directory) # Construct paths for output file and output report output_file = None if output_directory is None else os.path.join(os.path.abspath(output_directory), relative_path) output_report = None if output_report_directory is None else os.path.join(os.path.abspath(output_report_directory), relative_path + ".xml") result = self.protect_archive( input_file=input_file, output_file=output_file, output_report=output_report, content_management_policy=content_management_policy, raise_unsupported=raise_unsupported, ) protected_archives_dict[relative_path] = result return protected_archives_dict
def release(self)
-
Releases any resources held by the Glasswall Archive Manager library.
Expand source code
def release(self): """ Releases any resources held by the Glasswall Archive Manager library. """ self.library.GwArchiveDone()
def unpack(self, input_file: str, output_directory: str, recursive: bool = True, file_type: Optional[str] = None, include_file_type: Optional[bool] = False, raise_unsupported: bool = True, delete_origin: bool = False)
-
Unpack an archive, maintaining directory structure. Supported archive formats are: "7z", "bz2", "gz", "rar", "tar", "zip".
Args
input_file
:str
- The archive file path
output_directory
:str
- The output directory where the archive will be unpacked to a new directory.
recursive
:bool
, optional- Default True. Recursively unpack all nested archives.
file_type
:str
, optional- Default None (use extension). The archive file type.
include_file_type
:bool
, optional- Default False. Include the archive format in the directory name. Useful when there are multiple same-named archives of different formats.
raise_unsupported
:bool
, optional- Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
delete_origin
:bool
, optional- Default False. Delete input_file after unpacking to output_directory.
Expand source code
def unpack(self, input_file: str, output_directory: str, recursive: bool = True, file_type: Optional[str] = None, include_file_type: Optional[bool] = False, raise_unsupported: bool = True, delete_origin: bool = False): """ Unpack an archive, maintaining directory structure. Supported archive formats are: "7z", "bz2", "gz", "rar", "tar", "zip". Args: input_file (str): The archive file path output_directory (str): The output directory where the archive will be unpacked to a new directory. recursive (bool, optional): Default True. Recursively unpack all nested archives. file_type (str, optional): Default None (use extension). The archive file type. include_file_type (bool, optional): Default False. Include the archive format in the directory name. Useful when there are multiple same-named archives of different formats. raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. delete_origin (bool, optional): Default False. Delete input_file after unpacking to output_directory. """ # Convert to absolute paths input_file = os.path.abspath(input_file) output_directory = os.path.abspath(output_directory) if include_file_type: archive_name = os.path.basename(input_file) else: archive_name = os.path.splitext(os.path.basename(input_file))[0] archive_output_directory = os.path.join(output_directory, archive_name) # Unpack log.debug(f"Unpacking\n\tsrc: {input_file}\n\tdst: {archive_output_directory}") status = self.file_to_file_unpack(input_file=input_file, output_directory=archive_output_directory, file_type=file_type, raise_unsupported=raise_unsupported).status if status not in successes.success_codes: log.error(f"\n\tinput_file: {input_file}\n\tstatus: {status}") if raise_unsupported: raise errors.error_codes.get(status, errors.UnknownErrorCode)(status) else: log.debug(f"\n\tinput_file: {input_file}\n\tstatus: {status}") if delete_origin: os.remove(input_file) if recursive: # Unpack sub archives for subarchive in self.list_archive_paths(archive_output_directory): self.unpack( input_file=subarchive, output_directory=archive_output_directory, recursive=recursive, raise_unsupported=raise_unsupported, delete_origin=True ) return status
def unpack_directory(self, input_directory: str, output_directory: str, recursive: bool = True, file_type: Optional[str] = None, include_file_type: Optional[bool] = False, raise_unsupported: bool = True, delete_origin: bool = False)
-
Unpack a directory of archives, maintaining directory structure.
Args
input_directory
:str
- The input directory containing archives to unpack.
output_directory
:str
- The output directory where archives will be unpacked to a new directory.
recursive
:bool
, optional- Default True. Recursively unpack all nested archives.
file_type
:str
, optional- Default None (use extension). The archive file type of all archives within the directory.
include_file_type
:bool
, optional- Default False. Include the archive format in the directory name. Useful when there are multiple same-named archives of different formats.
raise_unsupported
:bool
, optional- Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
delete_origin
:bool
, optional- Default False. Delete input_file after unpacking to output_directory.
Expand source code
def unpack_directory(self, input_directory: str, output_directory: str, recursive: bool = True, file_type: Optional[str] = None, include_file_type: Optional[bool] = False, raise_unsupported: bool = True, delete_origin: bool = False): """ Unpack a directory of archives, maintaining directory structure. Args: input_directory (str): The input directory containing archives to unpack. output_directory (str): The output directory where archives will be unpacked to a new directory. recursive (bool, optional): Default True. Recursively unpack all nested archives. file_type (str, optional): Default None (use extension). The archive file type of all archives within the directory. include_file_type (bool, optional): Default False. Include the archive format in the directory name. Useful when there are multiple same-named archives of different formats. raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. delete_origin (bool, optional): Default False. Delete input_file after unpacking to output_directory. """ # Convert to absolute paths input_directory = os.path.abspath(input_directory) output_directory = os.path.abspath(output_directory) for archive_input_file in self.list_archive_paths(input_directory): relative_path = os.path.relpath(archive_input_file, input_directory) archive_output_file = os.path.dirname(os.path.join(output_directory, relative_path)) self.unpack( input_file=archive_input_file, output_directory=archive_output_file, recursive=recursive, file_type=file_type, include_file_type=include_file_type, raise_unsupported=raise_unsupported, delete_origin=delete_origin )
def version(self)
-
Returns the Glasswall library version.
Returns
version (str): The Glasswall library version.
Expand source code
def version(self): """ Returns the Glasswall library version. Returns: version (str): The Glasswall library version. """ # API function declaration self.library.GwArchiveVersion.restype = ct.c_char_p # API call version = self.library.GwArchiveVersion() # Convert to Python string version = ct.string_at(version).decode() return version