Module glasswall.libraries.editor.editor

Expand source code
import ctypes as ct
import io
import os
from contextlib import contextmanager
from typing import Union

import glasswall
from glasswall import determine_file_type as dft
from glasswall import utils
from glasswall.config.logging import log
from glasswall.libraries.editor import errors, successes
from glasswall.libraries.library import Library


class Editor(Library):
    """ A high level Python wrapper for Glasswall Editor / Core2. """

    def __init__(self, library_path: str):
        super().__init__(library_path)
        self.library = self.load_library(os.path.abspath(library_path))

        # Validate killswitch has not activated
        self.validate_license()

        log.info(f"Loaded Glasswall {self.__class__.__name__} version {self.version()} from {self.library_path}")

    def validate_license(self):
        """ Validates the license of the library by attempting to call protect_file on a known supported file.

        Raises:
            LicenseExpired: If the license has expired.
            EditorError: If the license could not be validated.
        """
        # Call protect file on a known good bitmap to see if license has expired
        try:
            self.protect_file(
                input_file=b"BM:\x00\x00\x00\x00\x00\x00\x006\x00\x00\x00(\x00\x00\x00\x01\x00\x00\x00\x01\x00\x00\x00\x01\x00\x18\x00\x00\x00\x00\x00\x04\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xff\xff\xff\x00",
                raise_unsupported=True
            )
            log.debug(f"{self.__class__.__name__} license validated successfully.")
        except errors.EditorError:
            log.error(f"{self.__class__.__name__} license validation failed.")
            raise

    def version(self):
        """ Returns the Glasswall library version.

        Returns:
            version (str): The Glasswall library version.
        """
        # API function declaration
        self.library.GW2LibVersion.restype = ct.c_char_p

        # API call
        version = self.library.GW2LibVersion()

        # Convert to Python string
        version = ct.string_at(version).decode()

        return version

    def open_session(self):
        """ Open a new Glasswall session.

        Returns:
            session (int): An incrementing integer repsenting the current session.
        """
        # API call
        session = self.library.GW2OpenSession()

        log.debug(f"\n\tsession: {session}")

        return session

    def close_session(self, session: int):
        """ Close the Glasswall session. All resources allocated by the session will be destroyed.

        Args:
            session (int): The session to close.

        Returns:
            None
        """
        if not isinstance(session, int):
            raise TypeError(session)

        # API function declaration
        self.library.GW2CloseSession.argtypes = [ct.c_size_t]

        # Variable initialisation
        ct_session = ct.c_size_t(session)

        # API call
        status = self.library.GW2CloseSession(ct_session)

        if status not in successes.success_codes:
            log.error(f"\n\tsession: {session}\n\tstatus: {status}")
        else:
            log.debug(f"\n\tsession: {session}\n\tstatus: {status}")

        return status

    @contextmanager
    def new_session(self):
        """ Context manager. Opens a new session on entry and closes the session on exit. """
        try:
            session = self.open_session()
            yield session
        finally:
            self.close_session(session)

    def run_session(self, session):
        """ Runs the Glasswall session and begins processing of a file.

        Args:
            session (int): The session to run.

        Returns:
            status (int): The status of the function call.
        """
        # API function declaration
        self.library.GW2RunSession.argtypes = [ct.c_size_t]

        # Variable initialisation
        ct_session = ct.c_size_t(session)

        # API call
        status = self.library.GW2RunSession(ct_session)

        if status not in successes.success_codes:
            log.error(f"\n\tsession: {session}\n\tstatus: {status}\n\tGW2FileErrorMsg: {self.GW2FileErrorMsg(session)}")
        else:
            log.debug(f"\n\tsession: {session}\n\tstatus: {status}\n\tGW2FileErrorMsg: {self.GW2FileErrorMsg(session)}")

        return status

    def determine_file_type(self, input_file: Union[str, bytes, bytearray, io.BytesIO], as_string: bool = False):
        """ Returns an int representing the file type / file format of a file.

        Args:
            input_file (Union[str, bytes, bytearray, io.BytesIO]): The input file, can be a local path.
            as_string (bool, optional): Return file type as string, eg: "bmp" instead of: 29. Defaults to False.

        Returns:
            file_type (Union[int, str]): The file format.
        """
        if isinstance(input_file, str):
            if not os.path.isfile(input_file):
                raise FileNotFoundError(input_file)

            # convert to ct.c_char_p of bytes
            ct_input_file = ct.c_char_p(input_file.encode("utf-8"))

            # API call
            file_type = self.library.GW2DetermineFileTypeFromFile(ct_input_file)

        elif isinstance(input_file, (bytes, bytearray, io.BytesIO)):
            # convert to bytes
            bytes_input_file = utils.as_bytes(input_file)

            # ctypes conversion
            ct_buffer = ct.c_char_p(bytes_input_file)
            ct_butter_length = ct.c_size_t(len(bytes_input_file))

            # API call
            file_type = self.library.GW2DetermineFileTypeFromMemory(
                ct_buffer,
                ct_butter_length
            )

        else:
            raise TypeError(input_file)

        file_type_as_string = dft.file_type_int_to_str(file_type)
        input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray,)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file

        if not dft.is_success(file_type):
            log.warning(f"\n\tfile_type: {file_type}\n\tfile_type_as_string: {file_type_as_string}\n\tinput_file: {input_file_repr}")
            raise dft.int_class_map.get(file_type, dft.errors.UnknownErrorCode)(file_type)
        else:
            log.debug(f"\n\tfile_type: {file_type}\n\tfile_type_as_string: {file_type_as_string}\n\tinput_file: {input_file_repr}")

        if as_string:
            return file_type_as_string

        return file_type

    def get_content_management_policy(self, session: int):
        """ Returns the content management configuration for a given session.

        Args:
            session (int): The current session.

        Returns:
            xml_string (str): The XML string of the current content management configuration.
        """
        # NOTE GW2GetPolicySettings is current not implemented in editor

        # set xml_string as loaded default config
        xml_string = glasswall.content_management.policies.Editor(default="sanitise").text,

        # log.debug(f"xml_string:\n{xml_string}")

        return xml_string

        # # API function declaration
        # self.library.GW2GetPolicySettings.argtypes = [
        #     ct.c_size_t,
        #     ct.c_void_p,
        # ]

        # # Variable initialisation
        # ct_session = ct.c_size_t(session)
        # ct_buffer = ct.c_void_p()
        # ct_butter_length = ct.c_size_t()
        # # ct_file_format = ct.c_int(file_format)

        # # API Call
        # status = self.library.GW2GetPolicySettings(
        #     ct_session,
        #     ct.byref(ct_buffer),
        #     ct.byref(ct_butter_length)
        # )

        # print("GW2GetPolicySettings status:", status)

        # file_bytes = utils.buffer_to_bytes(
        #     ct_buffer,
        #     ct_butter_length,
        # )

        # return file_bytes

    def set_content_management_policy(self, session: int, input_file: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, file_format=0):
        """ Sets the content management policy configuration. If input_file is None then default settings (sanitise) are applied.

        Args:
            session (int): The current session.
            input_file (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): Default None (sanitise). The content management policy to apply.
            file_format (int): The file format of the content management policy. 0 is xml.

        Returns:
            status (int): The result of the Glasswall API call.
        """
        # Validate type
        if not isinstance(session, int):
            raise TypeError(session)
        if not isinstance(input_file, (type(None), str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)):
            raise TypeError(input_file)
        if not isinstance(file_format, int):
            raise TypeError(file_format)

        # Set input_file to default if input_file is None
        if input_file is None:
            input_file = glasswall.content_management.policies.Editor(default="sanitise")

        # Validate xml content is parsable
        utils.validate_xml(input_file)

        gw_return_object = glasswall.GwReturnObj()

        # From file
        if isinstance(input_file, str) and os.path.isfile(input_file):
            # API function declaration
            self.library.GW2RegisterPoliciesFile.argtypes = [
                ct.c_size_t,
                ct.c_char_p,
                ct.c_int,
            ]

            # Variable initialisation
            gw_return_object.ct_session = ct.c_size_t(session)
            gw_return_object.ct_input_file = ct.c_char_p(input_file.encode("utf-8"))
            gw_return_object.ct_file_format = ct.c_int(file_format)

            gw_return_object.status = self.library.GW2RegisterPoliciesFile(
                gw_return_object.ct_session,
                gw_return_object.ct_input_file,
                gw_return_object.ct_file_format
            )

        # From memory
        elif isinstance(input_file, (str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)):
            # Convert bytearray, io.BytesIO to bytes
            if isinstance(input_file, (bytes, bytearray, io.BytesIO)):
                input_file = utils.as_bytes(input_file)
            # Convert string xml or Policy to bytes
            if isinstance(input_file, (str, glasswall.content_management.policies.policy.Policy)):
                input_file = input_file.encode("utf-8")

            # API function declaration
            self.library.GW2RegisterPoliciesMemory.argtype = [
                ct.c_size_t,
                ct.c_char_p,
                ct.c_int
            ]

            # Variable initialisation
            gw_return_object.ct_session = ct.c_size_t(session)
            gw_return_object.ct_buffer = ct.c_char_p(input_file)
            gw_return_object.ct_buffer_length = ct.c_size_t(len(input_file))
            gw_return_object.ct_file_format = ct.c_int(file_format)

            # API Call
            gw_return_object.status = self.library.GW2RegisterPoliciesMemory(
                gw_return_object.ct_session,
                gw_return_object.ct_buffer,
                gw_return_object.ct_buffer_length,
                gw_return_object.ct_file_format
            )

        if gw_return_object.status not in successes.success_codes:
            log.error(f"\n\tsession: {session}\n\tstatus: {gw_return_object.status}")
            raise errors.error_codes.get(gw_return_object.status, errors.UnknownErrorCode)(gw_return_object.status)
        else:
            log.debug(f"\n\tsession: {session}\n\tstatus: {gw_return_object.status}")

        return gw_return_object

    def register_input(self, session: int, input_file: Union[str, bytes, bytearray, io.BytesIO]):
        """ Register an input file or bytes for the given session.

        Args:
            session (int): The current session.
            input_file (Union[str, bytes, bytearray, io.BytesIO]): The input file path or bytes.

        Returns:
            status (int): The result of the Glasswall API call.
        """

        if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO,)):
            raise TypeError(input_file)

        if isinstance(input_file, str):
            if not os.path.isfile(input_file):
                raise FileNotFoundError(input_file)

            # API function declaration
            self.library.GW2RegisterInputFile.argtypes = [
                ct.c_size_t,
                ct.c_char_p
            ]

            # Variable initialisation
            ct_session = ct.c_size_t(session)
            ct_input_file = ct.c_char_p(input_file.encode("utf-8"))

            # API call
            status = self.library.GW2RegisterInputFile(
                ct_session,
                ct_input_file
            )

        elif isinstance(input_file, (bytes, bytearray, io.BytesIO,)):
            # Convert bytearray and io.BytesIO to bytes
            input_file = utils.as_bytes(input_file)

            # API function declaration
            self.library.GW2RegisterInputMemory.argtypes = [
                ct.c_size_t,
                ct.c_char_p,
                ct.c_size_t,
            ]

            # Variable initialisation
            ct_session = ct.c_size_t(session)
            ct_buffer = ct.c_char_p(input_file)
            ct_buffer_length = ct.c_size_t(len(input_file))

            # API call
            status = self.library.GW2RegisterInputMemory(
                ct_session,
                ct_buffer,
                ct_buffer_length
            )

        if status not in successes.success_codes:
            log.error(f"\n\tsession: {session}\n\tstatus: {status}")
            raise errors.error_codes.get(status, errors.UnknownErrorCode)(status)
        else:
            log.debug(f"\n\tsession: {session}\n\tstatus: {status}")

        return status

    def register_output(self, session, output_file: Union[None, str] = None):
        """ Register an output file for the given session. If output_file is None the file will be returned as 'buffer' and 'buffer_length' attributes.

        Args:
            session (int): The current session.
            output_file (Union[None, str], optional): If specified, during run session the file will be written to output_file, otherwise the file will be written to the glasswall.GwReturnObj 'buffer' and 'buffer_length' attributes.

        Returns:
            gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attribute 'status' indicating the result of the function call. If output_file is None (memory mode), 'buffer', and 'buffer_length' are included containing the file content and file size.
        """

        if not isinstance(output_file, (type(None), str)):
            raise TypeError(output_file)

        gw_return_object = glasswall.GwReturnObj()

        if isinstance(output_file, str):
            # API function declaration
            self.library.GW2RegisterOutFile.argtypes = [
                ct.c_size_t,
                ct.c_char_p
            ]

            # Variable initialisation
            ct_session = ct.c_size_t(session)
            ct_output_file = ct.c_char_p(output_file.encode("utf-8"))

            # API call
            gw_return_object.status = self.library.GW2RegisterOutFile(
                ct_session,
                ct_output_file
            )

        else:
            # API function declaration
            self.library.GW2RegisterOutputMemory.argtypes = [
                ct.c_size_t,
                ct.POINTER(ct.c_void_p),
                ct.POINTER(ct.c_size_t)
            ]

            # Variable initialisation
            ct_session = ct.c_size_t(session)
            gw_return_object.buffer = ct.c_void_p()
            gw_return_object.buffer_length = ct.c_size_t(0)

            # API call
            gw_return_object.status = self.library.GW2RegisterOutputMemory(
                ct_session,
                ct.byref(gw_return_object.buffer),
                ct.byref(gw_return_object.buffer_length)
            )

        if gw_return_object.status not in successes.success_codes:
            log.error(f"\n\tsession: {session}\n\tstatus: {gw_return_object.status}")
            raise errors.error_codes.get(gw_return_object.status, errors.UnknownErrorCode)(gw_return_object.status)
        else:
            log.debug(f"\n\tsession: {session}\n\tstatus: {gw_return_object.status}")

        return gw_return_object

    def register_analysis(self, session: int, output_file: Union[None, str] = None):
        """ Registers an analysis file for the given session. The analysis file will be created during the session's run_session call.

        Args:
            session (int): The session number.
            output_file (Union[None, str], optional): Default None. The file path where the analysis will be written. None returns the analysis as bytes.

        Returns:
            gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attribute 'status' indicating the result of the function call. If output_file is None (memory mode), 'buffer', and 'buffer_length' are included containing the file content and file size.
        """

        if not isinstance(output_file, (type(None), str)):
            raise TypeError(output_file)

        if isinstance(output_file, str):
            output_file = os.path.abspath(output_file)

            # API function declaration
            self.library.GW2RegisterAnalysisFile.argtypes = [
                ct.c_size_t,
                ct.c_char_p,
                ct.c_int,
            ]

            # Variable initialisation
            ct_session = ct.c_size_t(session)
            ct_output_file = ct.c_char_p(output_file.encode("utf-8"))
            analysis_file_format = ct.c_int()
            gw_return_object = glasswall.GwReturnObj()

            # API call
            status = self.library.GW2RegisterAnalysisFile(
                ct_session,
                ct_output_file,
                analysis_file_format
            )

        elif isinstance(output_file, type(None)):
            # API function declaration
            self.library.GW2RegisterAnalysisMemory.argtypes = [
                ct.c_size_t,
                ct.POINTER(ct.c_void_p),
                ct.POINTER(ct.c_size_t)
            ]

            # Variable initialisation
            ct_session = ct.c_size_t(session)
            gw_return_object = glasswall.GwReturnObj()
            gw_return_object.buffer = ct.c_void_p()
            gw_return_object.buffer_length = ct.c_size_t()

            # API call
            status = self.library.GW2RegisterAnalysisMemory(
                ct_session,
                ct.byref(gw_return_object.buffer),
                ct.byref(gw_return_object.buffer_length)
            )

        if status not in successes.success_codes:
            log.error(f"\n\tsession: {session}\n\tstatus: {status}")
            raise errors.error_codes.get(status, errors.UnknownErrorCode)(status)
        else:
            log.debug(f"\n\tsession: {session}\n\tstatus: {status}")

        gw_return_object.status = status

        return gw_return_object

    def protect_file(self, input_file: Union[str, bytes, bytearray, io.BytesIO], output_file: Union[None, str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, raise_unsupported: bool = True):
        """ Protects a file using the current content management configuration, returning the file bytes. The protected file is written to output_file if it is provided.

        Args:
            input_file (Union[str, bytes, bytearray, io.BytesIO]): The input file path or bytes.
            output_file (Union[None, str], optional): The output file path where the protected file will be written.
            content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): The content management policy to apply to the session.
            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.

        Returns:
            file_bytes (bytes): The protected file bytes.
        """
        # Validate arg types
        if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO)):
            raise TypeError(input_file)
        if not isinstance(output_file, (type(None), str)):
            raise TypeError(output_file)
        if not isinstance(content_management_policy, (type(None), str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)):
            raise TypeError(content_management_policy)
        if not isinstance(raise_unsupported, bool):
            raise TypeError(raise_unsupported)

        # Convert string path arguments to absolute paths
        if isinstance(input_file, str):
            if not os.path.isfile(input_file):
                raise FileNotFoundError(input_file)
            input_file = os.path.abspath(input_file)
        if isinstance(output_file, str):
            output_file = os.path.abspath(output_file)
            # make directories that do not exist
            os.makedirs(os.path.dirname(output_file), exist_ok=True)
        if isinstance(content_management_policy, str) and os.path.isfile(content_management_policy):
            content_management_policy = os.path.abspath(content_management_policy)

        # Convert memory inputs to bytes
        if isinstance(input_file, (bytes, bytearray, io.BytesIO)):
            input_file = utils.as_bytes(input_file)

        # Check that file type is supported
        try:
            self.determine_file_type(input_file=input_file)
        except dft.errors.FileTypeEnumError:
            if raise_unsupported:
                raise
            else:
                return None

        with utils.CwdHandler(self.library_path):
            with self.new_session() as session:
                content_management_policy = self.set_content_management_policy(session, content_management_policy)
                register_input = self.register_input(session, input_file)
                register_output = self.register_output(session, output_file=output_file)
                status = self.run_session(session)

                input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray,)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file
                if status not in successes.success_codes:
                    log.error(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tsession: {session}\n\tstatus: {status}")
                    if raise_unsupported:
                        raise errors.error_codes.get(status, errors.UnknownErrorCode)(status)
                    else:
                        file_bytes = None
                else:
                    log.debug(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tsession: {session}\n\tstatus: {status}")
                    # Get file bytes
                    if isinstance(output_file, str):
                        # File to file and memory to file, Editor wrote to a file, read it to get the file bytes
                        if not os.path.isfile(output_file):
                            log.error(f"Editor returned success code: {status} but no output file was found: {output_file}")
                            file_bytes = None
                        else:
                            with open(output_file, "rb") as f:
                                file_bytes = f.read()
                    else:
                        # File to memory and memory to memory, Editor wrote to a buffer, convert it to bytes
                        file_bytes = utils.buffer_to_bytes(
                            register_output.buffer,
                            register_output.buffer_length
                        )

                # Ensure memory allocated is not garbage collected
                content_management_policy, register_input, register_output

                return file_bytes

    def protect_directory(self, input_directory: str, output_directory: Union[None, str], content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, raise_unsupported: bool = True):
        """ Recursively processes all files in a directory in protect mode using the given content management policy.
        The protected files are written to output_directory maintaining the same directory structure as input_directory.

        Args:
            input_directory (str): The input directory containing files to protect.
            output_directory (Union[None, str]): The output directory where the protected file will be written, or None to not write files.
            content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): Default None (sanitise). The content management policy to apply.
            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.

        Returns:
            protected_files_dict (dict): A dictionary of file paths relative to input_directory, and file bytes.
        """
        protected_files_dict = {}
        # Call protect_file on each file in input_directory to output_directory
        for input_file in utils.list_file_paths(input_directory):
            relative_path = os.path.relpath(input_file, input_directory)
            output_file = None if output_directory is None else os.path.join(os.path.abspath(output_directory), relative_path)

            protected_bytes = self.protect_file(
                input_file=input_file,
                output_file=output_file,
                raise_unsupported=raise_unsupported,
                content_management_policy=content_management_policy,
            )

            protected_files_dict[relative_path] = protected_bytes

        return protected_files_dict

    def analyse_file(self, input_file: Union[str, bytes, bytearray, io.BytesIO], output_file: Union[None, str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, raise_unsupported: bool = True):
        """ Analyses a file, returning the analysis bytes. The analysis is written to output_file if it is provided.

        Args:
            input_file (Union[str, bytes, bytearray, io.BytesIO]): The input file path or bytes.
            output_file (Union[None, str], optional): The output file path where the analysis file will be written.
            content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): The content management policy to apply to the session.
            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.

        Returns:
            file_bytes (bytes): The analysis file bytes.
        """
        # Validate arg types
        if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO)):
            raise TypeError(input_file)
        if not isinstance(output_file, (type(None), str)):
            raise TypeError(output_file)
        if not isinstance(content_management_policy, (type(None), str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)):
            raise TypeError(content_management_policy)
        if not isinstance(raise_unsupported, bool):
            raise TypeError(raise_unsupported)

        # Convert string path arguments to absolute paths
        if isinstance(input_file, str):
            if not os.path.isfile(input_file):
                raise FileNotFoundError(input_file)
            input_file = os.path.abspath(input_file)
        if isinstance(output_file, str):
            output_file = os.path.abspath(output_file)
            # make directories that do not exist
            os.makedirs(os.path.dirname(output_file), exist_ok=True)
        if isinstance(content_management_policy, str) and os.path.isfile(content_management_policy):
            content_management_policy = os.path.abspath(content_management_policy)

        # Convert memory inputs to bytes
        if isinstance(input_file, (bytes, bytearray, io.BytesIO)):
            input_file = utils.as_bytes(input_file)

        # Check that file type is supported
        try:
            self.determine_file_type(input_file=input_file)
        except dft.errors.FileTypeEnumError:
            if raise_unsupported:
                raise
            else:
                return None

        with utils.CwdHandler(self.library_path):
            with self.new_session() as session:
                content_management_policy = self.set_content_management_policy(session, content_management_policy)
                register_input = self.register_input(session, input_file)
                register_analysis = self.register_analysis(session, output_file)
                status = self.run_session(session)

                input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray,)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file
                if status not in successes.success_codes:
                    log.error(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tsession: {session}\n\tstatus: {status}")
                    if raise_unsupported:
                        raise errors.error_codes.get(status, errors.UnknownErrorCode)(status)
                    else:
                        file_bytes = None
                else:
                    log.debug(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tsession: {session}\n\tstatus: {status}")
                    # Get file bytes
                    if isinstance(output_file, str):
                        # File to file and memory to file, Editor wrote to a file, read it to get the file bytes
                        if not os.path.isfile(output_file):
                            log.error(f"Editor returned success code: {status} but no output file was found: {output_file}")
                            file_bytes = None
                        else:
                            with open(output_file, "rb") as f:
                                file_bytes = f.read()
                    else:
                        # File to memory and memory to memory, Editor wrote to a buffer, convert it to bytes
                        file_bytes = utils.buffer_to_bytes(
                            register_analysis.buffer,
                            register_analysis.buffer_length
                        )

                # Ensure memory allocated is not garbage collected
                content_management_policy, register_input, register_analysis

                return file_bytes

    def analyse_directory(self, input_directory: str, output_directory: Union[None, str], content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, raise_unsupported: bool = True):
        """ Analyses all files in a directory and its subdirectories. The analysis files are written to output_directory maintaining the same directory structure as input_directory.

        Args:
            input_directory (str): The input directory containing files to analyse.
            output_directory (Union[None, str]): The output directory where the analysis files will be written, or None to not write files.
            content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): Default None (sanitise). The content management policy to apply.
            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.

        Returns:
            analysis_files_dict (dict): A dictionary of file paths relative to input_directory, and file bytes.
        """
        analysis_files_dict = {}
        # Call analyse_file on each file in input_directory to output_directory
        for input_file in utils.list_file_paths(input_directory):
            relative_path = os.path.relpath(input_file, input_directory) + ".xml"
            output_file = None if output_directory is None else os.path.join(os.path.abspath(output_directory), relative_path)

            analysis_bytes = self.analyse_file(
                input_file=input_file,
                output_file=output_file,
                raise_unsupported=raise_unsupported,
                content_management_policy=content_management_policy,
            )

            analysis_files_dict[relative_path] = analysis_bytes

        return analysis_files_dict

    def register_export(self, session: int, output_file: Union[None, str] = None):
        """ Registers a file to be exported for the given session. The export file will be created during the session's run_session call.

        Args:
            session (int): The session number.
            output_file (Union[None, str], optional): Default None. The file path where the export will be written. None returns the export as bytes.

        Returns:
            gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attribute 'status' indicating the result of the function call. If output_file is None (memory mode), 'buffer', and 'buffer_length' are included containing the file content and file size.
        """

        if not isinstance(output_file, (type(None), str)):
            raise TypeError(output_file)

        if isinstance(output_file, str):
            output_file = os.path.abspath(output_file)

            # API function declaration
            self.library.GW2RegisterExportFile.argtypes = [
                ct.c_size_t,
                ct.c_char_p
            ]

            # Variable initialisation
            ct_session = ct.c_size_t(session)
            ct_output_file = ct.c_char_p(output_file.encode("utf-8"))
            gw_return_object = glasswall.GwReturnObj()

            # API Call
            status = self.library.GW2RegisterExportFile(
                ct_session,
                ct_output_file
            )

        elif isinstance(output_file, type(None)):
            # API function declaration
            self.library.GW2RegisterExportMemory.argtypes = [
                ct.c_size_t,
                ct.POINTER(ct.c_void_p),
                ct.POINTER(ct.c_size_t)
            ]

            # Variable initialisation
            ct_session = ct.c_size_t(session)
            gw_return_object = glasswall.GwReturnObj()
            gw_return_object.buffer = ct.c_void_p()
            gw_return_object.buffer_length = ct.c_size_t()

            # API call
            status = self.library.GW2RegisterExportMemory(
                ct_session,
                ct.byref(gw_return_object.buffer),
                ct.byref(gw_return_object.buffer_length)
            )

        if status not in successes.success_codes:
            log.error(f"\n\tsession: {session}\n\tstatus: {status}")
            raise errors.error_codes.get(status, errors.UnknownErrorCode)(status)
        else:
            log.debug(f"\n\tsession: {session}\n\tstatus: {status}")

        gw_return_object.status = status

        return gw_return_object

    def export_file(self, input_file: Union[str, bytes, bytearray, io.BytesIO], output_file: Union[None, str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, raise_unsupported: bool = True):
        """ Export a file, returning the .zip file bytes. The .zip file is written to output_file if it is provided.

        Args:
            input_file (Union[str, bytes, bytearray, io.BytesIO]): The input file path or bytes.
            output_file (Union[None, str], optional): The output file path where the .zip file will be written.
            content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): The content management policy to apply to the session.
            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.

        Returns:
            file_bytes (bytes): The exported .zip file.
        """
        # Validate arg types
        if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO)):
            raise TypeError(input_file)
        if not isinstance(output_file, (type(None), str)):
            raise TypeError(output_file)
        if not isinstance(content_management_policy, (type(None), str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)):
            raise TypeError(content_management_policy)
        if not isinstance(raise_unsupported, bool):
            raise TypeError(raise_unsupported)

        # Convert string path arguments to absolute paths
        if isinstance(input_file, str):
            if not os.path.isfile(input_file):
                raise FileNotFoundError(input_file)
            input_file = os.path.abspath(input_file)
        if isinstance(output_file, str):
            output_file = os.path.abspath(output_file)
            # make directories that do not exist
            os.makedirs(os.path.dirname(output_file), exist_ok=True)
        if isinstance(content_management_policy, str) and os.path.isfile(content_management_policy):
            content_management_policy = os.path.abspath(content_management_policy)

        # Convert memory inputs to bytes
        if isinstance(input_file, (bytes, bytearray, io.BytesIO)):
            input_file = utils.as_bytes(input_file)

        # Check that file type is supported
        try:
            self.determine_file_type(input_file=input_file)
        except dft.errors.FileTypeEnumError:
            if raise_unsupported:
                raise
            else:
                return None

        with utils.CwdHandler(self.library_path):
            with self.new_session() as session:
                content_management_policy = self.set_content_management_policy(session, content_management_policy)
                register_input = self.register_input(session, input_file)
                register_export = self.register_export(session, output_file)
                status = self.run_session(session)

                input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray,)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file
                if status not in successes.success_codes:
                    log.error(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tsession: {session}\n\tstatus: {status}")
                    if raise_unsupported:
                        raise errors.error_codes.get(status, errors.UnknownErrorCode)(status)
                    else:
                        file_bytes = None
                else:
                    log.debug(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tsession: {session}\n\tstatus: {status}")
                    # Get file bytes
                    if isinstance(output_file, str):
                        # File to file and memory to file, Editor wrote to a file, read it to get the file bytes
                        if not os.path.isfile(output_file):
                            log.error(f"Editor returned success code: {status} but no output file was found: {output_file}")
                            file_bytes = None
                        else:
                            with open(output_file, "rb") as f:
                                file_bytes = f.read()
                    else:
                        # File to memory and memory to memory, Editor wrote to a buffer, convert it to bytes
                        file_bytes = utils.buffer_to_bytes(
                            register_export.buffer,
                            register_export.buffer_length
                        )

                # Ensure memory allocated is not garbage collected
                content_management_policy, register_input, register_export

                return file_bytes

    def export_directory(self, input_directory: str, output_directory: Union[None, str], content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, raise_unsupported: bool = True):
        """ Exports all files in a directory and its subdirectories. The export files are written to output_directory maintaining the same directory structure as input_directory.

        Args:
            input_directory (str): The input directory containing files to export.
            output_directory (Union[None, str]): The output directory where the export files will be written, or None to not write files.
            content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): Default None (sanitise). The content management policy to apply.
            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.

        Returns:
            export_files_dict (dict): A dictionary of file paths relative to input_directory, and file bytes.
        """
        export_files_dict = {}
        # Call export_file on each file in input_directory to output_directory
        for input_file in utils.list_file_paths(input_directory):
            relative_path = os.path.relpath(input_file, input_directory) + ".zip"
            output_file = None if output_directory is None else os.path.join(os.path.abspath(output_directory), relative_path)

            export_bytes = self.export_file(
                input_file=input_file,
                output_file=output_file,
                raise_unsupported=raise_unsupported,
                content_management_policy=content_management_policy,
            )

            export_files_dict[relative_path] = export_bytes

        return export_files_dict

    def register_import(self, session: int, input_file: Union[str, bytes, bytearray, io.BytesIO]):
        """ Registers a .zip file to be imported for the given session. The constructed file will be created during the session's run_session call.

        Args:
            session (int): The session number.
            input_file (Union[str, bytes, bytearray, io.BytesIO]): The input file path or bytes.

        Returns:
            gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attribute 'status' indicating the result of the function call. If output_file is None (memory mode), 'buffer', and 'buffer_length' are included containing the file content and file size.
        """
        if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO,)):
            raise TypeError(input_file)

        if isinstance(input_file, str):
            if not os.path.isfile(input_file):
                raise FileNotFoundError(input_file)

        gw_return_object = glasswall.GwReturnObj()

        if isinstance(input_file, str):
            input_file = os.path.abspath(input_file)

            # API function declaration
            self.library.GW2RegisterImportFile.argtypes = [
                ct.c_size_t,
                ct.c_char_p
            ]

            # Variable initialisation
            ct_session = ct.c_size_t(session)
            ct_input_file = ct.c_char_p(input_file.encode("utf-8"))

            # API Call
            gw_return_object.status = self.library.GW2RegisterImportFile(
                ct_session,
                ct_input_file
            )

        elif isinstance(input_file, (bytes, bytearray, io.BytesIO,)):
            # Convert bytearray and io.BytesIO to bytes
            input_file = utils.as_bytes(input_file)

            # API function declaration
            self.library.GW2RegisterImportMemory.argtypes = [
                ct.c_size_t,
                ct.c_void_p,
                ct.c_size_t
            ]

            # Variable initialisation
            ct_session = ct.c_size_t(session)
            gw_return_object.buffer = ct.c_char_p(input_file)
            gw_return_object.buffer_length = ct.c_size_t(len(input_file))

            # API call
            gw_return_object.status = self.library.GW2RegisterImportMemory(
                ct_session,
                gw_return_object.buffer,
                gw_return_object.buffer_length
            )

        if gw_return_object.status not in successes.success_codes:
            log.error(f"\n\tsession: {session}\n\tstatus: {gw_return_object.status}")
            raise errors.error_codes.get(gw_return_object.status, errors.UnknownErrorCode)(gw_return_object.status)
        else:
            log.debug(f"\n\tsession: {session}\n\tstatus: {gw_return_object.status}")

        return gw_return_object

    def import_file(self, input_file: Union[str, bytes, bytearray, io.BytesIO], output_file: Union[None, str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, raise_unsupported: bool = True):
        """ Import a .zip file, constructs a file from the .zip file and returns the file bytes. The file is written to output_file if it is provided.

        Args:
            input_file (Union[str, bytes, bytearray, io.BytesIO]): The .zip input file path or bytes.
            output_file (Union[None, str], optional): The output file path where the constructed file will be written.
            content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): The content management policy to apply to the session.
            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.

        Returns:
            file_bytes (bytes): The imported file bytes.
        """
        # Validate arg types
        if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO)):
            raise TypeError(input_file)
        if not isinstance(output_file, (type(None), str)):
            raise TypeError(output_file)
        if not isinstance(content_management_policy, (type(None), str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)):
            raise TypeError(content_management_policy)
        if not isinstance(raise_unsupported, bool):
            raise TypeError(raise_unsupported)

        # Convert string path arguments to absolute paths
        if isinstance(input_file, str):
            if not os.path.isfile(input_file):
                raise FileNotFoundError(input_file)
            input_file = os.path.abspath(input_file)
        if isinstance(output_file, str):
            output_file = os.path.abspath(output_file)
            # make directories that do not exist
            os.makedirs(os.path.dirname(output_file), exist_ok=True)
        if isinstance(content_management_policy, str) and os.path.isfile(content_management_policy):
            content_management_policy = os.path.abspath(content_management_policy)

        # Convert memory inputs to bytes
        if isinstance(input_file, (bytes, bytearray, io.BytesIO)):
            input_file = utils.as_bytes(input_file)

        # Check that file type is supported
        try:
            self.determine_file_type(input_file=input_file)
        except dft.errors.FileTypeEnumError:
            if raise_unsupported:
                raise
            else:
                return None

        with utils.CwdHandler(self.library_path):
            with self.new_session() as session:
                content_management_policy = self.set_content_management_policy(session, content_management_policy)
                register_import = self.register_import(session, input_file)
                register_output = self.register_output(session, output_file)
                status = self.run_session(session)

                input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray,)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file
                if status not in successes.success_codes:
                    log.error(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tsession: {session}\n\tstatus: {status}")
                    if raise_unsupported:
                        raise errors.error_codes.get(status, errors.UnknownErrorCode)(status)
                    else:
                        file_bytes = None
                else:
                    log.debug(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tsession: {session}\n\tstatus: {status}")
                    # Get file bytes
                    if isinstance(output_file, str):
                        # File to file and memory to file, Editor wrote to a file, read it to get the file bytes
                        if not os.path.isfile(output_file):
                            log.error(f"Editor returned success code: {status} but no output file was found: {output_file}")
                            file_bytes = None
                        else:
                            with open(output_file, "rb") as f:
                                file_bytes = f.read()
                    else:
                        # File to memory and memory to memory, Editor wrote to a buffer, convert it to bytes
                        file_bytes = utils.buffer_to_bytes(
                            register_output.buffer,
                            register_output.buffer_length
                        )

                # Ensure memory allocated is not garbage collected
                content_management_policy, register_import, register_output

                return file_bytes

    def import_directory(self, input_directory: str, output_directory: Union[None, str], content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, raise_unsupported: bool = True):
        """ Imports all files in a directory and its subdirectories. Files are expected as .zip but this is not forced.
        The constructed files are written to output_directory maintaining the same directory structure as input_directory.

        Args:
            input_directory (str): The input directory containing files to import.
            output_directory (Union[None, str]): The output directory where the constructed files will be written, or None to not write files.
            content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): Default None (sanitise). The content management policy to apply.
            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.

        Returns:
            import_files_dict (dict): A dictionary of file paths relative to input_directory, and file bytes.
        """
        import_files_dict = {}
        # Call import_file on each file in input_directory to output_directory
        for input_file in utils.list_file_paths(input_directory):
            relative_path = os.path.relpath(input_file, input_directory)
            # Remove .zip extension from relative_path
            relative_path = os.path.splitext(relative_path)[0]
            output_file = None if output_directory is None else os.path.join(os.path.abspath(output_directory), relative_path)

            import_bytes = self.import_file(
                input_file=input_file,
                output_file=output_file,
                raise_unsupported=raise_unsupported,
                content_management_policy=content_management_policy,
            )

            import_files_dict[relative_path] = import_bytes

        return import_files_dict

    def GW2FileErrorMsg(self, session: int):
        """ Retrieve the Glasswall Session Process error message.

        Args:
            session (int): The session number.

        Returns:
            error_message (str): The Glasswall Session Process error message.
        """
        # Validate arg types
        if not isinstance(session, int):
            raise TypeError(session)

        # API function declaration
        self.library.GW2FileErrorMsg.argtypes = [
            ct.c_size_t,
            ct.POINTER(ct.c_void_p),
            ct.POINTER(ct.c_size_t)
        ]

        # Variable initialisation
        ct_session = ct.c_size_t(session)
        ct_buffer = ct.c_void_p()
        ct_buffer_length = ct.c_size_t(0)

        # API call
        status = self.library.GW2FileErrorMsg(
            ct_session,
            ct.byref(ct_buffer),
            ct.byref(ct_buffer_length)
        )

        if status not in successes.success_codes:
            log.error(f"\n\tsession: {session}\n\tstatus: {status}")
            raise errors.error_codes.get(status, errors.UnknownErrorCode)(status)
        else:
            log.debug(f"\n\tsession: {session}\n\tstatus: {status}")

        # Editor wrote to a buffer, convert it to bytes
        error_bytes = utils.buffer_to_bytes(
            ct_buffer,
            ct_buffer_length
        )

        error_message = error_bytes.decode()

        return error_message

Classes

class Editor (library_path: str)

A high level Python wrapper for Glasswall Editor / Core2.

Expand source code
class Editor(Library):
    """ A high level Python wrapper for Glasswall Editor / Core2. """

    def __init__(self, library_path: str):
        super().__init__(library_path)
        self.library = self.load_library(os.path.abspath(library_path))

        # Validate killswitch has not activated
        self.validate_license()

        log.info(f"Loaded Glasswall {self.__class__.__name__} version {self.version()} from {self.library_path}")

    def validate_license(self):
        """ Validates the license of the library by attempting to call protect_file on a known supported file.

        Raises:
            LicenseExpired: If the license has expired.
            EditorError: If the license could not be validated.
        """
        # Call protect file on a known good bitmap to see if license has expired
        try:
            self.protect_file(
                input_file=b"BM:\x00\x00\x00\x00\x00\x00\x006\x00\x00\x00(\x00\x00\x00\x01\x00\x00\x00\x01\x00\x00\x00\x01\x00\x18\x00\x00\x00\x00\x00\x04\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xff\xff\xff\x00",
                raise_unsupported=True
            )
            log.debug(f"{self.__class__.__name__} license validated successfully.")
        except errors.EditorError:
            log.error(f"{self.__class__.__name__} license validation failed.")
            raise

    def version(self):
        """ Returns the Glasswall library version.

        Returns:
            version (str): The Glasswall library version.
        """
        # API function declaration
        self.library.GW2LibVersion.restype = ct.c_char_p

        # API call
        version = self.library.GW2LibVersion()

        # Convert to Python string
        version = ct.string_at(version).decode()

        return version

    def open_session(self):
        """ Open a new Glasswall session.

        Returns:
            session (int): An incrementing integer repsenting the current session.
        """
        # API call
        session = self.library.GW2OpenSession()

        log.debug(f"\n\tsession: {session}")

        return session

    def close_session(self, session: int):
        """ Close the Glasswall session. All resources allocated by the session will be destroyed.

        Args:
            session (int): The session to close.

        Returns:
            None
        """
        if not isinstance(session, int):
            raise TypeError(session)

        # API function declaration
        self.library.GW2CloseSession.argtypes = [ct.c_size_t]

        # Variable initialisation
        ct_session = ct.c_size_t(session)

        # API call
        status = self.library.GW2CloseSession(ct_session)

        if status not in successes.success_codes:
            log.error(f"\n\tsession: {session}\n\tstatus: {status}")
        else:
            log.debug(f"\n\tsession: {session}\n\tstatus: {status}")

        return status

    @contextmanager
    def new_session(self):
        """ Context manager. Opens a new session on entry and closes the session on exit. """
        try:
            session = self.open_session()
            yield session
        finally:
            self.close_session(session)

    def run_session(self, session):
        """ Runs the Glasswall session and begins processing of a file.

        Args:
            session (int): The session to run.

        Returns:
            status (int): The status of the function call.
        """
        # API function declaration
        self.library.GW2RunSession.argtypes = [ct.c_size_t]

        # Variable initialisation
        ct_session = ct.c_size_t(session)

        # API call
        status = self.library.GW2RunSession(ct_session)

        if status not in successes.success_codes:
            log.error(f"\n\tsession: {session}\n\tstatus: {status}\n\tGW2FileErrorMsg: {self.GW2FileErrorMsg(session)}")
        else:
            log.debug(f"\n\tsession: {session}\n\tstatus: {status}\n\tGW2FileErrorMsg: {self.GW2FileErrorMsg(session)}")

        return status

    def determine_file_type(self, input_file: Union[str, bytes, bytearray, io.BytesIO], as_string: bool = False):
        """ Returns an int representing the file type / file format of a file.

        Args:
            input_file (Union[str, bytes, bytearray, io.BytesIO]): The input file, can be a local path.
            as_string (bool, optional): Return file type as string, eg: "bmp" instead of: 29. Defaults to False.

        Returns:
            file_type (Union[int, str]): The file format.
        """
        if isinstance(input_file, str):
            if not os.path.isfile(input_file):
                raise FileNotFoundError(input_file)

            # convert to ct.c_char_p of bytes
            ct_input_file = ct.c_char_p(input_file.encode("utf-8"))

            # API call
            file_type = self.library.GW2DetermineFileTypeFromFile(ct_input_file)

        elif isinstance(input_file, (bytes, bytearray, io.BytesIO)):
            # convert to bytes
            bytes_input_file = utils.as_bytes(input_file)

            # ctypes conversion
            ct_buffer = ct.c_char_p(bytes_input_file)
            ct_butter_length = ct.c_size_t(len(bytes_input_file))

            # API call
            file_type = self.library.GW2DetermineFileTypeFromMemory(
                ct_buffer,
                ct_butter_length
            )

        else:
            raise TypeError(input_file)

        file_type_as_string = dft.file_type_int_to_str(file_type)
        input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray,)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file

        if not dft.is_success(file_type):
            log.warning(f"\n\tfile_type: {file_type}\n\tfile_type_as_string: {file_type_as_string}\n\tinput_file: {input_file_repr}")
            raise dft.int_class_map.get(file_type, dft.errors.UnknownErrorCode)(file_type)
        else:
            log.debug(f"\n\tfile_type: {file_type}\n\tfile_type_as_string: {file_type_as_string}\n\tinput_file: {input_file_repr}")

        if as_string:
            return file_type_as_string

        return file_type

    def get_content_management_policy(self, session: int):
        """ Returns the content management configuration for a given session.

        Args:
            session (int): The current session.

        Returns:
            xml_string (str): The XML string of the current content management configuration.
        """
        # NOTE GW2GetPolicySettings is current not implemented in editor

        # set xml_string as loaded default config
        xml_string = glasswall.content_management.policies.Editor(default="sanitise").text,

        # log.debug(f"xml_string:\n{xml_string}")

        return xml_string

        # # API function declaration
        # self.library.GW2GetPolicySettings.argtypes = [
        #     ct.c_size_t,
        #     ct.c_void_p,
        # ]

        # # Variable initialisation
        # ct_session = ct.c_size_t(session)
        # ct_buffer = ct.c_void_p()
        # ct_butter_length = ct.c_size_t()
        # # ct_file_format = ct.c_int(file_format)

        # # API Call
        # status = self.library.GW2GetPolicySettings(
        #     ct_session,
        #     ct.byref(ct_buffer),
        #     ct.byref(ct_butter_length)
        # )

        # print("GW2GetPolicySettings status:", status)

        # file_bytes = utils.buffer_to_bytes(
        #     ct_buffer,
        #     ct_butter_length,
        # )

        # return file_bytes

    def set_content_management_policy(self, session: int, input_file: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, file_format=0):
        """ Sets the content management policy configuration. If input_file is None then default settings (sanitise) are applied.

        Args:
            session (int): The current session.
            input_file (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): Default None (sanitise). The content management policy to apply.
            file_format (int): The file format of the content management policy. 0 is xml.

        Returns:
            status (int): The result of the Glasswall API call.
        """
        # Validate type
        if not isinstance(session, int):
            raise TypeError(session)
        if not isinstance(input_file, (type(None), str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)):
            raise TypeError(input_file)
        if not isinstance(file_format, int):
            raise TypeError(file_format)

        # Set input_file to default if input_file is None
        if input_file is None:
            input_file = glasswall.content_management.policies.Editor(default="sanitise")

        # Validate xml content is parsable
        utils.validate_xml(input_file)

        gw_return_object = glasswall.GwReturnObj()

        # From file
        if isinstance(input_file, str) and os.path.isfile(input_file):
            # API function declaration
            self.library.GW2RegisterPoliciesFile.argtypes = [
                ct.c_size_t,
                ct.c_char_p,
                ct.c_int,
            ]

            # Variable initialisation
            gw_return_object.ct_session = ct.c_size_t(session)
            gw_return_object.ct_input_file = ct.c_char_p(input_file.encode("utf-8"))
            gw_return_object.ct_file_format = ct.c_int(file_format)

            gw_return_object.status = self.library.GW2RegisterPoliciesFile(
                gw_return_object.ct_session,
                gw_return_object.ct_input_file,
                gw_return_object.ct_file_format
            )

        # From memory
        elif isinstance(input_file, (str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)):
            # Convert bytearray, io.BytesIO to bytes
            if isinstance(input_file, (bytes, bytearray, io.BytesIO)):
                input_file = utils.as_bytes(input_file)
            # Convert string xml or Policy to bytes
            if isinstance(input_file, (str, glasswall.content_management.policies.policy.Policy)):
                input_file = input_file.encode("utf-8")

            # API function declaration
            self.library.GW2RegisterPoliciesMemory.argtype = [
                ct.c_size_t,
                ct.c_char_p,
                ct.c_int
            ]

            # Variable initialisation
            gw_return_object.ct_session = ct.c_size_t(session)
            gw_return_object.ct_buffer = ct.c_char_p(input_file)
            gw_return_object.ct_buffer_length = ct.c_size_t(len(input_file))
            gw_return_object.ct_file_format = ct.c_int(file_format)

            # API Call
            gw_return_object.status = self.library.GW2RegisterPoliciesMemory(
                gw_return_object.ct_session,
                gw_return_object.ct_buffer,
                gw_return_object.ct_buffer_length,
                gw_return_object.ct_file_format
            )

        if gw_return_object.status not in successes.success_codes:
            log.error(f"\n\tsession: {session}\n\tstatus: {gw_return_object.status}")
            raise errors.error_codes.get(gw_return_object.status, errors.UnknownErrorCode)(gw_return_object.status)
        else:
            log.debug(f"\n\tsession: {session}\n\tstatus: {gw_return_object.status}")

        return gw_return_object

    def register_input(self, session: int, input_file: Union[str, bytes, bytearray, io.BytesIO]):
        """ Register an input file or bytes for the given session.

        Args:
            session (int): The current session.
            input_file (Union[str, bytes, bytearray, io.BytesIO]): The input file path or bytes.

        Returns:
            status (int): The result of the Glasswall API call.
        """

        if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO,)):
            raise TypeError(input_file)

        if isinstance(input_file, str):
            if not os.path.isfile(input_file):
                raise FileNotFoundError(input_file)

            # API function declaration
            self.library.GW2RegisterInputFile.argtypes = [
                ct.c_size_t,
                ct.c_char_p
            ]

            # Variable initialisation
            ct_session = ct.c_size_t(session)
            ct_input_file = ct.c_char_p(input_file.encode("utf-8"))

            # API call
            status = self.library.GW2RegisterInputFile(
                ct_session,
                ct_input_file
            )

        elif isinstance(input_file, (bytes, bytearray, io.BytesIO,)):
            # Convert bytearray and io.BytesIO to bytes
            input_file = utils.as_bytes(input_file)

            # API function declaration
            self.library.GW2RegisterInputMemory.argtypes = [
                ct.c_size_t,
                ct.c_char_p,
                ct.c_size_t,
            ]

            # Variable initialisation
            ct_session = ct.c_size_t(session)
            ct_buffer = ct.c_char_p(input_file)
            ct_buffer_length = ct.c_size_t(len(input_file))

            # API call
            status = self.library.GW2RegisterInputMemory(
                ct_session,
                ct_buffer,
                ct_buffer_length
            )

        if status not in successes.success_codes:
            log.error(f"\n\tsession: {session}\n\tstatus: {status}")
            raise errors.error_codes.get(status, errors.UnknownErrorCode)(status)
        else:
            log.debug(f"\n\tsession: {session}\n\tstatus: {status}")

        return status

    def register_output(self, session, output_file: Union[None, str] = None):
        """ Register an output file for the given session. If output_file is None the file will be returned as 'buffer' and 'buffer_length' attributes.

        Args:
            session (int): The current session.
            output_file (Union[None, str], optional): If specified, during run session the file will be written to output_file, otherwise the file will be written to the glasswall.GwReturnObj 'buffer' and 'buffer_length' attributes.

        Returns:
            gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attribute 'status' indicating the result of the function call. If output_file is None (memory mode), 'buffer', and 'buffer_length' are included containing the file content and file size.
        """

        if not isinstance(output_file, (type(None), str)):
            raise TypeError(output_file)

        gw_return_object = glasswall.GwReturnObj()

        if isinstance(output_file, str):
            # API function declaration
            self.library.GW2RegisterOutFile.argtypes = [
                ct.c_size_t,
                ct.c_char_p
            ]

            # Variable initialisation
            ct_session = ct.c_size_t(session)
            ct_output_file = ct.c_char_p(output_file.encode("utf-8"))

            # API call
            gw_return_object.status = self.library.GW2RegisterOutFile(
                ct_session,
                ct_output_file
            )

        else:
            # API function declaration
            self.library.GW2RegisterOutputMemory.argtypes = [
                ct.c_size_t,
                ct.POINTER(ct.c_void_p),
                ct.POINTER(ct.c_size_t)
            ]

            # Variable initialisation
            ct_session = ct.c_size_t(session)
            gw_return_object.buffer = ct.c_void_p()
            gw_return_object.buffer_length = ct.c_size_t(0)

            # API call
            gw_return_object.status = self.library.GW2RegisterOutputMemory(
                ct_session,
                ct.byref(gw_return_object.buffer),
                ct.byref(gw_return_object.buffer_length)
            )

        if gw_return_object.status not in successes.success_codes:
            log.error(f"\n\tsession: {session}\n\tstatus: {gw_return_object.status}")
            raise errors.error_codes.get(gw_return_object.status, errors.UnknownErrorCode)(gw_return_object.status)
        else:
            log.debug(f"\n\tsession: {session}\n\tstatus: {gw_return_object.status}")

        return gw_return_object

    def register_analysis(self, session: int, output_file: Union[None, str] = None):
        """ Registers an analysis file for the given session. The analysis file will be created during the session's run_session call.

        Args:
            session (int): The session number.
            output_file (Union[None, str], optional): Default None. The file path where the analysis will be written. None returns the analysis as bytes.

        Returns:
            gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attribute 'status' indicating the result of the function call. If output_file is None (memory mode), 'buffer', and 'buffer_length' are included containing the file content and file size.
        """

        if not isinstance(output_file, (type(None), str)):
            raise TypeError(output_file)

        if isinstance(output_file, str):
            output_file = os.path.abspath(output_file)

            # API function declaration
            self.library.GW2RegisterAnalysisFile.argtypes = [
                ct.c_size_t,
                ct.c_char_p,
                ct.c_int,
            ]

            # Variable initialisation
            ct_session = ct.c_size_t(session)
            ct_output_file = ct.c_char_p(output_file.encode("utf-8"))
            analysis_file_format = ct.c_int()
            gw_return_object = glasswall.GwReturnObj()

            # API call
            status = self.library.GW2RegisterAnalysisFile(
                ct_session,
                ct_output_file,
                analysis_file_format
            )

        elif isinstance(output_file, type(None)):
            # API function declaration
            self.library.GW2RegisterAnalysisMemory.argtypes = [
                ct.c_size_t,
                ct.POINTER(ct.c_void_p),
                ct.POINTER(ct.c_size_t)
            ]

            # Variable initialisation
            ct_session = ct.c_size_t(session)
            gw_return_object = glasswall.GwReturnObj()
            gw_return_object.buffer = ct.c_void_p()
            gw_return_object.buffer_length = ct.c_size_t()

            # API call
            status = self.library.GW2RegisterAnalysisMemory(
                ct_session,
                ct.byref(gw_return_object.buffer),
                ct.byref(gw_return_object.buffer_length)
            )

        if status not in successes.success_codes:
            log.error(f"\n\tsession: {session}\n\tstatus: {status}")
            raise errors.error_codes.get(status, errors.UnknownErrorCode)(status)
        else:
            log.debug(f"\n\tsession: {session}\n\tstatus: {status}")

        gw_return_object.status = status

        return gw_return_object

    def protect_file(self, input_file: Union[str, bytes, bytearray, io.BytesIO], output_file: Union[None, str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, raise_unsupported: bool = True):
        """ Protects a file using the current content management configuration, returning the file bytes. The protected file is written to output_file if it is provided.

        Args:
            input_file (Union[str, bytes, bytearray, io.BytesIO]): The input file path or bytes.
            output_file (Union[None, str], optional): The output file path where the protected file will be written.
            content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): The content management policy to apply to the session.
            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.

        Returns:
            file_bytes (bytes): The protected file bytes.
        """
        # Validate arg types
        if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO)):
            raise TypeError(input_file)
        if not isinstance(output_file, (type(None), str)):
            raise TypeError(output_file)
        if not isinstance(content_management_policy, (type(None), str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)):
            raise TypeError(content_management_policy)
        if not isinstance(raise_unsupported, bool):
            raise TypeError(raise_unsupported)

        # Convert string path arguments to absolute paths
        if isinstance(input_file, str):
            if not os.path.isfile(input_file):
                raise FileNotFoundError(input_file)
            input_file = os.path.abspath(input_file)
        if isinstance(output_file, str):
            output_file = os.path.abspath(output_file)
            # make directories that do not exist
            os.makedirs(os.path.dirname(output_file), exist_ok=True)
        if isinstance(content_management_policy, str) and os.path.isfile(content_management_policy):
            content_management_policy = os.path.abspath(content_management_policy)

        # Convert memory inputs to bytes
        if isinstance(input_file, (bytes, bytearray, io.BytesIO)):
            input_file = utils.as_bytes(input_file)

        # Check that file type is supported
        try:
            self.determine_file_type(input_file=input_file)
        except dft.errors.FileTypeEnumError:
            if raise_unsupported:
                raise
            else:
                return None

        with utils.CwdHandler(self.library_path):
            with self.new_session() as session:
                content_management_policy = self.set_content_management_policy(session, content_management_policy)
                register_input = self.register_input(session, input_file)
                register_output = self.register_output(session, output_file=output_file)
                status = self.run_session(session)

                input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray,)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file
                if status not in successes.success_codes:
                    log.error(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tsession: {session}\n\tstatus: {status}")
                    if raise_unsupported:
                        raise errors.error_codes.get(status, errors.UnknownErrorCode)(status)
                    else:
                        file_bytes = None
                else:
                    log.debug(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tsession: {session}\n\tstatus: {status}")
                    # Get file bytes
                    if isinstance(output_file, str):
                        # File to file and memory to file, Editor wrote to a file, read it to get the file bytes
                        if not os.path.isfile(output_file):
                            log.error(f"Editor returned success code: {status} but no output file was found: {output_file}")
                            file_bytes = None
                        else:
                            with open(output_file, "rb") as f:
                                file_bytes = f.read()
                    else:
                        # File to memory and memory to memory, Editor wrote to a buffer, convert it to bytes
                        file_bytes = utils.buffer_to_bytes(
                            register_output.buffer,
                            register_output.buffer_length
                        )

                # Ensure memory allocated is not garbage collected
                content_management_policy, register_input, register_output

                return file_bytes

    def protect_directory(self, input_directory: str, output_directory: Union[None, str], content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, raise_unsupported: bool = True):
        """ Recursively processes all files in a directory in protect mode using the given content management policy.
        The protected files are written to output_directory maintaining the same directory structure as input_directory.

        Args:
            input_directory (str): The input directory containing files to protect.
            output_directory (Union[None, str]): The output directory where the protected file will be written, or None to not write files.
            content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): Default None (sanitise). The content management policy to apply.
            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.

        Returns:
            protected_files_dict (dict): A dictionary of file paths relative to input_directory, and file bytes.
        """
        protected_files_dict = {}
        # Call protect_file on each file in input_directory to output_directory
        for input_file in utils.list_file_paths(input_directory):
            relative_path = os.path.relpath(input_file, input_directory)
            output_file = None if output_directory is None else os.path.join(os.path.abspath(output_directory), relative_path)

            protected_bytes = self.protect_file(
                input_file=input_file,
                output_file=output_file,
                raise_unsupported=raise_unsupported,
                content_management_policy=content_management_policy,
            )

            protected_files_dict[relative_path] = protected_bytes

        return protected_files_dict

    def analyse_file(self, input_file: Union[str, bytes, bytearray, io.BytesIO], output_file: Union[None, str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, raise_unsupported: bool = True):
        """ Analyses a file, returning the analysis bytes. The analysis is written to output_file if it is provided.

        Args:
            input_file (Union[str, bytes, bytearray, io.BytesIO]): The input file path or bytes.
            output_file (Union[None, str], optional): The output file path where the analysis file will be written.
            content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): The content management policy to apply to the session.
            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.

        Returns:
            file_bytes (bytes): The analysis file bytes.
        """
        # Validate arg types
        if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO)):
            raise TypeError(input_file)
        if not isinstance(output_file, (type(None), str)):
            raise TypeError(output_file)
        if not isinstance(content_management_policy, (type(None), str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)):
            raise TypeError(content_management_policy)
        if not isinstance(raise_unsupported, bool):
            raise TypeError(raise_unsupported)

        # Convert string path arguments to absolute paths
        if isinstance(input_file, str):
            if not os.path.isfile(input_file):
                raise FileNotFoundError(input_file)
            input_file = os.path.abspath(input_file)
        if isinstance(output_file, str):
            output_file = os.path.abspath(output_file)
            # make directories that do not exist
            os.makedirs(os.path.dirname(output_file), exist_ok=True)
        if isinstance(content_management_policy, str) and os.path.isfile(content_management_policy):
            content_management_policy = os.path.abspath(content_management_policy)

        # Convert memory inputs to bytes
        if isinstance(input_file, (bytes, bytearray, io.BytesIO)):
            input_file = utils.as_bytes(input_file)

        # Check that file type is supported
        try:
            self.determine_file_type(input_file=input_file)
        except dft.errors.FileTypeEnumError:
            if raise_unsupported:
                raise
            else:
                return None

        with utils.CwdHandler(self.library_path):
            with self.new_session() as session:
                content_management_policy = self.set_content_management_policy(session, content_management_policy)
                register_input = self.register_input(session, input_file)
                register_analysis = self.register_analysis(session, output_file)
                status = self.run_session(session)

                input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray,)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file
                if status not in successes.success_codes:
                    log.error(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tsession: {session}\n\tstatus: {status}")
                    if raise_unsupported:
                        raise errors.error_codes.get(status, errors.UnknownErrorCode)(status)
                    else:
                        file_bytes = None
                else:
                    log.debug(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tsession: {session}\n\tstatus: {status}")
                    # Get file bytes
                    if isinstance(output_file, str):
                        # File to file and memory to file, Editor wrote to a file, read it to get the file bytes
                        if not os.path.isfile(output_file):
                            log.error(f"Editor returned success code: {status} but no output file was found: {output_file}")
                            file_bytes = None
                        else:
                            with open(output_file, "rb") as f:
                                file_bytes = f.read()
                    else:
                        # File to memory and memory to memory, Editor wrote to a buffer, convert it to bytes
                        file_bytes = utils.buffer_to_bytes(
                            register_analysis.buffer,
                            register_analysis.buffer_length
                        )

                # Ensure memory allocated is not garbage collected
                content_management_policy, register_input, register_analysis

                return file_bytes

    def analyse_directory(self, input_directory: str, output_directory: Union[None, str], content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, raise_unsupported: bool = True):
        """ Analyses all files in a directory and its subdirectories. The analysis files are written to output_directory maintaining the same directory structure as input_directory.

        Args:
            input_directory (str): The input directory containing files to analyse.
            output_directory (Union[None, str]): The output directory where the analysis files will be written, or None to not write files.
            content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): Default None (sanitise). The content management policy to apply.
            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.

        Returns:
            analysis_files_dict (dict): A dictionary of file paths relative to input_directory, and file bytes.
        """
        analysis_files_dict = {}
        # Call analyse_file on each file in input_directory to output_directory
        for input_file in utils.list_file_paths(input_directory):
            relative_path = os.path.relpath(input_file, input_directory) + ".xml"
            output_file = None if output_directory is None else os.path.join(os.path.abspath(output_directory), relative_path)

            analysis_bytes = self.analyse_file(
                input_file=input_file,
                output_file=output_file,
                raise_unsupported=raise_unsupported,
                content_management_policy=content_management_policy,
            )

            analysis_files_dict[relative_path] = analysis_bytes

        return analysis_files_dict

    def register_export(self, session: int, output_file: Union[None, str] = None):
        """ Registers a file to be exported for the given session. The export file will be created during the session's run_session call.

        Args:
            session (int): The session number.
            output_file (Union[None, str], optional): Default None. The file path where the export will be written. None returns the export as bytes.

        Returns:
            gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attribute 'status' indicating the result of the function call. If output_file is None (memory mode), 'buffer', and 'buffer_length' are included containing the file content and file size.
        """

        if not isinstance(output_file, (type(None), str)):
            raise TypeError(output_file)

        if isinstance(output_file, str):
            output_file = os.path.abspath(output_file)

            # API function declaration
            self.library.GW2RegisterExportFile.argtypes = [
                ct.c_size_t,
                ct.c_char_p
            ]

            # Variable initialisation
            ct_session = ct.c_size_t(session)
            ct_output_file = ct.c_char_p(output_file.encode("utf-8"))
            gw_return_object = glasswall.GwReturnObj()

            # API Call
            status = self.library.GW2RegisterExportFile(
                ct_session,
                ct_output_file
            )

        elif isinstance(output_file, type(None)):
            # API function declaration
            self.library.GW2RegisterExportMemory.argtypes = [
                ct.c_size_t,
                ct.POINTER(ct.c_void_p),
                ct.POINTER(ct.c_size_t)
            ]

            # Variable initialisation
            ct_session = ct.c_size_t(session)
            gw_return_object = glasswall.GwReturnObj()
            gw_return_object.buffer = ct.c_void_p()
            gw_return_object.buffer_length = ct.c_size_t()

            # API call
            status = self.library.GW2RegisterExportMemory(
                ct_session,
                ct.byref(gw_return_object.buffer),
                ct.byref(gw_return_object.buffer_length)
            )

        if status not in successes.success_codes:
            log.error(f"\n\tsession: {session}\n\tstatus: {status}")
            raise errors.error_codes.get(status, errors.UnknownErrorCode)(status)
        else:
            log.debug(f"\n\tsession: {session}\n\tstatus: {status}")

        gw_return_object.status = status

        return gw_return_object

    def export_file(self, input_file: Union[str, bytes, bytearray, io.BytesIO], output_file: Union[None, str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, raise_unsupported: bool = True):
        """ Export a file, returning the .zip file bytes. The .zip file is written to output_file if it is provided.

        Args:
            input_file (Union[str, bytes, bytearray, io.BytesIO]): The input file path or bytes.
            output_file (Union[None, str], optional): The output file path where the .zip file will be written.
            content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): The content management policy to apply to the session.
            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.

        Returns:
            file_bytes (bytes): The exported .zip file.
        """
        # Validate arg types
        if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO)):
            raise TypeError(input_file)
        if not isinstance(output_file, (type(None), str)):
            raise TypeError(output_file)
        if not isinstance(content_management_policy, (type(None), str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)):
            raise TypeError(content_management_policy)
        if not isinstance(raise_unsupported, bool):
            raise TypeError(raise_unsupported)

        # Convert string path arguments to absolute paths
        if isinstance(input_file, str):
            if not os.path.isfile(input_file):
                raise FileNotFoundError(input_file)
            input_file = os.path.abspath(input_file)
        if isinstance(output_file, str):
            output_file = os.path.abspath(output_file)
            # make directories that do not exist
            os.makedirs(os.path.dirname(output_file), exist_ok=True)
        if isinstance(content_management_policy, str) and os.path.isfile(content_management_policy):
            content_management_policy = os.path.abspath(content_management_policy)

        # Convert memory inputs to bytes
        if isinstance(input_file, (bytes, bytearray, io.BytesIO)):
            input_file = utils.as_bytes(input_file)

        # Check that file type is supported
        try:
            self.determine_file_type(input_file=input_file)
        except dft.errors.FileTypeEnumError:
            if raise_unsupported:
                raise
            else:
                return None

        with utils.CwdHandler(self.library_path):
            with self.new_session() as session:
                content_management_policy = self.set_content_management_policy(session, content_management_policy)
                register_input = self.register_input(session, input_file)
                register_export = self.register_export(session, output_file)
                status = self.run_session(session)

                input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray,)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file
                if status not in successes.success_codes:
                    log.error(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tsession: {session}\n\tstatus: {status}")
                    if raise_unsupported:
                        raise errors.error_codes.get(status, errors.UnknownErrorCode)(status)
                    else:
                        file_bytes = None
                else:
                    log.debug(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tsession: {session}\n\tstatus: {status}")
                    # Get file bytes
                    if isinstance(output_file, str):
                        # File to file and memory to file, Editor wrote to a file, read it to get the file bytes
                        if not os.path.isfile(output_file):
                            log.error(f"Editor returned success code: {status} but no output file was found: {output_file}")
                            file_bytes = None
                        else:
                            with open(output_file, "rb") as f:
                                file_bytes = f.read()
                    else:
                        # File to memory and memory to memory, Editor wrote to a buffer, convert it to bytes
                        file_bytes = utils.buffer_to_bytes(
                            register_export.buffer,
                            register_export.buffer_length
                        )

                # Ensure memory allocated is not garbage collected
                content_management_policy, register_input, register_export

                return file_bytes

    def export_directory(self, input_directory: str, output_directory: Union[None, str], content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, raise_unsupported: bool = True):
        """ Exports all files in a directory and its subdirectories. The export files are written to output_directory maintaining the same directory structure as input_directory.

        Args:
            input_directory (str): The input directory containing files to export.
            output_directory (Union[None, str]): The output directory where the export files will be written, or None to not write files.
            content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): Default None (sanitise). The content management policy to apply.
            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.

        Returns:
            export_files_dict (dict): A dictionary of file paths relative to input_directory, and file bytes.
        """
        export_files_dict = {}
        # Call export_file on each file in input_directory to output_directory
        for input_file in utils.list_file_paths(input_directory):
            relative_path = os.path.relpath(input_file, input_directory) + ".zip"
            output_file = None if output_directory is None else os.path.join(os.path.abspath(output_directory), relative_path)

            export_bytes = self.export_file(
                input_file=input_file,
                output_file=output_file,
                raise_unsupported=raise_unsupported,
                content_management_policy=content_management_policy,
            )

            export_files_dict[relative_path] = export_bytes

        return export_files_dict

    def register_import(self, session: int, input_file: Union[str, bytes, bytearray, io.BytesIO]):
        """ Registers a .zip file to be imported for the given session. The constructed file will be created during the session's run_session call.

        Args:
            session (int): The session number.
            input_file (Union[str, bytes, bytearray, io.BytesIO]): The input file path or bytes.

        Returns:
            gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attribute 'status' indicating the result of the function call. If output_file is None (memory mode), 'buffer', and 'buffer_length' are included containing the file content and file size.
        """
        if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO,)):
            raise TypeError(input_file)

        if isinstance(input_file, str):
            if not os.path.isfile(input_file):
                raise FileNotFoundError(input_file)

        gw_return_object = glasswall.GwReturnObj()

        if isinstance(input_file, str):
            input_file = os.path.abspath(input_file)

            # API function declaration
            self.library.GW2RegisterImportFile.argtypes = [
                ct.c_size_t,
                ct.c_char_p
            ]

            # Variable initialisation
            ct_session = ct.c_size_t(session)
            ct_input_file = ct.c_char_p(input_file.encode("utf-8"))

            # API Call
            gw_return_object.status = self.library.GW2RegisterImportFile(
                ct_session,
                ct_input_file
            )

        elif isinstance(input_file, (bytes, bytearray, io.BytesIO,)):
            # Convert bytearray and io.BytesIO to bytes
            input_file = utils.as_bytes(input_file)

            # API function declaration
            self.library.GW2RegisterImportMemory.argtypes = [
                ct.c_size_t,
                ct.c_void_p,
                ct.c_size_t
            ]

            # Variable initialisation
            ct_session = ct.c_size_t(session)
            gw_return_object.buffer = ct.c_char_p(input_file)
            gw_return_object.buffer_length = ct.c_size_t(len(input_file))

            # API call
            gw_return_object.status = self.library.GW2RegisterImportMemory(
                ct_session,
                gw_return_object.buffer,
                gw_return_object.buffer_length
            )

        if gw_return_object.status not in successes.success_codes:
            log.error(f"\n\tsession: {session}\n\tstatus: {gw_return_object.status}")
            raise errors.error_codes.get(gw_return_object.status, errors.UnknownErrorCode)(gw_return_object.status)
        else:
            log.debug(f"\n\tsession: {session}\n\tstatus: {gw_return_object.status}")

        return gw_return_object

    def import_file(self, input_file: Union[str, bytes, bytearray, io.BytesIO], output_file: Union[None, str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, raise_unsupported: bool = True):
        """ Import a .zip file, constructs a file from the .zip file and returns the file bytes. The file is written to output_file if it is provided.

        Args:
            input_file (Union[str, bytes, bytearray, io.BytesIO]): The .zip input file path or bytes.
            output_file (Union[None, str], optional): The output file path where the constructed file will be written.
            content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): The content management policy to apply to the session.
            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.

        Returns:
            file_bytes (bytes): The imported file bytes.
        """
        # Validate arg types
        if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO)):
            raise TypeError(input_file)
        if not isinstance(output_file, (type(None), str)):
            raise TypeError(output_file)
        if not isinstance(content_management_policy, (type(None), str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)):
            raise TypeError(content_management_policy)
        if not isinstance(raise_unsupported, bool):
            raise TypeError(raise_unsupported)

        # Convert string path arguments to absolute paths
        if isinstance(input_file, str):
            if not os.path.isfile(input_file):
                raise FileNotFoundError(input_file)
            input_file = os.path.abspath(input_file)
        if isinstance(output_file, str):
            output_file = os.path.abspath(output_file)
            # make directories that do not exist
            os.makedirs(os.path.dirname(output_file), exist_ok=True)
        if isinstance(content_management_policy, str) and os.path.isfile(content_management_policy):
            content_management_policy = os.path.abspath(content_management_policy)

        # Convert memory inputs to bytes
        if isinstance(input_file, (bytes, bytearray, io.BytesIO)):
            input_file = utils.as_bytes(input_file)

        # Check that file type is supported
        try:
            self.determine_file_type(input_file=input_file)
        except dft.errors.FileTypeEnumError:
            if raise_unsupported:
                raise
            else:
                return None

        with utils.CwdHandler(self.library_path):
            with self.new_session() as session:
                content_management_policy = self.set_content_management_policy(session, content_management_policy)
                register_import = self.register_import(session, input_file)
                register_output = self.register_output(session, output_file)
                status = self.run_session(session)

                input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray,)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file
                if status not in successes.success_codes:
                    log.error(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tsession: {session}\n\tstatus: {status}")
                    if raise_unsupported:
                        raise errors.error_codes.get(status, errors.UnknownErrorCode)(status)
                    else:
                        file_bytes = None
                else:
                    log.debug(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tsession: {session}\n\tstatus: {status}")
                    # Get file bytes
                    if isinstance(output_file, str):
                        # File to file and memory to file, Editor wrote to a file, read it to get the file bytes
                        if not os.path.isfile(output_file):
                            log.error(f"Editor returned success code: {status} but no output file was found: {output_file}")
                            file_bytes = None
                        else:
                            with open(output_file, "rb") as f:
                                file_bytes = f.read()
                    else:
                        # File to memory and memory to memory, Editor wrote to a buffer, convert it to bytes
                        file_bytes = utils.buffer_to_bytes(
                            register_output.buffer,
                            register_output.buffer_length
                        )

                # Ensure memory allocated is not garbage collected
                content_management_policy, register_import, register_output

                return file_bytes

    def import_directory(self, input_directory: str, output_directory: Union[None, str], content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, raise_unsupported: bool = True):
        """ Imports all files in a directory and its subdirectories. Files are expected as .zip but this is not forced.
        The constructed files are written to output_directory maintaining the same directory structure as input_directory.

        Args:
            input_directory (str): The input directory containing files to import.
            output_directory (Union[None, str]): The output directory where the constructed files will be written, or None to not write files.
            content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): Default None (sanitise). The content management policy to apply.
            raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.

        Returns:
            import_files_dict (dict): A dictionary of file paths relative to input_directory, and file bytes.
        """
        import_files_dict = {}
        # Call import_file on each file in input_directory to output_directory
        for input_file in utils.list_file_paths(input_directory):
            relative_path = os.path.relpath(input_file, input_directory)
            # Remove .zip extension from relative_path
            relative_path = os.path.splitext(relative_path)[0]
            output_file = None if output_directory is None else os.path.join(os.path.abspath(output_directory), relative_path)

            import_bytes = self.import_file(
                input_file=input_file,
                output_file=output_file,
                raise_unsupported=raise_unsupported,
                content_management_policy=content_management_policy,
            )

            import_files_dict[relative_path] = import_bytes

        return import_files_dict

    def GW2FileErrorMsg(self, session: int):
        """ Retrieve the Glasswall Session Process error message.

        Args:
            session (int): The session number.

        Returns:
            error_message (str): The Glasswall Session Process error message.
        """
        # Validate arg types
        if not isinstance(session, int):
            raise TypeError(session)

        # API function declaration
        self.library.GW2FileErrorMsg.argtypes = [
            ct.c_size_t,
            ct.POINTER(ct.c_void_p),
            ct.POINTER(ct.c_size_t)
        ]

        # Variable initialisation
        ct_session = ct.c_size_t(session)
        ct_buffer = ct.c_void_p()
        ct_buffer_length = ct.c_size_t(0)

        # API call
        status = self.library.GW2FileErrorMsg(
            ct_session,
            ct.byref(ct_buffer),
            ct.byref(ct_buffer_length)
        )

        if status not in successes.success_codes:
            log.error(f"\n\tsession: {session}\n\tstatus: {status}")
            raise errors.error_codes.get(status, errors.UnknownErrorCode)(status)
        else:
            log.debug(f"\n\tsession: {session}\n\tstatus: {status}")

        # Editor wrote to a buffer, convert it to bytes
        error_bytes = utils.buffer_to_bytes(
            ct_buffer,
            ct_buffer_length
        )

        error_message = error_bytes.decode()

        return error_message

Ancestors

Methods

def GW2FileErrorMsg(self, session: int)

Retrieve the Glasswall Session Process error message.

Args

session : int
The session number.

Returns

error_message (str): The Glasswall Session Process error message.

Expand source code
def GW2FileErrorMsg(self, session: int):
    """ Retrieve the Glasswall Session Process error message.

    Args:
        session (int): The session number.

    Returns:
        error_message (str): The Glasswall Session Process error message.
    """
    # Validate arg types
    if not isinstance(session, int):
        raise TypeError(session)

    # API function declaration
    self.library.GW2FileErrorMsg.argtypes = [
        ct.c_size_t,
        ct.POINTER(ct.c_void_p),
        ct.POINTER(ct.c_size_t)
    ]

    # Variable initialisation
    ct_session = ct.c_size_t(session)
    ct_buffer = ct.c_void_p()
    ct_buffer_length = ct.c_size_t(0)

    # API call
    status = self.library.GW2FileErrorMsg(
        ct_session,
        ct.byref(ct_buffer),
        ct.byref(ct_buffer_length)
    )

    if status not in successes.success_codes:
        log.error(f"\n\tsession: {session}\n\tstatus: {status}")
        raise errors.error_codes.get(status, errors.UnknownErrorCode)(status)
    else:
        log.debug(f"\n\tsession: {session}\n\tstatus: {status}")

    # Editor wrote to a buffer, convert it to bytes
    error_bytes = utils.buffer_to_bytes(
        ct_buffer,
        ct_buffer_length
    )

    error_message = error_bytes.decode()

    return error_message
def analyse_directory(self, input_directory: str, output_directory: Optional[str], content_management_policy: Union[ForwardRef(None), str, bytes, bytearray, _io.BytesIO, ForwardRef('Policy')] = None, raise_unsupported: bool = True)

Analyses all files in a directory and its subdirectories. The analysis files are written to output_directory maintaining the same directory structure as input_directory.

Args

input_directory : str
The input directory containing files to analyse.
output_directory : Union[None, str]
The output directory where the analysis files will be written, or None to not write files.
content_management_policy : Union[None, str, bytes, bytearray, io.BytesIO, Policy], optional
Default None (sanitise). The content management policy to apply.
raise_unsupported : bool, optional
Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.

Returns

analysis_files_dict (dict): A dictionary of file paths relative to input_directory, and file bytes.

Expand source code
def analyse_directory(self, input_directory: str, output_directory: Union[None, str], content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, raise_unsupported: bool = True):
    """ Analyses all files in a directory and its subdirectories. The analysis files are written to output_directory maintaining the same directory structure as input_directory.

    Args:
        input_directory (str): The input directory containing files to analyse.
        output_directory (Union[None, str]): The output directory where the analysis files will be written, or None to not write files.
        content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): Default None (sanitise). The content management policy to apply.
        raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.

    Returns:
        analysis_files_dict (dict): A dictionary of file paths relative to input_directory, and file bytes.
    """
    analysis_files_dict = {}
    # Call analyse_file on each file in input_directory to output_directory
    for input_file in utils.list_file_paths(input_directory):
        relative_path = os.path.relpath(input_file, input_directory) + ".xml"
        output_file = None if output_directory is None else os.path.join(os.path.abspath(output_directory), relative_path)

        analysis_bytes = self.analyse_file(
            input_file=input_file,
            output_file=output_file,
            raise_unsupported=raise_unsupported,
            content_management_policy=content_management_policy,
        )

        analysis_files_dict[relative_path] = analysis_bytes

    return analysis_files_dict
def analyse_file(self, input_file: Union[str, bytes, bytearray, _io.BytesIO], output_file: Optional[str] = None, content_management_policy: Union[ForwardRef(None), str, bytes, bytearray, _io.BytesIO, ForwardRef('Policy')] = None, raise_unsupported: bool = True)

Analyses a file, returning the analysis bytes. The analysis is written to output_file if it is provided.

Args

input_file : Union[str, bytes, bytearray, io.BytesIO]
The input file path or bytes.
output_file : Union[None, str], optional
The output file path where the analysis file will be written.
content_management_policy : Union[None, str, bytes, bytearray, io.BytesIO, Policy], optional
The content management policy to apply to the session.
raise_unsupported : bool, optional
Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.

Returns

file_bytes (bytes): The analysis file bytes.

Expand source code
def analyse_file(self, input_file: Union[str, bytes, bytearray, io.BytesIO], output_file: Union[None, str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, raise_unsupported: bool = True):
    """ Analyses a file, returning the analysis bytes. The analysis is written to output_file if it is provided.

    Args:
        input_file (Union[str, bytes, bytearray, io.BytesIO]): The input file path or bytes.
        output_file (Union[None, str], optional): The output file path where the analysis file will be written.
        content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): The content management policy to apply to the session.
        raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.

    Returns:
        file_bytes (bytes): The analysis file bytes.
    """
    # Validate arg types
    if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO)):
        raise TypeError(input_file)
    if not isinstance(output_file, (type(None), str)):
        raise TypeError(output_file)
    if not isinstance(content_management_policy, (type(None), str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)):
        raise TypeError(content_management_policy)
    if not isinstance(raise_unsupported, bool):
        raise TypeError(raise_unsupported)

    # Convert string path arguments to absolute paths
    if isinstance(input_file, str):
        if not os.path.isfile(input_file):
            raise FileNotFoundError(input_file)
        input_file = os.path.abspath(input_file)
    if isinstance(output_file, str):
        output_file = os.path.abspath(output_file)
        # make directories that do not exist
        os.makedirs(os.path.dirname(output_file), exist_ok=True)
    if isinstance(content_management_policy, str) and os.path.isfile(content_management_policy):
        content_management_policy = os.path.abspath(content_management_policy)

    # Convert memory inputs to bytes
    if isinstance(input_file, (bytes, bytearray, io.BytesIO)):
        input_file = utils.as_bytes(input_file)

    # Check that file type is supported
    try:
        self.determine_file_type(input_file=input_file)
    except dft.errors.FileTypeEnumError:
        if raise_unsupported:
            raise
        else:
            return None

    with utils.CwdHandler(self.library_path):
        with self.new_session() as session:
            content_management_policy = self.set_content_management_policy(session, content_management_policy)
            register_input = self.register_input(session, input_file)
            register_analysis = self.register_analysis(session, output_file)
            status = self.run_session(session)

            input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray,)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file
            if status not in successes.success_codes:
                log.error(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tsession: {session}\n\tstatus: {status}")
                if raise_unsupported:
                    raise errors.error_codes.get(status, errors.UnknownErrorCode)(status)
                else:
                    file_bytes = None
            else:
                log.debug(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tsession: {session}\n\tstatus: {status}")
                # Get file bytes
                if isinstance(output_file, str):
                    # File to file and memory to file, Editor wrote to a file, read it to get the file bytes
                    if not os.path.isfile(output_file):
                        log.error(f"Editor returned success code: {status} but no output file was found: {output_file}")
                        file_bytes = None
                    else:
                        with open(output_file, "rb") as f:
                            file_bytes = f.read()
                else:
                    # File to memory and memory to memory, Editor wrote to a buffer, convert it to bytes
                    file_bytes = utils.buffer_to_bytes(
                        register_analysis.buffer,
                        register_analysis.buffer_length
                    )

            # Ensure memory allocated is not garbage collected
            content_management_policy, register_input, register_analysis

            return file_bytes
def close_session(self, session: int)

Close the Glasswall session. All resources allocated by the session will be destroyed.

Args

session : int
The session to close.

Returns

None

Expand source code
def close_session(self, session: int):
    """ Close the Glasswall session. All resources allocated by the session will be destroyed.

    Args:
        session (int): The session to close.

    Returns:
        None
    """
    if not isinstance(session, int):
        raise TypeError(session)

    # API function declaration
    self.library.GW2CloseSession.argtypes = [ct.c_size_t]

    # Variable initialisation
    ct_session = ct.c_size_t(session)

    # API call
    status = self.library.GW2CloseSession(ct_session)

    if status not in successes.success_codes:
        log.error(f"\n\tsession: {session}\n\tstatus: {status}")
    else:
        log.debug(f"\n\tsession: {session}\n\tstatus: {status}")

    return status
def determine_file_type(self, input_file: Union[str, bytes, bytearray, _io.BytesIO], as_string: bool = False)

Returns an int representing the file type / file format of a file.

Args

input_file : Union[str, bytes, bytearray, io.BytesIO]
The input file, can be a local path.
as_string : bool, optional
Return file type as string, eg: "bmp" instead of: 29. Defaults to False.

Returns

file_type (Union[int, str]): The file format.

Expand source code
def determine_file_type(self, input_file: Union[str, bytes, bytearray, io.BytesIO], as_string: bool = False):
    """ Returns an int representing the file type / file format of a file.

    Args:
        input_file (Union[str, bytes, bytearray, io.BytesIO]): The input file, can be a local path.
        as_string (bool, optional): Return file type as string, eg: "bmp" instead of: 29. Defaults to False.

    Returns:
        file_type (Union[int, str]): The file format.
    """
    if isinstance(input_file, str):
        if not os.path.isfile(input_file):
            raise FileNotFoundError(input_file)

        # convert to ct.c_char_p of bytes
        ct_input_file = ct.c_char_p(input_file.encode("utf-8"))

        # API call
        file_type = self.library.GW2DetermineFileTypeFromFile(ct_input_file)

    elif isinstance(input_file, (bytes, bytearray, io.BytesIO)):
        # convert to bytes
        bytes_input_file = utils.as_bytes(input_file)

        # ctypes conversion
        ct_buffer = ct.c_char_p(bytes_input_file)
        ct_butter_length = ct.c_size_t(len(bytes_input_file))

        # API call
        file_type = self.library.GW2DetermineFileTypeFromMemory(
            ct_buffer,
            ct_butter_length
        )

    else:
        raise TypeError(input_file)

    file_type_as_string = dft.file_type_int_to_str(file_type)
    input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray,)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file

    if not dft.is_success(file_type):
        log.warning(f"\n\tfile_type: {file_type}\n\tfile_type_as_string: {file_type_as_string}\n\tinput_file: {input_file_repr}")
        raise dft.int_class_map.get(file_type, dft.errors.UnknownErrorCode)(file_type)
    else:
        log.debug(f"\n\tfile_type: {file_type}\n\tfile_type_as_string: {file_type_as_string}\n\tinput_file: {input_file_repr}")

    if as_string:
        return file_type_as_string

    return file_type
def export_directory(self, input_directory: str, output_directory: Optional[str], content_management_policy: Union[ForwardRef(None), str, bytes, bytearray, _io.BytesIO, ForwardRef('Policy')] = None, raise_unsupported: bool = True)

Exports all files in a directory and its subdirectories. The export files are written to output_directory maintaining the same directory structure as input_directory.

Args

input_directory : str
The input directory containing files to export.
output_directory : Union[None, str]
The output directory where the export files will be written, or None to not write files.
content_management_policy : Union[None, str, bytes, bytearray, io.BytesIO, Policy], optional
Default None (sanitise). The content management policy to apply.
raise_unsupported : bool, optional
Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.

Returns

export_files_dict (dict): A dictionary of file paths relative to input_directory, and file bytes.

Expand source code
def export_directory(self, input_directory: str, output_directory: Union[None, str], content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, raise_unsupported: bool = True):
    """ Exports all files in a directory and its subdirectories. The export files are written to output_directory maintaining the same directory structure as input_directory.

    Args:
        input_directory (str): The input directory containing files to export.
        output_directory (Union[None, str]): The output directory where the export files will be written, or None to not write files.
        content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): Default None (sanitise). The content management policy to apply.
        raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.

    Returns:
        export_files_dict (dict): A dictionary of file paths relative to input_directory, and file bytes.
    """
    export_files_dict = {}
    # Call export_file on each file in input_directory to output_directory
    for input_file in utils.list_file_paths(input_directory):
        relative_path = os.path.relpath(input_file, input_directory) + ".zip"
        output_file = None if output_directory is None else os.path.join(os.path.abspath(output_directory), relative_path)

        export_bytes = self.export_file(
            input_file=input_file,
            output_file=output_file,
            raise_unsupported=raise_unsupported,
            content_management_policy=content_management_policy,
        )

        export_files_dict[relative_path] = export_bytes

    return export_files_dict
def export_file(self, input_file: Union[str, bytes, bytearray, _io.BytesIO], output_file: Optional[str] = None, content_management_policy: Union[ForwardRef(None), str, bytes, bytearray, _io.BytesIO, ForwardRef('Policy')] = None, raise_unsupported: bool = True)

Export a file, returning the .zip file bytes. The .zip file is written to output_file if it is provided.

Args

input_file : Union[str, bytes, bytearray, io.BytesIO]
The input file path or bytes.
output_file : Union[None, str], optional
The output file path where the .zip file will be written.
content_management_policy : Union[None, str, bytes, bytearray, io.BytesIO, Policy], optional
The content management policy to apply to the session.
raise_unsupported : bool, optional
Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.

Returns

file_bytes (bytes): The exported .zip file.

Expand source code
def export_file(self, input_file: Union[str, bytes, bytearray, io.BytesIO], output_file: Union[None, str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, raise_unsupported: bool = True):
    """ Export a file, returning the .zip file bytes. The .zip file is written to output_file if it is provided.

    Args:
        input_file (Union[str, bytes, bytearray, io.BytesIO]): The input file path or bytes.
        output_file (Union[None, str], optional): The output file path where the .zip file will be written.
        content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): The content management policy to apply to the session.
        raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.

    Returns:
        file_bytes (bytes): The exported .zip file.
    """
    # Validate arg types
    if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO)):
        raise TypeError(input_file)
    if not isinstance(output_file, (type(None), str)):
        raise TypeError(output_file)
    if not isinstance(content_management_policy, (type(None), str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)):
        raise TypeError(content_management_policy)
    if not isinstance(raise_unsupported, bool):
        raise TypeError(raise_unsupported)

    # Convert string path arguments to absolute paths
    if isinstance(input_file, str):
        if not os.path.isfile(input_file):
            raise FileNotFoundError(input_file)
        input_file = os.path.abspath(input_file)
    if isinstance(output_file, str):
        output_file = os.path.abspath(output_file)
        # make directories that do not exist
        os.makedirs(os.path.dirname(output_file), exist_ok=True)
    if isinstance(content_management_policy, str) and os.path.isfile(content_management_policy):
        content_management_policy = os.path.abspath(content_management_policy)

    # Convert memory inputs to bytes
    if isinstance(input_file, (bytes, bytearray, io.BytesIO)):
        input_file = utils.as_bytes(input_file)

    # Check that file type is supported
    try:
        self.determine_file_type(input_file=input_file)
    except dft.errors.FileTypeEnumError:
        if raise_unsupported:
            raise
        else:
            return None

    with utils.CwdHandler(self.library_path):
        with self.new_session() as session:
            content_management_policy = self.set_content_management_policy(session, content_management_policy)
            register_input = self.register_input(session, input_file)
            register_export = self.register_export(session, output_file)
            status = self.run_session(session)

            input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray,)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file
            if status not in successes.success_codes:
                log.error(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tsession: {session}\n\tstatus: {status}")
                if raise_unsupported:
                    raise errors.error_codes.get(status, errors.UnknownErrorCode)(status)
                else:
                    file_bytes = None
            else:
                log.debug(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tsession: {session}\n\tstatus: {status}")
                # Get file bytes
                if isinstance(output_file, str):
                    # File to file and memory to file, Editor wrote to a file, read it to get the file bytes
                    if not os.path.isfile(output_file):
                        log.error(f"Editor returned success code: {status} but no output file was found: {output_file}")
                        file_bytes = None
                    else:
                        with open(output_file, "rb") as f:
                            file_bytes = f.read()
                else:
                    # File to memory and memory to memory, Editor wrote to a buffer, convert it to bytes
                    file_bytes = utils.buffer_to_bytes(
                        register_export.buffer,
                        register_export.buffer_length
                    )

            # Ensure memory allocated is not garbage collected
            content_management_policy, register_input, register_export

            return file_bytes
def get_content_management_policy(self, session: int)

Returns the content management configuration for a given session.

Args

session : int
The current session.

Returns

xml_string (str): The XML string of the current content management configuration.

Expand source code
def get_content_management_policy(self, session: int):
    """ Returns the content management configuration for a given session.

    Args:
        session (int): The current session.

    Returns:
        xml_string (str): The XML string of the current content management configuration.
    """
    # NOTE GW2GetPolicySettings is current not implemented in editor

    # set xml_string as loaded default config
    xml_string = glasswall.content_management.policies.Editor(default="sanitise").text,

    # log.debug(f"xml_string:\n{xml_string}")

    return xml_string

    # # API function declaration
    # self.library.GW2GetPolicySettings.argtypes = [
    #     ct.c_size_t,
    #     ct.c_void_p,
    # ]

    # # Variable initialisation
    # ct_session = ct.c_size_t(session)
    # ct_buffer = ct.c_void_p()
    # ct_butter_length = ct.c_size_t()
    # # ct_file_format = ct.c_int(file_format)

    # # API Call
    # status = self.library.GW2GetPolicySettings(
    #     ct_session,
    #     ct.byref(ct_buffer),
    #     ct.byref(ct_butter_length)
    # )

    # print("GW2GetPolicySettings status:", status)

    # file_bytes = utils.buffer_to_bytes(
    #     ct_buffer,
    #     ct_butter_length,
    # )

    # return file_bytes
def import_directory(self, input_directory: str, output_directory: Optional[str], content_management_policy: Union[ForwardRef(None), str, bytes, bytearray, _io.BytesIO, ForwardRef('Policy')] = None, raise_unsupported: bool = True)

Imports all files in a directory and its subdirectories. Files are expected as .zip but this is not forced. The constructed files are written to output_directory maintaining the same directory structure as input_directory.

Args

input_directory : str
The input directory containing files to import.
output_directory : Union[None, str]
The output directory where the constructed files will be written, or None to not write files.
content_management_policy : Union[None, str, bytes, bytearray, io.BytesIO, Policy], optional
Default None (sanitise). The content management policy to apply.
raise_unsupported : bool, optional
Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.

Returns

import_files_dict (dict): A dictionary of file paths relative to input_directory, and file bytes.

Expand source code
def import_directory(self, input_directory: str, output_directory: Union[None, str], content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, raise_unsupported: bool = True):
    """ Imports all files in a directory and its subdirectories. Files are expected as .zip but this is not forced.
    The constructed files are written to output_directory maintaining the same directory structure as input_directory.

    Args:
        input_directory (str): The input directory containing files to import.
        output_directory (Union[None, str]): The output directory where the constructed files will be written, or None to not write files.
        content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): Default None (sanitise). The content management policy to apply.
        raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.

    Returns:
        import_files_dict (dict): A dictionary of file paths relative to input_directory, and file bytes.
    """
    import_files_dict = {}
    # Call import_file on each file in input_directory to output_directory
    for input_file in utils.list_file_paths(input_directory):
        relative_path = os.path.relpath(input_file, input_directory)
        # Remove .zip extension from relative_path
        relative_path = os.path.splitext(relative_path)[0]
        output_file = None if output_directory is None else os.path.join(os.path.abspath(output_directory), relative_path)

        import_bytes = self.import_file(
            input_file=input_file,
            output_file=output_file,
            raise_unsupported=raise_unsupported,
            content_management_policy=content_management_policy,
        )

        import_files_dict[relative_path] = import_bytes

    return import_files_dict
def import_file(self, input_file: Union[str, bytes, bytearray, _io.BytesIO], output_file: Optional[str] = None, content_management_policy: Union[ForwardRef(None), str, bytes, bytearray, _io.BytesIO, ForwardRef('Policy')] = None, raise_unsupported: bool = True)

Import a .zip file, constructs a file from the .zip file and returns the file bytes. The file is written to output_file if it is provided.

Args

input_file : Union[str, bytes, bytearray, io.BytesIO]
The .zip input file path or bytes.
output_file : Union[None, str], optional
The output file path where the constructed file will be written.
content_management_policy : Union[None, str, bytes, bytearray, io.BytesIO, Policy], optional
The content management policy to apply to the session.
raise_unsupported : bool, optional
Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.

Returns

file_bytes (bytes): The imported file bytes.

Expand source code
def import_file(self, input_file: Union[str, bytes, bytearray, io.BytesIO], output_file: Union[None, str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, raise_unsupported: bool = True):
    """ Import a .zip file, constructs a file from the .zip file and returns the file bytes. The file is written to output_file if it is provided.

    Args:
        input_file (Union[str, bytes, bytearray, io.BytesIO]): The .zip input file path or bytes.
        output_file (Union[None, str], optional): The output file path where the constructed file will be written.
        content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): The content management policy to apply to the session.
        raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.

    Returns:
        file_bytes (bytes): The imported file bytes.
    """
    # Validate arg types
    if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO)):
        raise TypeError(input_file)
    if not isinstance(output_file, (type(None), str)):
        raise TypeError(output_file)
    if not isinstance(content_management_policy, (type(None), str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)):
        raise TypeError(content_management_policy)
    if not isinstance(raise_unsupported, bool):
        raise TypeError(raise_unsupported)

    # Convert string path arguments to absolute paths
    if isinstance(input_file, str):
        if not os.path.isfile(input_file):
            raise FileNotFoundError(input_file)
        input_file = os.path.abspath(input_file)
    if isinstance(output_file, str):
        output_file = os.path.abspath(output_file)
        # make directories that do not exist
        os.makedirs(os.path.dirname(output_file), exist_ok=True)
    if isinstance(content_management_policy, str) and os.path.isfile(content_management_policy):
        content_management_policy = os.path.abspath(content_management_policy)

    # Convert memory inputs to bytes
    if isinstance(input_file, (bytes, bytearray, io.BytesIO)):
        input_file = utils.as_bytes(input_file)

    # Check that file type is supported
    try:
        self.determine_file_type(input_file=input_file)
    except dft.errors.FileTypeEnumError:
        if raise_unsupported:
            raise
        else:
            return None

    with utils.CwdHandler(self.library_path):
        with self.new_session() as session:
            content_management_policy = self.set_content_management_policy(session, content_management_policy)
            register_import = self.register_import(session, input_file)
            register_output = self.register_output(session, output_file)
            status = self.run_session(session)

            input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray,)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file
            if status not in successes.success_codes:
                log.error(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tsession: {session}\n\tstatus: {status}")
                if raise_unsupported:
                    raise errors.error_codes.get(status, errors.UnknownErrorCode)(status)
                else:
                    file_bytes = None
            else:
                log.debug(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tsession: {session}\n\tstatus: {status}")
                # Get file bytes
                if isinstance(output_file, str):
                    # File to file and memory to file, Editor wrote to a file, read it to get the file bytes
                    if not os.path.isfile(output_file):
                        log.error(f"Editor returned success code: {status} but no output file was found: {output_file}")
                        file_bytes = None
                    else:
                        with open(output_file, "rb") as f:
                            file_bytes = f.read()
                else:
                    # File to memory and memory to memory, Editor wrote to a buffer, convert it to bytes
                    file_bytes = utils.buffer_to_bytes(
                        register_output.buffer,
                        register_output.buffer_length
                    )

            # Ensure memory allocated is not garbage collected
            content_management_policy, register_import, register_output

            return file_bytes
def new_session(self)

Context manager. Opens a new session on entry and closes the session on exit.

Expand source code
@contextmanager
def new_session(self):
    """ Context manager. Opens a new session on entry and closes the session on exit. """
    try:
        session = self.open_session()
        yield session
    finally:
        self.close_session(session)
def open_session(self)

Open a new Glasswall session.

Returns

session (int): An incrementing integer repsenting the current session.

Expand source code
def open_session(self):
    """ Open a new Glasswall session.

    Returns:
        session (int): An incrementing integer repsenting the current session.
    """
    # API call
    session = self.library.GW2OpenSession()

    log.debug(f"\n\tsession: {session}")

    return session
def protect_directory(self, input_directory: str, output_directory: Optional[str], content_management_policy: Union[ForwardRef(None), str, bytes, bytearray, _io.BytesIO, ForwardRef('Policy')] = None, raise_unsupported: bool = True)

Recursively processes all files in a directory in protect mode using the given content management policy. The protected files are written to output_directory maintaining the same directory structure as input_directory.

Args

input_directory : str
The input directory containing files to protect.
output_directory : Union[None, str]
The output directory where the protected file will be written, or None to not write files.
content_management_policy : Union[None, str, bytes, bytearray, io.BytesIO, Policy], optional
Default None (sanitise). The content management policy to apply.
raise_unsupported : bool, optional
Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.

Returns

protected_files_dict (dict): A dictionary of file paths relative to input_directory, and file bytes.

Expand source code
def protect_directory(self, input_directory: str, output_directory: Union[None, str], content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, raise_unsupported: bool = True):
    """ Recursively processes all files in a directory in protect mode using the given content management policy.
    The protected files are written to output_directory maintaining the same directory structure as input_directory.

    Args:
        input_directory (str): The input directory containing files to protect.
        output_directory (Union[None, str]): The output directory where the protected file will be written, or None to not write files.
        content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): Default None (sanitise). The content management policy to apply.
        raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.

    Returns:
        protected_files_dict (dict): A dictionary of file paths relative to input_directory, and file bytes.
    """
    protected_files_dict = {}
    # Call protect_file on each file in input_directory to output_directory
    for input_file in utils.list_file_paths(input_directory):
        relative_path = os.path.relpath(input_file, input_directory)
        output_file = None if output_directory is None else os.path.join(os.path.abspath(output_directory), relative_path)

        protected_bytes = self.protect_file(
            input_file=input_file,
            output_file=output_file,
            raise_unsupported=raise_unsupported,
            content_management_policy=content_management_policy,
        )

        protected_files_dict[relative_path] = protected_bytes

    return protected_files_dict
def protect_file(self, input_file: Union[str, bytes, bytearray, _io.BytesIO], output_file: Optional[str] = None, content_management_policy: Union[ForwardRef(None), str, bytes, bytearray, _io.BytesIO, ForwardRef('Policy')] = None, raise_unsupported: bool = True)

Protects a file using the current content management configuration, returning the file bytes. The protected file is written to output_file if it is provided.

Args

input_file : Union[str, bytes, bytearray, io.BytesIO]
The input file path or bytes.
output_file : Union[None, str], optional
The output file path where the protected file will be written.
content_management_policy : Union[None, str, bytes, bytearray, io.BytesIO, Policy], optional
The content management policy to apply to the session.
raise_unsupported : bool, optional
Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.

Returns

file_bytes (bytes): The protected file bytes.

Expand source code
def protect_file(self, input_file: Union[str, bytes, bytearray, io.BytesIO], output_file: Union[None, str] = None, content_management_policy: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, raise_unsupported: bool = True):
    """ Protects a file using the current content management configuration, returning the file bytes. The protected file is written to output_file if it is provided.

    Args:
        input_file (Union[str, bytes, bytearray, io.BytesIO]): The input file path or bytes.
        output_file (Union[None, str], optional): The output file path where the protected file will be written.
        content_management_policy (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): The content management policy to apply to the session.
        raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.

    Returns:
        file_bytes (bytes): The protected file bytes.
    """
    # Validate arg types
    if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO)):
        raise TypeError(input_file)
    if not isinstance(output_file, (type(None), str)):
        raise TypeError(output_file)
    if not isinstance(content_management_policy, (type(None), str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)):
        raise TypeError(content_management_policy)
    if not isinstance(raise_unsupported, bool):
        raise TypeError(raise_unsupported)

    # Convert string path arguments to absolute paths
    if isinstance(input_file, str):
        if not os.path.isfile(input_file):
            raise FileNotFoundError(input_file)
        input_file = os.path.abspath(input_file)
    if isinstance(output_file, str):
        output_file = os.path.abspath(output_file)
        # make directories that do not exist
        os.makedirs(os.path.dirname(output_file), exist_ok=True)
    if isinstance(content_management_policy, str) and os.path.isfile(content_management_policy):
        content_management_policy = os.path.abspath(content_management_policy)

    # Convert memory inputs to bytes
    if isinstance(input_file, (bytes, bytearray, io.BytesIO)):
        input_file = utils.as_bytes(input_file)

    # Check that file type is supported
    try:
        self.determine_file_type(input_file=input_file)
    except dft.errors.FileTypeEnumError:
        if raise_unsupported:
            raise
        else:
            return None

    with utils.CwdHandler(self.library_path):
        with self.new_session() as session:
            content_management_policy = self.set_content_management_policy(session, content_management_policy)
            register_input = self.register_input(session, input_file)
            register_output = self.register_output(session, output_file=output_file)
            status = self.run_session(session)

            input_file_repr = f"{type(input_file)} length {len(input_file)}" if isinstance(input_file, (bytes, bytearray,)) else input_file.__sizeof__() if isinstance(input_file, io.BytesIO) else input_file
            if status not in successes.success_codes:
                log.error(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tsession: {session}\n\tstatus: {status}")
                if raise_unsupported:
                    raise errors.error_codes.get(status, errors.UnknownErrorCode)(status)
                else:
                    file_bytes = None
            else:
                log.debug(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file}\n\tsession: {session}\n\tstatus: {status}")
                # Get file bytes
                if isinstance(output_file, str):
                    # File to file and memory to file, Editor wrote to a file, read it to get the file bytes
                    if not os.path.isfile(output_file):
                        log.error(f"Editor returned success code: {status} but no output file was found: {output_file}")
                        file_bytes = None
                    else:
                        with open(output_file, "rb") as f:
                            file_bytes = f.read()
                else:
                    # File to memory and memory to memory, Editor wrote to a buffer, convert it to bytes
                    file_bytes = utils.buffer_to_bytes(
                        register_output.buffer,
                        register_output.buffer_length
                    )

            # Ensure memory allocated is not garbage collected
            content_management_policy, register_input, register_output

            return file_bytes
def register_analysis(self, session: int, output_file: Optional[str] = None)

Registers an analysis file for the given session. The analysis file will be created during the session's run_session call.

Args

session : int
The session number.
output_file : Union[None, str], optional
Default None. The file path where the analysis will be written. None returns the analysis as bytes.

Returns

gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attribute 'status' indicating the result of the function call. If output_file is None (memory mode), 'buffer', and 'buffer_length' are included containing the file content and file size.

Expand source code
def register_analysis(self, session: int, output_file: Union[None, str] = None):
    """ Registers an analysis file for the given session. The analysis file will be created during the session's run_session call.

    Args:
        session (int): The session number.
        output_file (Union[None, str], optional): Default None. The file path where the analysis will be written. None returns the analysis as bytes.

    Returns:
        gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attribute 'status' indicating the result of the function call. If output_file is None (memory mode), 'buffer', and 'buffer_length' are included containing the file content and file size.
    """

    if not isinstance(output_file, (type(None), str)):
        raise TypeError(output_file)

    if isinstance(output_file, str):
        output_file = os.path.abspath(output_file)

        # API function declaration
        self.library.GW2RegisterAnalysisFile.argtypes = [
            ct.c_size_t,
            ct.c_char_p,
            ct.c_int,
        ]

        # Variable initialisation
        ct_session = ct.c_size_t(session)
        ct_output_file = ct.c_char_p(output_file.encode("utf-8"))
        analysis_file_format = ct.c_int()
        gw_return_object = glasswall.GwReturnObj()

        # API call
        status = self.library.GW2RegisterAnalysisFile(
            ct_session,
            ct_output_file,
            analysis_file_format
        )

    elif isinstance(output_file, type(None)):
        # API function declaration
        self.library.GW2RegisterAnalysisMemory.argtypes = [
            ct.c_size_t,
            ct.POINTER(ct.c_void_p),
            ct.POINTER(ct.c_size_t)
        ]

        # Variable initialisation
        ct_session = ct.c_size_t(session)
        gw_return_object = glasswall.GwReturnObj()
        gw_return_object.buffer = ct.c_void_p()
        gw_return_object.buffer_length = ct.c_size_t()

        # API call
        status = self.library.GW2RegisterAnalysisMemory(
            ct_session,
            ct.byref(gw_return_object.buffer),
            ct.byref(gw_return_object.buffer_length)
        )

    if status not in successes.success_codes:
        log.error(f"\n\tsession: {session}\n\tstatus: {status}")
        raise errors.error_codes.get(status, errors.UnknownErrorCode)(status)
    else:
        log.debug(f"\n\tsession: {session}\n\tstatus: {status}")

    gw_return_object.status = status

    return gw_return_object
def register_export(self, session: int, output_file: Optional[str] = None)

Registers a file to be exported for the given session. The export file will be created during the session's run_session call.

Args

session : int
The session number.
output_file : Union[None, str], optional
Default None. The file path where the export will be written. None returns the export as bytes.

Returns

gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attribute 'status' indicating the result of the function call. If output_file is None (memory mode), 'buffer', and 'buffer_length' are included containing the file content and file size.

Expand source code
def register_export(self, session: int, output_file: Union[None, str] = None):
    """ Registers a file to be exported for the given session. The export file will be created during the session's run_session call.

    Args:
        session (int): The session number.
        output_file (Union[None, str], optional): Default None. The file path where the export will be written. None returns the export as bytes.

    Returns:
        gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attribute 'status' indicating the result of the function call. If output_file is None (memory mode), 'buffer', and 'buffer_length' are included containing the file content and file size.
    """

    if not isinstance(output_file, (type(None), str)):
        raise TypeError(output_file)

    if isinstance(output_file, str):
        output_file = os.path.abspath(output_file)

        # API function declaration
        self.library.GW2RegisterExportFile.argtypes = [
            ct.c_size_t,
            ct.c_char_p
        ]

        # Variable initialisation
        ct_session = ct.c_size_t(session)
        ct_output_file = ct.c_char_p(output_file.encode("utf-8"))
        gw_return_object = glasswall.GwReturnObj()

        # API Call
        status = self.library.GW2RegisterExportFile(
            ct_session,
            ct_output_file
        )

    elif isinstance(output_file, type(None)):
        # API function declaration
        self.library.GW2RegisterExportMemory.argtypes = [
            ct.c_size_t,
            ct.POINTER(ct.c_void_p),
            ct.POINTER(ct.c_size_t)
        ]

        # Variable initialisation
        ct_session = ct.c_size_t(session)
        gw_return_object = glasswall.GwReturnObj()
        gw_return_object.buffer = ct.c_void_p()
        gw_return_object.buffer_length = ct.c_size_t()

        # API call
        status = self.library.GW2RegisterExportMemory(
            ct_session,
            ct.byref(gw_return_object.buffer),
            ct.byref(gw_return_object.buffer_length)
        )

    if status not in successes.success_codes:
        log.error(f"\n\tsession: {session}\n\tstatus: {status}")
        raise errors.error_codes.get(status, errors.UnknownErrorCode)(status)
    else:
        log.debug(f"\n\tsession: {session}\n\tstatus: {status}")

    gw_return_object.status = status

    return gw_return_object
def register_import(self, session: int, input_file: Union[str, bytes, bytearray, _io.BytesIO])

Registers a .zip file to be imported for the given session. The constructed file will be created during the session's run_session call.

Args

session : int
The session number.
input_file : Union[str, bytes, bytearray, io.BytesIO]
The input file path or bytes.

Returns

gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attribute 'status' indicating the result of the function call. If output_file is None (memory mode), 'buffer', and 'buffer_length' are included containing the file content and file size.

Expand source code
def register_import(self, session: int, input_file: Union[str, bytes, bytearray, io.BytesIO]):
    """ Registers a .zip file to be imported for the given session. The constructed file will be created during the session's run_session call.

    Args:
        session (int): The session number.
        input_file (Union[str, bytes, bytearray, io.BytesIO]): The input file path or bytes.

    Returns:
        gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attribute 'status' indicating the result of the function call. If output_file is None (memory mode), 'buffer', and 'buffer_length' are included containing the file content and file size.
    """
    if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO,)):
        raise TypeError(input_file)

    if isinstance(input_file, str):
        if not os.path.isfile(input_file):
            raise FileNotFoundError(input_file)

    gw_return_object = glasswall.GwReturnObj()

    if isinstance(input_file, str):
        input_file = os.path.abspath(input_file)

        # API function declaration
        self.library.GW2RegisterImportFile.argtypes = [
            ct.c_size_t,
            ct.c_char_p
        ]

        # Variable initialisation
        ct_session = ct.c_size_t(session)
        ct_input_file = ct.c_char_p(input_file.encode("utf-8"))

        # API Call
        gw_return_object.status = self.library.GW2RegisterImportFile(
            ct_session,
            ct_input_file
        )

    elif isinstance(input_file, (bytes, bytearray, io.BytesIO,)):
        # Convert bytearray and io.BytesIO to bytes
        input_file = utils.as_bytes(input_file)

        # API function declaration
        self.library.GW2RegisterImportMemory.argtypes = [
            ct.c_size_t,
            ct.c_void_p,
            ct.c_size_t
        ]

        # Variable initialisation
        ct_session = ct.c_size_t(session)
        gw_return_object.buffer = ct.c_char_p(input_file)
        gw_return_object.buffer_length = ct.c_size_t(len(input_file))

        # API call
        gw_return_object.status = self.library.GW2RegisterImportMemory(
            ct_session,
            gw_return_object.buffer,
            gw_return_object.buffer_length
        )

    if gw_return_object.status not in successes.success_codes:
        log.error(f"\n\tsession: {session}\n\tstatus: {gw_return_object.status}")
        raise errors.error_codes.get(gw_return_object.status, errors.UnknownErrorCode)(gw_return_object.status)
    else:
        log.debug(f"\n\tsession: {session}\n\tstatus: {gw_return_object.status}")

    return gw_return_object
