Module glasswall.libraries.word_search.word_search
Expand source code
import ctypes as ct
import io
import os
from typing import Optional, Union
import glasswall
from glasswall import utils
from glasswall.config.logging import log
from glasswall.libraries.library import Library
from glasswall.libraries.word_search import errors, successes
class WordSearch(Library):
""" A high level Python wrapper for Glasswall WordSearch. """
def __init__(self, library_path: str):
super().__init__(library_path=library_path)
self.library = self.load_library(os.path.abspath(library_path))
log.info(f"Loaded Glasswall {self.__class__.__name__} version {self.version()} from {self.library_path}")
def version(self):
""" Returns the Glasswall library version.
Returns:
version (str): The Glasswall library version.
"""
# API function declaration
self.library.GwWordSearchVersion.restype = ct.c_char_p
# API call
version = self.library.GwWordSearchVersion()
# Convert to Python string
version = ct.string_at(version).decode()
return version
@glasswall.utils.deprecated_alias(xml_config="content_management_policy")
def redact_file(self, input_file: Union[str, bytes, bytearray, io.BytesIO], content_management_policy: Union[str, bytes, bytearray, io.BytesIO], output_file: Union[None, str] = None, output_report: Union[None, str] = None, homoglyphs: Union[None, str, bytes, bytearray, io.BytesIO] = None, raise_unsupported: bool = True):
""" Redacts text from input_file using the given content_management_policy and homoglyphs file, optionally writing the redacted file and report to the paths specified by output_file and output_report.
Args:
input_file (Union[str, bytes, bytearray, io.BytesIO]): The input file path or bytes.
content_management_policy (Union[str, bytes, bytearray, io.BytesIO)]): The content management policy to apply.
output_file (Union[None, str], optional): Default None. If str, write output_file to that path.
output_report (Union[None, str], optional): Default None. If str, write output_file to that path.
homoglyphs (Union[None, str, bytes, bytearray, io.BytesIO)], optional): Default None. The homoglyphs json file path or bytes.
raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
Returns:
gw_return_object (glasswall.GwReturnObj): An instance of class glasswall.GwReturnObj containing attributes: "status" (int), "output_file" (bytes), "output_report" (bytes)
"""
# Validate arg types
if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO)):
raise TypeError(input_file)
if not isinstance(content_management_policy, (str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)):
raise TypeError(content_management_policy)
if not isinstance(output_file, (type(None), str)):
raise TypeError(output_file)
if not isinstance(output_report, (type(None), str)):
raise TypeError(output_report)
if not isinstance(homoglyphs, (type(None), str, bytes, bytearray, io.BytesIO)):
raise TypeError(homoglyphs)
# Convert string path arguments to absolute paths
if isinstance(output_file, str):
output_file = os.path.abspath(output_file)
if isinstance(output_report, str):
output_report = os.path.abspath(output_report)
# Convert inputs to bytes
if isinstance(input_file, str):
with open(input_file, "rb") as f:
input_file_bytes = f.read()
elif isinstance(input_file, (bytes, bytearray, io.BytesIO)):
input_file_bytes = utils.as_bytes(input_file)
# warn if input_file is 0 bytes
if not input_file_bytes:
log.warning(f"input_file is 0 bytes\n\tinput_file: {input_file}")
if isinstance(homoglyphs, str):
with open(homoglyphs, "rb") as f:
homoglyphs_bytes = f.read()
elif isinstance(homoglyphs, (bytes, bytearray, io.BytesIO)):
homoglyphs_bytes = utils.as_bytes(homoglyphs)
elif isinstance(homoglyphs, type(None)):
# Load default
with open(os.path.join(glasswall._ROOT, "config", "word_search", "homoglyphs.json"), "rb") as f:
homoglyphs_bytes = f.read()
if isinstance(content_management_policy, str) and os.path.isfile(content_management_policy):
with open(content_management_policy, "rb") as f:
content_management_policy = f.read()
content_management_policy = utils.validate_xml(content_management_policy)
# Variable initialisation
ct_input_buffer = ct.c_char_p(input_file_bytes)
ct_input_buffer_length = ct.c_size_t(len(input_file_bytes))
ct_output_buffer = ct.c_void_p()
ct_output_buffer_length = ct.c_size_t()
ct_output_report_buffer = ct.c_void_p()
ct_output_report_buffer_length = ct.c_size_t()
ct_homoglyphs = ct.c_char_p(homoglyphs_bytes)
ct_content_management_policy = ct.c_char_p(content_management_policy.encode())
gw_return_object = glasswall.GwReturnObj()
with utils.CwdHandler(new_cwd=self.library_path):
gw_return_object.status = self.library.GwWordSearch(
ct_input_buffer,
ct_input_buffer_length,
ct.byref(ct_output_buffer),
ct.byref(ct_output_buffer_length),
ct.byref(ct_output_report_buffer),
ct.byref(ct_output_report_buffer_length),
ct_homoglyphs,
ct_content_management_policy
)
gw_return_object.output_file = utils.buffer_to_bytes(
ct_output_buffer,
ct_output_buffer_length
)
gw_return_object.output_report = utils.buffer_to_bytes(
ct_output_report_buffer,
ct_output_report_buffer_length
)
input_file_repr = f"{type(input_file_bytes)} length {len(input_file_bytes)}" if not isinstance(input_file, str) else input_file
output_file_repr = f"{type(gw_return_object.output_file)} length {len(gw_return_object.output_file)}"
output_report_repr = f"{type(gw_return_object.output_report)} length {len(gw_return_object.output_report)}"
homoglyphs_repr = f"{type(homoglyphs_bytes)} length {len(homoglyphs_bytes)}" if not isinstance(homoglyphs, str) else homoglyphs
if gw_return_object.status not in successes.success_codes:
log.error(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file_repr}\n\toutput_report: {output_report_repr}\n\thomoglyphs: {homoglyphs_repr}\n\tstatus: {gw_return_object.status}\n\tcontent_management_policy:\n{content_management_policy}")
if raise_unsupported:
raise errors.error_codes.get(gw_return_object.status, errors.UnknownErrorCode)(gw_return_object.status)
else:
log.debug(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file_repr}\n\toutput_report: {output_report_repr}\n\thomoglyphs: {homoglyphs_repr}\n\tstatus: {gw_return_object.status}\n\tcontent_management_policy:\n{content_management_policy}")
# Write output file
if isinstance(output_file, str):
os.makedirs(os.path.dirname(output_file), exist_ok=True)
with open(output_file, "wb") as f:
f.write(gw_return_object.output_file)
# Write output report
if isinstance(output_report, str):
os.makedirs(os.path.dirname(output_report), exist_ok=True)
with open(output_report, "wb") as f:
f.write(gw_return_object.output_report)
if input_file_bytes and not gw_return_object.output_file:
# input_file_bytes was not empty but output_file is unexpectedly empty
log.error(f"output_file empty\n\tinput_file: {input_file_repr}\n\tct_output_buffer: {ct_output_buffer}\n\tct_output_buffer_length: {ct_output_buffer_length}\n\toutput_file: {gw_return_object.output_file}")
if raise_unsupported:
raise errors.WordSearchError(f"Unexpected empty output_file after calling GwWordSearch\n\toutput_file: {output_file}")
if input_file_bytes and not gw_return_object.output_report:
# input_file_bytes was not empty but output_report is unexpectedly empty
log.error(f"output_report empty\n\tinput_file: {input_file_repr}\n\tct_output_report_buffer: {ct_output_report_buffer}\n\tct_output_report_buffer_length: {ct_output_report_buffer_length}")
if raise_unsupported:
raise errors.WordSearchError(f"Unexpected empty output_report after calling GwWordSearch\n\toutput_report: {output_report}")
return gw_return_object
@glasswall.utils.deprecated_alias(xml_config="content_management_policy")
def redact_directory(self, input_directory: str, content_management_policy: Union[str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], output_directory: Optional[str] = None, output_report_directory: Optional[str] = None, homoglyphs: Union[None, str, bytes, bytearray, io.BytesIO] = None, raise_unsupported: bool = True):
""" Redacts all files in a directory and it's subdirectories using the given content_management_policy and homoglyphs file. The redacted files are written to output_directory maintaining the same directory structure as input_directory.
Args:
input_directory (str): The input directory containing files to redact.
output_directory (str): The output directory where the redacted files will be written.
output_report_directory (Optional[str], optional): Default None. If str, the output directory where analysis reports for each redacted file will be written.
content_management_policy (Union[str, bytes, bytearray, io.BytesIO)]): The content management policy to apply.
homoglyphs (Union[None, str, bytes, bytearray, io.BytesIO)], optional): Default None. The homoglyphs file path, str, or bytes.
raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
Returns:
redacted_files_dict (dict): A dictionary of file paths relative to input_directory, and glasswall.GwReturnObj with attributes: "status" (int), "output_file" (bytes), "output_report" (bytes)
"""
redacted_files_dict = {}
# Call redact_file on each file in input_directory
for input_file in utils.list_file_paths(input_directory):
relative_path = os.path.relpath(input_file, input_directory)
# Construct paths for output file and output report
output_file = None if output_directory is None else os.path.join(os.path.abspath(output_directory), relative_path)
output_report = None if output_report_directory is None else os.path.join(os.path.abspath(output_report_directory), relative_path + ".xml")
result = self.redact_file(
input_file=input_file,
output_file=output_file,
output_report=output_report,
homoglyphs=homoglyphs,
content_management_policy=content_management_policy,
raise_unsupported=raise_unsupported,
)
redacted_files_dict[relative_path] = result
return redacted_files_dict
Classes
class WordSearch (library_path: str)
-
A high level Python wrapper for Glasswall WordSearch.
Expand source code
class WordSearch(Library): """ A high level Python wrapper for Glasswall WordSearch. """ def __init__(self, library_path: str): super().__init__(library_path=library_path) self.library = self.load_library(os.path.abspath(library_path)) log.info(f"Loaded Glasswall {self.__class__.__name__} version {self.version()} from {self.library_path}") def version(self): """ Returns the Glasswall library version. Returns: version (str): The Glasswall library version. """ # API function declaration self.library.GwWordSearchVersion.restype = ct.c_char_p # API call version = self.library.GwWordSearchVersion() # Convert to Python string version = ct.string_at(version).decode() return version @glasswall.utils.deprecated_alias(xml_config="content_management_policy") def redact_file(self, input_file: Union[str, bytes, bytearray, io.BytesIO], content_management_policy: Union[str, bytes, bytearray, io.BytesIO], output_file: Union[None, str] = None, output_report: Union[None, str] = None, homoglyphs: Union[None, str, bytes, bytearray, io.BytesIO] = None, raise_unsupported: bool = True): """ Redacts text from input_file using the given content_management_policy and homoglyphs file, optionally writing the redacted file and report to the paths specified by output_file and output_report. Args: input_file (Union[str, bytes, bytearray, io.BytesIO]): The input file path or bytes. content_management_policy (Union[str, bytes, bytearray, io.BytesIO)]): The content management policy to apply. output_file (Union[None, str], optional): Default None. If str, write output_file to that path. output_report (Union[None, str], optional): Default None. If str, write output_file to that path. homoglyphs (Union[None, str, bytes, bytearray, io.BytesIO)], optional): Default None. The homoglyphs json file path or bytes. raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. Returns: gw_return_object (glasswall.GwReturnObj): An instance of class glasswall.GwReturnObj containing attributes: "status" (int), "output_file" (bytes), "output_report" (bytes) """ # Validate arg types if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO)): raise TypeError(input_file) if not isinstance(content_management_policy, (str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)): raise TypeError(content_management_policy) if not isinstance(output_file, (type(None), str)): raise TypeError(output_file) if not isinstance(output_report, (type(None), str)): raise TypeError(output_report) if not isinstance(homoglyphs, (type(None), str, bytes, bytearray, io.BytesIO)): raise TypeError(homoglyphs) # Convert string path arguments to absolute paths if isinstance(output_file, str): output_file = os.path.abspath(output_file) if isinstance(output_report, str): output_report = os.path.abspath(output_report) # Convert inputs to bytes if isinstance(input_file, str): with open(input_file, "rb") as f: input_file_bytes = f.read() elif isinstance(input_file, (bytes, bytearray, io.BytesIO)): input_file_bytes = utils.as_bytes(input_file) # warn if input_file is 0 bytes if not input_file_bytes: log.warning(f"input_file is 0 bytes\n\tinput_file: {input_file}") if isinstance(homoglyphs, str): with open(homoglyphs, "rb") as f: homoglyphs_bytes = f.read() elif isinstance(homoglyphs, (bytes, bytearray, io.BytesIO)): homoglyphs_bytes = utils.as_bytes(homoglyphs) elif isinstance(homoglyphs, type(None)): # Load default with open(os.path.join(glasswall._ROOT, "config", "word_search", "homoglyphs.json"), "rb") as f: homoglyphs_bytes = f.read() if isinstance(content_management_policy, str) and os.path.isfile(content_management_policy): with open(content_management_policy, "rb") as f: content_management_policy = f.read() content_management_policy = utils.validate_xml(content_management_policy) # Variable initialisation ct_input_buffer = ct.c_char_p(input_file_bytes) ct_input_buffer_length = ct.c_size_t(len(input_file_bytes)) ct_output_buffer = ct.c_void_p() ct_output_buffer_length = ct.c_size_t() ct_output_report_buffer = ct.c_void_p() ct_output_report_buffer_length = ct.c_size_t() ct_homoglyphs = ct.c_char_p(homoglyphs_bytes) ct_content_management_policy = ct.c_char_p(content_management_policy.encode()) gw_return_object = glasswall.GwReturnObj() with utils.CwdHandler(new_cwd=self.library_path): gw_return_object.status = self.library.GwWordSearch( ct_input_buffer, ct_input_buffer_length, ct.byref(ct_output_buffer), ct.byref(ct_output_buffer_length), ct.byref(ct_output_report_buffer), ct.byref(ct_output_report_buffer_length), ct_homoglyphs, ct_content_management_policy ) gw_return_object.output_file = utils.buffer_to_bytes( ct_output_buffer, ct_output_buffer_length ) gw_return_object.output_report = utils.buffer_to_bytes( ct_output_report_buffer, ct_output_report_buffer_length ) input_file_repr = f"{type(input_file_bytes)} length {len(input_file_bytes)}" if not isinstance(input_file, str) else input_file output_file_repr = f"{type(gw_return_object.output_file)} length {len(gw_return_object.output_file)}" output_report_repr = f"{type(gw_return_object.output_report)} length {len(gw_return_object.output_report)}" homoglyphs_repr = f"{type(homoglyphs_bytes)} length {len(homoglyphs_bytes)}" if not isinstance(homoglyphs, str) else homoglyphs if gw_return_object.status not in successes.success_codes: log.error(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file_repr}\n\toutput_report: {output_report_repr}\n\thomoglyphs: {homoglyphs_repr}\n\tstatus: {gw_return_object.status}\n\tcontent_management_policy:\n{content_management_policy}") if raise_unsupported: raise errors.error_codes.get(gw_return_object.status, errors.UnknownErrorCode)(gw_return_object.status) else: log.debug(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file_repr}\n\toutput_report: {output_report_repr}\n\thomoglyphs: {homoglyphs_repr}\n\tstatus: {gw_return_object.status}\n\tcontent_management_policy:\n{content_management_policy}") # Write output file if isinstance(output_file, str): os.makedirs(os.path.dirname(output_file), exist_ok=True) with open(output_file, "wb") as f: f.write(gw_return_object.output_file) # Write output report if isinstance(output_report, str): os.makedirs(os.path.dirname(output_report), exist_ok=True) with open(output_report, "wb") as f: f.write(gw_return_object.output_report) if input_file_bytes and not gw_return_object.output_file: # input_file_bytes was not empty but output_file is unexpectedly empty log.error(f"output_file empty\n\tinput_file: {input_file_repr}\n\tct_output_buffer: {ct_output_buffer}\n\tct_output_buffer_length: {ct_output_buffer_length}\n\toutput_file: {gw_return_object.output_file}") if raise_unsupported: raise errors.WordSearchError(f"Unexpected empty output_file after calling GwWordSearch\n\toutput_file: {output_file}") if input_file_bytes and not gw_return_object.output_report: # input_file_bytes was not empty but output_report is unexpectedly empty log.error(f"output_report empty\n\tinput_file: {input_file_repr}\n\tct_output_report_buffer: {ct_output_report_buffer}\n\tct_output_report_buffer_length: {ct_output_report_buffer_length}") if raise_unsupported: raise errors.WordSearchError(f"Unexpected empty output_report after calling GwWordSearch\n\toutput_report: {output_report}") return gw_return_object @glasswall.utils.deprecated_alias(xml_config="content_management_policy") def redact_directory(self, input_directory: str, content_management_policy: Union[str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], output_directory: Optional[str] = None, output_report_directory: Optional[str] = None, homoglyphs: Union[None, str, bytes, bytearray, io.BytesIO] = None, raise_unsupported: bool = True): """ Redacts all files in a directory and it's subdirectories using the given content_management_policy and homoglyphs file. The redacted files are written to output_directory maintaining the same directory structure as input_directory. Args: input_directory (str): The input directory containing files to redact. output_directory (str): The output directory where the redacted files will be written. output_report_directory (Optional[str], optional): Default None. If str, the output directory where analysis reports for each redacted file will be written. content_management_policy (Union[str, bytes, bytearray, io.BytesIO)]): The content management policy to apply. homoglyphs (Union[None, str, bytes, bytearray, io.BytesIO)], optional): Default None. The homoglyphs file path, str, or bytes. raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. Returns: redacted_files_dict (dict): A dictionary of file paths relative to input_directory, and glasswall.GwReturnObj with attributes: "status" (int), "output_file" (bytes), "output_report" (bytes) """ redacted_files_dict = {} # Call redact_file on each file in input_directory for input_file in utils.list_file_paths(input_directory): relative_path = os.path.relpath(input_file, input_directory) # Construct paths for output file and output report output_file = None if output_directory is None else os.path.join(os.path.abspath(output_directory), relative_path) output_report = None if output_report_directory is None else os.path.join(os.path.abspath(output_report_directory), relative_path + ".xml") result = self.redact_file( input_file=input_file, output_file=output_file, output_report=output_report, homoglyphs=homoglyphs, content_management_policy=content_management_policy, raise_unsupported=raise_unsupported, ) redacted_files_dict[relative_path] = result return redacted_files_dict
Ancestors
Methods
def redact_directory(self, input_directory: str, content_management_policy: Union[str, bytes, bytearray, _io.BytesIO, Policy], output_directory: Optional[str] = None, output_report_directory: Optional[str] = None, homoglyphs: Union[ForwardRef(None), str, bytes, bytearray, _io.BytesIO] = None, raise_unsupported: bool = True)
-
Redacts all files in a directory and it's subdirectories using the given content_management_policy and homoglyphs file. The redacted files are written to output_directory maintaining the same directory structure as input_directory.
Args
input_directory
:str
- The input directory containing files to redact.
output_directory
:str
- The output directory where the redacted files will be written.
output_report_directory
:Optional[str]
, optional- Default None. If str, the output directory where analysis reports for each redacted file will be written.
- content_management_policy (Union[str, bytes, bytearray, io.BytesIO)]): The content management policy to apply.
- homoglyphs (Union[None, str, bytes, bytearray, io.BytesIO)], optional): Default None. The homoglyphs file path, str, or bytes.
raise_unsupported
:bool
, optional- Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
Returns
redacted_files_dict (dict): A dictionary of file paths relative to input_directory, and glasswall.GwReturnObj with attributes: "status" (int), "output_file" (bytes), "output_report" (bytes)
Expand source code
@glasswall.utils.deprecated_alias(xml_config="content_management_policy") def redact_directory(self, input_directory: str, content_management_policy: Union[str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy], output_directory: Optional[str] = None, output_report_directory: Optional[str] = None, homoglyphs: Union[None, str, bytes, bytearray, io.BytesIO] = None, raise_unsupported: bool = True): """ Redacts all files in a directory and it's subdirectories using the given content_management_policy and homoglyphs file. The redacted files are written to output_directory maintaining the same directory structure as input_directory. Args: input_directory (str): The input directory containing files to redact. output_directory (str): The output directory where the redacted files will be written. output_report_directory (Optional[str], optional): Default None. If str, the output directory where analysis reports for each redacted file will be written. content_management_policy (Union[str, bytes, bytearray, io.BytesIO)]): The content management policy to apply. homoglyphs (Union[None, str, bytes, bytearray, io.BytesIO)], optional): Default None. The homoglyphs file path, str, or bytes. raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. Returns: redacted_files_dict (dict): A dictionary of file paths relative to input_directory, and glasswall.GwReturnObj with attributes: "status" (int), "output_file" (bytes), "output_report" (bytes) """ redacted_files_dict = {} # Call redact_file on each file in input_directory for input_file in utils.list_file_paths(input_directory): relative_path = os.path.relpath(input_file, input_directory) # Construct paths for output file and output report output_file = None if output_directory is None else os.path.join(os.path.abspath(output_directory), relative_path) output_report = None if output_report_directory is None else os.path.join(os.path.abspath(output_report_directory), relative_path + ".xml") result = self.redact_file( input_file=input_file, output_file=output_file, output_report=output_report, homoglyphs=homoglyphs, content_management_policy=content_management_policy, raise_unsupported=raise_unsupported, ) redacted_files_dict[relative_path] = result return redacted_files_dict
def redact_file(self, input_file: Union[str, bytes, bytearray, _io.BytesIO], content_management_policy: Union[str, bytes, bytearray, _io.BytesIO], output_file: Optional[str] = None, output_report: Optional[str] = None, homoglyphs: Union[ForwardRef(None), str, bytes, bytearray, _io.BytesIO] = None, raise_unsupported: bool = True)
-
Redacts text from input_file using the given content_management_policy and homoglyphs file, optionally writing the redacted file and report to the paths specified by output_file and output_report.
Args
input_file
:Union[str, bytes, bytearray, io.BytesIO]
- The input file path or bytes.
- content_management_policy (Union[str, bytes, bytearray, io.BytesIO)]): The content management policy to apply.
output_file
:Union[None, str]
, optional- Default None. If str, write output_file to that path.
output_report
:Union[None, str]
, optional- Default None. If str, write output_file to that path.
- homoglyphs (Union[None, str, bytes, bytearray, io.BytesIO)], optional): Default None. The homoglyphs json file path or bytes.
raise_unsupported
:bool
, optional- Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False.
Returns
gw_return_object (glasswall.GwReturnObj): An instance of class glasswall.GwReturnObj containing attributes: "status" (int), "output_file" (bytes), "output_report" (bytes)
Expand source code
@glasswall.utils.deprecated_alias(xml_config="content_management_policy") def redact_file(self, input_file: Union[str, bytes, bytearray, io.BytesIO], content_management_policy: Union[str, bytes, bytearray, io.BytesIO], output_file: Union[None, str] = None, output_report: Union[None, str] = None, homoglyphs: Union[None, str, bytes, bytearray, io.BytesIO] = None, raise_unsupported: bool = True): """ Redacts text from input_file using the given content_management_policy and homoglyphs file, optionally writing the redacted file and report to the paths specified by output_file and output_report. Args: input_file (Union[str, bytes, bytearray, io.BytesIO]): The input file path or bytes. content_management_policy (Union[str, bytes, bytearray, io.BytesIO)]): The content management policy to apply. output_file (Union[None, str], optional): Default None. If str, write output_file to that path. output_report (Union[None, str], optional): Default None. If str, write output_file to that path. homoglyphs (Union[None, str, bytes, bytearray, io.BytesIO)], optional): Default None. The homoglyphs json file path or bytes. raise_unsupported (bool, optional): Default True. Raise exceptions when Glasswall encounters an error. Fail silently if False. Returns: gw_return_object (glasswall.GwReturnObj): An instance of class glasswall.GwReturnObj containing attributes: "status" (int), "output_file" (bytes), "output_report" (bytes) """ # Validate arg types if not isinstance(input_file, (str, bytes, bytearray, io.BytesIO)): raise TypeError(input_file) if not isinstance(content_management_policy, (str, bytes, bytearray, io.BytesIO, glasswall.content_management.policies.policy.Policy)): raise TypeError(content_management_policy) if not isinstance(output_file, (type(None), str)): raise TypeError(output_file) if not isinstance(output_report, (type(None), str)): raise TypeError(output_report) if not isinstance(homoglyphs, (type(None), str, bytes, bytearray, io.BytesIO)): raise TypeError(homoglyphs) # Convert string path arguments to absolute paths if isinstance(output_file, str): output_file = os.path.abspath(output_file) if isinstance(output_report, str): output_report = os.path.abspath(output_report) # Convert inputs to bytes if isinstance(input_file, str): with open(input_file, "rb") as f: input_file_bytes = f.read() elif isinstance(input_file, (bytes, bytearray, io.BytesIO)): input_file_bytes = utils.as_bytes(input_file) # warn if input_file is 0 bytes if not input_file_bytes: log.warning(f"input_file is 0 bytes\n\tinput_file: {input_file}") if isinstance(homoglyphs, str): with open(homoglyphs, "rb") as f: homoglyphs_bytes = f.read() elif isinstance(homoglyphs, (bytes, bytearray, io.BytesIO)): homoglyphs_bytes = utils.as_bytes(homoglyphs) elif isinstance(homoglyphs, type(None)): # Load default with open(os.path.join(glasswall._ROOT, "config", "word_search", "homoglyphs.json"), "rb") as f: homoglyphs_bytes = f.read() if isinstance(content_management_policy, str) and os.path.isfile(content_management_policy): with open(content_management_policy, "rb") as f: content_management_policy = f.read() content_management_policy = utils.validate_xml(content_management_policy) # Variable initialisation ct_input_buffer = ct.c_char_p(input_file_bytes) ct_input_buffer_length = ct.c_size_t(len(input_file_bytes)) ct_output_buffer = ct.c_void_p() ct_output_buffer_length = ct.c_size_t() ct_output_report_buffer = ct.c_void_p() ct_output_report_buffer_length = ct.c_size_t() ct_homoglyphs = ct.c_char_p(homoglyphs_bytes) ct_content_management_policy = ct.c_char_p(content_management_policy.encode()) gw_return_object = glasswall.GwReturnObj() with utils.CwdHandler(new_cwd=self.library_path): gw_return_object.status = self.library.GwWordSearch( ct_input_buffer, ct_input_buffer_length, ct.byref(ct_output_buffer), ct.byref(ct_output_buffer_length), ct.byref(ct_output_report_buffer), ct.byref(ct_output_report_buffer_length), ct_homoglyphs, ct_content_management_policy ) gw_return_object.output_file = utils.buffer_to_bytes( ct_output_buffer, ct_output_buffer_length ) gw_return_object.output_report = utils.buffer_to_bytes( ct_output_report_buffer, ct_output_report_buffer_length ) input_file_repr = f"{type(input_file_bytes)} length {len(input_file_bytes)}" if not isinstance(input_file, str) else input_file output_file_repr = f"{type(gw_return_object.output_file)} length {len(gw_return_object.output_file)}" output_report_repr = f"{type(gw_return_object.output_report)} length {len(gw_return_object.output_report)}" homoglyphs_repr = f"{type(homoglyphs_bytes)} length {len(homoglyphs_bytes)}" if not isinstance(homoglyphs, str) else homoglyphs if gw_return_object.status not in successes.success_codes: log.error(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file_repr}\n\toutput_report: {output_report_repr}\n\thomoglyphs: {homoglyphs_repr}\n\tstatus: {gw_return_object.status}\n\tcontent_management_policy:\n{content_management_policy}") if raise_unsupported: raise errors.error_codes.get(gw_return_object.status, errors.UnknownErrorCode)(gw_return_object.status) else: log.debug(f"\n\tinput_file: {input_file_repr}\n\toutput_file: {output_file_repr}\n\toutput_report: {output_report_repr}\n\thomoglyphs: {homoglyphs_repr}\n\tstatus: {gw_return_object.status}\n\tcontent_management_policy:\n{content_management_policy}") # Write output file if isinstance(output_file, str): os.makedirs(os.path.dirname(output_file), exist_ok=True) with open(output_file, "wb") as f: f.write(gw_return_object.output_file) # Write output report if isinstance(output_report, str): os.makedirs(os.path.dirname(output_report), exist_ok=True) with open(output_report, "wb") as f: f.write(gw_return_object.output_report) if input_file_bytes and not gw_return_object.output_file: # input_file_bytes was not empty but output_file is unexpectedly empty log.error(f"output_file empty\n\tinput_file: {input_file_repr}\n\tct_output_buffer: {ct_output_buffer}\n\tct_output_buffer_length: {ct_output_buffer_length}\n\toutput_file: {gw_return_object.output_file}") if raise_unsupported: raise errors.WordSearchError(f"Unexpected empty output_file after calling GwWordSearch\n\toutput_file: {output_file}") if input_file_bytes and not gw_return_object.output_report: # input_file_bytes was not empty but output_report is unexpectedly empty log.error(f"output_report empty\n\tinput_file: {input_file_repr}\n\tct_output_report_buffer: {ct_output_report_buffer}\n\tct_output_report_buffer_length: {ct_output_report_buffer_length}") if raise_unsupported: raise errors.WordSearchError(f"Unexpected empty output_report after calling GwWordSearch\n\toutput_report: {output_report}") return gw_return_object
def version(self)
-
Returns the Glasswall library version.
Returns
version (str): The Glasswall library version.
Expand source code
def version(self): """ Returns the Glasswall library version. Returns: version (str): The Glasswall library version. """ # API function declaration self.library.GwWordSearchVersion.restype = ct.c_char_p # API call version = self.library.GwWordSearchVersion() # Convert to Python string version = ct.string_at(version).decode() return version