# Copyright 2023 Quarkslab
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Features that uses the topology of the CFG
"""
from __future__ import annotations
import networkx
import hashlib
import community # type: ignore[import-untyped]
import numpy as np
import math
from typing import no_type_check, TYPE_CHECKING
from qbindiff.features.extractor import (
FunctionFeatureExtractor,
InstructionFeatureExtractor,
OperandFeatureExtractor,
FeatureCollector,
)
from qbindiff.loader import Program, Function, Instruction, Operand
from qbindiff.loader.types import OperandType, InstructionGroup, ProgramCapability
if TYPE_CHECKING:
from qbindiff.loader import Operand
[docs]
class BBlockNb(FunctionFeatureExtractor):
"""
Number of basic blocks in the function as a feature.
"""
key = "bnb"
[docs]
def visit_function(
self, program: Program, function: Function, collector: FeatureCollector
) -> None:
value = len(function.flowgraph.nodes)
collector.add_feature(self.key, value)
[docs]
class StronglyConnectedComponents(FunctionFeatureExtractor):
"""
Number of strongly connected components in a function CFG.
"""
key = "scc"
[docs]
def visit_function(
self, program: Program, function: Function, collector: FeatureCollector
) -> None:
value = len(list(networkx.strongly_connected_components(function.flowgraph)))
collector.add_feature(self.key, value)
[docs]
class BytesHash(FunctionFeatureExtractor):
"""
Hash of the function, using the instructions sorted by addresses.
The hashing function used is MD5.
"""
key = "bh"
[docs]
def visit_function(
self, program: Program, function: Function, collector: FeatureCollector
) -> None:
bytes_seq = b""
for bba, bb in function.items():
bytes_seq += bb.bytes
value = float(int(hashlib.md5(bytes_seq).hexdigest(), 16))
collector.add_feature(self.key, value)
[docs]
class CyclomaticComplexity(FunctionFeatureExtractor):
"""
Cyclomatic complexity of the function CFG.
"""
key = "cc"
[docs]
def visit_function(
self, program: Program, function: Function, collector: FeatureCollector
) -> None:
e = len(function.edges)
n = len([n for n in function.flowgraph.nodes()])
components = len([c for c in networkx.weakly_connected_components(function.flowgraph)])
value = e - n + 2 * components
collector.add_feature(self.key, value)
[docs]
class MDIndex(FunctionFeatureExtractor):
"""
MD-Index of the function, based on
`<https://www.sto.nato.int/publications/STO%20Meeting%20Proceedings/RTO-MP-IST-091/MP-IST-091-26.pdf>`_.
A slightly modified version of it: notice the topological sort is only available for DAG graphs
(which may not always be the case)
"""
key = "mdidx"
help_msg = """
MD-Index of the function, based on https://www.sto.nato.int/publications/STO%20Meeting%20Proceedings/RTO-MP-IST-091/MP-IST-091-26.pdf.
A slightly modified version of it: notice the topological sort is only available for
DAG graphs (which may not always be the case)
""".strip()
# There is a problem with networkx stubs for types on DiGraph.{in,out}_degree
[docs]
@no_type_check
def visit_function(
self, program: Program, function: Function, collector: FeatureCollector
) -> None:
try:
topological_sort = list(networkx.topological_sort(function.flowgraph))
sort_ok = True
except networkx.NetworkXUnfeasible:
sort_ok = False
if sort_ok:
value = np.sum(
[
1
/ math.sqrt(
topological_sort.index(src)
+ math.sqrt(2) * function.flowgraph.in_degree(src)
+ math.sqrt(3) * function.flowgraph.out_degree(src)
+ math.sqrt(5) * function.flowgraph.in_degree(dst)
+ math.sqrt(7) * function.flowgraph.out_degree(dst)
)
for (src, dst) in function.edges
]
)
else:
value = np.sum(
[
1
/ math.sqrt(
math.sqrt(2) * function.flowgraph.in_degree(src)
+ math.sqrt(3) * function.flowgraph.out_degree(src)
+ math.sqrt(5) * function.flowgraph.in_degree(dst)
+ math.sqrt(7) * function.flowgraph.out_degree(dst)
)
for (src, dst) in function.edges
]
)
collector.add_feature(self.key, float(value))
[docs]
class JumpNb(InstructionFeatureExtractor):
"""
Number of jumps in the function.
Requires INSTR_GROUP capability
"""
key = "jnb"
required_capabilities = ProgramCapability.INSTR_GROUP
[docs]
def visit_instruction(
self, program: Program, instruction: Instruction, collector: FeatureCollector
) -> None:
if InstructionGroup.GRP_JUMP in instruction.groups:
collector.add_dict_feature(self.key, {"value": 1})
[docs]
class ReadWriteAccess(OperandFeatureExtractor):
"""
Number of memory access in the function.
Both read and write.
"""
key = "rwa"
[docs]
def visit_operand(
self, program: Program, operand: Operand, collector: FeatureCollector
) -> None:
if operand.type == OperandType.memory:
collector.add_dict_feature(self.key, {"value": 1})
[docs]
class MaxParentNb(FunctionFeatureExtractor):
"""
Maximum number of parent of a basic block in the function.
"""
key = "maxp"
[docs]
def visit_function(
self, program: Program, function: Function, collector: FeatureCollector
) -> None:
try:
value = max(
len(list(function.flowgraph.predecessors(bblock))) for bblock in function.flowgraph
)
except ValueError:
value = 0
collector.add_feature(self.key, value)
[docs]
class MaxChildNb(FunctionFeatureExtractor):
"""
Maximum number of children of a basic block in the function.
"""
key = "maxc"
[docs]
def visit_function(
self, program: Program, function: Function, collector: FeatureCollector
) -> None:
try:
value = max(
len(list(function.flowgraph.successors(bblock))) for bblock in function.flowgraph
)
except ValueError:
value = 0
collector.add_feature(self.key, value)
[docs]
class MaxInsNB(FunctionFeatureExtractor):
"""
Max number of instructions per basic blocks in the function.
"""
key = "maxins"
[docs]
def visit_function(
self, program: Program, function: Function, collector: FeatureCollector
) -> None:
try:
value = max(len(bblock.instructions) for bblock in function)
except ValueError:
value = 0
collector.add_feature(self.key, value)
[docs]
class MeanInsNB(FunctionFeatureExtractor):
"""
Mean number of instructions per basic blocks in the function.
"""
key = "meanins"
[docs]
def visit_function(
self, program: Program, function: Function, collector: FeatureCollector
) -> None:
try:
value = sum(len(bblock.instructions) for bblock in function) / len(function)
except ValueError:
value = 0
collector.add_feature(self.key, value)
[docs]
class InstNB(FunctionFeatureExtractor):
"""
Number of instructions in the function.
"""
key = "totins"
[docs]
def visit_function(
self, program: Program, function: Function, collector: FeatureCollector
) -> None:
value = sum(len(bblock.instructions) for bblock in function)
collector.add_feature(self.key, value)
[docs]
class GraphMeanDegree(FunctionFeatureExtractor):
"""
Mean degree of the function.
"""
key = "Gmd"
[docs]
def visit_function(
self, program: Program, function: Function, collector: FeatureCollector
) -> None:
n_node = len(function.flowgraph)
value = sum(d for _, d in function.flowgraph.degree) / n_node if n_node != 0 else 0
collector.add_feature(self.key, value)
[docs]
class GraphDensity(FunctionFeatureExtractor):
"""
Density of the function flow graph (CFG).
"""
key = "Gd"
[docs]
def visit_function(
self, program: Program, function: Function, collector: FeatureCollector
) -> None:
value = networkx.density(function.flowgraph)
collector.add_feature(self.key, value)
[docs]
class GraphNbComponents(FunctionFeatureExtractor):
"""
Number of components in the function (non-connected flow graphs).
(This can happen in the way IDA disassemble functions)
"""
key = "Gnc"
[docs]
def visit_function(
self, program: Program, function: Function, collector: FeatureCollector
) -> None:
value = len(list(networkx.connected_components(function.flowgraph.to_undirected())))
collector.add_feature(self.key, value)
[docs]
class GraphDiameter(FunctionFeatureExtractor):
"""
Graph diameter of the function flow graph (CFG).
"""
key = "Gdi"
[docs]
def visit_function(
self, program: Program, function: Function, collector: FeatureCollector
) -> None:
components = list(networkx.connected_components(function.flowgraph.to_undirected()))
if components:
value = max(
networkx.diameter(networkx.subgraph(function.flowgraph, x).to_undirected())
for x in components
)
else:
value = 0
collector.add_feature(self.key, value)
[docs]
class GraphTransitivity(FunctionFeatureExtractor):
"""
Transitivity of the function flow graph (CFG).
"""
key = "Gt"
[docs]
def visit_function(
self, program: Program, function: Function, collector: FeatureCollector
) -> None:
value = networkx.transitivity(function.flowgraph)
collector.add_feature(self.key, value)
[docs]
class GraphCommunities(FunctionFeatureExtractor):
"""
Number of graph communities (Louvain modularity).
"""
key = "Gcom"
[docs]
def visit_function(
self, program: Program, function: Function, collector: FeatureCollector
) -> None:
partition = community.best_partition(function.flowgraph.to_undirected())
if (len(function) > 1) and partition:
p_list = [x for x in partition.values() if x != function.addr]
value = max(p_list) if p_list else 0
else:
value = 0
collector.add_feature(self.key, value)