Module glasswall.libraries.archive_manager.archive_manager

Expand source code
import ctypes as ct
import functools
import io
import os
from typing import Optional, Union

import glasswall
from glasswall import utils
from glasswall.config.logging import log
from glasswall.libraries.archive_manager import errors, successes
from glasswall.libraries.library import Library


class ArchiveManager(Library):
    """ A high level Python wrapper for Glasswall Archive Manager. """

    def __init__(self, library_path):
        super().__init__(library_path)
        self.library = self.load_library(os.path.abspath(library_path))

        log.info(f"Loaded Glasswall {self.__class__.__name__} version {self.version()} from {self.library_path}")

    def version(self):
        """ Returns the Glasswall library version.

        Returns:
            version (str): The Glasswall library version.
        """
        # API function declaration
        self.library.GwArchiveVersion.restype = ct.c_char_p

        # API call
        version = self.library.GwArchiveVersion()

        # Convert to Python string
        version = ct.string_at(version).decode()

        return version

    def release(self):
        """ Releases any resources held by the Glasswall Archive Manager library. """
        self.library.GwArchiveDone()

    @functools.lru_cache()
    def is_supported_archive(self, archive_type: str):
        """ Returns True if the archive type (e.g. `7z`) is supported. """

        # API function declaration
        self.library.GwIsSupportedArchiveType.argtypes = [
            ct.c_char_p
        ]
        self.library.GwIsSupportedArchiveType.restype = ct.c_bool

        ct_archive_type = ct.c_char_p(archive_type.encode())  # const char* type

        result = self.library.GwIsSupportedArchiveType(ct_archive_type)

        return result

    def list_archive_paths(self, directory: str, recursive: bool = True, absolute: bool = True, followlinks: bool = True):
        """ Returns a list of file paths of supported archives in a directory and all of its subdirectories. """
        return [
            file_path
            for file_path in glasswall.utils.list_file_paths(
                directory=directory,
                recursive=recursive,
                absolute=absolute,
                followlinks=followlinks,
            )
            if self.is_supported_archive(utils.get_file_type(file_path))
        ]

    def analyse_archive(self, input_file: Union[str, bytes, bytearray, io.BytesIO], output_file: Optional[str] = None, output_report: Optional[str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager] = None, raise_unsupported: bool = True):
        """ Extracts the input_file archive and processes each file within the archive using the Glasswall engine. Repackages all files regenerated by the Glasswall engine into a new archive, optionally writing the new archive and report to the paths specified by output_file and output_report.

        Args:
            input_file (Union[str, bytes, bytearray, io.BytesIO]): The archive file path or bytes.
            output_file (Optional[str], optional): Default None. If str, write the archive to the output_file path.
            output_report (Optional[str], optional): Default None. If str, write the analysis report to the output_report path.
            content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager], optional): The content management policy to apply.
            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.

        Returns:
            gw_return_object (glasswall.GwReturnObj): An instance of class glasswall.GwReturnObj containing attributes: "status" (int), "output_file" (bytes), "output_report" (bytes)
        """
        # Validate arg types
        if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO)):
            raise TypeError(input_file)
        if not isinstance(output_file, (type(None), str)):
            raise TypeError(output_file)
        if not isinstance(output_report, (type(None), str)):
            raise TypeError(output_report)
        if not isinstance(content_management_policy, (type(None), str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)):
            raise TypeError(content_management_policy)

        # Convert string path arguments to absolute paths
        if isinstance(output_file, str):
            output_file = os.path.abspath(output_file)

        if isinstance(output_report, str):
            output_report = os.path.abspath(output_report)

        # Convert inputs to bytes
        if isinstance(input_file, str):
            if not os.path.isfile(input_file):
                raise FileNotFoundError(input_file)
            with open(input_file, "rb") as f:
                input_file_bytes = f.read()
        elif isinstance(input_file, (bytes, bytearray, io.BytesIO)):
            input_file_bytes = utils.as_bytes(input_file)

        if isinstance(content_management_policy, str) and os.path.isfile(content_management_policy):
            with open(content_management_policy, "rb") as f:
                content_management_policy = f.read()
        elif isinstance(content_management_policy, type(None)):
            # Load default
            content_management_policy = glasswall.content_management.policies.ArchiveManager(default="sanitise", default_archive_manager="process")
        content_management_policy = utils.validate_xml(content_management_policy)

        # API function declaration
        self.library.GwFileAnalysisArchive.argtypes = [
            ct.c_void_p,
            ct.c_size_t,
            ct.POINTER(ct.c_void_p),
            ct.POINTER(ct.c_size_t),
            ct.POINTER(ct.c_void_p),
            ct.POINTER(ct.c_size_t),
            ct.c_char_p
        ]

        # Variable initialisation
        input_buffer_bytearray = bytearray(input_file_bytes)

        ct_input_buffer = (ct.c_ubyte * len(input_buffer_bytearray)).from_buffer(input_buffer_bytearray)  # void *inputBuffer
        ct_input_buffer_length = ct.c_size_t(len(input_file_bytes))  # size_t inputBufferLength
        ct_output_buffer = ct.c_void_p()  # void **outputFileBuffer
        ct_output_buffer_length = ct.c_size_t()  # size_t *outputFileBufferLength
        ct_output_report_buffer = ct.c_void_p()  # void **outputAnalysisReportBuffer
        ct_output_report_buffer_length = ct.c_size_t()  # size_t *outputAnalysisReportBufferLength
        ct_content_management_policy = ct.c_char_p(content_management_policy.encode())  # const char *xmlConfigString
        gw_return_object = glasswall.GwReturnObj()

        with utils.CwdHandler(new_cwd=self.library_path):
            # API call
            gw_return_object.status = self.library.GwFileAnalysisArchive(
                ct.byref(ct_input_buffer),
                ct_input_buffer_length,
                ct.byref(ct_output_buffer),
                ct.byref(ct_output_buffer_length),
                ct.byref(ct_output_report_buffer),
                ct.byref(ct_output_report_buffer_length),
                ct_content_management_policy
            )

        input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray,)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file
        if gw_return_object.status not in successes.success_codes:
            log.error(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {gw_return_object.status}")
            if raise_unsupported:
                raise errors.error_codes.get(gw_return_object.status, errors.UnknownErrorCode)(gw_return_object.status)
        else:
            log.debug(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {gw_return_object.status}")

        gw_return_object.output_file = utils.buffer_to_bytes(
            ct_output_buffer,
            ct_output_buffer_length
        )
        gw_return_object.output_report = utils.buffer_to_bytes(
            ct_output_report_buffer,
            ct_output_report_buffer_length
        )

        # Write output file
        if gw_return_object.output_file:
            if isinstance(output_file, str):
                os.makedirs(os.path.dirname(output_file), exist_ok=True)
                with open(output_file, "wb") as f:
                    f.write(gw_return_object.output_file)

        # Write output report
        if gw_return_object.output_report:
            if isinstance(output_report, str):
                os.makedirs(os.path.dirname(output_report), exist_ok=True)
                with open(output_report, "wb") as f:
                    f.write(gw_return_object.output_report)

        self.release()

        return gw_return_object

    def analyse_directory(self, input_directory: str, output_directory: Optional[str] = None, output_report_directory: Optional[str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager] = None, raise_unsupported: bool = True):
        """ Calls analyse_archive on each file in input_directory using the given content management configuration. The resulting archives and analysis reports are written to output_directory maintaining the same directory structure as input_directory.

        Args:
            input_directory (str): The input directory containing archives to analyse.
            output_directory (Optional[str], optional): Default None. If str, the output directory where the archives containing analysis reports of each file will be written.
            output_report_directory (Optional[str], optional): Default None. If str, the output directory where xml reports for each archive will be written.
            content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager], optional): The content management policy to apply.
            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.

        Returns:
            analysed_archives_dict (dict): A dictionary of file paths relative to input_directory, and glasswall.GwReturnObj with attributes: "status" (int), "output_file" (bytes), "output_report" (bytes)
        """
        analysed_archives_dict = {}
        # Call analyse_archive on each file in input_directory
        for input_file in utils.list_file_paths(input_directory):
            relative_path = os.path.relpath(input_file, input_directory)
            # Construct paths for output file and output report
            output_file = None if output_directory is None else os.path.join(os.path.abspath(output_directory), relative_path)
            output_report = None if output_report_directory is None else os.path.join(os.path.abspath(output_report_directory), relative_path + ".xml")

            result = self.analyse_archive(
                input_file=input_file,
                output_file=output_file,
                output_report=output_report,
                content_management_policy=content_management_policy,
                raise_unsupported=raise_unsupported,
            )

            analysed_archives_dict[relative_path] = result

        return analysed_archives_dict

    def protect_archive(self, input_file: Union[str, bytes, bytearray, io.BytesIO], output_file: Optional[str] = None, output_report: Optional[str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager] = None, raise_unsupported: bool = True):
        """ Extracts the input_file archive and processes each file within the archive using the Glasswall engine. Repackages all files regenerated by the Glasswall engine into a new archive, optionally writing the new archive and report to the paths specified by output_file and output_report.

        Args:
            input_file (Union[str, bytes, bytearray, io.BytesIO]): The archive file path or bytes.
            output_file (Optional[str], optional): Default None. If str, write the archive to the output_file path.
            output_report (Optional[str], optional): Default None. If str, write the analysis report to the output_report path.
            content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager], optional): The content management policy to apply.
            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.

        Returns:
            gw_return_object (glasswall.GwReturnObj): An instance of class glasswall.GwReturnObj containing attributes: "status" (int), "output_file" (bytes), "output_report" (bytes)
        """
        # Validate arg types
        if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO)):
            raise TypeError(input_file)
        if not isinstance(output_file, (type(None), str)):
            raise TypeError(output_file)
        if not isinstance(output_report, (type(None), str)):
            raise TypeError(output_report)
        if not isinstance(content_management_policy, (type(None), str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)):
            raise TypeError(content_management_policy)

        # Convert string path arguments to absolute paths
        if isinstance(output_file, str):
            output_file = os.path.abspath(output_file)

        if isinstance(output_report, str):
            output_report = os.path.abspath(output_report)

        # Convert inputs to bytes
        if isinstance(input_file, str):
            if not os.path.isfile(input_file):
                raise FileNotFoundError(input_file)
            with open(input_file, "rb") as f:
                input_file_bytes = f.read()
        elif isinstance(input_file, (bytes, bytearray, io.BytesIO)):
            input_file_bytes = utils.as_bytes(input_file)

        if isinstance(content_management_policy, str) and os.path.isfile(content_management_policy):
            with open(content_management_policy, "rb") as f:
                content_management_policy = f.read()
        elif isinstance(content_management_policy, type(None)):
            # Load default
            content_management_policy = glasswall.content_management.policies.ArchiveManager(default="sanitise", default_archive_manager="process")
        content_management_policy = utils.validate_xml(content_management_policy)

        # API function declaration
        self.library.GwFileProtectAndReportArchive.argtypes = [
            ct.c_void_p,
            ct.c_size_t,
            ct.POINTER(ct.c_void_p),
            ct.POINTER(ct.c_size_t),
            ct.POINTER(ct.c_void_p),
            ct.POINTER(ct.c_size_t),
            ct.c_char_p
        ]

        # Variable initialisation
        input_buffer_bytearray = bytearray(input_file_bytes)

        ct_input_buffer = (ct.c_ubyte * len(input_buffer_bytearray)).from_buffer(input_buffer_bytearray)  # void *inputBuffer
        ct_input_buffer_length = ct.c_size_t(len(input_file_bytes))  # size_t inputBufferLength
        ct_output_buffer = ct.c_void_p()  # void **outputFileBuffer
        ct_output_buffer_length = ct.c_size_t()  # size_t *outputFileBufferLength
        ct_output_report_buffer = ct.c_void_p()  # void **outputAnalysisReportBuffer
        ct_output_report_buffer_length = ct.c_size_t()  # size_t *outputAnalysisReportBufferLength
        ct_content_management_policy = ct.c_char_p(content_management_policy.encode())  # const char *xmlConfigString
        gw_return_object = glasswall.GwReturnObj()

        with utils.CwdHandler(new_cwd=self.library_path):
            # API call
            gw_return_object.status = self.library.GwFileProtectAndReportArchive(
                ct.byref(ct_input_buffer),
                ct_input_buffer_length,
                ct.byref(ct_output_buffer),
                ct.byref(ct_output_buffer_length),
                ct.byref(ct_output_report_buffer),
                ct.byref(ct_output_report_buffer_length),
                ct_content_management_policy
            )

        input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray,)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file
        if gw_return_object.status not in successes.success_codes:
            log.error(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {gw_return_object.status}")
            if raise_unsupported:
                raise errors.error_codes.get(gw_return_object.status, errors.UnknownErrorCode)(gw_return_object.status)
        else:
            log.debug(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {gw_return_object.status}")

        gw_return_object.output_file = utils.buffer_to_bytes(
            ct_output_buffer,
            ct_output_buffer_length
        )
        gw_return_object.output_report = utils.buffer_to_bytes(
            ct_output_report_buffer,
            ct_output_report_buffer_length
        )

        # Write output file
        if gw_return_object.output_file:
            if isinstance(output_file, str):
                os.makedirs(os.path.dirname(output_file), exist_ok=True)
                with open(output_file, "wb") as f:
                    f.write(gw_return_object.output_file)

        # Write output report
        if gw_return_object.output_report:
            if isinstance(output_report, str):
                os.makedirs(os.path.dirname(output_report), exist_ok=True)
                with open(output_report, "wb") as f:
                    f.write(gw_return_object.output_report)

        self.release()

        return gw_return_object

    def protect_directory(self, input_directory: str, output_directory: Optional[str] = None, output_report_directory: Optional[str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager] = None, raise_unsupported: bool = True):
        """ Calls protect_archive on each file in input_directory using the given content management configuration. The resulting archives are written to output_directory maintaining the same directory structure as input_directory.

        Args:
            input_directory (str): The input directory containing archives to protect.
            output_directory (Optional[str], optional): Default None. If str, the output directory where the archives will be written.
            output_report_directory (Optional[str], optional): Default None. If str, the output directory where xml reports for each archive will be written.
            content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager], optional): The content management policy to apply.
            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.

        Returns:
            protected_archives_dict (dict): A dictionary of file paths relative to input_directory, and glasswall.GwReturnObj with attributes: "status" (int), "output_file" (bytes), "output_report" (bytes)
        """
        protected_archives_dict = {}
        # Call protect_archive on each file in input_directory to output_directory
        for input_file in utils.list_file_paths(input_directory):
            relative_path = os.path.relpath(input_file, input_directory)
            # Construct paths for output file and output report
            output_file = None if output_directory is None else os.path.join(os.path.abspath(output_directory), relative_path)
            output_report = None if output_report_directory is None else os.path.join(os.path.abspath(output_report_directory), relative_path + ".xml")

            result = self.protect_archive(
                input_file=input_file,
                output_file=output_file,
                output_report=output_report,
                content_management_policy=content_management_policy,
                raise_unsupported=raise_unsupported,
            )

            protected_archives_dict[relative_path] = result

        return protected_archives_dict

    def file_to_file_unpack(self, input_file: str, output_directory: str, file_type: Optional[str] = None, raise_unsupported: bool = True):
        # Validate arg types
        if not isinstance(input_file, str):
            raise TypeError(input_file)
        elif not os.path.isfile(input_file):
            raise FileNotFoundError(input_file)
        if not isinstance(output_directory, str):
            raise TypeError(output_directory)
        if not file_type:
            file_type = utils.get_file_type(input_file)

        # API function declaration
        self.library.GwFileToFileUnpack.argtypes = [
            ct.c_char_p,
            ct.c_char_p,
            ct.c_char_p,
        ]

        # Variable initialisation
        gw_return_object = glasswall.GwReturnObj()
        gw_return_object.ct_input_file = ct.c_char_p(input_file.encode())  # const char* inputFilePath
        gw_return_object.ct_output_directory = ct.c_char_p(output_directory.encode())  # const char* outputDirPath
        gw_return_object.ct_file_type = ct.c_char_p(file_type.encode())  # const char *fileType

        with utils.CwdHandler(new_cwd=self.library_path):
            try:
                # API call
                gw_return_object.status = self.library.GwFileToFileUnpack(
                    gw_return_object.ct_input_file,
                    gw_return_object.ct_output_directory,
                    gw_return_object.ct_file_type,
                )
            except OSError:
                # bz2, gz currently OSError on unpack, not supported unpacking
                # OSError: exception: access violation reading 0x0000000000000000
                if raise_unsupported:
                    raise

                return 0

        if gw_return_object.status not in successes.success_codes:
            log.error(f"\n\tinput_file: {input_file}\n\tstatus: {gw_return_object.status}")
            if raise_unsupported:
                raise errors.error_codes.get(gw_return_object.status, errors.UnknownErrorCode)(gw_return_object.status)
        else:
            log.debug(f"\n\tinput_file: {input_file}\n\tstatus: {gw_return_object.status}")

        self.release()

        return gw_return_object

    def file_to_file_pack(self, input_directory: str, output_directory: str, file_type: Optional[str] = None, raise_unsupported: bool = True):
        # Validate arg types
        if not isinstance(input_directory, str):
            raise TypeError(input_directory)
        elif not os.path.isdir(input_directory):
            raise NotADirectoryError(input_directory)
        if not isinstance(output_directory, str):
            raise TypeError(output_directory)
        if not file_type:
            file_type = utils.get_file_type(input_directory)

        # Ensure output_directory exists
        os.makedirs(output_directory, exist_ok=True)

        # API function declaration
        self.library.GwFileToFilePack.argtypes = [
            ct.c_char_p,
            ct.c_char_p,
            ct.c_char_p,
        ]

        # Variable initialisation
        gw_return_object = glasswall.GwReturnObj()
        gw_return_object.ct_input_directory = ct.c_char_p(input_directory.encode())  # const char* inputDirPath
        gw_return_object.ct_output_directory = ct.c_char_p(output_directory.encode())  # const char* outputDirPath
        gw_return_object.ct_file_type = ct.c_char_p(file_type.encode())  # const char *fileType

        with utils.CwdHandler(new_cwd=self.library_path):
            # API call
            gw_return_object.status = self.library.GwFileToFilePack(
                gw_return_object.ct_input_directory,
                gw_return_object.ct_output_directory,
                gw_return_object.ct_file_type,
            )

        if gw_return_object.status not in successes.success_codes:
            log.error(f"\n\tinput_directory: {input_directory}\n\tstatus: {gw_return_object.status}")
            if raise_unsupported:
                raise errors.error_codes.get(gw_return_object.status, errors.UnknownErrorCode)(gw_return_object.status)
        else:
            log.debug(f"\n\tinput_directory: {input_directory}\n\tstatus: {gw_return_object.status}")

        self.release()

        return gw_return_object

    def unpack(self, input_file: str, output_directory: str, recursive: bool = True, file_type: Optional[str] = None, include_file_type: Optional[bool] = False, raise_unsupported: bool = True, delete_origin: bool = False):
        """ Unpack an archive, maintaining directory structure. Supported archive formats are: "7z", "bz2", "gz", "rar", "tar", "zip".

        Args:
            input_file (str): The archive file path
            output_directory (str): The output directory where the archive will be unpacked to a new directory.
            recursive (bool, optional): Default True. Recursively unpack all nested archives.
            file_type (str, optional): Default None (use extension). The archive file type.
            include_file_type (bool, optional): Default False. Include the archive format in the directory name. Useful when there are multiple same-named archives of different formats.
            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
            delete_origin (bool, optional): Default False. Delete input_file after unpacking to output_directory.
        """
        # Convert to absolute paths
        input_file = os.path.abspath(input_file)
        output_directory = os.path.abspath(output_directory)

        if include_file_type:
            archive_name = os.path.basename(input_file)
        else:
            archive_name = os.path.splitext(os.path.basename(input_file))[0]
        archive_output_directory = os.path.join(output_directory, archive_name)

        # Unpack
        log.debug(f"Unpacking\n\tsrc: {input_file}\n\tdst: {archive_output_directory}")
        status = self.file_to_file_unpack(input_file=input_file, output_directory=archive_output_directory, file_type=file_type, raise_unsupported=raise_unsupported).status

        if status not in successes.success_codes:
            log.error(f"\n\tinput_file: {input_file}\n\tstatus: {status}")
            if raise_unsupported:
                raise errors.error_codes.get(status, errors.UnknownErrorCode)(status)
        else:
            log.debug(f"\n\tinput_file: {input_file}\n\tstatus: {status}")

        if delete_origin:
            os.remove(input_file)

        if recursive:
            # Unpack sub archives
            for subarchive in self.list_archive_paths(archive_output_directory):
                self.unpack(
                    input_file=subarchive,
                    output_directory=archive_output_directory,
                    recursive=recursive,
                    raise_unsupported=raise_unsupported,
                    delete_origin=True
                )

        return status

    def unpack_directory(self, input_directory: str, output_directory: str, recursive: bool = True, file_type: Optional[str] = None, include_file_type: Optional[bool] = False, raise_unsupported: bool = True, delete_origin: bool = False):
        """ Unpack a directory of archives, maintaining directory structure.

        Args:
            input_directory (str): The input directory containing archives to unpack.
            output_directory (str): The output directory where archives will be unpacked to a new directory.
            recursive (bool, optional): Default True. Recursively unpack all nested archives.
            file_type (str, optional): Default None (use extension). The archive file type of all archives within the directory.
            include_file_type (bool, optional): Default False. Include the archive format in the directory name. Useful when there are multiple same-named archives of different formats.
            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
            delete_origin (bool, optional): Default False. Delete input_file after unpacking to output_directory.
        """
        # Convert to absolute paths
        input_directory = os.path.abspath(input_directory)
        output_directory = os.path.abspath(output_directory)

        for archive_input_file in self.list_archive_paths(input_directory):
            relative_path = os.path.relpath(archive_input_file, input_directory)
            archive_output_file = os.path.dirname(os.path.join(output_directory, relative_path))
            self.unpack(
                input_file=archive_input_file,
                output_directory=archive_output_file,
                recursive=recursive,
                file_type=file_type,
                include_file_type=include_file_type,
                raise_unsupported=raise_unsupported,
                delete_origin=delete_origin
            )

    def pack_directory(self, input_directory: str, output_directory: str, file_type: str, raise_unsupported: bool = True, delete_origin: bool = False):
        """ Pack a directory. Supported archive formats are: "7z", "bz2", "gz", "rar", "tar", "zip".

        Args:
            input_file (str): The archive file path
            output_directory (str): The output directory where the archive will be unpacked to a new directory.
            recursive (bool, optional): Default True. Recursively unpack all nested archives.
            file_type (str): The archive file type.
            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
            delete_origin (bool, optional): Default False. Delete input_file after unpacking to output_directory.
        """
        # Convert to absolute paths
        input_directory = os.path.abspath(input_directory)
        output_directory = os.path.abspath(output_directory)

        # Pack
        log.debug(f"Packing\n\tsrc: {input_directory}\n\tdst: {output_directory}")
        status = self.file_to_file_pack(input_directory=input_directory, output_directory=output_directory, file_type=file_type, raise_unsupported=raise_unsupported).status

        if status not in successes.success_codes:
            log.error(f"\n\tinput_directory: {input_directory}\n\tstatus: {status}")
            if raise_unsupported:
                raise errors.error_codes.get(status, errors.UnknownErrorCode)(status)
        else:
            log.debug(f"\n\tinput_directory: {input_directory}\n\tstatus: {status}")

        if delete_origin:
            utils.delete_directory(input_directory)

    def export_archive(self, input_file: Union[str, bytes, bytearray, io.BytesIO], output_file: Optional[str] = None, output_report: Optional[str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager] = None, raise_unsupported: bool = True):
        """ Exports an archive using the Glasswall engine.

        Args:
            input_file (Union[str, bytes, bytearray, io.BytesIO]): The archive file path or bytes.
            output_file (Optional[str], optional): Default None. If str, write the archive to the output_file path.
            output_report (Optional[str], optional): Default None. If str, write the analysis report to the output_report path.
            content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager], optional): The content management policy to apply.
            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.

        Returns:
            gw_return_object (glasswall.GwReturnObj): An instance of class glasswall.GwReturnObj containing attributes: "status" (int), "output_file" (bytes), "output_report" (bytes)
        """
        # Validate arg types
        if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO)):
            raise TypeError(input_file)
        if not isinstance(output_file, (type(None), str)):
            raise TypeError(output_file)
        if not isinstance(output_report, (type(None), str)):
            raise TypeError(output_report)
        if not isinstance(content_management_policy, (type(None), str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)):
            raise TypeError(content_management_policy)

        # Convert string path arguments to absolute paths
        if isinstance(output_file, str):
            output_file = os.path.abspath(output_file)

        if isinstance(output_report, str):
            output_report = os.path.abspath(output_report)

        # Convert inputs to bytes
        if isinstance(input_file, str):
            if not os.path.isfile(input_file):
                raise FileNotFoundError(input_file)
            with open(input_file, "rb") as f:
                input_file_bytes = f.read()
        elif isinstance(input_file, (bytes, bytearray, io.BytesIO)):
            input_file_bytes = utils.as_bytes(input_file)

        if isinstance(content_management_policy, str) and os.path.isfile(content_management_policy):
            with open(content_management_policy, "rb") as f:
                content_management_policy = f.read()
        elif isinstance(content_management_policy, type(None)):
            # Load default
            content_management_policy = glasswall.content_management.policies.ArchiveManager(default="sanitise", default_archive_manager="process")
        content_management_policy = utils.validate_xml(content_management_policy)

        # API function declaration
        self.library.GwFileExportArchive.argtypes = [
            ct.c_void_p,
            ct.c_size_t,
            ct.POINTER(ct.c_void_p),
            ct.POINTER(ct.c_size_t),
            ct.POINTER(ct.c_void_p),
            ct.POINTER(ct.c_size_t),
            ct.c_char_p
        ]

        # Variable initialisation
        input_buffer_bytearray = bytearray(input_file_bytes)

        ct_input_buffer = (ct.c_ubyte * len(input_buffer_bytearray)).from_buffer(input_buffer_bytearray)  # void *inputBuffer
        ct_input_buffer_length = ct.c_size_t(len(input_file_bytes))  # size_t inputBufferLength
        ct_output_buffer = ct.c_void_p()  # void **outputFileBuffer
        ct_output_buffer_length = ct.c_size_t()  # size_t *outputFileBufferLength
        ct_output_report_buffer = ct.c_void_p()  # void **outputReportBuffer
        ct_output_report_buffer_length = ct.c_size_t()  # size_t *outputReportBufferLength
        ct_content_management_policy = ct.c_char_p(content_management_policy.encode())  # const char *xmlConfigString
        gw_return_object = glasswall.GwReturnObj()

        with utils.CwdHandler(new_cwd=self.library_path):
            # API call
            gw_return_object.status = self.library.GwFileExportArchive(
                ct.byref(ct_input_buffer),
                ct_input_buffer_length,
                ct.byref(ct_output_buffer),
                ct.byref(ct_output_buffer_length),
                ct.byref(ct_output_report_buffer),
                ct.byref(ct_output_report_buffer_length),
                ct_content_management_policy
            )

        input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray,)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file
        if gw_return_object.status not in successes.success_codes:
            log.error(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {gw_return_object.status}")
            if raise_unsupported:
                raise errors.error_codes.get(gw_return_object.status, errors.UnknownErrorCode)(gw_return_object.status)
        else:
            log.debug(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {gw_return_object.status}")

        gw_return_object.output_file = utils.buffer_to_bytes(
            ct_output_buffer,
            ct_output_buffer_length
        )
        gw_return_object.output_report = utils.buffer_to_bytes(
            ct_output_report_buffer,
            ct_output_report_buffer_length
        )

        # Write output file
        if gw_return_object.output_file:
            if isinstance(output_file, str):
                os.makedirs(os.path.dirname(output_file), exist_ok=True)
                with open(output_file, "wb") as f:
                    f.write(gw_return_object.output_file)

        # Write output report
        if gw_return_object.output_report:
            if isinstance(output_report, str):
                os.makedirs(os.path.dirname(output_report), exist_ok=True)
                with open(output_report, "wb") as f:
                    f.write(gw_return_object.output_report)

        self.release()

        return gw_return_object

    def export_directory(self, input_directory: str, output_directory: Optional[str], output_report_directory: Optional[str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager] = None, raise_unsupported: bool = True):
        """ Calls export_archive on each file in input_directory. The exported archives are written to output_directory maintaining the same directory structure as input_directory.

        Args:
            input_directory (str): The input directory containing archives to export.
            output_directory (Optional[str], optional): Default None. If str, the output directory where the archives will be written.
            output_report_directory (Optional[str], optional): Default None. If str, the output directory where xml reports for each archive will be written.
            content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager], optional): The content management policy to apply.
            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.

        Returns:
            exported_archives_dict (dict): A dictionary of file paths relative to input_directory, and glasswall.GwReturnObj with attributes: "status" (int), "output_file" (bytes), "output_report" (bytes)
        """
        exported_archives_dict = {}
        # Call export_archive on each file in input_directory to output_directory
        for input_file in utils.list_file_paths(input_directory):
            relative_path = os.path.relpath(input_file, input_directory)
            # Construct paths for output file and output report
            output_file = None if output_directory is None else os.path.join(os.path.abspath(output_directory), relative_path)
            output_report = None if output_report_directory is None else os.path.join(os.path.abspath(output_report_directory), relative_path + ".xml")

            result = self.export_archive(
                input_file=input_file,
                output_file=output_file,
                output_report=output_report,
                content_management_policy=content_management_policy,
                raise_unsupported=raise_unsupported,
            )

            exported_archives_dict[relative_path] = result

        return exported_archives_dict

    def import_archive(self, input_file: Union[str, bytes, bytearray, io.BytesIO], output_file: Optional[str] = None, output_report: Optional[str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager] = None, include_analysis_report: Optional[int] = 1, raise_unsupported: bool = True):
        """ Imports an archive using the Glasswall engine.

        Args:
            input_file (Union[str, bytes, bytearray, io.BytesIO]): The archive file path or bytes.
            output_file (Optional[str], optional): Default None. If str, write the archive to the output_file path.
            output_report (Optional[str], optional): Default None. If str, write the analysis report to the output_report path.
            content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager], optional): The content management policy to apply.
            include_analysis_report (Optional[int], optional): Default 1. If 1, write the analysis report into imported archive.
            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.

        Returns:
            gw_return_object (glasswall.GwReturnObj): An instance of class glasswall.GwReturnObj containing attributes: "status" (int), "output_file" (bytes), "output_report" (bytes)
        """
        # Validate arg types
        if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO)):
            raise TypeError(input_file)
        if not isinstance(output_file, (type(None), str)):
            raise TypeError(output_file)
        if not isinstance(output_report, (type(None), str)):
            raise TypeError(output_report)
        if not isinstance(content_management_policy, (type(None), str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)):
            raise TypeError(content_management_policy)

        # Convert string path arguments to absolute paths
        if isinstance(output_file, str):
            output_file = os.path.abspath(output_file)

        if isinstance(output_report, str):
            output_report = os.path.abspath(output_report)

        # Convert inputs to bytes
        if isinstance(input_file, str):
            if not os.path.isfile(input_file):
                raise FileNotFoundError(input_file)
            with open(input_file, "rb") as f:
                input_file_bytes = f.read()
        elif isinstance(input_file, (bytes, bytearray, io.BytesIO)):
            input_file_bytes = utils.as_bytes(input_file)

        if isinstance(content_management_policy, str) and os.path.isfile(content_management_policy):
            with open(content_management_policy, "rb") as f:
                content_management_policy = f.read()
        elif isinstance(content_management_policy, type(None)):
            # Load default
            content_management_policy = glasswall.content_management.policies.ArchiveManager(default="sanitise", default_archive_manager="process")
        content_management_policy = utils.validate_xml(content_management_policy)

        # API function declaration
        self.library.GwFileImportArchive.argtypes = [
            ct.c_void_p,
            ct.c_size_t,
            ct.POINTER(ct.c_void_p),
            ct.POINTER(ct.c_size_t),
            ct.POINTER(ct.c_void_p),
            ct.POINTER(ct.c_size_t),
            ct.c_char_p,
            ct.c_int
        ]

        # Variable initialisation
        input_buffer_bytearray = bytearray(input_file_bytes)

        ct_input_buffer = (ct.c_ubyte * len(input_buffer_bytearray)).from_buffer(input_buffer_bytearray)  # void *inputBuffer
        ct_input_buffer_length = ct.c_size_t(len(input_file_bytes))  # size_t inputBufferLength
        ct_output_buffer = ct.c_void_p()  # void **outputFileBuffer
        ct_output_buffer_length = ct.c_size_t()  # size_t *outputFileBufferLength
        ct_output_report_buffer = ct.c_void_p()  # void **outputReportBuffer
        ct_output_report_buffer_length = ct.c_size_t()  # size_t *outputReportBufferLength
        ct_content_management_policy = ct.c_char_p(content_management_policy.encode())  # const char *xmlConfigString
        ct_include_analysis_report = ct.c_int(include_analysis_report)  # int
        gw_return_object = glasswall.GwReturnObj()

        with utils.CwdHandler(new_cwd=self.library_path):
            # API call
            gw_return_object.status = self.library.GwFileImportArchive(
                ct.byref(ct_input_buffer),
                ct_input_buffer_length,
                ct.byref(ct_output_buffer),
                ct.byref(ct_output_buffer_length),
                ct.byref(ct_output_report_buffer),
                ct.byref(ct_output_report_buffer_length),
                ct_content_management_policy,
                ct_include_analysis_report
            )

        input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray,)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file
        if gw_return_object.status not in successes.success_codes:
            log.error(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {gw_return_object.status}")
            if raise_unsupported:
                raise errors.error_codes.get(gw_return_object.status, errors.UnknownErrorCode)(gw_return_object.status)
        else:
            log.debug(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {gw_return_object.status}")

        gw_return_object.output_file = utils.buffer_to_bytes(
            ct_output_buffer,
            ct_output_buffer_length
        )
        gw_return_object.output_report = utils.buffer_to_bytes(
            ct_output_report_buffer,
            ct_output_report_buffer_length
        )

        # Write output file
        if gw_return_object.output_file:
            if isinstance(output_file, str):
                os.makedirs(os.path.dirname(output_file), exist_ok=True)
                with open(output_file, "wb") as f:
                    f.write(gw_return_object.output_file)

        # Write output report
        if gw_return_object.output_report:
            if isinstance(output_report, str):
                os.makedirs(os.path.dirname(output_report), exist_ok=True)
                with open(output_report, "wb") as f:
                    f.write(gw_return_object.output_report)

        self.release()

        return gw_return_object

    def import_directory(self, input_directory: str, output_directory: Optional[str], output_report_directory: Optional[str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager] = None, raise_unsupported: bool = True):
        """ Calls import_archive on each file in input_directory. The imported archives are written to output_directory maintaining the same directory structure as input_directory.

        Args:
            input_directory (str): The input directory containing archives to import.
            output_directory (Optional[str], optional): Default None. If str, the output directory where the archives will be written.
            output_report_directory (Optional[str], optional): Default None. If str, the output directory where xml reports for each archive will be written.
            content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager], optional): The content management policy to apply.
            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.

        Returns:
            imported_archives_dict (dict): A dictionary of file paths relative to input_directory, and glasswall.GwReturnObj with attributes: "status" (int), "output_file" (bytes), "output_report" (bytes)
        """
        imported_archives_dict = {}
        # Call import_archive on each file in input_directory to output_directory
        for input_file in utils.list_file_paths(input_directory):
            relative_path = os.path.relpath(input_file, input_directory)
            # Construct paths for output file and output report
            output_file = None if output_directory is None else os.path.join(os.path.abspath(output_directory), relative_path)
            output_report = None if output_report_directory is None else os.path.join(os.path.abspath(output_report_directory), relative_path + ".xml")

            result = self.import_archive(
                input_file=input_file,
                output_file=output_file,
                output_report=output_report,
                content_management_policy=content_management_policy,
                raise_unsupported=raise_unsupported,
            )

            imported_archives_dict[relative_path] = result

        return imported_archives_dict

Classes

class ArchiveManager (library_path)

A high level Python wrapper for Glasswall Archive Manager.

Expand source code
class ArchiveManager(Library):
    """ A high level Python wrapper for Glasswall Archive Manager. """

    def __init__(self, library_path):
        super().__init__(library_path)
        self.library = self.load_library(os.path.abspath(library_path))

        log.info(f"Loaded Glasswall {self.__class__.__name__} version {self.version()} from {self.library_path}")

    def version(self):
        """ Returns the Glasswall library version.

        Returns:
            version (str): The Glasswall library version.
        """
        # API function declaration
        self.library.GwArchiveVersion.restype = ct.c_char_p

        # API call
        version = self.library.GwArchiveVersion()

        # Convert to Python string
        version = ct.string_at(version).decode()

        return version

    def release(self):
        """ Releases any resources held by the Glasswall Archive Manager library. """
        self.library.GwArchiveDone()

    @functools.lru_cache()
    def is_supported_archive(self, archive_type: str):
        """ Returns True if the archive type (e.g. `7z`) is supported. """

        # API function declaration
        self.library.GwIsSupportedArchiveType.argtypes = [
            ct.c_char_p
        ]
        self.library.GwIsSupportedArchiveType.restype = ct.c_bool

        ct_archive_type = ct.c_char_p(archive_type.encode())  # const char* type

        result = self.library.GwIsSupportedArchiveType(ct_archive_type)

        return result

    def list_archive_paths(self, directory: str, recursive: bool = True, absolute: bool = True, followlinks: bool = True):
        """ Returns a list of file paths of supported archives in a directory and all of its subdirectories. """
        return [
            file_path
            for file_path in glasswall.utils.list_file_paths(
                directory=directory,
                recursive=recursive,
                absolute=absolute,
                followlinks=followlinks,
            )
            if self.is_supported_archive(utils.get_file_type(file_path))
        ]

    def analyse_archive(self, input_file: Union[str, bytes, bytearray, io.BytesIO], output_file: Optional[str] = None, output_report: Optional[str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager] = None, raise_unsupported: bool = True):
        """ Extracts the input_file archive and processes each file within the archive using the Glasswall engine. Repackages all files regenerated by the Glasswall engine into a new archive, optionally writing the new archive and report to the paths specified by output_file and output_report.

        Args:
            input_file (Union[str, bytes, bytearray, io.BytesIO]): The archive file path or bytes.
            output_file (Optional[str], optional): Default None. If str, write the archive to the output_file path.
            output_report (Optional[str], optional): Default None. If str, write the analysis report to the output_report path.
            content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager], optional): The content management policy to apply.
            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.

        Returns:
            gw_return_object (glasswall.GwReturnObj): An instance of class glasswall.GwReturnObj containing attributes: "status" (int), "output_file" (bytes), "output_report" (bytes)
        """
        # Validate arg types
        if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO)):
            raise TypeError(input_file)
        if not isinstance(output_file, (type(None), str)):
            raise TypeError(output_file)
        if not isinstance(output_report, (type(None), str)):
            raise TypeError(output_report)
        if not isinstance(content_management_policy, (type(None), str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)):
            raise TypeError(content_management_policy)

        # Convert string path arguments to absolute paths
        if isinstance(output_file, str):
            output_file = os.path.abspath(output_file)

        if isinstance(output_report, str):
            output_report = os.path.abspath(output_report)

        # Convert inputs to bytes
        if isinstance(input_file, str):
            if not os.path.isfile(input_file):
                raise FileNotFoundError(input_file)
            with open(input_file, "rb") as f:
                input_file_bytes = f.read()
        elif isinstance(input_file, (bytes, bytearray, io.BytesIO)):
            input_file_bytes = utils.as_bytes(input_file)

        if isinstance(content_management_policy, str) and os.path.isfile(content_management_policy):
            with open(content_management_policy, "rb") as f:
                content_management_policy = f.read()
        elif isinstance(content_management_policy, type(None)):
            # Load default
            content_management_policy = glasswall.content_management.policies.ArchiveManager(default="sanitise", default_archive_manager="process")
        content_management_policy = utils.validate_xml(content_management_policy)

        # API function declaration
        self.library.GwFileAnalysisArchive.argtypes = [
            ct.c_void_p,
            ct.c_size_t,
            ct.POINTER(ct.c_void_p),
            ct.POINTER(ct.c_size_t),
            ct.POINTER(ct.c_void_p),
            ct.POINTER(ct.c_size_t),
            ct.c_char_p
        ]

        # Variable initialisation
        input_buffer_bytearray = bytearray(input_file_bytes)

        ct_input_buffer = (ct.c_ubyte * len(input_buffer_bytearray)).from_buffer(input_buffer_bytearray)  # void *inputBuffer
        ct_input_buffer_length = ct.c_size_t(len(input_file_bytes))  # size_t inputBufferLength
        ct_output_buffer = ct.c_void_p()  # void **outputFileBuffer
        ct_output_buffer_length = ct.c_size_t()  # size_t *outputFileBufferLength
        ct_output_report_buffer = ct.c_void_p()  # void **outputAnalysisReportBuffer
        ct_output_report_buffer_length = ct.c_size_t()  # size_t *outputAnalysisReportBufferLength
        ct_content_management_policy = ct.c_char_p(content_management_policy.encode())  # const char *xmlConfigString
        gw_return_object = glasswall.GwReturnObj()

        with utils.CwdHandler(new_cwd=self.library_path):
            # API call
            gw_return_object.status = self.library.GwFileAnalysisArchive(
                ct.byref(ct_input_buffer),
                ct_input_buffer_length,
                ct.byref(ct_output_buffer),
                ct.byref(ct_output_buffer_length),
                ct.byref(ct_output_report_buffer),
                ct.byref(ct_output_report_buffer_length),
                ct_content_management_policy
            )

        input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray,)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file
        if gw_return_object.status not in successes.success_codes:
            log.error(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {gw_return_object.status}")
            if raise_unsupported:
                raise errors.error_codes.get(gw_return_object.status, errors.UnknownErrorCode)(gw_return_object.status)
        else:
            log.debug(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {gw_return_object.status}")

        gw_return_object.output_file = utils.buffer_to_bytes(
            ct_output_buffer,
            ct_output_buffer_length
        )
        gw_return_object.output_report = utils.buffer_to_bytes(
            ct_output_report_buffer,
            ct_output_report_buffer_length
        )

        # Write output file
        if gw_return_object.output_file:
            if isinstance(output_file, str):
                os.makedirs(os.path.dirname(output_file), exist_ok=True)
                with open(output_file, "wb") as f:
                    f.write(gw_return_object.output_file)

        # Write output report
        if gw_return_object.output_report:
            if isinstance(output_report, str):
                os.makedirs(os.path.dirname(output_report), exist_ok=True)
                with open(output_report, "wb") as f:
                    f.write(gw_return_object.output_report)

        self.release()

        return gw_return_object

    def analyse_directory(self, input_directory: str, output_directory: Optional[str] = None, output_report_directory: Optional[str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager] = None, raise_unsupported: bool = True):
        """ Calls analyse_archive on each file in input_directory using the given content management configuration. The resulting archives and analysis reports are written to output_directory maintaining the same directory structure as input_directory.

        Args:
            input_directory (str): The input directory containing archives to analyse.
            output_directory (Optional[str], optional): Default None. If str, the output directory where the archives containing analysis reports of each file will be written.
            output_report_directory (Optional[str], optional): Default None. If str, the output directory where xml reports for each archive will be written.
            content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager], optional): The content management policy to apply.
            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.

        Returns:
            analysed_archives_dict (dict): A dictionary of file paths relative to input_directory, and glasswall.GwReturnObj with attributes: "status" (int), "output_file" (bytes), "output_report" (bytes)
        """
        analysed_archives_dict = {}
        # Call analyse_archive on each file in input_directory
        for input_file in utils.list_file_paths(input_directory):
            relative_path = os.path.relpath(input_file, input_directory)
            # Construct paths for output file and output report
            output_file = None if output_directory is None else os.path.join(os.path.abspath(output_directory), relative_path)
            output_report = None if output_report_directory is None else os.path.join(os.path.abspath(output_report_directory), relative_path + ".xml")

            result = self.analyse_archive(
                input_file=input_file,
                output_file=output_file,
                output_report=output_report,
                content_management_policy=content_management_policy,
                raise_unsupported=raise_unsupported,
            )

            analysed_archives_dict[relative_path] = result

        return analysed_archives_dict

    def protect_archive(self, input_file: Union[str, bytes, bytearray, io.BytesIO], output_file: Optional[str] = None, output_report: Optional[str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager] = None, raise_unsupported: bool = True):
        """ Extracts the input_file archive and processes each file within the archive using the Glasswall engine. Repackages all files regenerated by the Glasswall engine into a new archive, optionally writing the new archive and report to the paths specified by output_file and output_report.

        Args:
            input_file (Union[str, bytes, bytearray, io.BytesIO]): The archive file path or bytes.
            output_file (Optional[str], optional): Default None. If str, write the archive to the output_file path.
            output_report (Optional[str], optional): Default None. If str, write the analysis report to the output_report path.
            content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager], optional): The content management policy to apply.
            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.

        Returns:
            gw_return_object (glasswall.GwReturnObj): An instance of class glasswall.GwReturnObj containing attributes: "status" (int), "output_file" (bytes), "output_report" (bytes)
        """
        # Validate arg types
        if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO)):
            raise TypeError(input_file)
        if not isinstance(output_file, (type(None), str)):
            raise TypeError(output_file)
        if not isinstance(output_report, (type(None), str)):
            raise TypeError(output_report)
        if not isinstance(content_management_policy, (type(None), str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)):
            raise TypeError(content_management_policy)

        # Convert string path arguments to absolute paths
        if isinstance(output_file, str):
            output_file = os.path.abspath(output_file)

        if isinstance(output_report, str):
            output_report = os.path.abspath(output_report)

        # Convert inputs to bytes
        if isinstance(input_file, str):
            if not os.path.isfile(input_file):
                raise FileNotFoundError(input_file)
            with open(input_file, "rb") as f:
                input_file_bytes = f.read()
        elif isinstance(input_file, (bytes, bytearray, io.BytesIO)):
            input_file_bytes = utils.as_bytes(input_file)

        if isinstance(content_management_policy, str) and os.path.isfile(content_management_policy):
            with open(content_management_policy, "rb") as f:
                content_management_policy = f.read()
        elif isinstance(content_management_policy, type(None)):
            # Load default
            content_management_policy = glasswall.content_management.policies.ArchiveManager(default="sanitise", default_archive_manager="process")
        content_management_policy = utils.validate_xml(content_management_policy)

        # API function declaration
        self.library.GwFileProtectAndReportArchive.argtypes = [
            ct.c_void_p,
            ct.c_size_t,
            ct.POINTER(ct.c_void_p),
            ct.POINTER(ct.c_size_t),
            ct.POINTER(ct.c_void_p),
            ct.POINTER(ct.c_size_t),
            ct.c_char_p
        ]

        # Variable initialisation
        input_buffer_bytearray = bytearray(input_file_bytes)

        ct_input_buffer = (ct.c_ubyte * len(input_buffer_bytearray)).from_buffer(input_buffer_bytearray)  # void *inputBuffer
        ct_input_buffer_length = ct.c_size_t(len(input_file_bytes))  # size_t inputBufferLength
        ct_output_buffer = ct.c_void_p()  # void **outputFileBuffer
        ct_output_buffer_length = ct.c_size_t()  # size_t *outputFileBufferLength
        ct_output_report_buffer = ct.c_void_p()  # void **outputAnalysisReportBuffer
        ct_output_report_buffer_length = ct.c_size_t()  # size_t *outputAnalysisReportBufferLength
        ct_content_management_policy = ct.c_char_p(content_management_policy.encode())  # const char *xmlConfigString
        gw_return_object = glasswall.GwReturnObj()

        with utils.CwdHandler(new_cwd=self.library_path):
            # API call
            gw_return_object.status = self.library.GwFileProtectAndReportArchive(
                ct.byref(ct_input_buffer),
                ct_input_buffer_length,
                ct.byref(ct_output_buffer),
                ct.byref(ct_output_buffer_length),
                ct.byref(ct_output_report_buffer),
                ct.byref(ct_output_report_buffer_length),
                ct_content_management_policy
            )

        input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray,)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file
        if gw_return_object.status not in successes.success_codes:
            log.error(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {gw_return_object.status}")
            if raise_unsupported:
                raise errors.error_codes.get(gw_return_object.status, errors.UnknownErrorCode)(gw_return_object.status)
        else:
            log.debug(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {gw_return_object.status}")

        gw_return_object.output_file = utils.buffer_to_bytes(
            ct_output_buffer,
            ct_output_buffer_length
        )
        gw_return_object.output_report = utils.buffer_to_bytes(
            ct_output_report_buffer,
            ct_output_report_buffer_length
        )

        # Write output file
        if gw_return_object.output_file:
            if isinstance(output_file, str):
                os.makedirs(os.path.dirname(output_file), exist_ok=True)
                with open(output_file, "wb") as f:
                    f.write(gw_return_object.output_file)

        # Write output report
        if gw_return_object.output_report:
            if isinstance(output_report, str):
                os.makedirs(os.path.dirname(output_report), exist_ok=True)
                with open(output_report, "wb") as f:
                    f.write(gw_return_object.output_report)

        self.release()

        return gw_return_object

    def protect_directory(self, input_directory: str, output_directory: Optional[str] = None, output_report_directory: Optional[str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager] = None, raise_unsupported: bool = True):
        """ Calls protect_archive on each file in input_directory using the given content management configuration. The resulting archives are written to output_directory maintaining the same directory structure as input_directory.

        Args:
            input_directory (str): The input directory containing archives to protect.
            output_directory (Optional[str], optional): Default None. If str, the output directory where the archives will be written.
            output_report_directory (Optional[str], optional): Default None. If str, the output directory where xml reports for each archive will be written.
            content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager], optional): The content management policy to apply.
            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.

        Returns:
            protected_archives_dict (dict): A dictionary of file paths relative to input_directory, and glasswall.GwReturnObj with attributes: "status" (int), "output_file" (bytes), "output_report" (bytes)
        """
        protected_archives_dict = {}
        # Call protect_archive on each file in input_directory to output_directory
        for input_file in utils.list_file_paths(input_directory):
            relative_path = os.path.relpath(input_file, input_directory)
            # Construct paths for output file and output report
            output_file = None if output_directory is None else os.path.join(os.path.abspath(output_directory), relative_path)
            output_report = None if output_report_directory is None else os.path.join(os.path.abspath(output_report_directory), relative_path + ".xml")

            result = self.protect_archive(
                input_file=input_file,
                output_file=output_file,
                output_report=output_report,
                content_management_policy=content_management_policy,
                raise_unsupported=raise_unsupported,
            )

            protected_archives_dict[relative_path] = result

        return protected_archives_dict

    def file_to_file_unpack(self, input_file: str, output_directory: str, file_type: Optional[str] = None, raise_unsupported: bool = True):
        # Validate arg types
        if not isinstance(input_file, str):
            raise TypeError(input_file)
        elif not os.path.isfile(input_file):
            raise FileNotFoundError(input_file)
        if not isinstance(output_directory, str):
            raise TypeError(output_directory)
        if not file_type:
            file_type = utils.get_file_type(input_file)

        # API function declaration
        self.library.GwFileToFileUnpack.argtypes = [
            ct.c_char_p,
            ct.c_char_p,
            ct.c_char_p,
        ]

        # Variable initialisation
        gw_return_object = glasswall.GwReturnObj()
        gw_return_object.ct_input_file = ct.c_char_p(input_file.encode())  # const char* inputFilePath
        gw_return_object.ct_output_directory = ct.c_char_p(output_directory.encode())  # const char* outputDirPath
        gw_return_object.ct_file_type = ct.c_char_p(file_type.encode())  # const char *fileType

        with utils.CwdHandler(new_cwd=self.library_path):
            try:
                # API call
                gw_return_object.status = self.library.GwFileToFileUnpack(
                    gw_return_object.ct_input_file,
                    gw_return_object.ct_output_directory,
                    gw_return_object.ct_file_type,
                )
            except OSError:
                # bz2, gz currently OSError on unpack, not supported unpacking
                # OSError: exception: access violation reading 0x0000000000000000
                if raise_unsupported:
                    raise

                return 0

        if gw_return_object.status not in successes.success_codes:
            log.error(f"\n\tinput_file: {input_file}\n\tstatus: {gw_return_object.status}")
            if raise_unsupported:
                raise errors.error_codes.get(gw_return_object.status, errors.UnknownErrorCode)(gw_return_object.status)
        else:
            log.debug(f"\n\tinput_file: {input_file}\n\tstatus: {gw_return_object.status}")

        self.release()

        return gw_return_object

    def file_to_file_pack(self, input_directory: str, output_directory: str, file_type: Optional[str] = None, raise_unsupported: bool = True):
        # Validate arg types
        if not isinstance(input_directory, str):
            raise TypeError(input_directory)
        elif not os.path.isdir(input_directory):
            raise NotADirectoryError(input_directory)
        if not isinstance(output_directory, str):
            raise TypeError(output_directory)
        if not file_type:
            file_type = utils.get_file_type(input_directory)

        # Ensure output_directory exists
        os.makedirs(output_directory, exist_ok=True)

        # API function declaration
        self.library.GwFileToFilePack.argtypes = [
            ct.c_char_p,
            ct.c_char_p,
            ct.c_char_p,
        ]

        # Variable initialisation
        gw_return_object = glasswall.GwReturnObj()
        gw_return_object.ct_input_directory = ct.c_char_p(input_directory.encode())  # const char* inputDirPath
        gw_return_object.ct_output_directory = ct.c_char_p(output_directory.encode())  # const char* outputDirPath
        gw_return_object.ct_file_type = ct.c_char_p(file_type.encode())  # const char *fileType

        with utils.CwdHandler(new_cwd=self.library_path):
            # API call
            gw_return_object.status = self.library.GwFileToFilePack(
                gw_return_object.ct_input_directory,
                gw_return_object.ct_output_directory,
                gw_return_object.ct_file_type,
            )

        if gw_return_object.status not in successes.success_codes:
            log.error(f"\n\tinput_directory: {input_directory}\n\tstatus: {gw_return_object.status}")
            if raise_unsupported:
                raise errors.error_codes.get(gw_return_object.status, errors.UnknownErrorCode)(gw_return_object.status)
        else:
            log.debug(f"\n\tinput_directory: {input_directory}\n\tstatus: {gw_return_object.status}")

        self.release()

        return gw_return_object

    def unpack(self, input_file: str, output_directory: str, recursive: bool = True, file_type: Optional[str] = None, include_file_type: Optional[bool] = False, raise_unsupported: bool = True, delete_origin: bool = False):
        """ Unpack an archive, maintaining directory structure. Supported archive formats are: "7z", "bz2", "gz", "rar", "tar", "zip".

        Args:
            input_file (str): The archive file path
            output_directory (str): The output directory where the archive will be unpacked to a new directory.
            recursive (bool, optional): Default True. Recursively unpack all nested archives.
            file_type (str, optional): Default None (use extension). The archive file type.
            include_file_type (bool, optional): Default False. Include the archive format in the directory name. Useful when there are multiple same-named archives of different formats.
            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
            delete_origin (bool, optional): Default False. Delete input_file after unpacking to output_directory.
        """
        # Convert to absolute paths
        input_file = os.path.abspath(input_file)
        output_directory = os.path.abspath(output_directory)

        if include_file_type:
            archive_name = os.path.basename(input_file)
        else:
            archive_name = os.path.splitext(os.path.basename(input_file))[0]
        archive_output_directory = os.path.join(output_directory, archive_name)

        # Unpack
        log.debug(f"Unpacking\n\tsrc: {input_file}\n\tdst: {archive_output_directory}")
        status = self.file_to_file_unpack(input_file=input_file, output_directory=archive_output_directory, file_type=file_type, raise_unsupported=raise_unsupported).status

        if status not in successes.success_codes:
            log.error(f"\n\tinput_file: {input_file}\n\tstatus: {status}")
            if raise_unsupported:
                raise errors.error_codes.get(status, errors.UnknownErrorCode)(status)
        else:
            log.debug(f"\n\tinput_file: {input_file}\n\tstatus: {status}")

        if delete_origin:
            os.remove(input_file)

        if recursive:
            # Unpack sub archives
            for subarchive in self.list_archive_paths(archive_output_directory):
                self.unpack(
                    input_file=subarchive,
                    output_directory=archive_output_directory,
                    recursive=recursive,
                    raise_unsupported=raise_unsupported,
                    delete_origin=True
                )

        return status

    def unpack_directory(self, input_directory: str, output_directory: str, recursive: bool = True, file_type: Optional[str] = None, include_file_type: Optional[bool] = False, raise_unsupported: bool = True, delete_origin: bool = False):
        """ Unpack a directory of archives, maintaining directory structure.

        Args:
            input_directory (str): The input directory containing archives to unpack.
            output_directory (str): The output directory where archives will be unpacked to a new directory.
            recursive (bool, optional): Default True. Recursively unpack all nested archives.
            file_type (str, optional): Default None (use extension). The archive file type of all archives within the directory.
            include_file_type (bool, optional): Default False. Include the archive format in the directory name. Useful when there are multiple same-named archives of different formats.
            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
            delete_origin (bool, optional): Default False. Delete input_file after unpacking to output_directory.
        """
        # Convert to absolute paths
        input_directory = os.path.abspath(input_directory)
        output_directory = os.path.abspath(output_directory)

        for archive_input_file in self.list_archive_paths(input_directory):
            relative_path = os.path.relpath(archive_input_file, input_directory)
            archive_output_file = os.path.dirname(os.path.join(output_directory, relative_path))
            self.unpack(
                input_file=archive_input_file,
                output_directory=archive_output_file,
                recursive=recursive,
                file_type=file_type,
                include_file_type=include_file_type,
                raise_unsupported=raise_unsupported,
                delete_origin=delete_origin
            )

    def pack_directory(self, input_directory: str, output_directory: str, file_type: str, raise_unsupported: bool = True, delete_origin: bool = False):
        """ Pack a directory. Supported archive formats are: "7z", "bz2", "gz", "rar", "tar", "zip".

        Args:
            input_file (str): The archive file path
            output_directory (str): The output directory where the archive will be unpacked to a new directory.
            recursive (bool, optional): Default True. Recursively unpack all nested archives.
            file_type (str): The archive file type.
            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
            delete_origin (bool, optional): Default False. Delete input_file after unpacking to output_directory.
        """
        # Convert to absolute paths
        input_directory = os.path.abspath(input_directory)
        output_directory = os.path.abspath(output_directory)

        # Pack
        log.debug(f"Packing\n\tsrc: {input_directory}\n\tdst: {output_directory}")
        status = self.file_to_file_pack(input_directory=input_directory, output_directory=output_directory, file_type=file_type, raise_unsupported=raise_unsupported).status

        if status not in successes.success_codes:
            log.error(f"\n\tinput_directory: {input_directory}\n\tstatus: {status}")
            if raise_unsupported:
                raise errors.error_codes.get(status, errors.UnknownErrorCode)(status)
        else:
            log.debug(f"\n\tinput_directory: {input_directory}\n\tstatus: {status}")

        if delete_origin:
            utils.delete_directory(input_directory)

    def export_archive(self, input_file: Union[str, bytes, bytearray, io.BytesIO], output_file: Optional[str] = None, output_report: Optional[str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager] = None, raise_unsupported: bool = True):
        """ Exports an archive using the Glasswall engine.

        Args:
            input_file (Union[str, bytes, bytearray, io.BytesIO]): The archive file path or bytes.
            output_file (Optional[str], optional): Default None. If str, write the archive to the output_file path.
            output_report (Optional[str], optional): Default None. If str, write the analysis report to the output_report path.
            content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager], optional): The content management policy to apply.
            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.

        Returns:
            gw_return_object (glasswall.GwReturnObj): An instance of class glasswall.GwReturnObj containing attributes: "status" (int), "output_file" (bytes), "output_report" (bytes)
        """
        # Validate arg types
        if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO)):
            raise TypeError(input_file)
        if not isinstance(output_file, (type(None), str)):
            raise TypeError(output_file)
        if not isinstance(output_report, (type(None), str)):
            raise TypeError(output_report)
        if not isinstance(content_management_policy, (type(None), str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)):
            raise TypeError(content_management_policy)

        # Convert string path arguments to absolute paths
        if isinstance(output_file, str):
            output_file = os.path.abspath(output_file)

        if isinstance(output_report, str):
            output_report = os.path.abspath(output_report)

        # Convert inputs to bytes
        if isinstance(input_file, str):
            if not os.path.isfile(input_file):
                raise FileNotFoundError(input_file)
            with open(input_file, "rb") as f:
                input_file_bytes = f.read()
        elif isinstance(input_file, (bytes, bytearray, io.BytesIO)):
            input_file_bytes = utils.as_bytes(input_file)

        if isinstance(content_management_policy, str) and os.path.isfile(content_management_policy):
            with open(content_management_policy, "rb") as f:
                content_management_policy = f.read()
        elif isinstance(content_management_policy, type(None)):
            # Load default
            content_management_policy = glasswall.content_management.policies.ArchiveManager(default="sanitise", default_archive_manager="process")
        content_management_policy = utils.validate_xml(content_management_policy)

        # API function declaration
        self.library.GwFileExportArchive.argtypes = [
            ct.c_void_p,
            ct.c_size_t,
            ct.POINTER(ct.c_void_p),
            ct.POINTER(ct.c_size_t),
            ct.POINTER(ct.c_void_p),
            ct.POINTER(ct.c_size_t),
            ct.c_char_p
        ]

        # Variable initialisation
        input_buffer_bytearray = bytearray(input_file_bytes)

        ct_input_buffer = (ct.c_ubyte * len(input_buffer_bytearray)).from_buffer(input_buffer_bytearray)  # void *inputBuffer
        ct_input_buffer_length = ct.c_size_t(len(input_file_bytes))  # size_t inputBufferLength
        ct_output_buffer = ct.c_void_p()  # void **outputFileBuffer
        ct_output_buffer_length = ct.c_size_t()  # size_t *outputFileBufferLength
        ct_output_report_buffer = ct.c_void_p()  # void **outputReportBuffer
        ct_output_report_buffer_length = ct.c_size_t()  # size_t *outputReportBufferLength
        ct_content_management_policy = ct.c_char_p(content_management_policy.encode())  # const char *xmlConfigString
        gw_return_object = glasswall.GwReturnObj()

        with utils.CwdHandler(new_cwd=self.library_path):
            # API call
            gw_return_object.status = self.library.GwFileExportArchive(
                ct.byref(ct_input_buffer),
                ct_input_buffer_length,
                ct.byref(ct_output_buffer),
                ct.byref(ct_output_buffer_length),
                ct.byref(ct_output_report_buffer),
                ct.byref(ct_output_report_buffer_length),
                ct_content_management_policy
            )

        input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray,)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file
        if gw_return_object.status not in successes.success_codes:
            log.error(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {gw_return_object.status}")
            if raise_unsupported:
                raise errors.error_codes.get(gw_return_object.status, errors.UnknownErrorCode)(gw_return_object.status)
        else:
            log.debug(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {gw_return_object.status}")

        gw_return_object.output_file = utils.buffer_to_bytes(
            ct_output_buffer,
            ct_output_buffer_length
        )
        gw_return_object.output_report = utils.buffer_to_bytes(
            ct_output_report_buffer,
            ct_output_report_buffer_length
        )

        # Write output file
        if gw_return_object.output_file:
            if isinstance(output_file, str):
                os.makedirs(os.path.dirname(output_file), exist_ok=True)
                with open(output_file, "wb") as f:
                    f.write(gw_return_object.output_file)

        # Write output report
        if gw_return_object.output_report:
            if isinstance(output_report, str):
                os.makedirs(os.path.dirname(output_report), exist_ok=True)
                with open(output_report, "wb") as f:
                    f.write(gw_return_object.output_report)

        self.release()

        return gw_return_object

    def export_directory(self, input_directory: str, output_directory: Optional[str], output_report_directory: Optional[str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager] = None, raise_unsupported: bool = True):
        """ Calls export_archive on each file in input_directory. The exported archives are written to output_directory maintaining the same directory structure as input_directory.

        Args:
            input_directory (str): The input directory containing archives to export.
            output_directory (Optional[str], optional): Default None. If str, the output directory where the archives will be written.
            output_report_directory (Optional[str], optional): Default None. If str, the output directory where xml reports for each archive will be written.
            content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager], optional): The content management policy to apply.
            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.

        Returns:
            exported_archives_dict (dict): A dictionary of file paths relative to input_directory, and glasswall.GwReturnObj with attributes: "status" (int), "output_file" (bytes), "output_report" (bytes)
        """
        exported_archives_dict = {}
        # Call export_archive on each file in input_directory to output_directory
        for input_file in utils.list_file_paths(input_directory):
            relative_path = os.path.relpath(input_file, input_directory)
            # Construct paths for output file and output report
            output_file = None if output_directory is None else os.path.join(os.path.abspath(output_directory), relative_path)
            output_report = None if output_report_directory is None else os.path.join(os.path.abspath(output_report_directory), relative_path + ".xml")

            result = self.export_archive(
                input_file=input_file,
                output_file=output_file,
                output_report=output_report,
                content_management_policy=content_management_policy,
                raise_unsupported=raise_unsupported,
            )

            exported_archives_dict[relative_path] = result

        return exported_archives_dict

    def import_archive(self, input_file: Union[str, bytes, bytearray, io.BytesIO], output_file: Optional[str] = None, output_report: Optional[str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager] = None, include_analysis_report: Optional[int] = 1, raise_unsupported: bool = True):
        """ Imports an archive using the Glasswall engine.

        Args:
            input_file (Union[str, bytes, bytearray, io.BytesIO]): The archive file path or bytes.
            output_file (Optional[str], optional): Default None. If str, write the archive to the output_file path.
            output_report (Optional[str], optional): Default None. If str, write the analysis report to the output_report path.
            content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager], optional): The content management policy to apply.
            include_analysis_report (Optional[int], optional): Default 1. If 1, write the analysis report into imported archive.
            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.

        Returns:
            gw_return_object (glasswall.GwReturnObj): An instance of class glasswall.GwReturnObj containing attributes: "status" (int), "output_file" (bytes), "output_report" (bytes)
        """
        # Validate arg types
        if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO)):
            raise TypeError(input_file)
        if not isinstance(output_file, (type(None), str)):
            raise TypeError(output_file)
        if not isinstance(output_report, (type(None), str)):
            raise TypeError(output_report)
        if not isinstance(content_management_policy, (type(None), str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)):
            raise TypeError(content_management_policy)

        # Convert string path arguments to absolute paths
        if isinstance(output_file, str):
            output_file = os.path.abspath(output_file)

        if isinstance(output_report, str):
            output_report = os.path.abspath(output_report)

        # Convert inputs to bytes
        if isinstance(input_file, str):
            if not os.path.isfile(input_file):
                raise FileNotFoundError(input_file)
            with open(input_file, "rb") as f:
                input_file_bytes = f.read()
        elif isinstance(input_file, (bytes, bytearray, io.BytesIO)):
            input_file_bytes = utils.as_bytes(input_file)

        if isinstance(content_management_policy, str) and os.path.isfile(content_management_policy):
            with open(content_management_policy, "rb") as f:
                content_management_policy = f.read()
        elif isinstance(content_management_policy, type(None)):
            # Load default
            content_management_policy = glasswall.content_management.policies.ArchiveManager(default="sanitise", default_archive_manager="process")
        content_management_policy = utils.validate_xml(content_management_policy)

        # API function declaration
        self.library.GwFileImportArchive.argtypes = [
            ct.c_void_p,
            ct.c_size_t,
            ct.POINTER(ct.c_void_p),
            ct.POINTER(ct.c_size_t),
            ct.POINTER(ct.c_void_p),
            ct.POINTER(ct.c_size_t),
            ct.c_char_p,
            ct.c_int
        ]

        # Variable initialisation
        input_buffer_bytearray = bytearray(input_file_bytes)

        ct_input_buffer = (ct.c_ubyte * len(input_buffer_bytearray)).from_buffer(input_buffer_bytearray)  # void *inputBuffer
        ct_input_buffer_length = ct.c_size_t(len(input_file_bytes))  # size_t inputBufferLength
        ct_output_buffer = ct.c_void_p()  # void **outputFileBuffer
        ct_output_buffer_length = ct.c_size_t()  # size_t *outputFileBufferLength
        ct_output_report_buffer = ct.c_void_p()  # void **outputReportBuffer
        ct_output_report_buffer_length = ct.c_size_t()  # size_t *outputReportBufferLength
        ct_content_management_policy = ct.c_char_p(content_management_policy.encode())  # const char *xmlConfigString
        ct_include_analysis_report = ct.c_int(include_analysis_report)  # int
        gw_return_object = glasswall.GwReturnObj()

        with utils.CwdHandler(new_cwd=self.library_path):
            # API call
            gw_return_object.status = self.library.GwFileImportArchive(
                ct.byref(ct_input_buffer),
                ct_input_buffer_length,
                ct.byref(ct_output_buffer),
                ct.byref(ct_output_buffer_length),
                ct.byref(ct_output_report_buffer),
                ct.byref(ct_output_report_buffer_length),
                ct_content_management_policy,
                ct_include_analysis_report
            )

        input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray,)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file
        if gw_return_object.status not in successes.success_codes:
            log.error(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {gw_return_object.status}")
            if raise_unsupported:
                raise errors.error_codes.get(gw_return_object.status, errors.UnknownErrorCode)(gw_return_object.status)
        else:
            log.debug(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {gw_return_object.status}")

        gw_return_object.output_file = utils.buffer_to_bytes(
            ct_output_buffer,
            ct_output_buffer_length
        )
        gw_return_object.output_report = utils.buffer_to_bytes(
            ct_output_report_buffer,
            ct_output_report_buffer_length
        )

        # Write output file
        if gw_return_object.output_file:
            if isinstance(output_file, str):
                os.makedirs(os.path.dirname(output_file), exist_ok=True)
                with open(output_file, "wb") as f:
                    f.write(gw_return_object.output_file)

        # Write output report
        if gw_return_object.output_report:
            if isinstance(output_report, str):
                os.makedirs(os.path.dirname(output_report), exist_ok=True)
                with open(output_report, "wb") as f:
                    f.write(gw_return_object.output_report)

        self.release()

        return gw_return_object

    def import_directory(self, input_directory: str, output_directory: Optional[str], output_report_directory: Optional[str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager] = None, raise_unsupported: bool = True):
        """ Calls import_archive on each file in input_directory. The imported archives are written to output_directory maintaining the same directory structure as input_directory.

        Args:
            input_directory (str): The input directory containing archives to import.
            output_directory (Optional[str], optional): Default None. If str, the output directory where the archives will be written.
            output_report_directory (Optional[str], optional): Default None. If str, the output directory where xml reports for each archive will be written.
            content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager], optional): The content management policy to apply.
            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.

        Returns:
            imported_archives_dict (dict): A dictionary of file paths relative to input_directory, and glasswall.GwReturnObj with attributes: "status" (int), "output_file" (bytes), "output_report" (bytes)
        """
        imported_archives_dict = {}
        # Call import_archive on each file in input_directory to output_directory
        for input_file in utils.list_file_paths(input_directory):
            relative_path = os.path.relpath(input_file, input_directory)
            # Construct paths for output file and output report
            output_file = None if output_directory is None else os.path.join(os.path.abspath(output_directory), relative_path)
            output_report = None if output_report_directory is None else os.path.join(os.path.abspath(output_report_directory), relative_path + ".xml")

            result = self.import_archive(
                input_file=input_file,
                output_file=output_file,
                output_report=output_report,
                content_management_policy=content_management_policy,
                raise_unsupported=raise_unsupported,
            )

            imported_archives_dict[relative_path] = result

        return imported_archives_dict

Ancestors

Methods

def analyse_archive(self, input_file: Union[str, bytes, bytearray, _io.BytesIO], output_file: Optional[str] = None, output_report: Optional[str] = None, content_management_policy: Union[ForwardRef(None), str, bytes, bytearray, _io.BytesIO, ArchiveManager] = None, raise_unsupported: bool = True)

Extracts the input_file archive and processes each file within the archive using the Glasswall engine. Repackages all files regenerated by the Glasswall engine into a new archive, optionally writing the new archive and report to the paths specified by output_file and output_report.

Args

input_file : Union[str, bytes, bytearray, io.BytesIO]
The archive file path or bytes.
output_file : Optional[str], optional
Default None. If str, write the archive to the output_file path.
output_report : Optional[str], optional
Default None. If str, write the analysis report to the output_report path.
content_management_policy : Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager], optional
The content management policy to apply.
raise_unsupported : bool, optional
Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.

Returns

gw_return_object (glasswall.GwReturnObj): An instance of class glasswall.GwReturnObj containing attributes: "status" (int), "output_file" (bytes), "output_report" (bytes)

Expand source code
def analyse_archive(self, input_file: Union[str, bytes, bytearray, io.BytesIO], output_file: Optional[str] = None, output_report: Optional[str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager] = None, raise_unsupported: bool = True):
    """ Extracts the input_file archive and processes each file within the archive using the Glasswall engine. Repackages all files regenerated by the Glasswall engine into a new archive, optionally writing the new archive and report to the paths specified by output_file and output_report.

    Args:
        input_file (Union[str, bytes, bytearray, io.BytesIO]): The archive file path or bytes.
        output_file (Optional[str], optional): Default None. If str, write the archive to the output_file path.
        output_report (Optional[str], optional): Default None. If str, write the analysis report to the output_report path.
        content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager], optional): The content management policy to apply.
        raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.

    Returns:
        gw_return_object (glasswall.GwReturnObj): An instance of class glasswall.GwReturnObj containing attributes: "status" (int), "output_file" (bytes), "output_report" (bytes)
    """
    # Validate arg types
    if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO)):
        raise TypeError(input_file)
    if not isinstance(output_file, (type(None), str)):
        raise TypeError(output_file)
    if not isinstance(output_report, (type(None), str)):
        raise TypeError(output_report)
    if not isinstance(content_management_policy, (type(None), str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)):
        raise TypeError(content_management_policy)

    # Convert string path arguments to absolute paths
    if isinstance(output_file, str):
        output_file = os.path.abspath(output_file)

    if isinstance(output_report, str):
        output_report = os.path.abspath(output_report)

    # Convert inputs to bytes
    if isinstance(input_file, str):
        if not os.path.isfile(input_file):
            raise FileNotFoundError(input_file)
        with open(input_file, "rb") as f:
            input_file_bytes = f.read()
    elif isinstance(input_file, (bytes, bytearray, io.BytesIO)):
        input_file_bytes = utils.as_bytes(input_file)

    if isinstance(content_management_policy, str) and os.path.isfile(content_management_policy):
        with open(content_management_policy, "rb") as f:
            content_management_policy = f.read()
    elif isinstance(content_management_policy, type(None)):
        # Load default
        content_management_policy = glasswall.content_management.policies.ArchiveManager(default="sanitise", default_archive_manager="process")
    content_management_policy = utils.validate_xml(content_management_policy)

    # API function declaration
    self.library.GwFileAnalysisArchive.argtypes = [
        ct.c_void_p,
        ct.c_size_t,
        ct.POINTER(ct.c_void_p),
        ct.POINTER(ct.c_size_t),
        ct.POINTER(ct.c_void_p),
        ct.POINTER(ct.c_size_t),
        ct.c_char_p
    ]

    # Variable initialisation
    input_buffer_bytearray = bytearray(input_file_bytes)

    ct_input_buffer = (ct.c_ubyte * len(input_buffer_bytearray)).from_buffer(input_buffer_bytearray)  # void *inputBuffer
    ct_input_buffer_length = ct.c_size_t(len(input_file_bytes))  # size_t inputBufferLength
    ct_output_buffer = ct.c_void_p()  # void **outputFileBuffer
    ct_output_buffer_length = ct.c_size_t()  # size_t *outputFileBufferLength
    ct_output_report_buffer = ct.c_void_p()  # void **outputAnalysisReportBuffer
    ct_output_report_buffer_length = ct.c_size_t()  # size_t *outputAnalysisReportBufferLength
    ct_content_management_policy = ct.c_char_p(content_management_policy.encode())  # const char *xmlConfigString
    gw_return_object = glasswall.GwReturnObj()

    with utils.CwdHandler(new_cwd=self.library_path):
        # API call
        gw_return_object.status = self.library.GwFileAnalysisArchive(
            ct.byref(ct_input_buffer),
            ct_input_buffer_length,
            ct.byref(ct_output_buffer),
            ct.byref(ct_output_buffer_length),
            ct.byref(ct_output_report_buffer),
            ct.byref(ct_output_report_buffer_length),
            ct_content_management_policy
        )

    input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray,)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file
    if gw_return_object.status not in successes.success_codes:
        log.error(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {gw_return_object.status}")
        if raise_unsupported:
            raise errors.error_codes.get(gw_return_object.status, errors.UnknownErrorCode)(gw_return_object.status)
    else:
        log.debug(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {gw_return_object.status}")

    gw_return_object.output_file = utils.buffer_to_bytes(
        ct_output_buffer,
        ct_output_buffer_length
    )
    gw_return_object.output_report = utils.buffer_to_bytes(
        ct_output_report_buffer,
        ct_output_report_buffer_length
    )

    # Write output file
    if gw_return_object.output_file:
        if isinstance(output_file, str):
            os.makedirs(os.path.dirname(output_file), exist_ok=True)
            with open(output_file, "wb") as f:
                f.write(gw_return_object.output_file)

    # Write output report
    if gw_return_object.output_report:
        if isinstance(output_report, str):
            os.makedirs(os.path.dirname(output_report), exist_ok=True)
            with open(output_report, "wb") as f:
                f.write(gw_return_object.output_report)

    self.release()

    return gw_return_object
def analyse_directory(self, input_directory: str, output_directory: Optional[str] = None, output_report_directory: Optional[str] = None, content_management_policy: Union[ForwardRef(None), str, bytes, bytearray, _io.BytesIO, ArchiveManager] = None, raise_unsupported: bool = True)

Calls analyse_archive on each file in input_directory using the given content management configuration. The resulting archives and analysis reports are written to output_directory maintaining the same directory structure as input_directory.

Args

input_directory : str
The input directory containing archives to analyse.
output_directory : Optional[str], optional
Default None. If str, the output directory where the archives containing analysis reports of each file will be written.
output_report_directory : Optional[str], optional
Default None. If str, the output directory where xml reports for each archive will be written.
content_management_policy : Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager], optional
The content management policy to apply.
raise_unsupported : bool, optional
Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.

Returns

analysed_archives_dict (dict): A dictionary of file paths relative to input_directory, and glasswall.GwReturnObj with attributes: "status" (int), "output_file" (bytes), "output_report" (bytes)

Expand source code
def analyse_directory(self, input_directory: str, output_directory: Optional[str] = None, output_report_directory: Optional[str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager] = None, raise_unsupported: bool = True):
    """ Calls analyse_archive on each file in input_directory using the given content management configuration. The resulting archives and analysis reports are written to output_directory maintaining the same directory structure as input_directory.

    Args:
        input_directory (str): The input directory containing archives to analyse.
        output_directory (Optional[str], optional): Default None. If str, the output directory where the archives containing analysis reports of each file will be written.
        output_report_directory (Optional[str], optional): Default None. If str, the output directory where xml reports for each archive will be written.
        content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager], optional): The content management policy to apply.
        raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.

    Returns:
        analysed_archives_dict (dict): A dictionary of file paths relative to input_directory, and glasswall.GwReturnObj with attributes: "status" (int), "output_file" (bytes), "output_report" (bytes)
    """
    analysed_archives_dict = {}
    # Call analyse_archive on each file in input_directory
    for input_file in utils.list_file_paths(input_directory):
        relative_path = os.path.relpath(input_file, input_directory)
        # Construct paths for output file and output report
        output_file = None if output_directory is None else os.path.join(os.path.abspath(output_directory), relative_path)
        output_report = None if output_report_directory is None else os.path.join(os.path.abspath(output_report_directory), relative_path + ".xml")

        result = self.analyse_archive(
            input_file=input_file,
            output_file=output_file,
            output_report=output_report,
            content_management_policy=content_management_policy,
            raise_unsupported=raise_unsupported,
        )

        analysed_archives_dict[relative_path] = result

    return analysed_archives_dict
def export_archive(self, input_file: Union[str, bytes, bytearray, _io.BytesIO], output_file: Optional[str] = None, output_report: Optional[str] = None, content_management_policy: Union[ForwardRef(None), str, bytes, bytearray, _io.BytesIO, ArchiveManager] = None, raise_unsupported: bool = True)

Exports an archive using the Glasswall engine.

Args

input_file : Union[str, bytes, bytearray, io.BytesIO]
The archive file path or bytes.
output_file : Optional[str], optional
Default None. If str, write the archive to the output_file path.
output_report : Optional[str], optional
Default None. If str, write the analysis report to the output_report path.
content_management_policy : Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager], optional
The content management policy to apply.
raise_unsupported : bool, optional
Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.