def register_input(self, session: int, input_file: Union[str, bytes, bytearray, _io.BytesIO])

Register an input file or bytes for the given session.

Args

session : int
The current session.
input_file : Union[str, bytes, bytearray, io.BytesIO]
The input file path or bytes.

Returns

status (int): The result of the Glasswall API call.

Expand source code
def register_input(self, session: int, input_file: Union[str, bytes, bytearray, io.BytesIO]):
    """ Register an input file or bytes for the given session.

    Args:
        session (int): The current session.
        input_file (Union[str, bytes, bytearray, io.BytesIO]): The input file path or bytes.

    Returns:
        status (int): The result of the Glasswall API call.
    """

    if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO,)):
        raise TypeError(input_file)

    if isinstance(input_file, str):
        if not os.path.isfile(input_file):
            raise FileNotFoundError(input_file)

        # API function declaration
        self.library.GW2RegisterInputFile.argtypes = [
            ct.c_size_t,
            ct.c_char_p
        ]

        # Variable initialisation
        ct_session = ct.c_size_t(session)
        ct_input_file = ct.c_char_p(input_file.encode("utf-8"))

        # API call
        status = self.library.GW2RegisterInputFile(
            ct_session,
            ct_input_file
        )

    elif isinstance(input_file, (bytes, bytearray, io.BytesIO,)):
        # Convert bytearray and io.BytesIO to bytes
        input_file = utils.as_bytes(input_file)

        # API function declaration
        self.library.GW2RegisterInputMemory.argtypes = [
            ct.c_size_t,
            ct.c_char_p,
            ct.c_size_t,
        ]

        # Variable initialisation
        ct_session = ct.c_size_t(session)
        ct_buffer = ct.c_char_p(input_file)
        ct_buffer_length = ct.c_size_t(len(input_file))

        # API call
        status = self.library.GW2RegisterInputMemory(
            ct_session,
            ct_buffer,
            ct_buffer_length
        )

    if status not in successes.success_codes:
        log.error(f"\n\tsession: {session}\n\tstatus: {status}")
        raise errors.error_codes.get(status, errors.UnknownErrorCode)(status)
    else:
        log.debug(f"\n\tsession: {session}\n\tstatus: {status}")

    return status
