Source code for idascript.ida

import enum
import subprocess
import logging
from idascript import IDA_BINARY
from pathlib import Path
from multiprocessing import Pool, Queue, Manager
import queue
import os
from typing import List, Optional, Iterable, Union, Generator

TIMEOUT_RETURNCODE: int = -1


[docs] class IDAException(Exception): """ Base class for exceptions in the module. """ pass
[docs] class IDANotStared(IDAException): """ This exception is raised when attempting to call a function of the `IDA` class before having called `start`. """ pass
[docs] class IDAModeNotSet(IDAException): """ This exception is raised when the IDA Mode has not been set before calling `start`. """ pass
[docs] class MultiIDAAlreadyRunning(IDAException): """ Exception raised if the `map` function of MultiIDA is called while another map operation is still pending. Design choices disallow launching two MultiIDA.map function in the same time. """ pass
class IDAMode(enum.Enum): """ Different modes possible for the IDA class """ # Default value NOTSET = enum.auto() # Used when IDA will be launched for an IDAPython script IDAPYTHON = enum.auto() # Used when IDA will be launched directly DIRECT = enum.auto()
[docs] class IDA: """ Class representing an IDA execution on a given file with a given script. This class is a wrapper to subprocess IDA. """ def __init__(self, binary_file: Union[Path, str], script_file: Optional[Union[str, Path]] = None, script_params: Optional[List[str]] = None, timeout: Optional[float] = None): """ :param binary_file: path of the binary file to analyse :param script_file: path to the Python script to execute on the binary (if required) :param script_params: additional parameters to send either to the script or IDA directly """ if not Path(binary_file).exists(): raise FileNotFoundError("Binary file: %s" % binary_file) self.bin_file: Path = Path(binary_file).resolve() #: File to the binary self._process = None self.script_file: Optional[Path] = None #: script file to execute self.params: List[str] = [] #: list of paramaters given to IDA self.timeout: Optional[float] = timeout #: Timeout for IDA execution if script_file is not None: # Mode IDAPython self._set_idapython(script_file, script_params) else: # Direct mode self._set_direct(script_params) def _set_idapython(self, script_file: Union[Path, str], script_params: List[str] = None) -> None: """ Set IDAPython script parameter. :param script_file: path to the script to execute on the binary file :param script_params: additional parameters sent to the script (available via idc.ARGV in idapython) """ if not Path(script_file).exists(): raise FileNotFoundError("Script file: %s" % script_file) if script_params is None: script_params = [] if script_params: if not isinstance(script_params, list): raise TypeError("script_params parameter should be a list") self.script_file = Path(script_file).resolve() self.params = [x.replace('"', '\\"') for x in script_params] if script_params else [] self.mode = IDAMode.IDAPYTHON def _set_direct(self, script_options: List[str]) -> None: """ Set parameters script in direct mode :param script_options: List of script options :return: None """ for option in script_options: if ':' not in option: raise TypeError('Options must have a ":"') self.params.append(f'-O{option}') self.mode = IDAMode.DIRECT
[docs] def start(self) -> None: """ Start the IDA process on the binary. """ cmd_line = [IDA_BINARY.as_posix(), '-A'] if self.mode == IDAMode.IDAPYTHON: params = " "+" ".join(self.params) if self.params else "" cmd_line.append('-S%s%s' % (self.script_file.as_posix(), params)) elif self.mode == IDAMode.DIRECT: cmd_line.extend(self.params) else: raise cmd_line.append(self.bin_file.as_posix()) logging.debug(f"run: {' '.join(cmd_line)}") env = os.environ env["TVHEADLESS"] = "1" env["TERM"] = "xterm" self._process = subprocess.Popen( cmd_line, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, # See `https://www.hex-rays.com/blog/igor-tip-of-the-week-08-batch-mode-under-the-hood/`_ env=env )
@property def returncode(self) -> Optional[int]: """ Get the returncode of the process. Raise IDANotStart if called before launching the process. """ if self._process: return self._process.returncode else: raise IDANotStared() @property def terminated(self) -> bool: """ Boolean function returning True if the process is terminated """ if self._process: if self._process.poll() is not None: return True else: return False else: raise IDANotStared() @property def pid(self) -> int: """ Returns the PID of the IDA process :return: int (PID of the process) """ if self._process: return self._process.pid else: raise IDANotStared()
[docs] def wait(self) -> int: """ Wait for the process to finish. This function hangs until the process terminate. A timeout can be given which raises TimeoutExpired if the timeout is exceeded (subprocess mechanism). """ if self._process: try: return self._process.wait(self.timeout) except subprocess.TimeoutExpired: self._process.terminate() return TIMEOUT_RETURNCODE else: raise IDANotStared()
[docs] def terminate(self) -> None: """ Call terminate on the IDA process (kill -15) """ if self._process: self._process.terminate() else: raise IDANotStared()
[docs] def kill(self) -> None: """ Call kill on the IDA subprocess (kill -9) """ if self._process: self._process.kill() else: raise IDANotStared()
[docs] class MultiIDA: """ Class to trigger multiple IDA processes concurrently on a bunch of files. """ @staticmethod def _worker(ingress, egress, script_file, params, timeout) -> None: while True: try: file = ingress.get(timeout=0.5) ida = IDA(file, script_file, params, timeout) ida.start() res = ida.wait() egress.put((file, res)) except queue.Empty: pass except KeyboardInterrupt: break
[docs] @staticmethod def map(generator: Iterable[Path], script: Union[str, Path] = None, params: List[str] = None, workers: int = None, timeout: Optional[float] = None) -> Generator[tuple[int, Path], None, None]: """ Iterator the generator sent and apply the script file on each file concurrently on a bunch of IDA workers. The function consume the generator as fast as it can occupy all the workers and yield a tuple (return code, path file) everytime an IDA process as terminated. :param generator: Iterable of file paths strings (or Path) :param script: path to the script to execute :param params: list of parameters to send to the script :param workers: number of workers to trigger in parallel :param timeout: timeout for IDA runs (-1 means infinity) :return: generator of files processed (return code, file path) """ manager = Manager() ingress = manager.Queue() egress = manager.Queue() pool = Pool(workers) # Launch all workers for i in range(workers): pool.apply_async(MultiIDA._worker, (ingress, egress, script, params, timeout)) # Pre-fill ingress queue total = 0 for file in generator: ingress.put(file) total += 1 i = 0 while True: path, res = egress.get() i += 1 yield path, res # once all items have been processed if i == total: break pool.terminate()