from pathlib import Path
import sqlite3
import hashlib
from datetime import datetime
from dataclasses import dataclass
from typing import Union
import ctypes
from bindiff.types import FunctionAlgorithm, BasicBlockAlgorithm
[docs]
@dataclass
class File:
"""
File diffed in database.
"""
# fmt: off
id: int #: Unique ID of the file in database
filename: str #: file path
exefilename: str #: file name
hash: str #: SHA256 hash of the file
functions: int #: total number of functions
libfunctions: int #: total number of functions identified as library
calls: int #: number of calls
basicblocks: int #: number of basic blocks
libbasicblocks: int #: number of basic blocks belonging to library functions
edges: int #: number of edges in callgraph
libedges: int #: number of edges in callgraph addressing a library
instructions: int #: number of instructions
libinstructions: int #: number of instructions in library functions
# fmt: on
[docs]
@dataclass
class FunctionMatch:
"""
A match between two functions in database.
"""
# fmt: off
id: int #: unique ID of function match in database
address1: int #: function address in primary
name1: str #: function name in primary
address2: int #: function address in secondary
name2: str #: function name in secondary
similarity: float #: similarity score (0..1)
confidence: float #: confidence of the match (0..1)
algorithm: FunctionAlgorithm #: algorithm used for the match
# fmt: on
[docs]
@dataclass
class BasicBlockMatch:
"""
A match between two basic blocks
"""
# fmt: off
id: int #: ID of the match in database
function_match: FunctionMatch #: FunctionMatch associated with this match
address1: int #: basic block address in primary
address2: int #: basic block address in secondary
algorithm: BasicBlockAlgorithm #: algorithm used to match the basic blocks
# fmt: on
[docs]
class BindiffFile(object):
"""
Bindiff database file.
The class seemlessly parse the database and allowing retrieving
and manipulating the results.
It also provides some methods to create a database and to add entries
in the database.
"""
def __init__(self, file: Union[Path, str], permission: str = "ro"):
"""
:param file: path to Bindiff database
:param permission: permission to use for opening database (default: ro)
"""
self._file = file
# Open database
self.db = sqlite3.connect(f"file:{str(file)}?mode={permission}", uri=True)
# fmt: off
# Global variables
self.similarity: float = None #: Overall similarity
self.confidence: float = None #: Overall diffing confidence
self.version: str = None #: version of the differ used for diffing
self.created: datetime = None #: Database creation date
self.modified: datetime = None #: Database last modification date
self._load_metadata(self.db.cursor())
# Files
self.primary_file: File = None #: Primary file
self.secondary_file: File = None #: Secondary file
self._load_file(self.db.cursor())
# fmt: on
# Function matches
self.primary_functions_match: dict[
int, FunctionMatch
] = {} #: FunctionMatch indexed by addresses in primary
self.secondary_functions_match: dict[
int, FunctionMatch
] = {} #: FunctionMatch indexed by addresses in secondary
self._load_function_match(self.db.cursor())
# Basicblock matches: BB-addr -> fun-addr -> match
self.primary_basicblock_match: dict[
int, dict[int, BasicBlockMatch]
] = {} #: Basic block match from primary
self.secondary_basicblock_match: dict[
int, dict[int, BasicBlockMatch]
] = {} #: Basic block match from secondary
self._load_basicblock_match(self.db.cursor())
# Instruction matches
# {inst_addr : {match_func_addr : match_inst_addr}}
self.primary_instruction_match: dict[int, dict[int, int]] = {}
self.secondary_instruction_match: dict[int, dict[int, int]] = {}
self._load_instruction_match(self.db.cursor())
@property
def unmatched_primary_count(self) -> int:
"""
Returns the number of functions inside primary that are not matched
"""
return (
self.primary_file.functions
+ self.primary_file.libfunctions
- len(self.primary_functions_match)
)
@property
def unmatched_secondary_count(self) -> int:
"""
Returns the number of functions inside secondary that are not matched
"""
return (
self.secondary_file.functions
+ self.secondary_file.libfunctions
- len(self.primary_functions_match)
)
@property
def function_matches(self) -> list[FunctionMatch]:
"""
Returns the list of matched functions
"""
return list(self.primary_functions_match.values())
@property
def basicblock_matches(self) -> list[BasicBlockMatch]:
"""
Returns the list of matched basic blocks in primary (and secondary)
"""
return [
x for bb_matches in self.primary_basicblock_match.values() for x in bb_matches.values()
]
def _load_file(self, cursor: sqlite3.Cursor) -> None:
"""
Load diffing file stored in a DB file
:param cursor: sqlite3 cursor to the DB
"""
files = cursor.execute("SELECT * FROM file").fetchall()
assert len(files) >= 2
self.primary_file = File(*files[0])
self.secondary_file = File(*files[1])
def _load_metadata(self, cursor: sqlite3.Cursor) -> None:
"""
Load diffing metadata as stored in the DB file
:param cursor: sqlite3 cursor to the DB
"""
query = "SELECT created, modified, similarity, confidence FROM metadata"
self.created, self.modified, self.similarity, self.confidence = cursor.execute(
query
).fetchone()
self.created = datetime.strptime(self.created, "%Y-%m-%d %H:%M:%S")
self.modified = datetime.strptime(self.modified, "%Y-%m-%d %H:%M:%S")
self.similarity = float("{0:.3f}".format(self.similarity)) # round the value to 3 decimals
self.confidence = float("{0:.3f}".format(self.confidence)) # round the value to 3 decimals
def _load_function_match(self, cursor: sqlite3.Cursor) -> None:
"""
Load matched functions stored in a DB file
:param cursor: sqlite3 cursor to the DB
"""
i2u = lambda x: ctypes.c_ulonglong(x).value
fun_query = "SELECT id, address1, name1, address2, name2, similarity, confidence, algorithm FROM function"
for id, addr1, name1, addr2, name2, sim, conf, alg in cursor.execute(fun_query):
addr1, addr2 = i2u(addr1), i2u(addr2)
m = FunctionMatch(id, addr1, name1, addr2, name2, sim, conf, FunctionAlgorithm(alg))
self.primary_functions_match[addr1] = m
self.secondary_functions_match[addr2] = m
def _load_basicblock_match(self, cursor: sqlite3.Cursor) -> None:
"""
Load matched basic blocks stored in a DB file
:param cursor: sqlite3 cursor to the DB
"""
mapping = {x.id: x for x in self.function_matches}
query = "SELECT id, functionid, address1, address2, algorithm FROM basicblock"
for id, fun_id, bb_addr1, bb_addr2, bb_algo in cursor.execute(query):
fun_match = mapping[fun_id]
assert fun_id == mapping[fun_id].id
bmatch = BasicBlockMatch(
id, fun_match, bb_addr1, bb_addr2, BasicBlockAlgorithm(bb_algo)
)
# As a basic block address can be in multiple functions create a nested dictionnary
if bb_addr1 in self.primary_basicblock_match:
self.primary_basicblock_match[bb_addr1][fun_match.address1] = bmatch
else:
self.primary_basicblock_match[bb_addr1] = {fun_match.address1: bmatch}
if bb_addr2 in self.secondary_basicblock_match:
self.secondary_basicblock_match[bb_addr2][fun_match.address2] = bmatch
else:
self.secondary_basicblock_match[bb_addr2] = {fun_match.address2: bmatch}
def _load_instruction_match(self, cursor: sqlite3.Cursor) -> None:
"""
Load matched instructions stored in a DB file
:param cursor: sqlite3 cursor to the DB
"""
i2u = lambda x: ctypes.c_ulonglong(x).value
mapping = {x.id: x for x in self.basicblock_matches}
query = "SELECT basicblockid, address1, address2 FROM instruction"
for id, i_addr1, i_addr2 in cursor.execute(query):
i_addr1, i_addr2 = i2u(i_addr1), i2u(i_addr2)
fun_match = mapping[id].function_match
# Set mapping for instructions
if i_addr1 in self.primary_instruction_match:
self.primary_instruction_match[i_addr1][fun_match.address1] = i_addr2
else:
self.primary_instruction_match[i_addr1] = {fun_match.address1: i_addr2}
if i_addr2 in self.secondary_instruction_match:
self.secondary_instruction_match[i_addr2][fun_match.address2] = i_addr1
else:
self.secondary_instruction_match[i_addr2] = {fun_match.address2: i_addr1}
[docs]
@staticmethod
def init_database(db: sqlite3.Connection) -> None:
"""
Initialize the database by creating all the tables
"""
conn = db.cursor()
# fmt: off
conn.execute("""
CREATE TABLE file (id INTEGER PRIMARY KEY, filename TEXT, exefilename TEXT, hash CHARACTER(40),
functions INT, libfunctions INT, calls INT, basicblocks INT, libbasicblocks INT, edges INT,
libedges INT, instructions INT, libinstructions INT)""")
conn.execute("""
CREATE TABLE metadata (version TEXT, file1 INTEGER, file2 INTEGER, description TEXT, created DATE,
modified DATE, similarity DOUBLE PRECISION, confidence DOUBLE PRECISION,
FOREIGN KEY(file1) REFERENCES file(id), FOREIGN KEY(file2) REFERENCES file(id))""")
conn.execute("""CREATE TABLE functionalgorithm (id SMALLINT PRIMARY KEY, name TEXT)""")
conn.execute("""
CREATE TABLE function (id INTEGER PRIMARY KEY, address1 BIGINT, name1 TEXT, address2 BIGINT,
name2 TEXT, similarity DOUBLE PRECISION, confidence DOUBLE PRECISION, flags INTEGER,
algorithm SMALLINT, evaluate BOOLEAN, commentsported BOOLEAN, basicblocks INTEGER,
edges INTEGER, instructions INTEGER, UNIQUE(address1, address2),
FOREIGN KEY(algorithm) REFERENCES functionalgorithm(id))""")
conn.execute("""CREATE TABLE basicblockalgorithm (id INTEGER PRIMARY KEY, name TEXT)""")
conn.execute("""
CREATE TABLE basicblock (id INTEGER, functionid INT, address1 BIGINT, address2 BIGINT,
algorithm SMALLINT, evaluate BOOLEAN, PRIMARY KEY(id), FOREIGN KEY(functionid) REFERENCES function(id),
FOREIGN KEY(algorithm) REFERENCES basicblockalgorithm(id))""")
conn.execute("""
CREATE TABLE instruction (basicblockid INT, address1 BIGINT, address2 BIGINT,
FOREIGN KEY(basicblockid) REFERENCES basicblock(id))""")
db.commit()
# fmt: on
conn.execute(
"""INSERT INTO basicblockalgorithm(name) VALUES ("basicBlock: edges prime product")"""
)
db.commit()
[docs]
@staticmethod
def create(
filename: str,
primary: str,
secondary: str,
version: str,
desc: str,
similarity: float,
confidence: float,
) -> "BindiffFile":
"""
Create a new Bindiff database object in the file given in `filename`.
It only takes two binaries.
:param filename: database file path
:param primary: path to primary binary
:param secondary: path to secondary binary
:param version: version of the differ used
:param desc: description of the database
:param similarity: similarity score between to two binaries
:param confidence: confidence of results
:return: instance of BindiffFile (ready to be filled)
"""
open(filename, "w").close()
db = sqlite3.connect(filename)
BindiffFile.init_database(db)
conn = db.cursor()
# Save primary
file1 = Path(primary)
hash1 = hashlib.sha256(file1.read_bytes()).hexdigest() if file1.exists() else ""
conn.execute(
"""INSERT INTO file (filename, exefilename, hash) VALUES (:filename, :name, :hash)""",
{"filename": str(file1), "name": file1.name, "hash": hash1},
)
# Save secondary
file2 = Path(secondary)
hash2 = hashlib.sha256(file2.read_bytes()).hexdigest() if file2.exists() else ""
conn.execute(
"""INSERT INTO file (filename, exefilename, hash) VALUES (:filename, :name, :hash)""",
{"filename": str(file2), "name": file2.name, "hash": hash2},
)
conn.execute(
"""
INSERT INTO metadata (version, file1, file2, description, created, modified, similarity, confidence)
VALUES (:version, 1, 2, :desc, :created, :modified, :similarity, :confidence)
""",
{
"version": version,
"desc": desc,
"created": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"modified": datetime.now().strftime(
"%Y-%m-%d %H:%M:%S"
), # modified has to be filled so initialize it to the creation time
"similarity": similarity,
"confidence": confidence,
},
)
db.commit()
db.close()
return BindiffFile(filename, permission="rw")
[docs]
def add_function_match(
self,
fun_addr1: int,
fun_addr2: int,
fun_name1: str,
fun_name2: str,
similarity: float,
confidence: float = 0.0,
identical_bbs: int = 0,
) -> int:
"""
Add a function match in database.
:param fun_addr1: primary function address
:param fun_addr2: secondary function address
:param fun_name1: primary function name
:param fun_name2: secondary function name
:param similarity: similarity score between the two functions
:param confidence: confidence score between the two functions
:param identical_bbs: number of identical basic blocks
:return: id of the row inserted in database.
"""
cursor = self.db.cursor()
cursor.execute(
"""
INSERT INTO function (address1, address2, name1, name2, similarity, confidence, basicblocks)
VALUES (:address1, :address2, :name1, :name2, :similarity, :confidence, :identical_bbs)
""",
{
"address1": fun_addr1,
"address2": fun_addr2,
"name1": fun_name1,
"name2": fun_name2,
"similarity": similarity,
"confidence": confidence,
"identical_bbs": identical_bbs,
},
)
return cursor.lastrowid
[docs]
def add_basic_block_match(
self, fun_addr1: int, fun_addr2: int, bb_addr1: int, bb_addr2: int
) -> int:
"""
Add a basic block match in database.
:param fun_addr1: function address of basic block in primary
:param fun_addr2: function address of basic block in secondary
:param bb_addr1: basic block address in primary
:param bb_addr2: basic block address in secondary
:return: id of the row inserted in database.
"""
cursor = self.db.cursor()
cursor.execute(
"""
INSERT INTO basicblock (functionid, address1, address2, algorithm)
VALUES ((SELECT id FROM function WHERE address1=:function_address1 AND address2=:function_address2), :address1, :address2, :algorithm)
""",
{
"function_address1": fun_addr1,
"function_address2": fun_addr2,
"address1": bb_addr1,
"address2": bb_addr2,
"algorithm": "1",
},
)
return cursor.lastrowid
[docs]
def add_instruction_match(self, entry: int, inst_addr1: int, inst_addr2: int) -> None:
"""
Add an instruction match in database.
:param entry: basic block match identifier in database
:param inst_addr1: instruction address in primary
:param inst_addr2: instruction address in secondary
"""
cursor = self.db.cursor()
cursor.execute(
"""
INSERT INTO instruction (basicblockid, address1, address2) VALUES (:basicblockid, :address1, :address2)
""",
{
"address1": inst_addr1,
"address2": inst_addr2,
"basicblockid": entry,
},
)
[docs]
def update_file_infos(
self, entry_id: int, fun_count: int, lib_count: int, bb_count: int, inst_count: int
) -> None:
"""
Update information about a binary in database (function, basic block count ...)
:param entry_id: entry of the binary in database (row id)
:param fun_count: number of functions
:param lib_count: number of functions flagged as libraries
:param bb_count: number of basic blocks
:param inst_count: number of instructions
"""
cursor = self.db.cursor()
cursor.execute(
"""
UPDATE file
SET functions = :functions, libfunctions = :libfunctions, basicblocks = :basicblocks, instructions = :instructions
WHERE id = :entry_id
""",
{
"entry_id": str(entry_id),
"functions": fun_count,
"libfunctions": lib_count,
"basicblocks": bb_count,
"instructions": inst_count,
},
)
self.db.commit()