def register_output(self, session, output_file: Optional[str] = None)

Register an output file for the given session. If output_file is None the file will be returned as 'buffer' and 'buffer_length' attributes.

Args

session : int
The current session.
output_file : Union[None, str], optional
If specified, during run session the file will be written to output_file, otherwise the file will be written to the glasswall.GwReturnObj 'buffer' and 'buffer_length' attributes.

Returns

gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attribute 'status' indicating the result of the function call. If output_file is None (memory mode), 'buffer', and 'buffer_length' are included containing the file content and file size.

Expand source code
def register_output(self, session, output_file: Union[None, str] = None):
    """ Register an output file for the given session. If output_file is None the file will be returned as 'buffer' and 'buffer_length' attributes.

    Args:
        session (int): The current session.
        output_file (Union[None, str], optional): If specified, during run session the file will be written to output_file, otherwise the file will be written to the glasswall.GwReturnObj 'buffer' and 'buffer_length' attributes.

    Returns:
        gw_return_object (glasswall.GwReturnObj): A GwReturnObj instance with the attribute 'status' indicating the result of the function call. If output_file is None (memory mode), 'buffer', and 'buffer_length' are included containing the file content and file size.
    """

    if not isinstance(output_file, (type(None), str)):
        raise TypeError(output_file)

    gw_return_object = glasswall.GwReturnObj()

    if isinstance(output_file, str):
        # API function declaration
        self.library.GW2RegisterOutFile.argtypes = [
            ct.c_size_t,
            ct.c_char_p
        ]

        # Variable initialisation
        ct_session = ct.c_size_t(session)
        ct_output_file = ct.c_char_p(output_file.encode("utf-8"))

        # API call
        gw_return_object.status = self.library.GW2RegisterOutFile(
            ct_session,
            ct_output_file
        )

    else:
        # API function declaration
        self.library.GW2RegisterOutputMemory.argtypes = [
            ct.c_size_t,
            ct.POINTER(ct.c_void_p),
            ct.POINTER(ct.c_size_t)
        ]

        # Variable initialisation
        ct_session = ct.c_size_t(session)
        gw_return_object.buffer = ct.c_void_p()
        gw_return_object.buffer_length = ct.c_size_t(0)

        # API call
        gw_return_object.status = self.library.GW2RegisterOutputMemory(
            ct_session,
            ct.byref(gw_return_object.buffer),
            ct.byref(gw_return_object.buffer_length)
        )

    if gw_return_object.status not in successes.success_codes:
        log.error(f"\n\tsession: {session}\n\tstatus: {gw_return_object.status}")
        raise errors.error_codes.get(gw_return_object.status, errors.UnknownErrorCode)(gw_return_object.status)
    else:
        log.debug(f"\n\tsession: {session}\n\tstatus: {gw_return_object.status}")

    return gw_return_object