Returns

gw_return_object (glasswall.GwReturnObj): An instance of class glasswall.GwReturnObj containing attributes: "status" (int), "output_file" (bytes), "output_report" (bytes)

Expand source code
def export_archive(self, input_file: Union[str, bytes, bytearray, io.BytesIO], output_file: Optional[str] = None, output_report: Optional[str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager] = None, raise_unsupported: bool = True):
    """ Exports an archive using the Glasswall engine.

    Args:
        input_file (Union[str, bytes, bytearray, io.BytesIO]): The archive file path or bytes.
        output_file (Optional[str], optional): Default None. If str, write the archive to the output_file path.
        output_report (Optional[str], optional): Default None. If str, write the analysis report to the output_report path.
        content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager], optional): The content management policy to apply.
        raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.

    Returns:
        gw_return_object (glasswall.GwReturnObj): An instance of class glasswall.GwReturnObj containing attributes: "status" (int), "output_file" (bytes), "output_report" (bytes)
    """
    # Validate arg types
    if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO)):
        raise TypeError(input_file)
    if not isinstance(output_file, (type(None), str)):
        raise TypeError(output_file)
    if not isinstance(output_report, (type(None), str)):
        raise TypeError(output_report)
    if not isinstance(content_management_policy, (type(None), str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)):
        raise TypeError(content_management_policy)

    # Convert string path arguments to absolute paths
    if isinstance(output_file, str):
        output_file = os.path.abspath(output_file)

    if isinstance(output_report, str):
        output_report = os.path.abspath(output_report)

    # Convert inputs to bytes
    if isinstance(input_file, str):
        if not os.path.isfile(input_file):
            raise FileNotFoundError(input_file)
        with open(input_file, "rb") as f:
            input_file_bytes = f.read()
    elif isinstance(input_file, (bytes, bytearray, io.BytesIO)):
        input_file_bytes = utils.as_bytes(input_file)

    if isinstance(content_management_policy, str) and os.path.isfile(content_management_policy):
        with open(content_management_policy, "rb") as f:
            content_management_policy = f.read()
    elif isinstance(content_management_policy, type(None)):
        # Load default
        content_management_policy = glasswall.content_management.policies.ArchiveManager(default="sanitise", default_archive_manager="process")
    content_management_policy = utils.validate_xml(content_management_policy)

    # API function declaration
    self.library.GwFileExportArchive.argtypes = [
        ct.c_void_p,
        ct.c_size_t,
        ct.POINTER(ct.c_void_p),
        ct.POINTER(ct.c_size_t),
        ct.POINTER(ct.c_void_p),
        ct.POINTER(ct.c_size_t),
        ct.c_char_p
    ]

    # Variable initialisation
    input_buffer_bytearray = bytearray(input_file_bytes)

    ct_input_buffer = (ct.c_ubyte * len(input_buffer_bytearray)).from_buffer(input_buffer_bytearray)  # void *inputBuffer
    ct_input_buffer_length = ct.c_size_t(len(input_file_bytes))  # size_t inputBufferLength
    ct_output_buffer = ct.c_void_p()  # void **outputFileBuffer
    ct_output_buffer_length = ct.c_size_t()  # size_t *outputFileBufferLength
    ct_output_report_buffer = ct.c_void_p()  # void **outputReportBuffer
    ct_output_report_buffer_length = ct.c_size_t()  # size_t *outputReportBufferLength
    ct_content_management_policy = ct.c_char_p(content_management_policy.encode())  # const char *xmlConfigString
    gw_return_object = glasswall.GwReturnObj()

    with utils.CwdHandler(new_cwd=self.library_path):
        # API call
        gw_return_object.status = self.library.GwFileExportArchive(
            ct.byref(ct_input_buffer),
            ct_input_buffer_length,
            ct.byref(ct_output_buffer),
            ct.byref(ct_output_buffer_length),
            ct.byref(ct_output_report_buffer),
            ct.byref(ct_output_report_buffer_length),
            ct_content_management_policy
        )

    input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray,)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file
    if gw_return_object.status not in successes.success_codes:
        log.error(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {gw_return_object.status}")
        if raise_unsupported:
            raise errors.error_codes.get(gw_return_object.status, errors.UnknownErrorCode)(gw_return_object.status)
    else:
        log.debug(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {gw_return_object.status}")

    gw_return_object.output_file = utils.buffer_to_bytes(
        ct_output_buffer,
        ct_output_buffer_length
    )
    gw_return_object.output_report = utils.buffer_to_bytes(
        ct_output_report_buffer,
        ct_output_report_buffer_length
    )

    # Write output file
    if gw_return_object.output_file:
        if isinstance(output_file, str):
            os.makedirs(os.path.dirname(output_file), exist_ok=True)
            with open(output_file, "wb") as f:
                f.write(gw_return_object.output_file)

    # Write output report
    if gw_return_object.output_report:
        if isinstance(output_report, str):
            os.makedirs(os.path.dirname(output_report), exist_ok=True)
            with open(output_report, "wb") as f:
                f.write(gw_return_object.output_report)

    self.release()

    return gw_return_object
def export_directory(self, input_directory: str, output_directory: Optional[str], output_report_directory: Optional[str] = None, content_management_policy: Union[ForwardRef(None), str, bytes, bytearray, _io.BytesIO, ArchiveManager] = None, raise_unsupported: bool = True)

Calls export_archive on each file in input_directory. The exported archives are written to output_directory maintaining the same directory structure as input_directory.

Args

input_directory : str
The input directory containing archives to export.
output_directory : Optional[str], optional
Default None. If str, the output directory where the archives will be written.
output_report_directory : Optional[str], optional
Default None. If str, the output directory where xml reports for each archive will be written.
content_management_policy : Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager], optional
The content management policy to apply.
raise_unsupported : bool, optional
Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.

Returns

exported_archives_dict (dict): A dictionary of file paths relative to input_directory, and glasswall.GwReturnObj with attributes: "status" (int), "output_file" (bytes), "output_report" (bytes)

Expand source code
def export_directory(self, input_directory: str, output_directory: Optional[str], output_report_directory: Optional[str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager] = None, raise_unsupported: bool = True):
    """ Calls export_archive on each file in input_directory. The exported archives are written to output_directory maintaining the same directory structure as input_directory.

    Args:
        input_directory (str): The input directory containing archives to export.
        output_directory (Optional[str], optional): Default None. If str, the output directory where the archives will be written.
        output_report_directory (Optional[str], optional): Default None. If str, the output directory where xml reports for each archive will be written.
        content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager], optional): The content management policy to apply.
        raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.

    Returns:
        exported_archives_dict (dict): A dictionary of file paths relative to input_directory, and glasswall.GwReturnObj with attributes: "status" (int), "output_file" (bytes), "output_report" (bytes)
    """
    exported_archives_dict = {}
    # Call export_archive on each file in input_directory to output_directory
    for input_file in utils.list_file_paths(input_directory):
        relative_path = os.path.relpath(input_file, input_directory)
        # Construct paths for output file and output report
        output_file = None if output_directory is None else os.path.join(os.path.abspath(output_directory), relative_path)
        output_report = None if output_report_directory is None else os.path.join(os.path.abspath(output_report_directory), relative_path + ".xml")

        result = self.export_archive(
            input_file=input_file,
            output_file=output_file,
            output_report=output_report,
            content_management_policy=content_management_policy,
            raise_unsupported=raise_unsupported,
        )

        exported_archives_dict[relative_path] = result

    return exported_archives_dict
def file_to_file_pack(self, input_directory: str, output_directory: str, file_type: Optional[str] = None, raise_unsupported: bool = True)
Expand source code
def file_to_file_pack(self, input_directory: str, output_directory: str, file_type: Optional[str] = None, raise_unsupported: bool = True):
    # Validate arg types
    if not isinstance(input_directory, str):
        raise TypeError(input_directory)
    elif not os.path.isdir(input_directory):
        raise NotADirectoryError(input_directory)
    if not isinstance(output_directory, str):
        raise TypeError(output_directory)
    if not file_type:
        file_type = utils.get_file_type(input_directory)

    # Ensure output_directory exists
    os.makedirs(output_directory, exist_ok=True)

    # API function declaration
    self.library.GwFileToFilePack.argtypes = [
        ct.c_char_p,
        ct.c_char_p,
        ct.c_char_p,
    ]

    # Variable initialisation
    gw_return_object = glasswall.GwReturnObj()
    gw_return_object.ct_input_directory = ct.c_char_p(input_directory.encode())  # const char* inputDirPath
    gw_return_object.ct_output_directory = ct.c_char_p(output_directory.encode())  # const char* outputDirPath
    gw_return_object.ct_file_type = ct.c_char_p(file_type.encode())  # const char *fileType

    with utils.CwdHandler(new_cwd=self.library_path):
        # API call
        gw_return_object.status = self.library.GwFileToFilePack(
            gw_return_object.ct_input_directory,
            gw_return_object.ct_output_directory,
            gw_return_object.ct_file_type,
        )

    if gw_return_object.status not in successes.success_codes:
        log.error(f"\n\tinput_directory: {input_directory}\n\tstatus: {gw_return_object.status}")
        if raise_unsupported:
            raise errors.error_codes.get(gw_return_object.status, errors.UnknownErrorCode)(gw_return_object.status)
    else:
        log.debug(f"\n\tinput_directory: {input_directory}\n\tstatus: {gw_return_object.status}")

    self.release()

    return gw_return_object
def file_to_file_unpack(self, input_file: str, output_directory: str, file_type: Optional[str] = None, raise_unsupported: bool = True)
Expand source code
def file_to_file_unpack(self, input_file: str, output_directory: str, file_type: Optional[str] = None, raise_unsupported: bool = True):
    # Validate arg types
    if not isinstance(input_file, str):
        raise TypeError(input_file)
    elif not os.path.isfile(input_file):
        raise FileNotFoundError(input_file)
    if not isinstance(output_directory, str):
        raise TypeError(output_directory)
    if not file_type:
        file_type = utils.get_file_type(input_file)

    # API function declaration
    self.library.GwFileToFileUnpack.argtypes = [
        ct.c_char_p,
        ct.c_char_p,
        ct.c_char_p,
    ]

    # Variable initialisation
    gw_return_object = glasswall.GwReturnObj()
    gw_return_object.ct_input_file = ct.c_char_p(input_file.encode())  # const char* inputFilePath
    gw_return_object.ct_output_directory = ct.c_char_p(output_directory.encode())  # const char* outputDirPath
    gw_return_object.ct_file_type = ct.c_char_p(file_type.encode())  # const char *fileType

    with utils.CwdHandler(new_cwd=self.library_path):
        try:
            # API call
            gw_return_object.status = self.library.GwFileToFileUnpack(
                gw_return_object.ct_input_file,
                gw_return_object.ct_output_directory,
                gw_return_object.ct_file_type,
            )
        except OSError:
            # bz2, gz currently OSError on unpack, not supported unpacking
            # OSError: exception: access violation reading 0x0000000000000000
            if raise_unsupported:
                raise

            return 0

    if gw_return_object.status not in successes.success_codes:
        log.error(f"\n\tinput_file: {input_file}\n\tstatus: {gw_return_object.status}")
        if raise_unsupported:
            raise errors.error_codes.get(gw_return_object.status, errors.UnknownErrorCode)(gw_return_object.status)
    else:
        log.debug(f"\n\tinput_file: {input_file}\n\tstatus: {gw_return_object.status}")

    self.release()

    return gw_return_object
def import_archive(self, input_file: Union[str, bytes, bytearray, _io.BytesIO], output_file: Optional[str] = None, output_report: Optional[str] = None, content_management_policy: Union[ForwardRef(None), str, bytes, bytearray, _io.BytesIO, ArchiveManager] = None, include_analysis_report: Optional[int] = 1, raise_unsupported: bool = True)

Imports an archive using the Glasswall engine.

Args

input_file : Union[str, bytes, bytearray, io.BytesIO]
The archive file path or bytes.
output_file : Optional[str], optional
Default None. If str, write the archive to the output_file path.
output_report : Optional[str], optional
Default None. If str, write the analysis report to the output_report path.
content_management_policy : Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager], optional
The content management policy to apply.
include_analysis_report : Optional[int], optional
Default 1. If 1, write the analysis report into imported archive.
raise_unsupported : bool, optional
Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.

