diff --git a/.gitmodules b/.gitmodules index 10250bf..e69de29 100644 --- a/.gitmodules +++ b/.gitmodules @@ -1,3 +0,0 @@ -[submodule "tools/sandbox_toolkit"] - path = tools/sandbox_toolkit - url = https://github.com/sektioneins/sandbox_toolkit diff --git a/.idea/.gitignore b/.idea/.gitignore new file mode 100644 index 0000000..13566b8 --- /dev/null +++ b/.idea/.gitignore @@ -0,0 +1,8 @@ +# Default ignored files +/shelf/ +/workspace.xml +# Editor-based HTTP Client requests +/httpRequests/ +# Datasource local storage ignored files +/dataSources/ +/dataSources.local.xml diff --git a/.idea/discord.xml b/.idea/discord.xml new file mode 100644 index 0000000..30bab2a --- /dev/null +++ b/.idea/discord.xml @@ -0,0 +1,7 @@ + + + + + \ No newline at end of file diff --git a/.idea/inspectionProfiles/Project_Default.xml b/.idea/inspectionProfiles/Project_Default.xml new file mode 100644 index 0000000..03d9549 --- /dev/null +++ b/.idea/inspectionProfiles/Project_Default.xml @@ -0,0 +1,6 @@ + + + + \ No newline at end of file diff --git a/.idea/inspectionProfiles/profiles_settings.xml b/.idea/inspectionProfiles/profiles_settings.xml new file mode 100644 index 0000000..105ce2d --- /dev/null +++ b/.idea/inspectionProfiles/profiles_settings.xml @@ -0,0 +1,6 @@ + + + + \ No newline at end of file diff --git a/.idea/misc.xml b/.idea/misc.xml new file mode 100644 index 0000000..dc9ea49 --- /dev/null +++ b/.idea/misc.xml @@ -0,0 +1,4 @@ + + + + \ No newline at end of file diff --git a/.idea/modules.xml b/.idea/modules.xml new file mode 100644 index 0000000..c25b1f1 --- /dev/null +++ b/.idea/modules.xml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/.idea/sandblaster.iml b/.idea/sandblaster.iml new file mode 100644 index 0000000..8a05c6e --- /dev/null +++ b/.idea/sandblaster.iml @@ -0,0 +1,12 @@ + + + + + + + + + + \ No newline at end of file diff --git a/.idea/vcs.xml b/.idea/vcs.xml new file mode 100644 index 0000000..94a25f7 --- /dev/null +++ b/.idea/vcs.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/README.md b/README.md index ccede70..c48d3b9 100644 --- a/README.md +++ b/README.md @@ -6,58 +6,50 @@ The technical report [SandBlaster: Reversing the Apple Sandbox](https://arxiv.or SandBlaster relied on previous work by [Dionysus Blazakis](https://github.com/dionthegod/XNUSandbox) and Stefan Esser's [code](https://github.com/sektioneins/sandbox_toolkit) and [slides](https://www.slideshare.net/i0n1c/ruxcon-2014-stefan-esser-ios8-containers-sandboxes-and-entitlements). -The reverser (in the `reverse-sandbox/` folder) runs on any Python running platform. The helper tools in `tools/sandbox_toolkit/` run on macOS only. +The reverser (in the `reverse-sandbox/` folder) and the helper tool (in the `helpers/` folder) run on any Python running platform. SandBlaster may be installed and run standalone, though we recommend installing and running it from within [iExtractor](https://github.com/malus-security/iExtractor). Check the [iExtractor documentation](https://github.com/malus-security/iExtractor/blob/master/README.md) for information. ## Installation -SandBlaster requires Python for the reverser (in `reverse-sandbox/`), Bash for helper scripts (in `helpers/`) and tools from the [sandbox_toolkit](https://github.com/sektioneins/sandbox_toolkit) (in `tools/`). - -After cloning the SandBlaster repository, you have to clone required tools as submodules: +SandBlaster requires Python2 for the reverser (in `reverse-sandbox/`), Python3 with `lief` library for helper script (in `helpers/`). +After cloning the SandBlaster repository, you have to install `lief` for Python3: ``` -git submodule update --init tools/sandbox_toolkit +pip3 install lief ``` -Then you build the `extract_sbops` and `extract_sbprofiles` tools: - -``` -cd tools/sandbox_toolkit/extract_sbops -make -cd ../extract_sbprofiles -make -``` +If the installation of `lief` fails you need compile to it. More information about how to compile it can be found on the [wiki page](https://lief.quarkslab.com/doc/stable/compilation.html). ## Usage -In order to use SandBlaster you need access to the binary sandbox profiles and the sandbox operations, a set of strings that define sandbox-specific actions. Sandbox operations are extracted from the kernelcache using the `helpers/extract_sandbox_operations` script, which in turn calls `tools/sandbox_toolkit/extract_sbops/extract_sbops`. Sandbox profiles are extracted either from the kernel sandbox extension (as a bundle from iOS >= 9) or from the `sandboxd` file in the iOS filesystem (for iOS <= 8) using the `helpers/extract_sandbox_profiles` script, which in turn calls `tools/sandbox_toolkit/extract_sbprofiles/extract_sbprofiles`. +In order to use SandBlaster you need access to the binary sandbox profiles and the sandbox operations, a set of strings that define sandbox-specific actions. Sandbox operations and sandbox profiles are extracted using the `helpers/extract_sandbox_data.py` script. Sandbox profiles are extracted from the kernel sandbox extension (as a bundle for iOS 4 and 9-11) or from kernel cache (as a bundle for iOS 12) or from the `sandboxd` file in the iOS filesystem (for iOS 5-8). Sandbox operations are extracted either from kernel extension (for iOS 4-11) or from kernel cache (for iOS 12). So, as input data, SandBlaster requires the kernelcache, the kernel sandbox extension and the `sandboxd` file. Information and scripts on extracting them from a publicly available IPSW (*iPhone Software*) file is presented by [iExtractor](https://github.com/malus-security/iExtractor). -Below are the steps and commands to reverse the sandbox profiles for iOS 8.4.1, assuming the kernelcache and the `sandboxd` file are available: +Below are the steps and commands to reverse the sandbox profiles for iOS 8.4.1, assuming the sandbox kernel extension (`com.apple.security.sandbox.kext`) and the `sandboxd` file are available: ``` # Extract sandbox operations from kernelcache. cd helpers/ -./extract_sandbox_operations iPad2,1_8.4.1_12H321.kernelcache.mach.arm 8.4.1 > iPad2,1_8.4.1_12H321.sb_ops +./extract_sandbox_data.py -o iPad2,1_8.4.1_12H321.sb_ops iPad2,1_8.4.1_12H321.com.apple.security.sandox.kext 8.4.1 # Extract binary sandbox profile files from sandboxd. mkdir iPad2,1_8.4.1_12H321.sandbox_profiles -./extract_sandbox_profiles iPad2,1_8.4.1_12H321.sandboxd 8.4.1 iPad2,1_8.4.1_12H321.sandbox_profiles/ +./extract_sandbox_data.py -O iPad2,1_8.4.1_12H321.sandbox_profiles/ iPad2,1_8.4.1_12H321.sandboxd 8.4.1 # Reverse all binary sandbox profiles. cd ../reverse-sandbox/ mkdir iPad2,1_8.4.1_12H321.reversed_profiles for i in ../helpers/iPad2,1_8.4.1_12H321.sandbox_profiles/*; do python reverse_sandbox.py -r 8.4.1 -o ../helpers/iPad2,1_8.4.1_12H321.sb_ops -d iPad2,1_8.4.1_12H321.reversed_profiles/ "$i"; done ``` -Below are the steps and commands to reverse the sandbox profiles for iOS 9.3, assuming the kernelcache and the kernel sandbox extension (`com.apple.security.sandbox.kext`) are available: +Below are the steps and commands to reverse the sandbox profiles for iOS 9.3, assuming the sandbox kernel extension (`com.apple.security.sandbox.kext`) is available: ``` # Extract sandbox operations from kernelcache. cd helpers/ -./extract_sandbox_operations iPhone5,1_9.3_13E237.kernelcache.mach.arm 9.3 > iPhone5,1_9.3_13E237.sb_ops +./extract_sandbox_data.py -o iPhone5,1_9.3_13E237.sb_ops iPhone5,1_9.3_13E237.com.apple.security.sandox.kext 9.3 # Extract sandbox profile bundle from kernel sandbox extension. -./extract_sandbox_profiles iPhone5,1_9.3_13E237.com.apple.security.sandox.kext 9.3 +./extract_sandbox_data.py -O . iPhone5,1_9.3_13E237.com.apple.security.sandox.kext 9.3 cd ../reverse-sandbox/ # Reverse all binary sandbox profiles in sandbox bundle. mkdir iPhone5,1_9.3_13E237.reversed_profiles @@ -67,7 +59,7 @@ python reverse_sandbox.py -r 9.3 -o ../helpers/iPhone5,1_9.3_13E237.sb_ops -d iP python reverse_sandbox.py -r 9.3 -o ../helpers/iPhone5,1_9.3_13E237.sb_ops -d iPhone5,1_9.3_13E237.reversed_profiles/ ../helpers/sandbox_bundle ``` -The extraction of the binary sandbox profiles differs between iOS <= 8 and iOS >= 9. For iOS 7 and iOS 8 the binary sandbox profiles are stored in the `sandboxd` file. Since iOS >= 9 the binary sandbox profiles are stored in a sandbox bundle in the kernel sandbox extension. The `helpers/extract_sandbox_profiles` script extracts them appropriately depending on the iOS version. +The extraction of the binary sandbox profiles differs between iOS <= 8 and iOS >= 9. Since iOS >= 9 the binary sandbox profiles are stored in a sandbox bundle in the kernel sandbox extension. The `helpers/extract_sandbox_data.py` script extracts them appropriately depending on the iOS version. The `-psb` option for `reverse_sandbox.py` prints out the sandbox profiles part of a sandbox bundle without doing the actual reversing. @@ -75,8 +67,6 @@ The `reverse_sandbox.py` script needs to be run in its directory (`reverse-sandb ## Internals -The `tools/` subfolder in the repository stores external tools, in this case [sandbox_toolkit](https://github.com/sektioneins/sandbox_toolkit) used for extracting the sandbox operations and the binary sandbox profiles. - The `helpers/` subfolder contains helper scripts that provide a nicer interface for the external tools. The actual reverser is part of the `reverse-sandbox/` folder. Files here can be categorized as follows: @@ -90,6 +80,4 @@ The actual reverser is part of the `reverse-sandbox/` folder. Files here can be ## Supported iOS Versions -SandBlaster works for iOS version 7 onwards including iOS 11. Apple has been making updates to the binary format of the sandbox profiles: since iOS 9 sandbox profiles are stored in a bundle, since iOS 10 strings are aggregated together in a specialied binary format. iOS 11 didn't bring any change to the format. - -Earlier version of iOS (<= 6) use a different format that SandBlaster doesn't (yet) support. Contributions are welcome. +SandBlaster works for iOS version 4 onwards including iOS 12. Apple has been making updates to the binary format of the sandbox profiles: since iOS 9 sandbox profiles are stored in a bundle, since iOS 10 strings are aggregated together in a specialied binary format. iOS 11 didn't bring any change to the format. diff --git a/helpers/extract_sandbox_data.py b/helpers/extract_sandbox_data.py index bcf5505..cd5b2fb 100755 --- a/helpers/extract_sandbox_data.py +++ b/helpers/extract_sandbox_data.py @@ -9,367 +9,630 @@ CONST_SECTION = '__const' DATA_SECTION = '__data' + def binary_get_word_size(binary: lief.MachO.Binary): - """Returns word size of the given binary. It returns 4 for 32bit MachO - binaries and 8 for 64bit binaries. + """Gets the word size of the given binary + + The Mach-O binary has 'magic' bytes. These bytes can be used for checking + whether the binary is 32bit or 64bit. + Note: iOS 4 and 5 are different to the other sandbox profiles as they have + no magic values. + + Args: + binary: A sandbox profile in its binary form. + + Returns: + 4: for 32bit MachO binaries + 8: for 64bit MachO binaries """ - assert(binary.header.magic in - [lief.MachO.MACHO_TYPES.MAGIC, lief.MachO.MACHO_TYPES.MAGIC_64]) + + assert (binary.header.magic in + [lief.MachO.MACHO_TYPES.MAGIC, lief.MachO.MACHO_TYPES.MAGIC_64]) + return 4 if binary.header.magic == lief.MachO.MACHO_TYPES.MAGIC else 8 def unpack(bytes_list): - """Unpack bytes + """Unpacks bytes + + The information is stored as little endian so '<' is needed. + For 32bit 'I' is needed and for 64bit 'Q'. + + Args: + bytes_list: A packed list of bytes. + + Returns: + The unpacked 'higher-order' equivalent. """ - return struct.unpack(' 0: vaddr_str = str_sect.virtual_address + strs[0] xref_vaddrs = get_xref(binary, vaddr_str) + if len(xref_vaddrs) > 0: sects = [binary.section_from_virtual_address(x) for x in xref_vaddrs] sects = [s for s in sects if 'const' in s.name.lower()] assert len(sects) >= 1 and all([sects[0] == s for s in sects]) return sects[0] + seg = binary.get_segment('__DATA') if seg: sects = [s for s in seg.sections if s.name == CONST_SECTION] - assert(len(sects) <= 1) + assert len(sects) <= 1 + if len(sects) == 1: return sects[0] - return binary.get_section(CONST_SECTION) + return binary.get_section(CONST_SECTION) -def get_data_section(binary: lief.MachO.Binary): - """Returns the data section from the given MachO binary - """ - seg = binary.get_segment('__DATA') - if seg: - sects = [s for s in seg.sections if s.name == DATA_SECTION] - assert(len(sects) == 1) - return sects[0] - return binary.get_section(DATA_SECTION) +def is_vaddr_in_section(vaddr, section): + """Checks if given virtual address is inside given section. + Args: + vaddr: A virtual address. + section: A section of the binary. -def is_vaddr_in_section(vaddr, section): - """Checks if given virtual address is inside given section. Returns true - if the preveious condition is satisfied and false otherwise. + Returns: + True: if the address is inside the section + False: Otherwise """ + return vaddr >= section.virtual_address \ and vaddr < section.virtual_address + section.size -def extract_data_tables_from_section(binary, to_data, section): +def unpack_pointer(addr_size, binary, vaddr): + """Unpacks a pointer and untags it if it is necessary. + + Args: + binary: A sandbox profile in its binary form. + vaddr: A virtual address. + addr_size: The size of an address (4 or 8). + + Returns: + A pointer. + """ + + ptr = unpack( + binary.get_content_from_virtual_address(vaddr, addr_size)) + if addr_size == 8: + ptr = untag_pointer(ptr) + return ptr + + +def extract_data_tables_from_section(binary: lief.MachO.Binary, to_data, section): """ Generic implementation of table search. A table is formed of adjacent - pointers to data. In order to check if the data is valid the provided - to_data function is used. This function should return None for invalid data - and anything else otherwise. This function searches for tables just in the - given section. It returns an array of tables(arrays of data). + pointers to data. + + Args: + binary: A sandbox profile in its binary form. + to_data: Function that checks if the data is valid. This function + returns None for invalid data and anything else otherwise. + section: A section of the binary. + + Returns: + An array of tables (arrays of data). """ + addr_size = binary_get_word_size(binary) - startaddr = section.virtual_address - endaddr = section.virtual_address + section.size + start_addr = section.virtual_address + end_addr = section.virtual_address + section.size tables = [] - vaddr = startaddr - while vaddr <= endaddr - addr_size: - ptr = unpack( - binary.get_content_from_virtual_address(vaddr, addr_size)) - if addr_size == 8: - ptr = untag_pointer(ptr) + vaddr = start_addr + + while vaddr <= end_addr - addr_size: + ptr = unpack_pointer(addr_size, binary, vaddr) + data = to_data(binary, ptr) - if data == None: + if data is None: vaddr += addr_size continue + table = [data] vaddr += addr_size - while vaddr <= endaddr - addr_size: - ptr = unpack( - binary.get_content_from_virtual_address(vaddr, addr_size)) - if addr_size == 8: - ptr = untag_pointer(ptr) + + while vaddr <= end_addr - addr_size: + ptr = unpack_pointer(addr_size, binary, vaddr) + data = to_data(binary, ptr) - if data == None: + if data is None: break + table.append(data) vaddr += addr_size + if table not in tables: tables.append(table) + vaddr += addr_size + return tables def extract_string_tables(binary: lief.MachO.Binary): """Extracts string tables from the given MachO binary. + + Args: + binary: A sandbox profile in its binary form. + + Returns: + The string tables. """ return extract_data_tables_from_section(binary, - binary_get_string_from_address, get_tables_section(binary)) + binary_get_string_from_address, + get_tables_section(binary)) def extract_separated_profiles(binary, string_tables): - """Extract separated profiles from given MachO bianry. It requires all - string tables. This function is intented to be used for older version + """Extract separated profiles from given MachO binary. It requires all + string tables. This function is intended to be used for older version of iOS(<=7) because in newer versions the sandbox profiles are bundled. + + Args: + binary: A sandbox profile in its binary form. + string_tables: The extracted string tables. + + Returns: + A zip object with profiles. """ def get_profile_names(): - """Returns the names of the sandbox profiles. + """Extracts the profile names. + + Returns: + A list with the names of the sandbox profiles. """ - def transform(v): - if len(v) <= 3: + def transform(arr): + if len(arr) <= 3: return None - r = [] - tmp =[] - for val in v: + + ans = [] + tmp = [] + for val in arr: if val in ['default', '0123456789abcdef']: - r.append(tmp) + ans.append(tmp) tmp = [] else: tmp.append(val) - r.append(tmp) - return r + ans.append(tmp) + return ans def get_sol(posible): - r = [v for v in posible - if 'com.apple.sandboxd' in v ] - assert(len(r) == 1) - return r[0] + ans = [arr for arr in posible + if 'com.apple.sandboxd' in arr] + assert len(ans) == 1 + return ans[0] profile_names_v = [transform(v) for v in string_tables] - profile_names_v = [v for v in profile_names_v if v != None] + profile_names_v = [v for v in profile_names_v if v is not None] profile_names_v = [x for v in profile_names_v for x in v] return get_sol(profile_names_v) def get_profile_contents(): - """Returns the contents of the sandbox profiles. + """Extracts the profile names. + + Returns: + The contents of the sandbox profiles. """ def get_profile_content(binary, vaddr): addr_size = binary_get_word_size(binary) - section = get_data_section(binary) + section = get_section_from_segment(binary, "__DATA", DATA_SECTION) + if not is_vaddr_in_section(vaddr, section): return None - data = binary.get_content_from_virtual_address(vaddr, 2*addr_size) - if len(data) != 2*addr_size: + + data = binary.get_content_from_virtual_address(vaddr, 2 * addr_size) + if len(data) != 2 * addr_size: return None + data_vaddr = unpack(data[:addr_size]) size = unpack(data[addr_size:]) if not is_vaddr_in_section(vaddr, section): return None + data = binary.get_content_from_virtual_address(data_vaddr, size) if len(data) != size: return None return bytes(data) - contents_v = [v for v in extract_data_tables_from_section(binary, - get_profile_content, get_tables_section(binary)) if len(v) > 3] - assert(len(contents_v) == 1) + contents_v = [v for v in + extract_data_tables_from_section(binary, + get_profile_content, + get_tables_section(binary)) + if len(v) > 3] + + assert len(contents_v) == 1 return contents_v[0] profile_names = get_profile_names() profile_contents = get_profile_contents() - assert(len(profile_names) == len(profile_contents)) + + assert len(profile_names) == len(profile_contents) return zip(profile_names, profile_contents) -def extract_sbops(binary, string_tables): - """ Extracts sandbox operations from a given MachO binary which has the - string tables provided in the string_tables param. +def extract_sbops(string_tables): + """ Extracts sandbox operations from a given MachO binary. + If the sandbox profiles are stored either in sandboxd or sandbox kernel + extension, the operations are stored always in the kernel extension. + The sandbox operations are stored similar to the separated sandbox profiles + but this time we have only one table: the name table. + + Args: + string_tables: The binary's string tables. + + Returns: + The sandbox operations. """ - def transform(v): - if len(v) <= 3: + + def transform(arr): + if len(arr) <= 3: return None + idxs = [] - for idx,val in enumerate(v): + for idx, val in enumerate(arr): if val == 'default': idxs.append(idx) - return [v[idx:] for idx in idxs] - def get_sol(posible): - assert(len(posible) >= 1) + return [arr[idx:] for idx in idxs] + + def get_sol(possible): + assert len(possible) >= 1 + sol = [] - if len(posible) > 1: - cnt = min(len(v) for v in posible) - for vals in zip(*[v[:cnt] for v in posible]): - if not all(v == vals[0] for v in vals): + if len(possible) > 1: + cnt = min(len(arr) for arr in possible) + for vals in zip(*[val[:cnt] for val in possible]): + if not all(val == vals[0] for val in vals): break sol.append(vals[0]) else: - sol.append(posible[0][0]) - for c in posible[0][1:]: - if c in ['HOME','default']: + sol.append(possible[0][0]) + for pos in possible[0][1:]: + if pos in ['HOME', 'default']: break - sol.append(c) + sol.append(pos) + return sol sbops_v = [transform(v) for v in string_tables] - sbops_v = [v for v in sbops_v if v != None and v != []] + sbops_v = [v for v in sbops_v if v is not None and v != []] sbops_v = [x for v in sbops_v for x in v] + return get_sol(sbops_v) def get_ios_major_version(version: str): - """Returns the major iOS version from a given version + """Extracts the major iOS version from a given version. + + Args: + version: A string with the 'full' version. + Returns: + An integer with the major iOS version. + """ return int(version.split('.')[0]) -def findall(searchin, pattern): - """Returns the indexes of all substrings equal to pattern inside - searchin string. +def findall(searching, pattern): + """Finds all the substring in the given string. + + Args: + searching: A string. + pattern: A pattern that needs to be searched in the searching string. + + Returns: + The indexes of all substrings equal to pattern inside searching string. """ - i = searchin.find(pattern) + i = searching.find(pattern) while i != -1: yield i - i = searchin.find(pattern, i+1) + i = searching.find(pattern, i + 1) -def check_regex(data: bytes, base_index: int): - """ Checks if the regular expression(from sandbox profile) at offset +def check_regex(data: bytes, base_index: int, ios_version: int): + """ Checks if the regular expression (from sandbox profile) at offset base_index from data is valid for newer versions of iOS(>=8). + + Args: + data: An array of bytes. + base_index: The starting index. + ios_version: An integer representing the iOS version. + + Returns: + True: if the regular expression is valid for iOS version >= 8. + False: otherwise. """ if base_index + 0x10 > len(data): return False - size = struct.unpack('I', data[base_index+0x4: base_index+0x8])[0] + + if ios_version >= 13: + size = struct.unpack('I', data[base_index + 0x2: base_index + 0x6])[0] + else: + size = struct.unpack('I', data[base_index + 0x4: base_index + 0x8])[0] + if size > 0x1000 or size < 0x8 or base_index + size + 4 > len(data): return False + if version != 3: return False - subsize = struct.unpack('= 13: + sub_size = struct.unpack('= 13). + + Args: + base_index: The starting index. + count: Bundle size. + data: An array of bytes. + Returns: + The new base index and an offset. + """ + + re_offset = base_index + 12 + op_nodes_count = struct.unpack('=8). + + Args: + data: An array of bytes. + base_index: The starting index. + ios_version: An integer representing the iOS version. + + Returns: + True: if the sandbox profile bundle is valid. + False: otherwise. """ if len(data) - base_index < 50: return False - re_offset, aux = struct.unpack('<2H', data[base_index+2:base_index+6]) - if ios_version >= 12: - count = (aux - re_offset)*4 + re_offset, aux = struct.unpack('<2H', data[base_index + 2:base_index + 6]) + + if ios_version >= 13: + count = struct.unpack('= 12: + count = (aux - re_offset) * 4 # bundle should be big if count < 0x10: return False else: count = aux + if count > 0x1000 or re_offset < 0x10: return False - re_offset = base_index + re_offset*8 - if len(data) - re_offset < count * 2: - return False - for off_index in range(re_offset, re_offset + 2*count, 2): - index = struct.unpack('= 13: + base_index, re_offset = unpack_for_newer_ios(base_index, count, data) + + else: + re_offset = base_index + re_offset * 8 + if len(data) - re_offset < count * 2: + return False + + for off_index in range(re_offset, re_offset + 2 * count, 2): + index = struct.unpack('\n" - (name, argument) = self.value.values() - if argument == None: - result_str += (level+1)*"\t" + "\n" - else: - arg = str(argument).replace('&', '&').replace('"', '"').replace('\'', ''').replace('<', '<').replace('>', '>') - result_str += (level+1)*"\t" + "\n" - result_str += level*"\t" + "\n" - else: - (name, argument) = self.value.values() - if argument == None: - result_str += level*"\t" + "\n" - else: - arg = str(argument).replace('&', '&').replace('"', '"').replace('\'', ''').replace('<', '<').replace('>', '>') - result_str += level*"\t" + "\n" - elif self.is_type_require_entitlement(): - if self.is_not: - result_str += level*"\t" + "\n" - level += 1 - result_str += level*"\t" + "', '>') - result_str += " value=\"" + _tmp + "\" />\n" - else: - _tmp = str(n.value)[21:-1].replace('&', '&').replace('"', '"').replace('\'', ''').replace('<', '<').replace('>', '>') - result_str += " value=\"" + _tmp + "\">\n" - result_str += i.recursive_xml_str(level+1, self.is_not) - result_str += level*"\t" + "\n" - if self.is_not: - level -= 1 - result_str += level*"\t" + "\n" - else: - result_str += level*"\t" + "\n" - for i, v in enumerate(self.value): - result_str += v.recursive_xml_str(level+1, recursive_is_not) - result_str += level*"\t" + "\n" - return result_str - - def __str__(self): - return self.recursive_str(1, False) - - def str_debug(self): - return self.recursive_str_debug(1, False) - - def str_simple(self): - if self.is_type_single(): - return self.value.str_debug() - elif self.is_type_require_any(): - return "require-any" - elif self.is_type_require_all(): - return "require-all" - elif self.is_type_require_entitlement(): - return self.value.str_debug()[1:-1] - elif self.is_type_start(): - return "start" - else: - return "unknown-type" - - def str_print_debug(self): - if self.is_type_single(): - return (self.value.str_debug(), None) - elif self.is_type_require_any(): - return ("(require-any", ")") - elif self.is_type_require_all(): - return ("(require-all", ")") - elif self.is_type_require_entitlement(): - return (self.value.str_debug()[:-1], ")") - elif self.is_type_start(): - return (None, None) - else: - return ("unknown-type", None) - - def str_print(self): - if self.is_type_single(): - return (str(self.value), None) - elif self.is_type_require_any(): - return ("(require-any", ")") - elif self.is_type_require_all(): - return ("(require-all", ")") - elif self.is_type_require_entitlement(): - return (str(self.value)[:-1], ")") - elif self.is_type_start(): - return (None, None) - else: - return ("unknown-type", None) - - def str_print_not(self): - result_str = "" - if self.is_type_single(): - if self.is_not: - value = str(self.value) - if "(require-any" in value: - result_str = self.value.str_not() - else: - result_str += "(require-not " + str(self.value) + ")" - return result_str - - def xml_str(self): - return self.recursive_xml_str(3, False) - - -class ReducedEdge(): - start = None - end = None - - def __init__(self, start=None, end=None): - self.start = start - self.end = end - - def str_debug(self): - return self.start.str_debug() + " -> " + self.end.str_debug() - - def str_simple(self): - #print "start: %s" % (self.start.str_simple()) - #print "end: %s" % (self.end.str_simple()) - return "%s -----> %s" % (self.start.str_simple(), self.end.str_simple()) - - def __str__(self): - return str(self.start) + " -> " + str(self.end) - - -class ReducedGraph(): - vertices = [] - edges = [] - final_vertices = [] - reduce_changes_occurred = False - - def __init__(self): - self.vertices = [] - self.edges = [] - self.final_vertices = [] - self.reduce_changes_occurred = False - - def add_vertice(self, v): - self.vertices.append(v) - - def add_edge(self, e): - self.edges.append(e) - - def add_edge_by_vertices(self, v_start, v_end): - e = ReducedEdge(v_start, v_end) - self.edges.append(e) - - def set_final_vertices(self): - self.final_vertices = [] - for v in self.vertices: - is_final = True - for e in self.edges: - if v == e.start: - is_final = False - break - if is_final: - self.final_vertices.append(v) - - def contains_vertice(self, v): - return v in self.vertices - - def contains_edge(self, e): - return e in self.edges - - def contains_edge_by_vertices(self, v_start, v_end): - for e in self.edges: - if e.start == v_start and e.end == v_end: - return True - return False - - def get_vertice_by_value(self, value): - for v in self.vertices: - if v.is_type_single(): - if v.value == value: - return v - - def get_edge_by_vertices(self, v_start, v_end): - for e in self.edges: - if e.start == v_start and e.end == v_end: - return e - return None - - def remove_vertice(self, v): - edges_copy = list(self.edges) - for e in edges_copy: - if e.start == v or e.end == v: - self.edges.remove(e) - if v in self.vertices: - self.vertices.remove(v) - - def remove_vertice_update_decision(self, v): - edges_copy = list(self.edges) - for e in edges_copy: - if e.start == v: - self.edges.remove(e) - if e.end == v: - e.start.decision = v.decision - self.edges.remove(e) - if v in self.vertices: - self.vertices.remove(v) - - def remove_edge(self, e): - if e in self.edges: - self.edges.remove(e) - - def remove_edge_by_vertices(self, v_start, v_end): - e = self.get_edge_by_vertices(v_start, v_end) - if e: - self.edges.remove(e) - - def replace_vertice_in_edge_start(self, old, new): - global replace_occurred - for e in self.edges: - if e.start == old: - e.start = new - replace_occurred = True - else: - if isinstance(e.start.value, list): - e.start.replace_in_list(old, new) - if replace_occurred: - e.start.decision = new.decision - - def replace_vertice_in_edge_end(self, old, new): - global replace_occurred - for e in self.edges: - if e.end == old: - e.end = new - replace_occurred = True - else: - if isinstance(e.end.value, list): - e.end.replace_in_list(old, new) - if replace_occurred: - e.end.decision = new.decision - - def replace_vertice_in_single_vertices(self, old, new): - for v in self.vertices: - if len(self.get_next_vertices(v)) == 0 and len(self.get_prev_vertices(v)) == 0: - if isinstance(v.value, list): - v.replace_in_list(old, new) - - def replace_vertice_list(self, old, new): - for v in self.vertices: - if isinstance(v.value, list): - v.replace_sublist_in_list(old, new) - if set(self.get_next_vertices(v)) == set(old): - for n in old: - self.remove_edge_by_vertices(v, n) - self.add_edge_by_vertices(v, new) - if set(self.get_prev_vertices(v)) == set(old): - for n in old: - self.remove_edge_by_vertices(n, v) - self.add_edge_by_vertices(new, v) - - def get_next_vertices(self, v): - next_vertices = [] - for e in self.edges: - if e.start == v: - next_vertices.append(e.end) - return next_vertices - - def get_prev_vertices(self, v): - prev_vertices = [] - for e in self.edges: - if e.end == v: - prev_vertices.append(e.start) - return prev_vertices - - def get_start_vertices(self): - start_vertices = [] - for v in self.vertices: - if not self.get_prev_vertices(v): - start_vertices.append(v) - return start_vertices - - def get_end_vertices(self): - end_vertices = [] - for v in self.vertices: - if not self.get_next_vertices(v): - end_vertices.append(v) - return end_vertices - - def reduce_next_vertices(self, v): - next_vertices = self.get_next_vertices(v) - if len(next_vertices) <= 1: - return - self.reduce_changes_occurred = True - new_vertice = ReducedVertice("require-any", next_vertices, next_vertices[0].decision) - add_to_final = False - for n in next_vertices: - self.remove_edge_by_vertices(v, n) - self.replace_vertice_list(next_vertices, new_vertice) - for n in next_vertices: - if n in self.final_vertices: - self.final_vertices.remove(n) - add_to_final = True - # If no more next vertices, remove vertice. - if not self.get_next_vertices(n): - if n in self.vertices: - self.vertices.remove(n) - self.add_edge_by_vertices(v, new_vertice) - self.add_vertice(new_vertice) - if add_to_final: - self.final_vertices.append(new_vertice) - - def reduce_prev_vertices(self, v): - prev_vertices = self.get_prev_vertices(v) - if len(prev_vertices) <= 1: - return - self.reduce_changes_occurred = True - new_vertice = ReducedVertice("require-any", prev_vertices, v.decision) - for p in prev_vertices: - self.remove_edge_by_vertices(p, v) - self.replace_vertice_list(prev_vertices, new_vertice) - for p in prev_vertices: - # If no more prev vertices, remove vertice. - if not self.get_prev_vertices(p): - if p in self.vertices: - self.vertices.remove(p) - self.add_vertice(new_vertice) - self.add_edge_by_vertices(new_vertice, v) - - def reduce_vertice_single_prev(self, v): - global replace_occurred - prev = self.get_prev_vertices(v) - if len(prev) != 1: - logger.debug("not a single prev for node") - return - p = prev[0] - nexts = self.get_next_vertices(p) - if len(nexts) > 1 or nexts[0] != v: - logger.debug("multiple nexts for prev") - return - require_all_vertices = [] - if p.is_type_require_all(): - require_all_vertices.extend(p.value) - else: - require_all_vertices.append(p) - if v.is_type_require_all(): - require_all_vertices.extend(v.value) - else: - require_all_vertices.append(v) - new_vertice = ReducedVertice("require-all", require_all_vertices, v.decision) - self.remove_edge_by_vertices(p, v) - replace_occurred = False - self.replace_vertice_in_edge_start(v, new_vertice) - self.replace_vertice_in_edge_end(p, new_vertice) - self.replace_vertice_in_single_vertices(p, new_vertice) - self.replace_vertice_in_single_vertices(v, new_vertice) - self.remove_vertice(p) - self.remove_vertice(v) - if not replace_occurred: - self.add_vertice(new_vertice) - if v in self.final_vertices: - self.final_vertices.remove(v) - self.final_vertices.append(new_vertice) - - def reduce_vertice_single_next(self, v): - global replace_occurred - next = self.get_next_vertices(v) - if len(next) != 1: - return - n = next[0] - prevs = self.get_prev_vertices(n) - if len(prevs) > 1 or prevs[0] != v: - return - require_all_vertices = [] - if v.is_type_require_all(): - require_all_vertices.extend(v.value) - else: - require_all_vertices.append(v) - if n.is_type_require_all(): - require_all_vertices.extend(n.value) - else: - require_all_vertices.append(n) - new_vertice = ReducedVertice("require-all", require_all_vertices, n.decision) - self.remove_edge_by_vertices(v, n) - replace_occurred = False - self.replace_vertice_in_edge_start(n, new_vertice) - self.replace_vertice_in_edge_end(e, new_vertice) - self.replace_vertice_in_single_vertices(v, new_vertice) - self.replace_vertice_in_single_vertices(n, new_vertice) - self.remove_vertice(v) - self.remove_vertice(n) - if not replace_occurred: - self.add_vertice(new_vertice) - if n in self.final_vertices: - self.final_vertices.remove(n) - self.final_vertices.append(new_vertice) - - def reduce_graph(self): - self.set_final_vertices() - - logger.debug("before everything:\n" + self.str_simple()) - # Do until no more changes. - while True: - self.reduce_changes_occurred = False - copy_vertices = list(self.vertices) - for v in copy_vertices: - self.reduce_next_vertices(v) - if self.reduce_changes_occurred == False: - break - logger.debug("after next:\n" + self.str_simple()) - # Do until no more changes. - while True: - self.reduce_changes_occurred = False - copy_vertices = list(self.vertices) - for v in copy_vertices: - self.reduce_prev_vertices(v) - if self.reduce_changes_occurred == False: - break - logger.debug("after next/prev:\n" + self.str_simple()) - - # Reduce graph starting from final vertices. Keep going until - # final vertices don't change during an iteration. - while True: - copy_final_vertices = list(self.final_vertices) - for v in copy_final_vertices: - logger.debug("reducing single prev vertex: " + v.str_debug()) - self.reduce_vertice_single_prev(v) - logger.debug("### new graph is:") - logger.debug(self.str_simple()) - if set(copy_final_vertices) == set(self.final_vertices): - break - for e in self.edges: - v = e.end - logger.debug("reducing single prev vertex: " + v.str_debug()) - self.reduce_vertice_single_prev(v) - logger.debug("after everything:\n" + self.str_simple()) - - def reduce_graph_with_metanodes(self): - # Add require-any metanode if current node has multiple successors. - copy_vertices = list(self.vertices) - for v in copy_vertices: - nlist = self.get_next_vertices(v) - if len(nlist) >= 2: - new_node = ReducedVertice("require-any", None, None) - self.add_vertice(new_node) - self.add_edge_by_vertices(v, new_node) - for n in nlist: - self.remove_edge_by_vertices(v, n) - self.add_edge_by_vertices(new_node, n) - - start_list = self.get_start_vertices() - new_node = ReducedVertice("start", None, None) - self.add_vertice(new_node) - for s in start_list: - self.add_edge_by_vertices(new_node, s) - - # Add require-all metanode if current node has a require-any as a predecessor and is followed by another node. - copy_vertices = list(self.vertices) - for v in copy_vertices: - prev_vertices = list(self.get_prev_vertices(v)) - next_vertices = list(self.get_next_vertices(v)) - for p in prev_vertices: - if (p.is_type_require_any() or p.is_type_start()) and next_vertices: - # Except for when a require-entitlement ending block. - if v.is_type_require_entitlement(): - has_next_nexts = False - for n in next_vertices: - if n.is_type_require_any(): - for n2 in self.get_next_vertices(n): - if self.get_next_vertices(n2): - has_next_nexts = True - break - else: - if self.get_next_vertices(n): - has_next_nexts = True - break - if not has_next_nexts: - continue - new_node = ReducedVertice("require-all", None, None) - self.add_vertice(new_node) - self.remove_edge_by_vertices(p, v) - self.add_edge_by_vertices(p, new_node) - self.add_edge_by_vertices(new_node, v) - - def str_simple_with_metanodes(self): - logger.debug("==== vertices:\n") - for v in self.vertices: - logger.debug(v.str_simple()) - logger.debug("==== edges:\n") - for e in self.edges: - logger.debug(e.str_simple()) - - def str_simple(self): - message = "==== vertices:\n" - for v in self.vertices: - message += "decision: " + str(v.decision) + "\t" + v.str_debug() + "\n" - message += "==== final vertices:\n" - for v in self.final_vertices: - message += "decision: " + str(v.decision) + "\t" + v.str_debug() + "\n" - message += "==== edges:\n" - for e in self.edges: - message += "\t" + e.str_debug() + "\n" - return message - - def __str__(self): - result_str = "" - for v in self.vertices: - result_str += "(" + str(v.decision) + " " - if len(self.get_next_vertices(v)) == 0 and len(self.get_next_vertices(v)) == 0: - if v in self.final_vertices: - result_str += str(v) + "\n" - result_str += ")\n" - for e in self.edges: - result_str += str(e) + "\n" - result_str += "\n" - return result_str - - def remove_builtin_filters(self): - copy_vertices = list(self.vertices) - for v in copy_vertices: - if re.search("###\$\$\$\*\*\*", str(v)): - self.remove_vertice_update_decision(v) - - def reduce_integrated_vertices(self, integrated_vertices): - if len(integrated_vertices) == 0: - return (None, None) - if len(integrated_vertices) > 1: - return (ReducedVertice("require-any", integrated_vertices, integrated_vertices[0].decision), integrated_vertices[0].decision) - require_all_vertices = [] - v = integrated_vertices[0] - decision = None - while True: - if not re.search("entitlement-value #t", str(v)): - require_all_vertices.append(v) - next_vertices = self.get_next_vertices(v) - if decision == None and v.decision != None: - decision = v.decision - self.remove_vertice(v) - if v in self.final_vertices: - self.final_vertices.remove(v) - if next_vertices: - v = next_vertices[0] - else: - break - if len(require_all_vertices) == 0: - return (None, v.decision) - if len(require_all_vertices) == 1: - return (ReducedVertice(value=require_all_vertices[0].value, decision=require_all_vertices[0].decision, is_not=require_all_vertices[0].is_not), v.decision) - return (ReducedVertice("require-all", require_all_vertices, require_all_vertices[len(require_all_vertices)-1].decision), v.decision) - - def aggregate_require_entitlement(self, v): - next_vertices = [] - prev_vertices = self.get_prev_vertices(v) - integrated_vertices = [] - for n in self.get_next_vertices(v): - if not re.search("entitlement-value", str(n)): - next_vertices.append(n) - break - integrated_vertices.append(n) - current_list = [ n ] - while current_list: - current = current_list.pop() - for n2 in self.get_next_vertices(current): - if not re.search("entitlement-value", str(n2)): - self.remove_edge_by_vertices(current, n2) - next_vertices.append(n2) - else: - current_list.append(n2) - new_vertice = ReducedVertice(type="require-entitlement", value=(v, None), decision=None, is_not=v.is_not) - for p in prev_vertices: - self.remove_edge_by_vertices(p, v) - self.add_edge_by_vertices(p, new_vertice) - for n in next_vertices: - self.remove_edge_by_vertices(v, n) - self.add_edge_by_vertices(new_vertice, n) - for i in integrated_vertices: - self.remove_edge_by_vertices(v, i) - self.remove_vertice(v) - self.add_vertice(new_vertice) - if v in self.final_vertices: - self.final_vertices.remove(v) - self.final_vertices.append(new_vertice) - (new_integrate, decision) = self.reduce_integrated_vertices(integrated_vertices) - for i in integrated_vertices: - self.remove_vertice(i) - if i in self.final_vertices: - self.final_vertices.remove(i) - new_vertice.set_integrated_vertice(new_integrate) - new_vertice.set_decision(decision) - - def aggregate_require_entitlement_nodes(self): - copy_vertices = list(self.vertices) - idx = 0 - while idx < len(copy_vertices): - v = copy_vertices[idx] - if re.search("require-entitlement", str(v)): - self.aggregate_require_entitlement(v) - idx += 1 - - def cleanup_filters(self): - self.remove_builtin_filters() - self.aggregate_require_entitlement_nodes() - - def remove_builtin_filters_with_metanodes(self): - copy_vertices = list(self.vertices) - for v in copy_vertices: - if re.search("###\$\$\$\*\*\*", v.str_simple()): - self.remove_vertice(v) - elif re.search("entitlement-value #t", v.str_simple()): - self.remove_vertice(v) - elif re.search("entitlement-value-regex #\"\.\"", v.str_simple()): - v.value.non_terminal.argument = "#\".+\"" - elif re.search("global-name-regex #\"\.\"", v.str_simple()): - v.value.non_terminal.argument = "#\".+\"" - elif re.search("local-name-regex #\"\.\"", v.str_simple()): - v.value.non_terminal.argument = "#\".+\"" - - def replace_require_entitlement_with_metanodes(self, v): - prev_list = self.get_prev_vertices(v) - next_list = self.get_next_vertices(v) - new_node = ReducedVertice(type="require-entitlement", value=v.value, decision=None, is_not=v.is_not) - self.add_vertice(new_node) - self.remove_vertice(v) - for p in prev_list: - self.add_edge_by_vertices(p, new_node) - for n in next_list: - self.add_edge_by_vertices(new_node, n) - - def aggregate_require_entitlement_with_metanodes(self): - copy_vertices = list(self.vertices) - for v in copy_vertices: - if re.search("require-entitlement", str(v)): - self.replace_require_entitlement_with_metanodes(v) - - def cleanup_filters_with_metanodes(self): - self.remove_builtin_filters_with_metanodes() - self.aggregate_require_entitlement_with_metanodes() - - def print_vertices_with_operation(self, operation, out_f): - allow_vertices = [v for v in self.vertices if v.decision == "allow"] - deny_vertices = [v for v in self.vertices if v.decision == "deny"] - if allow_vertices: - out_f.write("(allow %s " % (operation)) - if len(allow_vertices) > 1: - for v in allow_vertices: - out_f.write("\n" + 8*" " + str(v)) - else: - out_f.write(str(allow_vertices[0])) - out_f.write(")\n") - if deny_vertices: - out_f.write("(deny %s " % (operation)) - if len(deny_vertices) > 1: - for v in deny_vertices: - out_f.write("\n" + 8*" " + str(v)) - else: - out_f.write(str(deny_vertices[0])) - out_f.write(")\n") - - def print_vertices_with_operation_metanodes(self, operation, default_is_allow, out_f): - # Return if only start node in list. - if len(self.vertices) == 1 and self.vertices[0].is_type_start(): - return - # Use reverse of default rule. - if default_is_allow: - out_f.write("(deny %s" % (operation)) - else: - out_f.write("(allow %s" % (operation)) - vlist = [] - start_list = self.get_start_vertices() - start_list.reverse() - vlist.insert(0, (None, 0)) - for s in start_list: - vlist.insert(0, (s, 1)) - while True: - if not vlist: - break - (cnode, indent) = vlist.pop(0) - if not cnode: - out_f.write(")") - continue - (first, last) = cnode.str_print() - if first: - if cnode.is_not: - if cnode.str_print_not() != "": - out_f.write("\n" + indent * "\t" + cnode.str_print_not()) - else: - out_f.write("\n" + indent * "\t" + "(require-not " + first) - if cnode.is_type_require_any() or cnode.is_type_require_all() or cnode.is_type_require_entitlement(): - vlist.insert(0, (None, indent)) - else: - out_f.write(")") - else: - out_f.write("\n" + indent * "\t" + first) - if last: - vlist.insert(0, (None, indent)) - next_vertices_list = self.get_next_vertices(cnode) - if next_vertices_list: - if cnode.is_type_require_any() or cnode.is_type_require_all() or cnode.is_type_require_entitlement(): - indent += 1 - next_vertices_list.reverse() - if cnode.is_type_require_entitlement(): - pos = 0 - for n in next_vertices_list: - if (n.is_type_single() and not re.search("entitlement-value", n.str_simple())) or \ - n.is_type_require_entitlement(): - vlist.insert(pos + 1, (n, indent-1)) - else: - vlist.insert(0, (n, indent)) - pos += 1 - else: - for n in next_vertices_list: - vlist.insert(0, (n, indent)) - out_f.write("\n") - - def dump_xml(self, operation, out_f): - allow_vertices = [v for v in self.vertices if v.decision == "allow"] - deny_vertices = [v for v in self.vertices if v.decision == "deny"] - if allow_vertices: - out_f.write("\t\n" % (operation)) - out_f.write("\t\t\n") - for v in allow_vertices: - out_f.write(v.xml_str()) - out_f.write("\t\t\n") - out_f.write("\t\n") - if deny_vertices: - out_f.write("\t\n" % (operation)) - out_f.write("\t\t\n") - for v in deny_vertices: - out_f.write(v.xml_str()) - out_f.write("\t\t\n") - out_f.write("\t\n") - - -def reduce_operation_node_graph(g): - # Create reduced graph. - rg = ReducedGraph() - for node_iter in g.keys(): - rv = ReducedVertice(value=node_iter, decision=g[node_iter]["decision"], is_not=g[node_iter]["not"]) - rg.add_vertice(rv) - - for node_iter in g.keys(): - rv = rg.get_vertice_by_value(node_iter) - for node_next in g[node_iter]["list"]: - rn = rg.get_vertice_by_value(node_next) - rg.add_edge_by_vertices(rv, rn) - - # Handle special case for require-not (require-enitlement (...)). - l = len(g.keys()) - for idx, node_iter in enumerate(g.keys()): - rv = rg.get_vertice_by_value(node_iter) - if not re.search("require-entitlement", str(rv)): - continue - if not rv.is_not: - continue - c_idx = idx - while True: - c_idx += 1 - if c_idx >= l: - break - rn = rg.get_vertice_by_value(g.keys()[c_idx]) - if not re.search("entitlement-value", str(rn)): - break - prevs_rv = rg.get_prev_vertices(rv) - prevs_rn = rg.get_prev_vertices(rn) - if sorted(prevs_rv) != sorted(prevs_rn): - continue - for pn in prevs_rn: - rg.remove_edge_by_vertices(rn, pn) - rg.add_edge_by_vertices(rv, rn) - - rg.cleanup_filters_with_metanodes() - for node_iter in g.keys(): - rv = rg.get_vertice_by_value(node_iter) - rg.reduce_graph_with_metanodes() - return rg - def main(): if len(sys.argv) != 4: @@ -1722,14 +471,15 @@ def main(): num_regex = struct.unpack(" " + self.end.str_debug() + + def str_simple(self): + return "%s -----> %s" % (self.start.str_simple(), self.end.str_simple()) + + def __str__(self): + return str(self.start) + " -> " + str(self.end) diff --git a/reverse-sandbox/reduced_graph.py b/reverse-sandbox/reduced_graph.py new file mode 100644 index 0000000..9703737 --- /dev/null +++ b/reverse-sandbox/reduced_graph.py @@ -0,0 +1,652 @@ +import re +from reduced_edge import * +from reduced_node import * +from operation_node import * + + +class ReducedGraph: + vertices = [] + edges = [] + final_vertices = [] + reduce_changes_occurred = False + + def __init__(self): + self.vertices = [] + self.edges = [] + self.final_vertices = [] + self.reduce_changes_occurred = False + + def add_vertice(self, v): + self.vertices.append(v) + + def add_edge(self, e): + self.edges.append(e) + + def add_edge_by_vertices(self, v_start, v_end): + e = ReducedEdge(v_start, v_end) + self.edges.append(e) + + def set_final_vertices(self): + self.final_vertices = [] + for v in self.vertices: + is_final = True + for e in self.edges: + if v == e.start: + is_final = False + break + if is_final: + self.final_vertices.append(v) + + def contains_vertice(self, v): + return v in self.vertices + + def contains_edge(self, e): + return e in self.edges + + def contains_edge_by_vertices(self, v_start, v_end): + for e in self.edges: + if e.start == v_start and e.end == v_end: + return True + return False + + def get_vertice_by_value(self, value): + for v in self.vertices: + if v.is_type_single(): + if v.value == value: + return v + + def get_edge_by_vertices(self, v_start, v_end): + for e in self.edges: + if e.start == v_start and e.end == v_end: + return e + return None + + def remove_vertice(self, v): + edges_copy = list(self.edges) + for e in edges_copy: + if e.start == v or e.end == v: + self.edges.remove(e) + if v in self.vertices: + self.vertices.remove(v) + + def remove_vertice_update_decision(self, v): + edges_copy = list(self.edges) + for e in edges_copy: + if e.start == v: + self.edges.remove(e) + if e.end == v: + e.start.decision = v.decision + self.edges.remove(e) + if v in self.vertices: + self.vertices.remove(v) + + def remove_edge(self, e): + if e in self.edges: + self.edges.remove(e) + + def remove_edge_by_vertices(self, v_start, v_end): + e = self.get_edge_by_vertices(v_start, v_end) + if e: + self.edges.remove(e) + + def replace_vertice_in_edge_start(self, old, new): + global replace_occurred + for e in self.edges: + if e.start == old: + e.start = new + replace_occurred = True + else: + if isinstance(e.start.value, list): + e.start.replace_in_list(old, new) + if replace_occurred: + e.start.decision = new.decision + + def replace_vertice_in_edge_end(self, old, new): + global replace_occurred + for e in self.edges: + if e.end == old: + e.end = new + replace_occurred = True + else: + if isinstance(e.end.value, list): + e.end.replace_in_list(old, new) + if replace_occurred: + e.end.decision = new.decision + + def replace_vertice_in_single_vertices(self, old, new): + for v in self.vertices: + if len(self.get_next_vertices(v)) == 0 and len(self.get_prev_vertices(v)) == 0: + if isinstance(v.value, list): + v.replace_in_list(old, new) + + def replace_vertice_list(self, old, new): + for v in self.vertices: + if isinstance(v.value, list): + v.replace_sublist_in_list(old, new) + if set(self.get_next_vertices(v)) == set(old): + for n in old: + self.remove_edge_by_vertices(v, n) + self.add_edge_by_vertices(v, new) + if set(self.get_prev_vertices(v)) == set(old): + for n in old: + self.remove_edge_by_vertices(n, v) + self.add_edge_by_vertices(new, v) + + def get_next_vertices(self, v): + next_vertices = [] + for e in self.edges: + if e.start == v: + next_vertices.append(e.end) + return next_vertices + + def get_prev_vertices(self, v): + prev_vertices = [] + for e in self.edges: + if e.end == v: + prev_vertices.append(e.start) + return prev_vertices + + def get_start_vertices(self): + start_vertices = [] + for v in self.vertices: + if not self.get_prev_vertices(v): + start_vertices.append(v) + return start_vertices + + def get_end_vertices(self): + end_vertices = [] + for v in self.vertices: + if not self.get_next_vertices(v): + end_vertices.append(v) + return end_vertices + + def reduce_next_vertices(self, v): + next_vertices = self.get_next_vertices(v) + if len(next_vertices) <= 1: + return + self.reduce_changes_occurred = True + new_vertice = ReducedVertice("require-any", next_vertices, next_vertices[0].decision) + add_to_final = False + for n in next_vertices: + self.remove_edge_by_vertices(v, n) + self.replace_vertice_list(next_vertices, new_vertice) + for n in next_vertices: + if n in self.final_vertices: + self.final_vertices.remove(n) + add_to_final = True + # If no more next vertices, remove vertice. + if not self.get_next_vertices(n): + if n in self.vertices: + self.vertices.remove(n) + self.add_edge_by_vertices(v, new_vertice) + self.add_vertice(new_vertice) + if add_to_final: + self.final_vertices.append(new_vertice) + + def reduce_prev_vertices(self, v): + prev_vertices = self.get_prev_vertices(v) + if len(prev_vertices) <= 1: + return + self.reduce_changes_occurred = True + new_vertice = ReducedVertice("require-any", prev_vertices, v.decision) + for p in prev_vertices: + self.remove_edge_by_vertices(p, v) + self.replace_vertice_list(prev_vertices, new_vertice) + for p in prev_vertices: + # If no more prev vertices, remove vertice. + if not self.get_prev_vertices(p): + if p in self.vertices: + self.vertices.remove(p) + self.add_vertice(new_vertice) + self.add_edge_by_vertices(new_vertice, v) + + def reduce_vertice_single_prev(self, v): + global replace_occurred + prev = self.get_prev_vertices(v) + if len(prev) != 1: + logger.debug("not a single prev for node") + return + p = prev[0] + nexts = self.get_next_vertices(p) + if len(nexts) > 1 or nexts[0] != v: + logger.debug("multiple nexts for prev") + return + require_all_vertices = [] + if p.is_type_require_all(): + require_all_vertices.extend(p.value) + else: + require_all_vertices.append(p) + if v.is_type_require_all(): + require_all_vertices.extend(v.value) + else: + require_all_vertices.append(v) + new_vertice = ReducedVertice("require-all", require_all_vertices, v.decision) + self.remove_edge_by_vertices(p, v) + replace_occurred = False + self.replace_vertice_in_edge_start(v, new_vertice) + self.replace_vertice_in_edge_end(p, new_vertice) + self.replace_vertice_in_single_vertices(p, new_vertice) + self.replace_vertice_in_single_vertices(v, new_vertice) + self.remove_vertice(p) + self.remove_vertice(v) + if not replace_occurred: + self.add_vertice(new_vertice) + if v in self.final_vertices: + self.final_vertices.remove(v) + self.final_vertices.append(new_vertice) + + def reduce_vertice_single_next(self, v): + global replace_occurred + next = self.get_next_vertices(v) + if len(next) != 1: + return + n = next[0] + prevs = self.get_prev_vertices(n) + if len(prevs) > 1 or prevs[0] != v: + return + self.extend_require_all(n, v) + + def extend_require_all(self, n, v): + global replace_occurred + require_all_vertices = [] + if v.is_type_require_all(): + require_all_vertices.extend(v.value) + else: + require_all_vertices.append(v) + if n.is_type_require_all(): + require_all_vertices.extend(n.value) + else: + require_all_vertices.append(n) + new_vertice = ReducedVertice("require-all", require_all_vertices, n.decision) + self.remove_edge_by_vertices(v, n) + replace_occurred = False + self.replace_vertice_in_edge_start(n, new_vertice) + self.replace_vertice_in_edge_end(v, new_vertice) + self.replace_vertice_in_single_vertices(v, new_vertice) + self.replace_vertice_in_single_vertices(n, new_vertice) + self.remove_vertice(v) + self.remove_vertice(n) + if not replace_occurred: + self.add_vertice(new_vertice) + if n in self.final_vertices: + self.final_vertices.remove(n) + self.final_vertices.append(new_vertice) + + def reduce_graph(self): + self.set_final_vertices() + + logger.debug("before everything:\n" + self.str_simple()) + # Do until no more changes. + while True: + self.reduce_changes_occurred = False + copy_vertices = list(self.vertices) + for v in copy_vertices: + self.reduce_next_vertices(v) + if not self.reduce_changes_occurred: + break + logger.debug("after next:\n" + self.str_simple()) + # Do until no more changes. + while True: + self.reduce_changes_occurred = False + copy_vertices = list(self.vertices) + for v in copy_vertices: + self.reduce_prev_vertices(v) + if self.reduce_changes_occurred == False: + break + logger.debug("after next/prev:\n" + self.str_simple()) + + # Reduce graph starting from final vertices. Keep going until + # final vertices don't change during an iteration. + while True: + copy_final_vertices = list(self.final_vertices) + for v in copy_final_vertices: + logger.debug("reducing single prev vertex: " + v.str_debug()) + self.reduce_vertice_single_prev(v) + logger.debug("### new graph is:") + logger.debug(self.str_simple()) + if set(copy_final_vertices) == set(self.final_vertices): + break + for e in self.edges: + v = e.end + logger.debug("reducing single prev vertex: " + v.str_debug()) + self.reduce_vertice_single_prev(v) + logger.debug("after everything:\n" + self.str_simple()) + + def reduce_graph_with_metanodes(self): + # Add require-any metanode if current node has multiple successors. + copy_vertices = list(self.vertices) + for v in copy_vertices: + nlist = self.get_next_vertices(v) + if len(nlist) >= 2: + new_node = ReducedVertice("require-any", None, None) + self.add_vertice(new_node) + self.add_edge_by_vertices(v, new_node) + for n in nlist: + self.remove_edge_by_vertices(v, n) + self.add_edge_by_vertices(new_node, n) + + start_list = self.get_start_vertices() + new_node = ReducedVertice("start", None, None) + self.add_vertice(new_node) + for s in start_list: + self.add_edge_by_vertices(new_node, s) + + # Add require-all metanode if current node has a require-any as a predecessor and is followed by another node. + copy_vertices = list(self.vertices) + for v in copy_vertices: + prev_vertices = list(self.get_prev_vertices(v)) + next_vertices = list(self.get_next_vertices(v)) + for p in prev_vertices: + if (p.is_type_require_any() or p.is_type_start()) and next_vertices: + # Except for when a require-entitlement ending block. + if v.is_type_require_entitlement(): + has_next_nexts = False + for n in next_vertices: + if n.is_type_require_any(): + for n2 in self.get_next_vertices(n): + if self.get_next_vertices(n2): + has_next_nexts = True + break + else: + if self.get_next_vertices(n): + has_next_nexts = True + break + if not has_next_nexts: + continue + new_node = ReducedVertice("require-all", None, None) + self.add_vertice(new_node) + self.remove_edge_by_vertices(p, v) + self.add_edge_by_vertices(p, new_node) + self.add_edge_by_vertices(new_node, v) + + def str_simple_with_metanodes(self): + logger.debug("==== vertices:\n") + for v in self.vertices: + logger.debug(v.str_simple()) + logger.debug("==== edges:\n") + for e in self.edges: + logger.debug(e.str_simple()) + + def str_simple(self): + message = "==== vertices:\n" + for v in self.vertices: + message += "decision: " + str(v.decision) + "\t" + v.str_debug() + "\n" + message += "==== final vertices:\n" + for v in self.final_vertices: + message += "decision: " + str(v.decision) + "\t" + v.str_debug() + "\n" + message += "==== edges:\n" + for e in self.edges: + message += "\t" + e.str_debug() + "\n" + return message + + def __str__(self): + result_str = "" + for v in self.vertices: + result_str += "(" + str(v.decision) + " " + if len(self.get_next_vertices(v)) == 0 and len(self.get_next_vertices(v)) == 0: + if v in self.final_vertices: + result_str += str(v) + "\n" + result_str += ")\n" + for e in self.edges: + result_str += str(e) + "\n" + result_str += "\n" + return result_str + + def remove_builtin_filters(self): + copy_vertices = list(self.vertices) + for v in copy_vertices: + if re.search("###\$\$\$\*\*\*", str(v)): + self.remove_vertice_update_decision(v) + + def reduce_integrated_vertices(self, integrated_vertices): + if len(integrated_vertices) == 0: + return (None, None) + if len(integrated_vertices) > 1: + return (ReducedVertice("require-any", integrated_vertices, integrated_vertices[0].decision), integrated_vertices[0].decision) + require_all_vertices = [] + v = integrated_vertices[0] + decision = None + while True: + if not re.search("entitlement-value #t", str(v)): + require_all_vertices.append(v) + next_vertices = self.get_next_vertices(v) + if decision == None and v.decision != None: + decision = v.decision + self.remove_vertice(v) + if v in self.final_vertices: + self.final_vertices.remove(v) + if next_vertices: + v = next_vertices[0] + else: + break + if len(require_all_vertices) == 0: + return (None, v.decision) + if len(require_all_vertices) == 1: + return (ReducedVertice(value=require_all_vertices[0].value, decision=require_all_vertices[0].decision, is_not=require_all_vertices[0].is_not), v.decision) + return (ReducedVertice("require-all", require_all_vertices, require_all_vertices[len(require_all_vertices)-1].decision), v.decision) + + def aggregate_require_entitlement(self, v): + next_vertices = [] + prev_vertices = self.get_prev_vertices(v) + integrated_vertices = [] + for n in self.get_next_vertices(v): + if not re.search("entitlement-value", str(n)): + next_vertices.append(n) + break + integrated_vertices.append(n) + current_list = [ n ] + while current_list: + current = current_list.pop() + for n2 in self.get_next_vertices(current): + if not re.search("entitlement-value", str(n2)): + self.remove_edge_by_vertices(current, n2) + next_vertices.append(n2) + else: + current_list.append(n2) + new_vertice = ReducedVertice(type="require-entitlement", value=(v, None), decision=None, is_not=v.is_not) + for p in prev_vertices: + self.remove_edge_by_vertices(p, v) + self.add_edge_by_vertices(p, new_vertice) + for n in next_vertices: + self.remove_edge_by_vertices(v, n) + self.add_edge_by_vertices(new_vertice, n) + for i in integrated_vertices: + self.remove_edge_by_vertices(v, i) + self.remove_vertice(v) + self.add_vertice(new_vertice) + if v in self.final_vertices: + self.final_vertices.remove(v) + self.final_vertices.append(new_vertice) + (new_integrate, decision) = self.reduce_integrated_vertices(integrated_vertices) + for i in integrated_vertices: + self.remove_vertice(i) + if i in self.final_vertices: + self.final_vertices.remove(i) + new_vertice.set_integrated_vertice(new_integrate) + new_vertice.set_decision(decision) + + def aggregate_require_entitlement_nodes(self): + copy_vertices = list(self.vertices) + idx = 0 + while idx < len(copy_vertices): + v = copy_vertices[idx] + if re.search("require-entitlement", str(v)): + self.aggregate_require_entitlement(v) + idx += 1 + + def cleanup_filters(self): + self.remove_builtin_filters() + self.aggregate_require_entitlement_nodes() + + def remove_builtin_filters_with_metanodes(self): + copy_vertices = list(self.vertices) + for v in copy_vertices: + if re.search("###\$\$\$\*\*\*", v.str_simple()): + self.remove_vertice(v) + elif re.search("entitlement-value #t", v.str_simple()): + self.remove_vertice(v) + elif re.search("entitlement-value-regex #\"\.\"", v.str_simple()): + v.value.non_terminal.argument = "#\".+\"" + elif re.search("global-name-regex #\"\.\"", v.str_simple()): + v.value.non_terminal.argument = "#\".+\"" + elif re.search("local-name-regex #\"\.\"", v.str_simple()): + v.value.non_terminal.argument = "#\".+\"" + + def replace_require_entitlement_with_metanodes(self, v): + prev_list = self.get_prev_vertices(v) + next_list = self.get_next_vertices(v) + new_node = ReducedVertice(type="require-entitlement", value=v.value, decision=None, is_not=v.is_not) + self.add_vertice(new_node) + self.remove_vertice(v) + for p in prev_list: + self.add_edge_by_vertices(p, new_node) + for n in next_list: + self.add_edge_by_vertices(new_node, n) + + def aggregate_require_entitlement_with_metanodes(self): + copy_vertices = list(self.vertices) + for v in copy_vertices: + if re.search("require-entitlement", str(v)): + self.replace_require_entitlement_with_metanodes(v) + + def cleanup_filters_with_metanodes(self): + self.remove_builtin_filters_with_metanodes() + self.aggregate_require_entitlement_with_metanodes() + + def print_vertices_with_operation(self, operation, out_f): + allow_vertices = [v for v in self.vertices if v.decision == "allow"] + deny_vertices = [v for v in self.vertices if v.decision == "deny"] + if allow_vertices: + out_f.write("(allow %s " % (operation)) + if len(allow_vertices) > 1: + for v in allow_vertices: + out_f.write("\n" + 8*" " + str(v)) + else: + out_f.write(str(allow_vertices[0])) + out_f.write(")\n") + if deny_vertices: + out_f.write("(deny %s " % (operation)) + if len(deny_vertices) > 1: + for v in deny_vertices: + out_f.write("\n" + 8*" " + str(v)) + else: + out_f.write(str(deny_vertices[0])) + out_f.write(")\n") + + def print_vertices_with_operation_metanodes(self, operation, default_is_allow, out_f): + # Return if only start node in list. + if len(self.vertices) == 1 and self.vertices[0].is_type_start(): + return + # Use reverse of default rule. + if default_is_allow: + out_f.write("(deny %s" % (operation)) + else: + out_f.write("(allow %s" % (operation)) + vlist = [] + start_list = self.get_start_vertices() + start_list.reverse() + vlist.insert(0, (None, 0)) + for s in start_list: + vlist.insert(0, (s, 1)) + while True: + if not vlist: + break + (cnode, indent) = vlist.pop(0) + if not cnode: + out_f.write(")") + continue + (first, last) = cnode.str_print() + if first: + if cnode.is_not: + if cnode.str_print_not() != "": + out_f.write("\n" + indent * "\t" + cnode.str_print_not()) + else: + out_f.write("\n" + indent * "\t" + "(require-not " + first) + if cnode.is_type_require_any() or cnode.is_type_require_all() or cnode.is_type_require_entitlement(): + vlist.insert(0, (None, indent)) + else: + out_f.write(")") + else: + out_f.write("\n" + indent * "\t" + first) + if last: + vlist.insert(0, (None, indent)) + next_vertices_list = self.get_next_vertices(cnode) + if next_vertices_list: + if cnode.is_type_require_any() or cnode.is_type_require_all() or cnode.is_type_require_entitlement(): + indent += 1 + next_vertices_list.reverse() + if cnode.is_type_require_entitlement(): + pos = 0 + for n in next_vertices_list: + if (n.is_type_single() and not re.search("entitlement-value", n.str_simple())) or \ + n.is_type_require_entitlement(): + vlist.insert(pos + 1, (n, indent-1)) + else: + vlist.insert(0, (n, indent)) + pos += 1 + else: + for n in next_vertices_list: + vlist.insert(0, (n, indent)) + out_f.write("\n") + + def dump_xml(self, operation, out_f): + allow_vertices = [v for v in self.vertices if v.decision == "allow"] + deny_vertices = [v for v in self.vertices if v.decision == "deny"] + if allow_vertices: + out_f.write("\t\n" % (operation)) + out_f.write("\t\t\n") + for v in allow_vertices: + out_f.write(v.xml_str()) + out_f.write("\t\t\n") + out_f.write("\t\n") + if deny_vertices: + out_f.write("\t\n" % (operation)) + out_f.write("\t\t\n") + for v in deny_vertices: + out_f.write(v.xml_str()) + out_f.write("\t\t\n") + out_f.write("\t\n") + + +def reduce_operation_node_graph(g): + # Create reduced graph. + rg = ReducedGraph() + for node_iter in g.keys(): + rv = ReducedVertice(value=node_iter, decision=g[node_iter]["decision"], is_not=g[node_iter]["not"]) + rg.add_vertice(rv) + + for node_iter in g.keys(): + rv = rg.get_vertice_by_value(node_iter) + for node_next in g[node_iter]["list"]: + rn = rg.get_vertice_by_value(node_next) + rg.add_edge_by_vertices(rv, rn) + + # Handle special case for require-not (require-enitlement (...)). + l = len(g.keys()) + for idx, node_iter in enumerate(g.keys()): + rv = rg.get_vertice_by_value(node_iter) + if not re.search("require-entitlement", str(rv)): + continue + if not rv.is_not: + continue + c_idx = idx + while True: + c_idx += 1 + if c_idx >= l: + break + rn = rg.get_vertice_by_value(g.keys()[c_idx]) + if not re.search("entitlement-value", str(rn)): + break + prevs_rv = rg.get_prev_vertices(rv) + prevs_rn = rg.get_prev_vertices(rn) + if sorted(prevs_rv) != sorted(prevs_rn): + continue + for pn in prevs_rn: + rg.remove_edge_by_vertices(rn, pn) + rg.add_edge_by_vertices(rv, rn) + + rg.cleanup_filters_with_metanodes() + for node_iter in g.keys(): + rv = rg.get_vertice_by_value(node_iter) + rg.reduce_graph_with_metanodes() + return rg \ No newline at end of file diff --git a/reverse-sandbox/reduced_node.py b/reverse-sandbox/reduced_node.py new file mode 100644 index 0000000..040325f --- /dev/null +++ b/reverse-sandbox/reduced_node.py @@ -0,0 +1,274 @@ +class ReducedVertice(): + TYPE_SINGLE = "single" + TYPE_START = "start" + TYPE_REQUIRE_ANY = "require-any" + TYPE_REQUIRE_ALL = "require-all" + TYPE_REQUIRE_ENTITLEMENT = "require-entitlement" + type = TYPE_SINGLE + is_not = False + value = None + decision = None + + def __init__(self, type=TYPE_SINGLE, value=None, decision=None, is_not=False): + self.type = type + self.value = value + self.decision = decision + self.is_not = is_not + + def set_value(self, value): + self.value = value + + def set_type(self, type): + self.type = type + + def _replace_in_list(self, lst, old, new): + global replace_occurred + tmp_list = list(lst) + for i, v in enumerate(tmp_list): + if isinstance(v.value, list): + self._replace_in_list(v.value, old, new) + else: + if v == old: + lst[i] = new + replace_occurred = True + return + + def replace_in_list(self, old, new): + if isinstance(self.value, list): + self._replace_in_list(self.value, old, new) + + def _replace_sublist_in_list(self, lst, old, new): + global replace_occurred + all_found = True + for v in old: + if v not in lst: + all_found = False + break + if all_found: + for v in old: + lst.remove(v) + lst.append(new) + replace_occurred = True + return + + for i, v in enumerate(lst): + if isinstance(v.value, list): + self._replace_sublist_in_list(v.value, old, new) + else: + return + + def replace_sublist_in_list(self, old, new): + if isinstance(self.value, list): + self._replace_sublist_in_list(self.value, old, new) + + def set_decision(self, decision): + self.decision = decision + + def set_type_single(self): + self.type = self.TYPE_SINGLE + + def set_type_start(self): + self.type = self.TYPE_START + + def set_type_require_entitlement(self): + self.type = self.TYPE_REQUIRE_ENTITLEMENT + + def set_type_require_any(self): + self.type = self.TYPE_REQUIRE_ANY + + def set_type_require_all(self): + self.type = self.TYPE_REQUIRE_ALL + + def set_integrated_vertice(self, integrated_vertice): + (n, i) = self.value + self.value = (n, integrated_vertice) + + def is_type_single(self): + return self.type == self.TYPE_SINGLE + + def is_type_start(self): + return self.type == self.TYPE_START + + def is_type_require_entitlement(self): + return self.type == self.TYPE_REQUIRE_ENTITLEMENT + + def is_type_require_all(self): + return self.type == self.TYPE_REQUIRE_ALL + + def is_type_require_any(self): + return self.type == self.TYPE_REQUIRE_ANY + + def recursive_str(self, level, recursive_is_not): + result_str = "" + if self.is_type_single(): + if self.is_not and not recursive_is_not: + value = str(self.value) + if "(require-any" in value: + result_str = self.value.str_not() + else: + result_str += "(require-not " + str(self.value) + ")" + else: + result_str += str(self.value) + elif self.is_type_require_entitlement(): + ent_str = "" + (n, i) = self.value + if i == None: + ent_str += str(n.value) + else: + ent_str += str(n.value)[:-1] + " " + ent_str += i.recursive_str(level, self.is_not) + ent_str += ")" + if self.is_not: + result_str += "(require-not " + ent_str + ")" + else: + result_str += ent_str + else: + if level == 1: + result_str += "\n" + 13*' ' + result_str += "(" + self.type + level += 1 + for i, v in enumerate(self.value): + if i == 0: + result_str += " " + v.recursive_str(level, recursive_is_not) + else: + result_str += "\n" + 13*level*' ' + v.recursive_str(level, recursive_is_not) + result_str += ")" + return result_str + + def recursive_str_debug(self, level, recursive_is_not): + result_str = "" + if self.is_type_single(): + if self.is_not and not recursive_is_not: + result_str += "(require-not " + self.value.str_debug() + ")" + else: + result_str += self.value.str_debug() + elif self.is_type_require_entitlement(): + ent_str = "" + (n, i) = self.value + if i == None: + ent_str += n.value.str_debug() + else: + ent_str += n.value.str_debug()[:-1] + " " + ent_str += i.recursive_str_debug(level, self.is_not) + ent_str += ")" + if self.is_not: + result_str += "(require-not " + ent_str + ")" + else: + result_str += ent_str + else: + if level == 1: + result_str += "\n" + 13*' ' + result_str += "(" + self.type + level += 1 + for i, v in enumerate(self.value): + if i == 0: + result_str += " " + v.recursive_str_debug(level, recursive_is_not) + else: + result_str += "\n" + 13*level*' ' + v.recursive_str_debug(level, recursive_is_not) + result_str += ")" + return result_str + + def recursive_xml_str(self, level, recursive_is_not): + result_str = "" + if self.is_type_single(): + if self.is_not and not recursive_is_not: + result_str += level*"\t" + "\n" + (name, argument) = self.value.values() + if argument == None: + result_str += (level+1)*"\t" + "\n" + else: + arg = str(argument).replace('&', '&').replace('"', '"').replace('\'', ''').replace('<', '<').replace('>', '>') + result_str += (level+1)*"\t" + "\n" + result_str += level*"\t" + "\n" + else: + (name, argument) = self.value.values() + if argument == None: + result_str += level*"\t" + "\n" + else: + arg = str(argument).replace('&', '&').replace('"', '"').replace('\'', ''').replace('<', '<').replace('>', '>') + result_str += level*"\t" + "\n" + elif self.is_type_require_entitlement(): + if self.is_not: + result_str += level*"\t" + "\n" + level += 1 + result_str += level*"\t" + "', '>') + result_str += " value=\"" + _tmp + "\" />\n" + else: + _tmp = str(n.value)[21:-1].replace('&', '&').replace('"', '"').replace('\'', ''').replace('<', '<').replace('>', '>') + result_str += " value=\"" + _tmp + "\">\n" + result_str += i.recursive_xml_str(level+1, self.is_not) + result_str += level*"\t" + "\n" + if self.is_not: + level -= 1 + result_str += level*"\t" + "\n" + else: + result_str += level*"\t" + "\n" + for i, v in enumerate(self.value): + result_str += v.recursive_xml_str(level+1, recursive_is_not) + result_str += level*"\t" + "\n" + return result_str + + def __str__(self): + return self.recursive_str(1, False) + + def str_debug(self): + return self.recursive_str_debug(1, False) + + def str_simple(self): + if self.is_type_single(): + return self.value.str_debug() + elif self.is_type_require_any(): + return "require-any" + elif self.is_type_require_all(): + return "require-all" + elif self.is_type_require_entitlement(): + return self.value.str_debug()[1:-1] + elif self.is_type_start(): + return "start" + else: + return "unknown-type" + + def str_print_debug(self): + if self.is_type_single(): + return (self.value.str_debug(), None) + elif self.is_type_require_any(): + return ("(require-any", ")") + elif self.is_type_require_all(): + return ("(require-all", ")") + elif self.is_type_require_entitlement(): + return (self.value.str_debug()[:-1], ")") + elif self.is_type_start(): + return (None, None) + else: + return ("unknown-type", None) + + def str_print(self): + if self.is_type_single(): + return (str(self.value), None) + elif self.is_type_require_any(): + return ("(require-any", ")") + elif self.is_type_require_all(): + return ("(require-all", ")") + elif self.is_type_require_entitlement(): + return (str(self.value)[:-1], ")") + elif self.is_type_start(): + return (None, None) + else: + return ("unknown-type", None) + + def str_print_not(self): + result_str = "" + if self.is_type_single(): + if self.is_not: + value = str(self.value) + if "(require-any" in value: + result_str = self.value.str_not() + else: + result_str += "(require-not " + str(self.value) + ")" + return result_str + + def xml_str(self): + return self.recursive_xml_str(3, False) \ No newline at end of file diff --git a/reverse-sandbox/regex_parser_v3.py b/reverse-sandbox/regex_parser_v3.py index 2547a44..4c89da7 100644 --- a/reverse-sandbox/regex_parser_v3.py +++ b/reverse-sandbox/regex_parser_v3.py @@ -73,7 +73,7 @@ def parse_character_class(re, i, regex_list): values.append(re[i+2*j+1]) first = values[0] last = values[2*num-1] - # In case of exlucdes. + # In case of excludes. if (first > last): node_type = "class_exclude" value += "^" diff --git a/reverse-sandbox/reverse_sandbox.py b/reverse-sandbox/reverse_sandbox.py index 01b58e5..2662648 100644 --- a/reverse-sandbox/reverse_sandbox.py +++ b/reverse-sandbox/reverse_sandbox.py @@ -26,10 +26,14 @@ logger = logging.getLogger(__name__) -def extract_string_from_offset(f, offset): +def extract_string_from_offset(f, offset, ios_version): """Extract string (literal) from given offset.""" - f.seek(offset * 8) - len = struct.unpack("= 13: + f.seek(get_base_addr(f, ios_version) + offset * 8) + len = struct.unpack("\n" % (operation, node.terminal)) @@ -131,7 +137,9 @@ def is_ios_more_than_10_release(release): def display_sandbox_profiles(f, re_table_offset, num_sb_ops, ios_version): logger.info("Printing sandbox profiles from bundle") - if ios_version >= 12: + if ios_version >= 13: + f.seek(6) + elif ios_version >= 12: f.seek(12) elif ios_version >= 10: f.seek(10) @@ -139,25 +147,40 @@ def display_sandbox_profiles(f, re_table_offset, num_sb_ops, ios_version): f.seek(6) num_profiles = struct.unpack("= 12: - f.seek(14 + (num_sb_ops + 2) * 2 * num_profiles) - elif ios_version >= 10: - f.seek(12 + (num_sb_ops + 2) * 2 * num_profiles) + if ios_version >= 13: + f.seek(2) + num_operation_nodes = struct.unpack("= 12: + f.seek(14 + (num_sb_ops + 2) * 2 * num_profiles) + elif ios_version >= 10: + f.seek(12 + (num_sb_ops + 2) * 2 * num_profiles) + else: + f.seek(8 + (num_sb_ops + 2) * 2 * num_profiles) + while True: + word = struct.unpack("= 12: + if ios_version >= 13: + f.seek(8) + regex_table_count = struct.unpack('= 12: f.seek(14 + (num_sb_ops + 2) * 2 * i) elif ios_version >= 10: f.seek(12 + (num_sb_ops + 2) * 2 * i) @@ -166,25 +189,61 @@ def display_sandbox_profiles(f, re_table_offset, num_sb_ops, ios_version): name_offset = struct.unpack(" 0: + f.seek(vars_offset + i*2) + else: + f.seek(vars_offset*8 + i*2) current_offset = struct.unpack(" 0: + len = struct.unpack("= 13: + # extract operation node table count + f.seek(2) + op_nodes_count = struct.unpack('= 13: + re_table_offset = 12 + else: + re_table_offset = struct.unpack("= 12: f.seek(8) re_table_count = struct.unpack(" 0: - f.seek(re_table_offset * 8) + if get_ios_major_version(args.release) >= 13: + f.seek(re_table_offset) + else: + f.seek(re_table_offset * 8) + re_offsets_table = struct.unpack("<%dH" % re_table_count, f.read(2 * re_table_count)) for offset in re_offsets_table: - f.seek(offset * 8) - re_length = struct.unpack("= 13: + f.seek(get_base_addr(f, get_ios_major_version(args.release)) + offset * 8) + re_length = struct.unpack("= 12: + if get_ios_major_version(args.release) >= 13: + # get the regex table entries + f.seek(8) + regex_table_count = struct.unpack('= 12: f.seek(4) vars_offset = struct.unpack("= 10: f.seek(6) vars_offset = struct.unpack("= 12: - f.seek(14 + (num_sb_ops + 2) * 2 * num_profiles) - elif get_ios_major_version(args.release) >= 10: - f.seek(12 + (num_sb_ops + 2) * 2 * num_profiles) + if get_ios_major_version(args.release) >= 13: + f.seek(2) + num_operation_nodes = struct.unpack("= 12: + f.seek(14 + (num_sb_ops + 2) * 2 * num_profiles) + elif get_ios_major_version(args.release) >= 10: + f.seek(12 + (num_sb_ops + 2) * 2 * num_profiles) + else: + f.seek(8 + (num_sb_ops + 2) * 2 * num_profiles) + while True: + word = struct.unpack("= 12: + if get_ios_major_version(args.release) >= 13: + f.seek(8) + regex_table_count = struct.unpack('= 12: f.seek(14 + (num_sb_ops + 2) * 2 * i) elif get_ios_major_version(args.release) >= 10: f.seek(12 + (num_sb_ops + 2) * 2 * i) @@ -328,7 +423,8 @@ def main(): name_offset = struct.unpack("= 12: + if get_ios_major_version(args.release) >= 13: + f.seek(8) + regex_table_count = struct.unpack('= 12: f.seek(14 + (num_sb_ops + 2) * 2 * i + 4) elif get_ios_major_version(args.release) >= 10: f.seek(12 + (num_sb_ops + 2) * 2 * i + 4) @@ -357,14 +462,14 @@ def main(): f.seek(10) num_vars = struct.unpack("= 10: f.seek(6) vars_offset = struct.unpack("= 6: f.seek(6) diff --git a/reverse-sandbox/sandbox_filter.py b/reverse-sandbox/sandbox_filter.py index 9706652..1df0f12 100644 --- a/reverse-sandbox/sandbox_filter.py +++ b/reverse-sandbox/sandbox_filter.py @@ -12,15 +12,27 @@ logging.config.fileConfig("logger.config") logger = logging.getLogger(__name__) -import logging as logger ios_major_version = 4 keep_builtin_filters = False global_vars = [] +base_addr = 0 def get_filter_arg_string_by_offset(f, offset): """Extract string (literal) from given offset.""" - f.seek(offset * 8) + f.seek(base_addr + offset * 8) + if ios_major_version >= 13: + len = struct.unpack("= 10: f.seek(offset * 8) @@ -42,10 +54,23 @@ def get_filter_arg_string_by_offset_with_type(f, offset): """Extract string from given offset and consider type byte.""" global ios_major_version global keep_builtin_filters - f.seek(offset * 8) + f.seek(base_addr + offset * 8) + if ios_major_version >= 13: + len = struct.unpack("= 10: - f.seek(offset * 8) + f.seek(base_addr + offset * 8) s = f.read(4+len) logger.info("binary string is " + s.encode("hex")) ss = reverse_string.SandboxString() @@ -79,14 +104,17 @@ def get_filter_arg_string_by_offset_with_type(f, offset): def get_filter_arg_string_by_offset_no_skip(f, offset): """Extract string from given offset and ignore type byte.""" - f.seek(offset * 8) - len = struct.unpack("= 13: + len = struct.unpack("