def run_session(self, session)

Runs the Glasswall session and begins processing of a file.

Args

session : int
The session to run.

Returns

status (int): The status of the function call.

Expand source code
def run_session(self, session):
    """ Runs the Glasswall session and begins processing of a file.

    Args:
        session (int): The session to run.

    Returns:
        status (int): The status of the function call.
    """
    # API function declaration
    self.library.GW2RunSession.argtypes = [ct.c_size_t]

    # Variable initialisation
    ct_session = ct.c_size_t(session)

    # API call
    status = self.library.GW2RunSession(ct_session)

    if status not in successes.success_codes:
        log.error(f"\n\tsession: {session}\n\tstatus: {status}\n\tGW2FileErrorMsg: {self.GW2FileErrorMsg(session)}")
    else:
        log.debug(f"\n\tsession: {session}\n\tstatus: {status}\n\tGW2FileErrorMsg: {self.GW2FileErrorMsg(session)}")

    return status
def set_content_management_policy(self, session: int, input_file: Union[ForwardRef(None), str, bytes, bytearray, _io.BytesIO, ForwardRef('Policy')] = None, file_format=0)

Sets the content management policy configuration. If input_file is None then default settings (sanitise) are applied.

Args

session : int
The current session.
input_file : Union[None, str, bytes, bytearray, io.BytesIO, Policy], optional
Default None (sanitise). The content management policy to apply.
file_format : int
The file format of the content management policy. 0 is xml.