Returns

gw_return_object (glasswall.GwReturnObj): An instance of class glasswall.GwReturnObj containing attributes: "status" (int), "output_file" (bytes), "output_report" (bytes)

Expand source code
def import_archive(self, input_file: Union[str, bytes, bytearray, io.BytesIO], output_file: Optional[str] = None, output_report: Optional[str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager] = None, include_analysis_report: Optional[int] = 1, raise_unsupported: bool = True):
    """ Imports an archive using the Glasswall engine.

    Args:
        input_file (Union[str, bytes, bytearray, io.BytesIO]): The archive file path or bytes.
        output_file (Optional[str], optional): Default None. If str, write the archive to the output_file path.
        output_report (Optional[str], optional): Default None. If str, write the analysis report to the output_report path.
        content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager], optional): The content management policy to apply.
        include_analysis_report (Optional[int], optional): Default 1. If 1, write the analysis report into imported archive.
        raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.

    Returns:
        gw_return_object (glasswall.GwReturnObj): An instance of class glasswall.GwReturnObj containing attributes: "status" (int), "output_file" (bytes), "output_report" (bytes)
    """
    # Validate arg types
    if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO)):
        raise TypeError(input_file)
    if not isinstance(output_file, (type(None), str)):
        raise TypeError(output_file)
    if not isinstance(output_report, (type(None), str)):
        raise TypeError(output_report)
    if not isinstance(content_management_policy, (type(None), str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)):
        raise TypeError(content_management_policy)

    # Convert string path arguments to absolute paths
    if isinstance(output_file, str):
        output_file = os.path.abspath(output_file)

    if isinstance(output_report, str):
        output_report = os.path.abspath(output_report)

    # Convert inputs to bytes
    if isinstance(input_file, str):
        if not os.path.isfile(input_file):
            raise FileNotFoundError(input_file)
        with open(input_file, "rb") as f:
            input_file_bytes = f.read()
    elif isinstance(input_file, (bytes, bytearray, io.BytesIO)):
        input_file_bytes = utils.as_bytes(input_file)

    if isinstance(content_management_policy, str) and os.path.isfile(content_management_policy):
        with open(content_management_policy, "rb") as f:
            content_management_policy = f.read()
    elif isinstance(content_management_policy, type(None)):
        # Load default
        content_management_policy = glasswall.content_management.policies.ArchiveManager(default="sanitise", default_archive_manager="process")
    content_management_policy = utils.validate_xml(content_management_policy)

    # API function declaration
    self.library.GwFileImportArchive.argtypes = [
        ct.c_void_p,
        ct.c_size_t,
        ct.POINTER(ct.c_void_p),
        ct.POINTER(ct.c_size_t),
        ct.POINTER(ct.c_void_p),
        ct.POINTER(ct.c_size_t),
        ct.c_char_p,
        ct.c_int
    ]

    # Variable initialisation
    input_buffer_bytearray = bytearray(input_file_bytes)

    ct_input_buffer = (ct.c_ubyte * len(input_buffer_bytearray)).from_buffer(input_buffer_bytearray)  # void *inputBuffer
    ct_input_buffer_length = ct.c_size_t(len(input_file_bytes))  # size_t inputBufferLength
    ct_output_buffer = ct.c_void_p()  # void **outputFileBuffer
    ct_output_buffer_length = ct.c_size_t()  # size_t *outputFileBufferLength
    ct_output_report_buffer = ct.c_void_p()  # void **outputReportBuffer
    ct_output_report_buffer_length = ct.c_size_t()  # size_t *outputReportBufferLength
    ct_content_management_policy = ct.c_char_p(content_management_policy.encode())  # const char *xmlConfigString
    ct_include_analysis_report = ct.c_int(include_analysis_report)  # int
    gw_return_object = glasswall.GwReturnObj()

    with utils.CwdHandler(new_cwd=self.library_path):
        # API call
        gw_return_object.status = self.library.GwFileImportArchive(
            ct.byref(ct_input_buffer),
            ct_input_buffer_length,
            ct.byref(ct_output_buffer),
            ct.byref(ct_output_buffer_length),
            ct.byref(ct_output_report_buffer),
            ct.byref(ct_output_report_buffer_length),
            ct_content_management_policy,
            ct_include_analysis_report
        )

    input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray,)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file
    if gw_return_object.status not in successes.success_codes:
        log.error(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {gw_return_object.status}")
        if raise_unsupported:
            raise errors.error_codes.get(gw_return_object.status, errors.UnknownErrorCode)(gw_return_object.status)
    else:
        log.debug(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {gw_return_object.status}")

    gw_return_object.output_file = utils.buffer_to_bytes(
        ct_output_buffer,
        ct_output_buffer_length
    )
    gw_return_object.output_report = utils.buffer_to_bytes(
        ct_output_report_buffer,
        ct_output_report_buffer_length
    )

    # Write output file
    if gw_return_object.output_file:
        if isinstance(output_file, str):
            os.makedirs(os.path.dirname(output_file), exist_ok=True)
            with open(output_file, "wb") as f:
                f.write(gw_return_object.output_file)

    # Write output report
    if gw_return_object.output_report:
        if isinstance(output_report, str):
            os.makedirs(os.path.dirname(output_report), exist_ok=True)
            with open(output_report, "wb") as f:
                f.write(gw_return_object.output_report)

    self.release()

    return gw_return_object
def import_directory(self, input_directory: str, output_directory: Optional[str], output_report_directory: Optional[str] = None, content_management_policy: Union[ForwardRef(None), str, bytes, bytearray, _io.BytesIO, ArchiveManager] = None, raise_unsupported: bool = True)

Calls import_archive on each file in input_directory. The imported archives are written to output_directory maintaining the same directory structure as input_directory.

Args

input_directory : str
The input directory containing archives to import.
output_directory : Optional[str], optional
Default None. If str, the output directory where the archives will be written.
output_report_directory : Optional[str], optional
Default None. If str, the output directory where xml reports for each archive will be written.
content_management_policy : Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager], optional
The content management policy to apply.
raise_unsupported : bool, optional
Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.

Returns

imported_archives_dict (dict): A dictionary of file paths relative to input_directory, and glasswall.GwReturnObj with attributes: "status" (int), "output_file" (bytes), "output_report" (bytes)

Expand source code
def import_directory(self, input_directory: str, output_directory: Optional[str], output_report_directory: Optional[str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager] = None, raise_unsupported: bool = True):
    """ Calls import_archive on each file in input_directory. The imported archives are written to output_directory maintaining the same directory structure as input_directory.

    Args:
        input_directory (str): The input directory containing archives to import.
        output_directory (Optional[str], optional): Default None. If str, the output directory where the archives will be written.
        output_report_directory (Optional[str], optional): Default None. If str, the output directory where xml reports for each archive will be written.
        content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager], optional): The content management policy to apply.
        raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.

    Returns:
        imported_archives_dict (dict): A dictionary of file paths relative to input_directory, and glasswall.GwReturnObj with attributes: "status" (int), "output_file" (bytes), "output_report" (bytes)
    """
    imported_archives_dict = {}
    # Call import_archive on each file in input_directory to output_directory
    for input_file in utils.list_file_paths(input_directory):
        relative_path = os.path.relpath(input_file, input_directory)
        # Construct paths for output file and output report
        output_file = None if output_directory is None else os.path.join(os.path.abspath(output_directory), relative_path)
        output_report = None if output_report_directory is None else os.path.join(os.path.abspath(output_report_directory), relative_path + ".xml")

        result = self.import_archive(
            input_file=input_file,
            output_file=output_file,
            output_report=output_report,
            content_management_policy=content_management_policy,
            raise_unsupported=raise_unsupported,
        )

        imported_archives_dict[relative_path] = result

    return imported_archives_dict
def is_supported_archive(self, archive_type: str)

Returns True if the archive type (e.g. 7z) is supported.

Expand source code
@functools.lru_cache()
def is_supported_archive(self, archive_type: str):
    """ Returns True if the archive type (e.g. `7z`) is supported. """

    # API function declaration
    self.library.GwIsSupportedArchiveType.argtypes = [
        ct.c_char_p
    ]
    self.library.GwIsSupportedArchiveType.restype = ct.c_bool

    ct_archive_type = ct.c_char_p(archive_type.encode())  # const char* type

    result = self.library.GwIsSupportedArchiveType(ct_archive_type)

    return result
def list_archive_paths(self, directory: str, recursive: bool = True, absolute: bool = True, followlinks: bool = True)

Returns a list of file paths of supported archives in a directory and all of its subdirectories.

Expand source code
def list_archive_paths(self, directory: str, recursive: bool = True, absolute: bool = True, followlinks: bool = True):
    """ Returns a list of file paths of supported archives in a directory and all of its subdirectories. """
    return [
        file_path
        for file_path in glasswall.utils.list_file_paths(
            directory=directory,
            recursive=recursive,
            absolute=absolute,
            followlinks=followlinks,
        )
        if self.is_supported_archive(utils.get_file_type(file_path))
    ]
def pack_directory(self, input_directory: str, output_directory: str, file_type: str, raise_unsupported: bool = True, delete_origin: bool = False)

Pack a directory. Supported archive formats are: "7z", "bz2", "gz", "rar", "tar", "zip".

Args

input_file : str
The archive file path
output_directory : str
The output directory where the archive will be unpacked to a new directory.
recursive : bool, optional
Default True. Recursively unpack all nested archives.
file_type : str
The archive file type.
raise_unsupported : bool, optional
Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
delete_origin : bool, optional
Default False. Delete input_file after unpacking to output_directory.
Expand source code
def pack_directory(self, input_directory: str, output_directory: str, file_type: str, raise_unsupported: bool = True, delete_origin: bool = False):
    """ Pack a directory. Supported archive formats are: "7z", "bz2", "gz", "rar", "tar", "zip".

    Args:
        input_file (str): The archive file path
        output_directory (str): The output directory where the archive will be unpacked to a new directory.
        recursive (bool, optional): Default True. Recursively unpack all nested archives.
        file_type (str): The archive file type.
        raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
        delete_origin (bool, optional): Default False. Delete input_file after unpacking to output_directory.
    """
    # Convert to absolute paths
    input_directory = os.path.abspath(input_directory)
    output_directory = os.path.abspath(output_directory)

    # Pack
    log.debug(f"Packing\n\tsrc: {input_directory}\n\tdst: {output_directory}")
    status = self.file_to_file_pack(input_directory=input_directory, output_directory=output_directory, file_type=file_type, raise_unsupported=raise_unsupported).status

    if status not in successes.success_codes:
        log.error(f"\n\tinput_directory: {input_directory}\n\tstatus: {status}")
        if raise_unsupported:
            raise errors.error_codes.get(status, errors.UnknownErrorCode)(status)
    else:
        log.debug(f"\n\tinput_directory: {input_directory}\n\tstatus: {status}")

    if delete_origin:
        utils.delete_directory(input_directory)
def protect_archive(self, input_file: Union[str, bytes, bytearray, _io.BytesIO], output_file: Optional[str] = None, output_report: Optional[str] = None, content_management_policy: Union[ForwardRef(None), str, bytes, bytearray, _io.BytesIO, ArchiveManager] = None, raise_unsupported: bool = True)

Extracts the input_file archive and processes each file within the archive using the Glasswall engine. Repackages all files regenerated by the Glasswall engine into a new archive, optionally writing the new archive and report to the paths specified by output_file and output_report.

Args

input_file : Union[str, bytes, bytearray, io.BytesIO]
The archive file path or bytes.
output_file : Optional[str], optional
Default None. If str, write the archive to the output_file path.
output_report : Optional[str], optional
Default None. If str, write the analysis report to the output_report path.
content_management_policy : Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager], optional
The content management policy to apply.
raise_unsupported : bool, optional
Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.

