# Copyright 2023 Quarkslab
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Matcher
"""
# built-in imports
from __future__ import annotations
import logging
from typing import TYPE_CHECKING
# Third-party imports
import numpy as np
from lapjv import lapjv # type: ignore[import-not-found]
from scipy.sparse import csr_matrix, coo_matrix # type: ignore[import-untyped]
# Local imports
from qbindiff.matcher.squares import find_squares # type: ignore[import-untyped]
from qbindiff.matcher.belief_propagation import BeliefMWM, BeliefQAP
if TYPE_CHECKING:
from collections.abc import Iterator
from qbindiff.types import Positive, Ratio, RawMapping, AdjacencyMatrix, Matrix, SimMatrix
def solve_linear_assignment(cost_matrix: Matrix) -> RawMapping:
"""
Solve the linear assignment problem given the cost_matrix
:param: cost matrix
:return: raw mapping
"""
n, m = cost_matrix.shape
transposed = n > m
if transposed:
n, m = m, n
cost_matrix = cost_matrix.T
full_cost_matrix = np.zeros((m, m), dtype=cost_matrix.dtype)
full_cost_matrix[:n, :m] = cost_matrix
col_indices = lapjv(full_cost_matrix)[0][:n]
if transposed:
return col_indices, np.arange(n)
return np.arange(n), col_indices
[docs]
class Matcher:
def __init__(
self,
similarity_matrix: SimMatrix,
primary_adj_matrix: AdjacencyMatrix,
secondary_adj_matrix: AdjacencyMatrix,
):
self._mapping: RawMapping = None # nodes mapping
#: Similarity matrix used by the Matcher
self.sim_matrix = similarity_matrix
#: Adjacency matrix of the primary graph
self.primary_adj_matrix = primary_adj_matrix
#: Adjacency matrix of the secondary graph
self.secondary_adj_matrix = secondary_adj_matrix
self._processed = False
self.sparse_sim_matrix: SimMatrix | None = None
self.squares_matrix: Matrix | None = None
def _compute_sparse_sim_matrix(self, sparsity_ratio: Ratio, sparse_row: bool) -> None:
"""
Generate the sparse similarity matrix given the sparsity_ratio
:param sparsity_ratio: ratio of least probable matches to ignore
:param sparse_row: whether to use sparse rows
:return: None
"""
sparsity_size = round(sparsity_ratio * self.sim_matrix.size)
if sparsity_size == 0: # Keep the matrix as it is
self.sparse_sim_matrix = csr_matrix(self.sim_matrix)
return
if sparsity_size == self.sim_matrix.size: # Empty matrix
self.sparse_sim_matrix = csr_matrix(self.sim_matrix.shape, dtype=self.sim_matrix.dtype)
return
if sparse_row:
sparsity_size = round(sparsity_ratio * self.sim_matrix.shape[1])
# Sort the similarity matrix columns and keep indexes of the sorted values
sorted_indexes = np.argsort(self.sim_matrix, kind="stable")
mask = []
for i in range(self.sim_matrix.shape[0]):
# Replace all the discarded values with zeros
self.sim_matrix[i, sorted_indexes[i, :sparsity_size]] = 0
# Append the mask for the current row
mask.append(self.sim_matrix[i] > 0)
self.sparse_sim_matrix = csr_matrix(mask, dtype=self.sim_matrix.dtype)
self.sparse_sim_matrix.data[:] = self.sim_matrix[mask] # type: ignore
else:
# Sort the flattened similarity matrix and keep the indexes of the sorted values
sorted_indexes = np.argsort(self.sim_matrix, axis=None, kind="stable")
# Replace all the discarded values with zeros
self.sim_matrix.flat[sorted_indexes[:sparsity_size]] = 0
# Create a mask
mask = self.sim_matrix > 0 # type: ignore
# Create the sparse matrix
csr_data = self.sim_matrix[mask]
self.sparse_sim_matrix = csr_matrix(mask, dtype=self.sim_matrix.dtype)
self.sparse_sim_matrix.data[:] = csr_data # type: ignore
def _compute_squares_matrix(self) -> None:
"""
Generate the sparse squares matrix and store it in self._squares_matrix.
Given two graphs G1 and G2, a square is a tuple of nodes (nodeA, nodeB, nodeC, nodeD)
such that all of the followings conditions are true:
- nodeA and nodeD belong to G1
- nodeB and nodeC belong to G2
- (nodeA, nodeD) is a directed edge in G1
- (nodeB, nodeC) is a directed edge in G2
- (nodeA, nodeB) is a edge in the similarity matrix (non-zero score)
- (nodeC, nodeD) is a edge in the similarity matrix (non-zero score)
Note that the nodes are not necessarily different since (nodeX, nodeX) might be
a valid edge.
(A) <---sim_edge---> (B)
| |
|graph_edge |
| graph_edge|
v v
(D) <---sim_edge---> (C)
The resulting square matrix is stored as a csr_matrix of size NxN where N=#{similarity edge}
Every similarity edge is given a unique increasing number from 0 to N and there is a square
between two similarity edges `e1` and `e2` <=> (iff) self._squares_matrix[e1][e2] == 1
The time complexity is O(|sparse_sim_matrix| * average_graph_degree**2)
"""
# Use the fast Cython algorithm for efficiency
self.squares_matrix = find_squares(
self.primary_adj_matrix, self.secondary_adj_matrix, self.sparse_sim_matrix
)
@property
def mapping(self) -> RawMapping | None:
"""
Nodes mapping between the two graphs
:returns: The raw mapping between the nodes in the two input graphs
"""
return self._mapping
@property
def confidence_score(self) -> list[float]:
"""
Confidence score for each match in the nodes mapping
"""
return [self._confidence[idx1, idx2] for idx1, idx2 in zip(*self.mapping)]
[docs]
def process(
self, sparsity_ratio: Ratio = 0.6, sparse_row: bool = False, compute_squares: bool = True
) -> None:
"""
Initialize the matching algorithm
:param sparsity_ratio: The ratio between null element over the entire similarity
matrix
:param sparse_row: When building the sparse similarity matrix we can either
filter out the elements by considering all the entries in the
similarity matrix (sparse_row == False) or by considering
each vector separately (sparse_row == True)
:param compute_squares: Whether to compute the squares matrix
"""
if self._processed: # Only do it once
return
self._processed = True
logging.debug(
f"Computing sparse similarity matrix (ratio {sparsity_ratio} sparse_row {sparse_row})"
)
self._compute_sparse_sim_matrix(sparsity_ratio, sparse_row)
logging.debug(
"Sparse similarity matrix computed,"
f" shape: {self.sparse_sim_matrix.shape}" # type: ignore
f", nnz elements: {self.sparse_sim_matrix.nnz}" # type: ignore
)
if compute_squares:
logging.debug("Computing squares matrix")
self._compute_squares_matrix()
logging.debug(
f"Squares matrix computed, shape: {self.squares_matrix.shape}" # type: ignore
f", nnz elements: {self.squares_matrix.nnz}" # type: ignore
)
[docs]
def compute(
self, tradeoff: Ratio = 0.75, epsilon: Positive = 0.5, maxiter: int = 1000
) -> Iterator[int]:
"""
Launch the computation for a given number of iterations, using specific QBinDiff parameters
:param tradeoff: tradeoff between the node similarity and the structure
:param epsilon: perturbation to add to the similarity matrix
:param maxiter: maximum number of iterations for the belief propagation
:return: an iterator over each step of the belief propagation
"""
self.process()
if tradeoff == 1:
logging.info("[+] switching to Maximum Weight Matching (tradeoff is 1)")
belief = BeliefMWM(self.sparse_sim_matrix, epsilon)
else:
belief = BeliefQAP(self.sparse_sim_matrix, self.squares_matrix, tradeoff, epsilon)
for niter in belief.compute(maxiter):
yield niter
score_matrix = self.sparse_sim_matrix.copy() # type: ignore
self._confidence = belief.current_marginals
self._mapping = self.refine(belief.current_mapping, score_matrix)
[docs]
def refine(self, mapping: RawMapping, score_matrix: SimMatrix) -> RawMapping:
"""
Refine the mappings between the nodes of the two graphs
by matching the unassigned nodes
:param mapping: initial mapping
:param score_matrix: similarity matrix
:return: updated raw mapping
"""
primary, secondary = mapping
assert len(primary) == len(secondary)
# All the nodes have been assigned
if len(primary) == min(score_matrix.shape):
return mapping
primary_missing = np.setdiff1d(range(score_matrix.shape[0]), primary)
secondary_missing = np.setdiff1d(range(score_matrix.shape[1]), secondary)
score_matrix = score_matrix[primary_missing][:, secondary_missing]
nnz_indices = score_matrix.nonzero()
score_matrix = score_matrix.toarray()
# Give the zero elements a high score
lap_scores = np.full(score_matrix.shape, 1000000, dtype=score_matrix.dtype)
# LAP solves solves for the minimum cost but high scores means good match
lap_scores[nnz_indices] = -score_matrix[nnz_indices]
primary_ass, secondary_ass = solve_linear_assignment(lap_scores)
return np.hstack((primary, primary_missing[primary_ass])), np.hstack(
(secondary, secondary_missing[secondary_ass])
)