Returns

status (int): The result of the Glasswall API call.

Expand source code
def set_content_management_policy(self, session: int, input_file: Union[None, str, bytes, bytearray, io.BytesIO, "glasswall.content_management.policies.policy.Policy"] = None, file_format=0):
    """ Sets the content management policy configuration. If input_file is None then default settings (sanitise) are applied.

    Args:
        session (int): The current session.
        input_file (Union[None, str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], optional): Default None (sanitise). The content management policy to apply.
        file_format (int): The file format of the content management policy. 0 is xml.

    Returns:
        status (int): The result of the Glasswall API call.
    """
    # Validate type
    if not isinstance(session, int):
        raise TypeError(session)
    if not isinstance(input_file, (type(None), str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)):
        raise TypeError(input_file)
    if not isinstance(file_format, int):
        raise TypeError(file_format)

    # Set input_file to default if input_file is None
    if input_file is None:
        input_file = glasswall.content_management.policies.Editor(default="sanitise")

    # Validate xml content is parsable
    utils.validate_xml(input_file)

    gw_return_object = glasswall.GwReturnObj()

    # From file
    if isinstance(input_file, str) and os.path.isfile(input_file):
        # API function declaration
        self.library.GW2RegisterPoliciesFile.argtypes = [
            ct.c_size_t,
            ct.c_char_p,
            ct.c_int,
        ]

        # Variable initialisation
        gw_return_object.ct_session = ct.c_size_t(session)
        gw_return_object.ct_input_file = ct.c_char_p(input_file.encode("utf-8"))
        gw_return_object.ct_file_format = ct.c_int(file_format)

        gw_return_object.status = self.library.GW2RegisterPoliciesFile(
            gw_return_object.ct_session,
            gw_return_object.ct_input_file,
            gw_return_object.ct_file_format
        )

    # From memory
    elif isinstance(input_file, (str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)):
        # Convert bytearray, io.BytesIO to bytes
        if isinstance(input_file, (bytes, bytearray, io.BytesIO)):
            input_file = utils.as_bytes(input_file)
        # Convert string xml or Policy to bytes
        if isinstance(input_file, (str, glasswall.content_management.policies.policy.Policy)):
            input_file = input_file.encode("utf-8")

        # API function declaration
        self.library.GW2RegisterPoliciesMemory.argtype = [
            ct.c_size_t,
            ct.c_char_p,
            ct.c_int
        ]

        # Variable initialisation
        gw_return_object.ct_session = ct.c_size_t(session)
        gw_return_object.ct_buffer = ct.c_char_p(input_file)
        gw_return_object.ct_buffer_length = ct.c_size_t(len(input_file))
        gw_return_object.ct_file_format = ct.c_int(file_format)

        # API Call
        gw_return_object.status = self.library.GW2RegisterPoliciesMemory(
            gw_return_object.ct_session,
            gw_return_object.ct_buffer,
            gw_return_object.ct_buffer_length,
            gw_return_object.ct_file_format
        )

    if gw_return_object.status not in successes.success_codes:
        log.error(f"\n\tsession: {session}\n\tstatus: {gw_return_object.status}")
        raise errors.error_codes.get(gw_return_object.status, errors.UnknownErrorCode)(gw_return_object.status)
    else:
        log.debug(f"\n\tsession: {session}\n\tstatus: {gw_return_object.status}")

    return gw_return_object
