Source code for qbindiff.features.graph

# Copyright 2023 Quarkslab
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.

"""Features that uses the topology of the CFG

from __future__ import annotations
import networkx
import hashlib
import community  # type: ignore[import-untyped]
import numpy as np
import math
from typing import no_type_check, TYPE_CHECKING

from qbindiff.features.extractor import (
from qbindiff.loader import Program, Function, Instruction, Operand
from qbindiff.loader.types import OperandType, InstructionGroup, ProgramCapability

    from qbindiff.loader import Operand

[docs] class BBlockNb(FunctionFeatureExtractor): """ Number of basic blocks in the function as a feature. """ key = "bnb"
[docs] def visit_function( self, program: Program, function: Function, collector: FeatureCollector ) -> None: value = len(function.flowgraph.nodes) collector.add_feature(self.key, value)
[docs] class StronglyConnectedComponents(FunctionFeatureExtractor): """ Number of strongly connected components in a function CFG. """ key = "scc"
[docs] def visit_function( self, program: Program, function: Function, collector: FeatureCollector ) -> None: value = len(list(networkx.strongly_connected_components(function.flowgraph))) collector.add_feature(self.key, value)
[docs] class BytesHash(FunctionFeatureExtractor): """ Hash of the function, using the instructions sorted by addresses. The hashing function used is MD5. """ key = "bh"
[docs] def visit_function( self, program: Program, function: Function, collector: FeatureCollector ) -> None: bytes_seq = b"" for bba, bb in function.items(): bytes_seq += bb.bytes value = float(int(hashlib.md5(bytes_seq).hexdigest(), 16)) collector.add_feature(self.key, value)
[docs] class CyclomaticComplexity(FunctionFeatureExtractor): """ Cyclomatic complexity of the function CFG. """ key = "cc"
[docs] def visit_function( self, program: Program, function: Function, collector: FeatureCollector ) -> None: e = len(function.edges) n = len([n for n in function.flowgraph.nodes()]) components = len([c for c in networkx.weakly_connected_components(function.flowgraph)]) value = e - n + 2 * components collector.add_feature(self.key, value)
[docs] class MDIndex(FunctionFeatureExtractor): """ MD-Index of the function, based on `<>`_. A slightly modified version of it: notice the topological sort is only available for DAG graphs (which may not always be the case) """ key = "mdidx" help_msg = """ MD-Index of the function, based on A slightly modified version of it: notice the topological sort is only available for DAG graphs (which may not always be the case) """.strip() # There is a problem with networkx stubs for types on DiGraph.{in,out}_degree
[docs] @no_type_check def visit_function( self, program: Program, function: Function, collector: FeatureCollector ) -> None: try: topological_sort = list(networkx.topological_sort(function.flowgraph)) sort_ok = True except networkx.NetworkXUnfeasible: sort_ok = False if sort_ok: value = np.sum( [ 1 / math.sqrt( topological_sort.index(src) + math.sqrt(2) * function.flowgraph.in_degree(src) + math.sqrt(3) * function.flowgraph.out_degree(src) + math.sqrt(5) * function.flowgraph.in_degree(dst) + math.sqrt(7) * function.flowgraph.out_degree(dst) ) for (src, dst) in function.edges ] ) else: value = np.sum( [ 1 / math.sqrt( math.sqrt(2) * function.flowgraph.in_degree(src) + math.sqrt(3) * function.flowgraph.out_degree(src) + math.sqrt(5) * function.flowgraph.in_degree(dst) + math.sqrt(7) * function.flowgraph.out_degree(dst) ) for (src, dst) in function.edges ] ) collector.add_feature(self.key, float(value))
[docs] class JumpNb(InstructionFeatureExtractor): """ Number of jumps in the function. Requires INSTR_GROUP capability """ key = "jnb" required_capabilities = ProgramCapability.INSTR_GROUP
[docs] def visit_instruction( self, program: Program, instruction: Instruction, collector: FeatureCollector ) -> None: if InstructionGroup.GRP_JUMP in instruction.groups: collector.add_dict_feature(self.key, {"value": 1})
[docs] class SmallPrimeNumbers(FunctionFeatureExtractor): """ Small-Prime-Number based on mnemonics, as defined in `Bindiff <>`_. This hash is slightly different from the theoretical implementation. Modulo is made at each round, instead of doing it at the end. """ key = "spp" help_msg = """ Small-Prime-Number based on mnemonics, as defined in Bindiff (see This hash is slightly different from the theoretical implementation. Modulo is made at each round, instead of doing it at the end. """.strip()
[docs] @staticmethod def primesbelow(n: int) -> list[int]: """ Utility function that returns a list of all the primes below n. This comes from `Diaphora <>`_ :param n: integer n :return: list of prime integers below n """ correction = n % 6 > 1 n = {0: n, 1: n - 1, 2: n + 4, 3: n + 3, 4: n + 2, 5: n + 1}[n % 6] sieve = [True] * (n // 3) sieve[0] = False for i in range(int(n**0.5) // 3 + 1): if sieve[i]: k = (3 * i + 1) | 1 sieve[k * k // 3 :: 2 * k] = [False] * ((n // 6 - (k * k) // 6 - 1) // k + 1) sieve[(k * k + 4 * k - 2 * k * (i % 2)) // 3 :: 2 * k] = [False] * ( (n // 6 - (k * k + 4 * k - 2 * k * (i % 2)) // 6 - 1) // k + 1 ) return [2, 3] + [(3 * i + 1) | 1 for i in range(1, n // 3 - correction) if sieve[i]]
[docs] def visit_function( self, program: Program, function: Function, collector: FeatureCollector ) -> None: mnemonics = set() for bb_addr, bb in function.items(): for ins in bb.instructions: if ins.mnemonic not in mnemonics: mnemonics.update({ins.mnemonic}) mnemonics = list(mnemonics) value = 1 primes = self.primesbelow(4096) for bb_addr, bb in function.items(): for ins in bb.instructions: value *= primes[mnemonics.index(ins.mnemonic)] value = value % (2**64) collector.add_feature(self.key, value)
[docs] class ReadWriteAccess(OperandFeatureExtractor): """ Number of memory access in the function. Both read and write. """ key = "rwa"
[docs] def visit_operand( self, program: Program, operand: Operand, collector: FeatureCollector ) -> None: if operand.type == OperandType.memory: collector.add_dict_feature(self.key, {"value": 1})
[docs] class MaxParentNb(FunctionFeatureExtractor): """ Maximum number of parent of a basic block in the function. """ key = "maxp"
[docs] def visit_function( self, program: Program, function: Function, collector: FeatureCollector ) -> None: try: value = max( len(list(function.flowgraph.predecessors(bblock))) for bblock in function.flowgraph ) except ValueError: value = 0 collector.add_feature(self.key, value)
[docs] class MaxChildNb(FunctionFeatureExtractor): """ Maximum number of children of a basic block in the function. """ key = "maxc"
[docs] def visit_function( self, program: Program, function: Function, collector: FeatureCollector ) -> None: try: value = max( len(list(function.flowgraph.successors(bblock))) for bblock in function.flowgraph ) except ValueError: value = 0 collector.add_feature(self.key, value)
[docs] class MaxInsNB(FunctionFeatureExtractor): """ Max number of instructions per basic blocks in the function. """ key = "maxins"
[docs] def visit_function( self, program: Program, function: Function, collector: FeatureCollector ) -> None: try: value = max(len(bblock.instructions) for bblock in function) except ValueError: value = 0 collector.add_feature(self.key, value)
[docs] class MeanInsNB(FunctionFeatureExtractor): """ Mean number of instructions per basic blocks in the function. """ key = "meanins"
[docs] def visit_function( self, program: Program, function: Function, collector: FeatureCollector ) -> None: try: value = sum(len(bblock.instructions) for bblock in function) / len(function) except ValueError: value = 0 collector.add_feature(self.key, value)
[docs] class InstNB(FunctionFeatureExtractor): """ Number of instructions in the function. """ key = "totins"
[docs] def visit_function( self, program: Program, function: Function, collector: FeatureCollector ) -> None: value = sum(len(bblock.instructions) for bblock in function) collector.add_feature(self.key, value)
[docs] class GraphMeanDegree(FunctionFeatureExtractor): """ Mean degree of the function. """ key = "Gmd"
[docs] def visit_function( self, program: Program, function: Function, collector: FeatureCollector ) -> None: n_node = len(function.flowgraph) value = sum(d for _, d in / n_node if n_node != 0 else 0 collector.add_feature(self.key, value)
[docs] class GraphDensity(FunctionFeatureExtractor): """ Density of the function flow graph (CFG). """ key = "Gd"
[docs] def visit_function( self, program: Program, function: Function, collector: FeatureCollector ) -> None: value = networkx.density(function.flowgraph) collector.add_feature(self.key, value)
[docs] class GraphNbComponents(FunctionFeatureExtractor): """ Number of components in the function (non-connected flow graphs). (This can happen in the way IDA disassemble functions) """ key = "Gnc"
[docs] def visit_function( self, program: Program, function: Function, collector: FeatureCollector ) -> None: value = len(list(networkx.connected_components(function.flowgraph.to_undirected()))) collector.add_feature(self.key, value)
[docs] class GraphDiameter(FunctionFeatureExtractor): """ Graph diameter of the function flow graph (CFG). """ key = "Gdi"
[docs] def visit_function( self, program: Program, function: Function, collector: FeatureCollector ) -> None: components = list(networkx.connected_components(function.flowgraph.to_undirected())) if components: value = max( networkx.diameter(networkx.subgraph(function.flowgraph, x).to_undirected()) for x in components ) else: value = 0 collector.add_feature(self.key, value)
[docs] class GraphTransitivity(FunctionFeatureExtractor): """ Transitivity of the function flow graph (CFG). """ key = "Gt"
[docs] def visit_function( self, program: Program, function: Function, collector: FeatureCollector ) -> None: value = networkx.transitivity(function.flowgraph) collector.add_feature(self.key, value)
[docs] class GraphCommunities(FunctionFeatureExtractor): """ Number of graph communities (Louvain modularity). """ key = "Gcom"
[docs] def visit_function( self, program: Program, function: Function, collector: FeatureCollector ) -> None: partition = community.best_partition(function.flowgraph.to_undirected()) if (len(function) > 1) and partition: p_list = [x for x in partition.values() if x != function.addr] value = max(p_list) if p_list else 0 else: value = 0 collector.add_feature(self.key, value)