# Copyright 2023 Quarkslab
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Quokka backend loader
"""
# local imports
from __future__ import annotations
import logging
import weakref
from functools import cached_property
from collections.abc import Iterator
from typing import Any, TypeAlias, TYPE_CHECKING
# third party imports
import quokka
import quokka.types
import networkx
import capstone # type: ignore[import-untyped]
# local imports
from qbindiff.loader import Data, Structure
from qbindiff.loader.backend import (
AbstractProgramBackend,
AbstractFunctionBackend,
AbstractBasicBlockBackend,
AbstractInstructionBackend,
AbstractOperandBackend,
)
from qbindiff.loader.types import (
FunctionType,
DataType,
StructureType,
ReferenceType,
ReferenceTarget,
OperandType,
InstructionGroup,
ProgramCapability,
)
from qbindiff.types import Addr
from qbindiff.loader.backend.utils import convert_operand_type
if TYPE_CHECKING:
from pypcode import PcodeOp
# Aliases
capstoneOperand: TypeAlias = Any # Relaxed typing
# ===== General purpose utils functions =====
def convert_data_type(qbe_data_type: quokka.types.DataType) -> DataType:
"""
Convert a quokka DataType to qbindiff DataType
:param qbe_data_type: the Quokka datatype to convert
:return: the corresponding qbindiff datatype
"""
if qbe_data_type == quokka.types.DataType.ASCII:
return DataType.ASCII
elif qbe_data_type == quokka.types.DataType.BYTE:
return DataType.BYTE
elif qbe_data_type == quokka.types.DataType.WORD:
return DataType.WORD
elif qbe_data_type == quokka.types.DataType.DOUBLE_WORD:
return DataType.DOUBLE_WORD
elif qbe_data_type == quokka.types.DataType.QUAD_WORD:
return DataType.QUAD_WORD
elif qbe_data_type == quokka.types.DataType.OCTO_WORD:
return DataType.OCTO_WORD
elif qbe_data_type == quokka.types.DataType.FLOAT:
return DataType.FLOAT
elif qbe_data_type == quokka.types.DataType.DOUBLE:
return DataType.DOUBLE
else:
return DataType.UNKNOWN
def convert_struct_type(qbe_struct_type: quokka.types.StructureType) -> StructureType:
"""
Convert a quokka StructureType to qbindiff StructureType
:param qbe_struct_type: the Quokka structure to convert
:return: the corresponding qbindiff structure
"""
if qbe_struct_type == quokka.types.StructureType.ENUM:
return StructureType.ENUM
elif qbe_struct_type == quokka.types.StructureType.STRUCT:
return StructureType.STRUCT
elif qbe_struct_type == quokka.types.StructureType.UNION:
return StructureType.UNION
else:
return StructureType.UNKNOWN
def convert_ref_type(qbe_ref_type: quokka.types.ReferenceType) -> ReferenceType:
"""
Convert a quokka ReferenceType to qbindiff ReferenceType
:param qbe_ref_type: the Quokka reference to convert
:return: the corresponding qbindiff reference
"""
if qbe_ref_type == quokka.types.ReferenceType.DATA:
return ReferenceType.DATA
elif qbe_ref_type == quokka.types.ReferenceType.ENUM:
return ReferenceType.ENUM
elif qbe_ref_type == quokka.types.ReferenceType.STRUC:
return ReferenceType.STRUC
else:
return ReferenceType.UNKNOWN
# ===========================================
[docs]
class OperandBackendQuokka(AbstractOperandBackend):
"""
Backend loader of an Operand using Quokka
"""
def __init__(
self,
program: weakref.ref[ProgramBackendQuokka],
cs_instruction: "capstone.CsInsn",
cs_operand: capstoneOperand,
cs_operand_position: int,
):
super(OperandBackendQuokka, self).__init__()
self._program = program
self.cs_instr = cs_instruction
self.cs_operand = cs_operand
self.cs_operand_position = cs_operand_position
def __str__(self) -> str:
return self.cs_instr.op_str.split(",")[self.cs_operand_position]
@property
def program(self) -> ProgramBackendQuokka:
"""Wrapper on weak reference on ProgramBackendQuokka"""
if (program := self._program()) is None:
raise RuntimeError(
"Trying to access an already expired weak reference on ProgramBackendQuokka"
)
return program
@property
def value(self) -> int | None:
"""
Returns the immediate value (not addresses).
"""
if self.is_immediate():
return self.cs_operand.value.imm
return None
@property
def type(self) -> OperandType:
"""Returns the capstone operand type"""
# Get capstone type
return convert_operand_type(self.program.qb_prog.capstone.arch, self.cs_operand)
[docs]
class InstructionBackendQuokka(AbstractInstructionBackend):
"""
Backend loader of a Instruction using Quokka
"""
def __init__(
self,
program: weakref.ref[ProgramBackendQuokka],
qb_instruction: quokka.instruction.Instruction,
):
super(InstructionBackendQuokka, self).__init__()
self._program = program
self.qb_instr = qb_instruction
self.cs_instr = qb_instruction.cs_inst
if self.cs_instr is None:
logging.error(
f"Capstone could not disassemble instruction at 0x{self.qb_instr.address:x} {self.qb_instr}"
)
def __del__(self):
"""
Clean quokka internal state to deallocate memory
"""
# Clear the reference to capstone object
self.qb_instr._cs_instr = None
# Unload cached instruction
block = self.qb_instr.parent
block._raw_dict[self.qb_instr.address] = self.qb_instr.proto_index
def _cast_references(
self, references: list[quokka.types.ReferenceTarget]
) -> list[ReferenceTarget]:
"""
Cast the quokka references to qbindiff reference types
:param references: list of Quokka references
:returns: list of corresponding references cast to qbindiff type
"""
ret_ref: list[ReferenceTarget] = []
for ref in references:
match ref:
case quokka.data.Data():
data_type = convert_data_type(ref.type)
ret_ref.append(Data(data_type, ref.address, ref.value))
case quokka.structure.Structure(name=name):
ret_ref.append(self.program.get_structure(name))
case quokka.structure.StructureMember(structure=qbe_struct, name=name):
if (
member := self.program.get_structure(qbe_struct.name).member_by_name(name)
) is None:
logging.info(
f"Cannot retrieve the structure member named `{name}`"
f" from structure `{qbe_struct.name}` during reference parsing"
)
else:
ret_ref.append(member)
case quokka.Instruction(): # Not implemented for now
logging.warning("Skipping instruction reference")
return ret_ref
@property
def program(self) -> ProgramBackendQuokka:
"""Wrapper on weak reference on ProgramBackendQuokka"""
if (program := self._program()) is None:
raise RuntimeError(
"Trying to access an already expired weak reference on ProgramBackendQuokka"
)
return program
@property
def addr(self) -> Addr:
"""
The address of the instruction
"""
return self.qb_instr.address
@property
def mnemonic(self) -> str:
"""
Returns the instruction mnemonic as a string
"""
return self.qb_instr.mnemonic
@cached_property
def references(self) -> dict[ReferenceType, list[ReferenceTarget]]:
"""
Returns all the references towards the instruction
:return: dictionary with reference type as key and the corresponding references list as values
"""
ref = {}
for ref_type, references in self.qb_instr.references.items():
ref[convert_ref_type(ref_type)] = self._cast_references(references)
return ref
@property
def operands(self) -> Iterator[OperandBackendQuokka]:
"""
Returns an iterator over backend operand objects
:return: list of Quokka operands
"""
if self.cs_instr is None:
return iter([])
return (
OperandBackendQuokka(self._program, self.cs_instr, o, i)
for i, o in enumerate(self.cs_instr.operands)
)
@property
def groups(self) -> list[InstructionGroup]:
"""
Returns a list of groups of this instruction.
"""
return list(map(InstructionGroup.from_capstone, self.cs_instr.groups))
@property
def id(self) -> int:
"""
Returns the capstone instruction ID as a non negative int. The ID is in the range [0, MAX_ID].
The id is MAX_ID if there is no capstone instruction available.
"""
if self.cs_instr is None:
return self.MAX_ID # Custom defined value representing a "unknown" instruction
return self.cs_instr.id
@property
def comment(self) -> str:
"""
Comment associated with the instruction
"""
return "" # Not supported
@property
def bytes(self) -> bytes:
"""
Returns the bytes representation of the instruction
"""
return bytes(self.qb_instr.bytes)
@property
def pcode_ops(self) -> list[PcodeOp]:
"""
List of PcodeOp associated with the instruction.
Provided with the PCODE capability
"""
return self.qb_instr.pcode_insts
[docs]
class BasicBlockBackendQuokka(AbstractBasicBlockBackend):
"""
Backend loader of a BasicBlock using Quokka
"""
def __init__(self, program: weakref.ref[ProgramBackendQuokka], qb_block: quokka.block.Block):
super(BasicBlockBackendQuokka, self).__init__()
self.qb_block = qb_block
self.program = program
# Private attributes
self._addr = qb_block.start
def __del__(self) -> None:
"""
Clean quokka internal state by unloading from memory the Block object
:return: None
"""
chunk = self.qb_block.parent
chunk._raw_dict[self.qb_block.start] = self.qb_block.proto_index
def __len__(self) -> int:
"""
The numbers of instructions in the basic block
"""
return len(self.qb_block)
@property
def addr(self) -> Addr:
"""
The address of the basic block
"""
return self._addr
@property
def instructions(self) -> Iterator[InstructionBackendQuokka]:
"""
Returns an iterator over backend instruction objects
:return: iterator over Quokka instructions
"""
return (
InstructionBackendQuokka(self.program, instr) for instr in self.qb_block.instructions
)
@property
def bytes(self) -> bytes:
return b"".join(x.bytes for x in self.instructions)
[docs]
class FunctionBackendQuokka(AbstractFunctionBackend):
"""
Backend loader of a Function using Quokka
"""
def __init__(
self, program: weakref.ref[ProgramBackendQuokka], qb_func: quokka.function.Function
):
super(FunctionBackendQuokka, self).__init__()
self.qb_prog = qb_func.program
self.qb_func = qb_func
self.program = program
# [TODO] Init all the properties and free the memory of qb_prog/qb_func
@property
def basic_blocks(self) -> Iterator[BasicBlockBackendQuokka]:
"""
Returns an iterator over backend basic blocks objects
:return: Iterator over the Quokka Basic Blocks
"""
# Stop the exploration if it's an imported function
if self.is_import():
return iter([])
return (
BasicBlockBackendQuokka(self.program, self.qb_func.get_block(addr))
for addr in self.qb_func.graph.nodes
)
@property
def addr(self) -> Addr:
"""
The address of the function
"""
return self.qb_func.start
@property
def graph(self) -> networkx.DiGraph:
"""
The Control Flow Graph of the function
"""
return self.qb_func.graph
@cached_property
def parents(self) -> set[Addr]:
"""
Set of function parents in the call graph
"""
parents = set()
for chunk in self.qb_func.callers:
try:
for func in self.qb_prog.get_function_by_chunk(chunk):
parents.add(func.start)
except IndexError:
pass # Sometimes there can be a chunk that is not part of any function
return parents
@cached_property
def children(self) -> set[Addr]:
"""
Set of function children in the call graph
"""
children = set()
for chunk in self.qb_func.calls:
try:
for func in self.qb_prog.get_function_by_chunk(chunk):
children.add(func.start)
except IndexError:
pass # Sometimes there can be a chunk that is not part of any function
return children
@cached_property
def type(self) -> FunctionType:
"""
The type of the function (as defined by IDA)
"""
f_type = self.qb_func.type
if f_type == quokka.types.FunctionType.NORMAL:
return FunctionType.normal
elif f_type == quokka.types.FunctionType.IMPORTED:
return FunctionType.imported
elif f_type == quokka.types.FunctionType.LIBRARY:
return FunctionType.library
elif f_type == quokka.types.FunctionType.THUNK:
return FunctionType.thunk
elif f_type == quokka.types.FunctionType.EXTERN:
return FunctionType.extern
elif f_type == quokka.types.FunctionType.INVALID:
return FunctionType.invalid
else:
raise NotImplementedError(f"Function type {f_type} not implemented")
@property
def name(self) -> str:
"""
The name of the function
"""
return self.qb_func.name
[docs]
def is_import(self) -> bool:
"""
True if the function is imported
:return: whether the fonction is imported
"""
# Should we consider also FunctionType.thunk?
return self.type in (FunctionType.imported, FunctionType.extern)
[docs]
class ProgramBackendQuokka(AbstractProgramBackend):
"""
Backend loader of a Program using Quokka
"""
def __init__(self, export_path: str, exec_path: str):
super(ProgramBackendQuokka, self).__init__()
self.qb_prog = quokka.Program(export_path, exec_path)
self._exec_path = exec_path
self._callgraph = networkx.DiGraph() # type: ignore[var-annotated]
self._fun_names: dict[str, Addr] = {} # {fun_name : fun_address}
@property
def functions(self) -> Iterator[FunctionBackendQuokka]:
"""
Returns an iterator over backend function objects
"""
functions = {}
for addr, func in self.qb_prog.items():
# Pass a self (weak) reference for performance
f = FunctionBackendQuokka(weakref.ref(self), func)
if addr in functions:
logging.error("Address collision for 0x%x" % addr)
functions[addr] = f
self._fun_names[f.name] = addr
# Load the callgraph
self._callgraph.add_node(addr)
for c_addr in f.children:
self._callgraph.add_edge(addr, c_addr)
for p_addr in f.parents:
self._callgraph.add_edge(p_addr, addr)
return iter(functions.values())
@property
def name(self) -> str:
return self.qb_prog.executable.exec_file.name
@cached_property
def structures(self) -> list[Structure]:
"""Returns the list of structures defined in program"""
struct_list = []
for qbe_struct in self.qb_prog.structures:
if (struct_size := qbe_struct.size) is None:
raise RuntimeError(
"Normally this should never happen."
" Struct with size None encountered. name = `{qbe_struct.name}`"
)
struct = Structure(convert_struct_type(qbe_struct.type), qbe_struct.name, struct_size)
for offset, member in qbe_struct.items():
struct.add_member(
offset,
convert_data_type(member.type),
member.name,
member.size,
member.value,
)
struct_list.append(struct)
return struct_list
@cached_property
def structures_by_name(self) -> dict[str, Structure]:
"""
Returns the dictionary {name: structure}
"""
# Hoping that there won't be two struct with the same name
return {struct.name: struct for struct in self.structures}
[docs]
def get_structure(self, name: str) -> Structure:
"""
Returns structure identified by the name
"""
return self.structures_by_name[name]
@property
def callgraph(self) -> networkx.DiGraph:
"""
The callgraph of the program
"""
return self._callgraph
@property
def fun_names(self) -> dict[str, int]:
"""
Returns a dictionary with function name as key and the function address as value
"""
return self._fun_names
@property
def exec_path(self) -> str:
"""
Returns the executable path
"""
return self._exec_path
@property
def capabilities(self) -> ProgramCapability:
"""
Returns the supported capabilities
"""
return ProgramCapability.INSTR_GROUP | ProgramCapability.PCODE