def validate_license(self)

Validates the license of the library by attempting to call protect_file on a known supported file.

Raises

LicenseExpired
If the license has expired.
EditorError
If the license could not be validated.
Expand source code
def validate_license(self):
    """ Validates the license of the library by attempting to call protect_file on a known supported file.

    Raises:
        LicenseExpired: If the license has expired.
        EditorError: If the license could not be validated.
    """
    # Call protect file on a known good bitmap to see if license has expired
    try:
        self.protect_file(
            input_file=b"BM:\x00\x00\x00\x00\x00\x00\x006\x00\x00\x00(\x00\x00\x00\x01\x00\x00\x00\x01\x00\x00\x00\x01\x00\x18\x00\x00\x00\x00\x00\x04\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xff\xff\xff\x00",
            raise_unsupported=True
        )
        log.debug(f"{self.__class__.__name__} license validated successfully.")
    except errors.EditorError:
        log.error(f"{self.__class__.__name__} license validation failed.")
        raise
def version(self)

Returns the Glasswall library version.

Returns

version (str): The Glasswall library version.

Expand source code
def version(self):
    """ Returns the Glasswall library version.

    Returns:
        version (str): The Glasswall library version.
    """
    # API function declaration
    self.library.GW2LibVersion.restype = ct.c_char_p

    # API call
    version = self.library.GW2LibVersion()

    # Convert to Python string
    version = ct.string_at(version).decode()

    return version