Source code for qbindiff.features.wlgk

# Copyright 2023 Quarkslab
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Features releated to the WLGK

Features related to the Weisfeiler-Lehman Graph Kernel framework.
"""

import random
from functools import cache
from collections import defaultdict, Counter
from abc import ABCMeta, abstractmethod

from qbindiff.features.extractor import FunctionFeatureExtractor, FeatureCollector
from qbindiff.loader import Program, Function, BasicBlock


[docs] class LSH(metaclass=ABCMeta): """ Abstract class representing a Locality Sensitive Hashing function. It defines the interface to the function. """ @abstractmethod def __init__(self, node: BasicBlock): raise NotImplementedError() @abstractmethod def __add__(self, lsh: "LSH") -> "LSH": raise NotImplementedError() @abstractmethod def __iadd__(self, lsh: "LSH") -> "LSH": raise NotImplementedError() @abstractmethod def __call__(self) -> bytes: """Return the hash assigned to the node""" raise NotImplementedError()
[docs] @abstractmethod def add(self, lsh: "LSH") -> None: """Add the hash lsh to the current hash""" raise NotImplementedError()
[docs] class BOWLSH(LSH): """ Extract the bag-of-words representation of a block. The hashes are 4 bytes long. """ def __init__(self, node: BasicBlock | None = None): self.bag = defaultdict(int) if node is not None: for instr in node: self.bag[instr.id] += 1 def __iadd__(self, lsh: "BOWLSH") -> "BOWLSH": for k, v in lsh.bag.items(): self.bag[k] += v return self def __add__(self, lsh: "BOWLSH") -> "BOWLSH": res = self.copy() res += lsh return res
[docs] def copy(self) -> "BOWLSH": res = BOWLSH() res.bag = self.bag.copy() return res
def __call__(self) -> bytes: """Return the hash assigned to the node""" resHash = 0 for hp in BOWLSH.hyperplanes: resHash <<= 1 prod = sum(hp[k] * v for k, v in self.bag.items()) if prod >= 0: resHash |= 1 return resHash.to_bytes(4, "big")
[docs] def add(self, lsh: "LSH") -> None: """Add the hash lsh to the current hash""" self.__iadd__(lsh)
@classmethod @property @cache def hyperplanes(cls): """ Generate the hyperplanes for the LSH. Each hyperplane is identified by its normal vector v from R^2000: v * x = 0 the dimension 2000 should be sufficient to characterize the basic asm blocks. Warning: this method will leak memory as the hyperplanes will never be deallocated. """ hyperplanes = [] random.seed(0) for k in range(32): hyperplanes.append([2 * random.random() - 1 for i in range(2000)]) return hyperplanes
[docs] class WeisfeilerLehman(FunctionFeatureExtractor): """ Weisfeiler-Lehman Graph Kernel feature. It's strongly suggested to use the cosine distance with this feature. Options: ['max_passes': int] """ key = "wlgk" def __init__(self, *args, lsh: type[LSH] | None = None, max_passes: int = -1, **kwargs): """ Extract a feature vector by using a custom defined node labeling scheme. :param lsh: The Local Sensitive Hashing function to use. Must inherit from LSH. If None is specified then BOWLSH is used :param max_passes: The maximum number of iterations allowed. If it is set to -1 then no limit is specified. """ super(WeisfeilerLehman, self).__init__(*args, **kwargs) self.max_passes = max_passes if lsh is None: self.lsh = BOWLSH else: self.lsh = lsh
[docs] def visit_function(self, program: Program, function: Function, collector: FeatureCollector): labels = [] # Labels for each node at step i map_node_to_index = {} adjacency = defaultdict(list) # Label each node of the graph for bb_addr, bb in function.items(): labels.append(self.lsh(bb)) # We have to get the BlockNode from the corresponding Block and map # it to our node index map_node_to_index[bb_addr] = len(labels) - 1 # Adjacency for source, target in function.edges: adjacency[map_node_to_index[source]].append(map_node_to_index[target]) vec = [l() for l in labels] prev_counter = 0 if self.max_passes == -1: iterations = len(labels) else: iterations = min(self.max_passes, len(labels)) for step in range(iterations): # Recalculate labels at each step newLabels = [] for node, label in enumerate(labels): newLabels.append(label.copy()) for n in adjacency[node]: newLabels[node] += labels[n] labels = newLabels new_hashes = [l() for l in labels] counter = sorted(Counter(new_hashes).values()) # Early stop if counter == prev_counter: break prev_counter = counter vec.extend(new_hashes) # Generate the frequency vector of the labels collector.add_dict_feature(self.key, dict(Counter(vec)))