Returns

gw_return_object (glasswall.GwReturnObj): An instance of class glasswall.GwReturnObj containing attributes: "status" (int), "output_file" (bytes), "output_report" (bytes)

Expand source code
def protect_archive(self, input_file: Union[str, bytes, bytearray, io.BytesIO], output_file: Optional[str] = None, output_report: Optional[str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager] = None, raise_unsupported: bool = True):
    """ Extracts the input_file archive and processes each file within the archive using the Glasswall engine. Repackages all files regenerated by the Glasswall engine into a new archive, optionally writing the new archive and report to the paths specified by output_file and output_report.

    Args:
        input_file (Union[str, bytes, bytearray, io.BytesIO]): The archive file path or bytes.
        output_file (Optional[str], optional): Default None. If str, write the archive to the output_file path.
        output_report (Optional[str], optional): Default None. If str, write the analysis report to the output_report path.
        content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager], optional): The content management policy to apply.
        raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.

    Returns:
        gw_return_object (glasswall.GwReturnObj): An instance of class glasswall.GwReturnObj containing attributes: "status" (int), "output_file" (bytes), "output_report" (bytes)
    """
    # Validate arg types
    if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO)):
        raise TypeError(input_file)
    if not isinstance(output_file, (type(None), str)):
        raise TypeError(output_file)
    if not isinstance(output_report, (type(None), str)):
        raise TypeError(output_report)
    if not isinstance(content_management_policy, (type(None), str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)):
        raise TypeError(content_management_policy)

    # Convert string path arguments to absolute paths
    if isinstance(output_file, str):
        output_file = os.path.abspath(output_file)

    if isinstance(output_report, str):
        output_report = os.path.abspath(output_report)

    # Convert inputs to bytes
    if isinstance(input_file, str):
        if not os.path.isfile(input_file):
            raise FileNotFoundError(input_file)
        with open(input_file, "rb") as f:
            input_file_bytes = f.read()
    elif isinstance(input_file, (bytes, bytearray, io.BytesIO)):
        input_file_bytes = utils.as_bytes(input_file)

    if isinstance(content_management_policy, str) and os.path.isfile(content_management_policy):
        with open(content_management_policy, "rb") as f:
            content_management_policy = f.read()
    elif isinstance(content_management_policy, type(None)):
        # Load default
        content_management_policy = glasswall.content_management.policies.ArchiveManager(default="sanitise", default_archive_manager="process")
    content_management_policy = utils.validate_xml(content_management_policy)

    # API function declaration
    self.library.GwFileProtectAndReportArchive.argtypes = [
        ct.c_void_p,
        ct.c_size_t,
        ct.POINTER(ct.c_void_p),
        ct.POINTER(ct.c_size_t),
        ct.POINTER(ct.c_void_p),
        ct.POINTER(ct.c_size_t),
        ct.c_char_p
    ]

    # Variable initialisation
    input_buffer_bytearray = bytearray(input_file_bytes)

    ct_input_buffer = (ct.c_ubyte * len(input_buffer_bytearray)).from_buffer(input_buffer_bytearray)  # void *inputBuffer
    ct_input_buffer_length = ct.c_size_t(len(input_file_bytes))  # size_t inputBufferLength
    ct_output_buffer = ct.c_void_p()  # void **outputFileBuffer
    ct_output_buffer_length = ct.c_size_t()  # size_t *outputFileBufferLength
    ct_output_report_buffer = ct.c_void_p()  # void **outputAnalysisReportBuffer
    ct_output_report_buffer_length = ct.c_size_t()  # size_t *outputAnalysisReportBufferLength
    ct_content_management_policy = ct.c_char_p(content_management_policy.encode())  # const char *xmlConfigString
    gw_return_object = glasswall.GwReturnObj()

    with utils.CwdHandler(new_cwd=self.library_path):
        # API call
        gw_return_object.status = self.library.GwFileProtectAndReportArchive(
            ct.byref(ct_input_buffer),
            ct_input_buffer_length,
            ct.byref(ct_output_buffer),
            ct.byref(ct_output_buffer_length),
            ct.byref(ct_output_report_buffer),
            ct.byref(ct_output_report_buffer_length),
            ct_content_management_policy
        )

    input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray,)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file
    if gw_return_object.status not in successes.success_codes:
        log.error(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {gw_return_object.status}")
        if raise_unsupported:
            raise errors.error_codes.get(gw_return_object.status, errors.UnknownErrorCode)(gw_return_object.status)
    else:
        log.debug(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tstatus: {gw_return_object.status}")

    gw_return_object.output_file = utils.buffer_to_bytes(
        ct_output_buffer,
        ct_output_buffer_length
    )
    gw_return_object.output_report = utils.buffer_to_bytes(
        ct_output_report_buffer,
        ct_output_report_buffer_length
    )

    # Write output file
    if gw_return_object.output_file:
        if isinstance(output_file, str):
            os.makedirs(os.path.dirname(output_file), exist_ok=True)
            with open(output_file, "wb") as f:
                f.write(gw_return_object.output_file)

    # Write output report
    if gw_return_object.output_report:
        if isinstance(output_report, str):
            os.makedirs(os.path.dirname(output_report), exist_ok=True)
            with open(output_report, "wb") as f:
                f.write(gw_return_object.output_report)

    self.release()

    return gw_return_object
def protect_directory(self, input_directory: str, output_directory: Optional[str] = None, output_report_directory: Optional[str] = None, content_management_policy: Union[ForwardRef(None), str, bytes, bytearray, _io.BytesIO, ArchiveManager] = None, raise_unsupported: bool = True)

Calls protect_archive on each file in input_directory using the given content management configuration. The resulting archives are written to output_directory maintaining the same directory structure as input_directory.

Args

input_directory : str
The input directory containing archives to protect.
output_directory : Optional[str], optional
Default None. If str, the output directory where the archives will be written.
output_report_directory : Optional[str], optional
Default None. If str, the output directory where xml reports for each archive will be written.
content_management_policy : Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager], optional
The content management policy to apply.
raise_unsupported : bool, optional
Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.

Returns

protected_archives_dict (dict): A dictionary of file paths relative to input_directory, and glasswall.GwReturnObj with attributes: "status" (int), "output_file" (bytes), "output_report" (bytes)

Expand source code
def protect_directory(self, input_directory: str, output_directory: Optional[str] = None, output_report_directory: Optional[str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager] = None, raise_unsupported: bool = True):
    """ Calls protect_archive on each file in input_directory using the given content management configuration. The resulting archives are written to output_directory maintaining the same directory structure as input_directory.

    Args:
        input_directory (str): The input directory containing archives to protect.
        output_directory (Optional[str], optional): Default None. If str, the output directory where the archives will be written.
        output_report_directory (Optional[str], optional): Default None. If str, the output directory where xml reports for each archive will be written.
        content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.ArchiveManager], optional): The content management policy to apply.
        raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.

    Returns:
        protected_archives_dict (dict): A dictionary of file paths relative to input_directory, and glasswall.GwReturnObj with attributes: "status" (int), "output_file" (bytes), "output_report" (bytes)
    """
    protected_archives_dict = {}
    # Call protect_archive on each file in input_directory to output_directory
    for input_file in utils.list_file_paths(input_directory):
        relative_path = os.path.relpath(input_file, input_directory)
        # Construct paths for output file and output report
        output_file = None if output_directory is None else os.path.join(os.path.abspath(output_directory), relative_path)
        output_report = None if output_report_directory is None else os.path.join(os.path.abspath(output_report_directory), relative_path + ".xml")

        result = self.protect_archive(
            input_file=input_file,
            output_file=output_file,
            output_report=output_report,
            content_management_policy=content_management_policy,
            raise_unsupported=raise_unsupported,
        )

        protected_archives_dict[relative_path] = result

    return protected_archives_dict
def release(self)

Releases any resources held by the Glasswall Archive Manager library.

Expand source code
def release(self):
    """ Releases any resources held by the Glasswall Archive Manager library. """
    self.library.GwArchiveDone()
def unpack(self, input_file: str, output_directory: str, recursive: bool = True, file_type: Optional[str] = None, include_file_type: Optional[bool] = False, raise_unsupported: bool = True, delete_origin: bool = False)

Unpack an archive, maintaining directory structure. Supported archive formats are: "7z", "bz2", "gz", "rar", "tar", "zip".

Args

input_file : str
The archive file path
output_directory : str
The output directory where the archive will be unpacked to a new directory.
recursive : bool, optional
Default True. Recursively unpack all nested archives.
file_type : str, optional
Default None (use extension). The archive file type.
include_file_type : bool, optional
Default False. Include the archive format in the directory name. Useful when there are multiple same-named archives of different formats.
raise_unsupported : bool, optional
Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
delete_origin : bool, optional
Default False. Delete input_file after unpacking to output_directory.
Expand source code
def unpack(self, input_file: str, output_directory: str, recursive: bool = True, file_type: Optional[str] = None, include_file_type: Optional[bool] = False, raise_unsupported: bool = True, delete_origin: bool = False):
    """ Unpack an archive, maintaining directory structure. Supported archive formats are: "7z", "bz2", "gz", "rar", "tar", "zip".

    Args:
        input_file (str): The archive file path
        output_directory (str): The output directory where the archive will be unpacked to a new directory.
        recursive (bool, optional): Default True. Recursively unpack all nested archives.
        file_type (str, optional): Default None (use extension). The archive file type.
        include_file_type (bool, optional): Default False. Include the archive format in the directory name. Useful when there are multiple same-named archives of different formats.
        raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
        delete_origin (bool, optional): Default False. Delete input_file after unpacking to output_directory.
    """
    # Convert to absolute paths
    input_file = os.path.abspath(input_file)
    output_directory = os.path.abspath(output_directory)

    if include_file_type:
        archive_name = os.path.basename(input_file)
    else:
        archive_name = os.path.splitext(os.path.basename(input_file))[0]
    archive_output_directory = os.path.join(output_directory, archive_name)

    # Unpack
    log.debug(f"Unpacking\n\tsrc: {input_file}\n\tdst: {archive_output_directory}")
    status = self.file_to_file_unpack(input_file=input_file, output_directory=archive_output_directory, file_type=file_type, raise_unsupported=raise_unsupported).status

    if status not in successes.success_codes:
        log.error(f"\n\tinput_file: {input_file}\n\tstatus: {status}")
        if raise_unsupported:
            raise errors.error_codes.get(status, errors.UnknownErrorCode)(status)
    else:
        log.debug(f"\n\tinput_file: {input_file}\n\tstatus: {status}")

    if delete_origin:
        os.remove(input_file)

    if recursive:
        # Unpack sub archives
        for subarchive in self.list_archive_paths(archive_output_directory):
            self.unpack(
                input_file=subarchive,
                output_directory=archive_output_directory,
                recursive=recursive,
                raise_unsupported=raise_unsupported,
                delete_origin=True
            )

    return status
def unpack_directory(self, input_directory: str, output_directory: str, recursive: bool = True, file_type: Optional[str] = None, include_file_type: Optional[bool] = False, raise_unsupported: bool = True, delete_origin: bool = False)

Unpack a directory of archives, maintaining directory structure.

Args

input_directory : str
The input directory containing archives to unpack.
output_directory : str
The output directory where archives will be unpacked to a new directory.
recursive : bool, optional
Default True. Recursively unpack all nested archives.
file_type : str, optional
Default None (use extension). The archive file type of all archives within the directory.
include_file_type : bool, optional
Default False. Include the archive format in the directory name. Useful when there are multiple same-named archives of different formats.
raise_unsupported : bool, optional
Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
delete_origin : bool, optional
Default False. Delete input_file after unpacking to output_directory.
Expand source code
def unpack_directory(self, input_directory: str, output_directory: str, recursive: bool = True, file_type: Optional[str] = None, include_file_type: Optional[bool] = False, raise_unsupported: bool = True, delete_origin: bool = False):
    """ Unpack a directory of archives, maintaining directory structure.

    Args:
        input_directory (str): The input directory containing archives to unpack.
        output_directory (str): The output directory where archives will be unpacked to a new directory.
        recursive (bool, optional): Default True. Recursively unpack all nested archives.
        file_type (str, optional): Default None (use extension). The archive file type of all archives within the directory.
        include_file_type (bool, optional): Default False. Include the archive format in the directory name. Useful when there are multiple same-named archives of different formats.
        raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
        delete_origin (bool, optional): Default False. Delete input_file after unpacking to output_directory.
    """
    # Convert to absolute paths
    input_directory = os.path.abspath(input_directory)
    output_directory = os.path.abspath(output_directory)

    for archive_input_file in self.list_archive_paths(input_directory):
        relative_path = os.path.relpath(archive_input_file, input_directory)
        archive_output_file = os.path.dirname(os.path.join(output_directory, relative_path))
        self.unpack(
            input_file=archive_input_file,
            output_directory=archive_output_file,
            recursive=recursive,
            file_type=file_type,
            include_file_type=include_file_type,
            raise_unsupported=raise_unsupported,
            delete_origin=delete_origin
        )
def version(self)

Returns the Glasswall library version.

Returns

version (str): The Glasswall library version.

Expand source code
def version(self):
    """ Returns the Glasswall library version.

    Returns:
        version (str): The Glasswall library version.
    """
    # API function declaration
    self.library.GwArchiveVersion.restype = ct.c_char_p

    # API call
    version = self.library.GwArchiveVersion()

    # Convert to Python string
    version = ct.string_at(version).decode()

    return version