I working on a plugin for Binary Ninja where one of the features is to trace functions using Frida. The plugin is written in python (using python 3.10) but the Frida commands are in JavaScript. I am trying to load some JS code and make Frida run it (It is part of my understanding that Frida provides its own VM for JS, but I may have misunderstood).
This code is an object representing the function being traced. The trace_template
is the JS code I want to run.
import frida
class Tracee:
session: frida.core.Session
pid: int
active: bool
def __init__(self, session, pid):
self.session = session
self.pid = pid
self.active = False
def __eq__(self, other):
if not isinstance(other, Tracee):
return NotImplemented
return self.pid == other.pid
def __hash__(self):
return hash(self.pid)
trace_template = """
const base_mod = Process.getModuleByName('{binary_name}').base;
const func_ptr = ptr(base_mod.add({func_addr}));
Interceptor.attach(func_ptr, {{
onEnter: function(args) {{
console.log('Entered function! had argument: ', args[0].toInt32().toString(16));
var msg = {{
'event': 'call',
'args': args[0].toInt32().toString(16)
}};
send(msg);
}},
onLeave: function(retval) {{
console.log('Returning from function: ', retval.toInt32().toString(16));
var msg = {{
'event': 'return',
'args': retval.toInt32().toString(16)
}};
send(msg);
}}
}});
send({{
'maybeidk': DebugSymbol.fromName('{func_name}')
}})
"""
This is the function which uses the trace_template
and is supposed to run it.
def trace_functions(self, tracee: Tracee):
t = self.targets[0]
fn = self.bv.file.original_filename.split('/')[-1]
formatted = trace_template.format(
binary_name=fn,
func_addr=t.lowest_address,
func_name=t.name)
slog.log_warn(formatted)
script = tracee.session.create_script(formatted)
script.on('message', lambda message,data: self._reactor.schedule(
lambda: self._on_message(tracee.pid, message)
))
script.load()
self._device.resume(tracee.pid)
And that function is part of this file of code, which I’m adding in case more context is necessary:
import json
import binaryninja as bn
import frida
import time
import threading
from enum import Enum
from pathlib import Path
from frida_tools.application import Reactor
from typing import Optional, List, Callable
from json import loads
from .settings import SETTINGS
from .log import system_logger as slog, gadget_logger as glog
from .helper import TRACE_TAG, get_functions_by_tag, mark_func
from .trace import Tracee, trace_template
__portal_thread = None
_stop_event = threading.Event()
class PortalAction(Enum):
INIT = 1
TRACE = 2
def stop_server(bv: bn.BinaryView) -> None:
"""
Stops server with stop_event
"""
global __portal_thread, _stop_event
if __portal_thread is None:
slog.log_warn("Portal thread not up")
bn.show_message_box(
"Error: Portal not instantiated",
"Frida-portal thread is not running.",
bn.MessageBoxButtonSet.OKButtonSet,
bn.MessageBoxIcon.ErrorIcon)
return
slog.log_info("Stopping server...")
_stop_event.set()
__portal_thread.join()
__portal_thread = None
_stop_event.clear()
slog.log_info("Server closed.")
def mark_function(bv: bn.BinaryView, func: bn.Function):
"""
Uses helper to mark function and schedule a reload of all marked functions
"""
global __portal_thread
mark_func(bv, func)
if is_portal_running():
__portal_thread.app.schedule_reload()
def is_portal_running() -> bool:
return __portal_thread is not None and __portal_thread.is_alive()
def clear_traced_functions(bv: bn.BinaryView):
if not is_portal_running():
slog.log_warn("No functions to clear!")
return
for mark in __portal_thread.app.targets:
mark_func(bv, mark)
__portal_thread.app.schedule_reload()
def start_server(bv: bn.BinaryView) -> None:
"""
Start server thread with PortalApp running.
Run thread as daemon to not block and be closed when exiting.
"""
global __portal_thread
if __portal_thread is not None:
bn.show_message_box(
"Error",
"Error: Portal already running",
bn.MessageBoxButtonSet.OKButtonSet,
bn.MessageBoxIcon.ErrorIcon)
slog.log_warn("Portal server already created!")
else:
slog.log_info('Starting frida portal...')
__portal_thread = threading.Thread(target=instance_app, args=(bv, ))
__portal_thread.daemon = True
__portal_thread.start()
def instance_app(bv: bn.BinaryView) -> None:
global __portal_thread
app = PortalApp(bv)
__portal_thread.app = app
app.run()
class PortalApp:
"""
Portal app based on example found under frida_python
Uses Reactor from frida_tools to handle async tasks
"""
bv: bn.BinaryView
_reactor: Reactor
_session: Optional[frida.core.Session]
cluster_params: frida.EndpointParameters
control_params: frida.EndpointParameters
action: PortalAction
targets: List[bn.Function]
tracees: List[Tracee]
def __init__(self, bv: bn.BinaryView):
self._reactor = Reactor(run_until_return=self.handle)
self.bv = bv
try:
cluster_ip = bv.query_metadata('fridalens_cluster_ip')
cluster_port = int(bv.query_metadata('fridalens_cluster_port'))
control_ip = bv.query_metadata('fridalens_control_ip')
control_port = int(bv.query_metadata('fridalens_control_port'))
except:
print('No saved settings, using defailtnAddress=127.0.0.1ncluster port=27052ncontrol port=27042')
cluster_ip = "127.0.0.1"
cluster_port = 27052
control_ip = "127.0.0.1"
control_port = 27042
cluster_params = frida.EndpointParameters(
address=cluster_ip,
port=cluster_port
)
control_params = frida.EndpointParameters(
address=control_ip, port=control_port, authentication=None)
service = frida.PortalService(cluster_params, control_params)
self._service = service
self._device = service.device
self._session = None
self.action = PortalAction.INIT
self.targets = get_functions_by_tag(bv, TRACE_TAG)
self.tracees = []
service.on("node-connected", lambda *args: self._reactor.schedule(
lambda: self._on_node_connected(*args)
))
service.on("node-joined", lambda *args: self._reactor.schedule(
lambda: self._on_node_joined(*args)
))
service.on("node-left", lambda *args: self._reactor.schedule(
lambda: self._on_node_left(*args)
))
service.on("node-disconnected", lambda *args: self._reactor.schedule(
lambda: self._on_node_disconnected(*args)
))
service.on("message", lambda *args: self._reactor.schedule(
lambda: self._on_message(*args)
))
service.on("controller-connected", lambda *args: self._reactor.schedule(
lambda: self._on_controller_connected(*args)
))
service.on("controller-disconnected", lambda *args: self._reactor.schedule(
lambda: self._on_controller_disconnected(*args)
))
service.on("subscribe", lambda *args: self._reactor.schedule(
lambda: self._on_subscribe(*args)
))
def _on_node_connected(self, connection_id, remote_addr):
slog.log_info(f"Node connected: {connection_id}, Address: {remote_addr}")
def _on_node_disconnected(self, connection_id, remote_addr):
slog.log_info(f"Node disconnected: {connection_id}, Address: {remote_addr}")
def _on_detached(self, pid: int, reason: str):
slog.log_info(f"[DETACH] Tracee: {pid} reason: {reason}")
def _on_node_joined(self, connection_id, app):
"""
Happens along with connecting, but passes application info
Do most of logic here
"""
slog.log_info(f"Node joined: {connection_id}, Application: {app}")
sesh = self._device.attach(app.pid)
#TODO
sesh.on("detached",
lambda reason: self._reactor.schedule(
lambda: self._on_detached(app.pid, reason)
))
tracee = Tracee(sesh, app.pid)
if tracee not in self.tracees:
self.tracees.append(tracee)
self.action = PortalAction.TRACE
else:
slog.log_warn(f"Repeated tracee triggered node_joined; PID: {tracee.pid}")
def _on_node_left(self, connection_id, app):
"""
Happens along with disconnecting, but passes application info
Do most of logic here
"""
slog.log_info(f"Node left: {connection_id}, Address: {app}")
self.tracees = [tracee for tracee in self.tracees if tracee.pid != app.pid]
#TODO rename to idle or something similar
if len(self.tracees) == 0:
self.action = PortalAction.INIT
def _on_controller_connected(self, connection_id, remote_addr):
slog.log_info(f"Controller Connected: {connection_id}, Address: {remote_addr}")
def _on_controller_disconnected(self, connection_id, remote_addr):
slog.log_info(f"Controller disconnected: {connection_id}, Address: {remote_addr}")
def _on_message(self, pid: int, msg: str):
"""
Takes messages sent from gadgets/controllers and handles them
Msg is sent as str, convert to dict using json.loads()
"""
msg = loads(msg)
match msg['type']:
case "log":
payload = msg['payload']
level = msg['level']
glog.log_info(f"[PID {pid}-{level}] {payload}")
case _:
glog.log_warn(f"[PID {pid}] {msg}")
def _on_subscribe(self, connection_id):
slog.log_info(f'New subscription: {connection_id}')
def run(self):
slog.log_info("Starting portal run")
self._reactor.schedule(self._start)
self._reactor.run()
def _start(self):
self._service.start()
self._device.enable_spawn_gating()
def _stop(self):
for tracee in self.tracees:
self._reactor.schedule(lambda: tracee.session.detach())
self._service.stop()
def _reload(self):
targets = get_functions_by_tag(self.bv, TRACE_TAG)
self.targets = targets
slog.log_info(f"Targets found: {targets}")
if len(self.tracees) > 0:
self.action = PortalAction.TRACE
def schedule_reload(self):
"""
KISS for now ig
"""
self._reactor.schedule(self._reload)
def trace_functions(self, tracee: Tracee):
t = self.targets[0]
fn = self.bv.file.original_filename.split('/')[-1]
formatted = trace_template.format(
binary_name=fn,
func_addr=t.lowest_address,
func_name=t.name)
slog.log_warn(formatted)
script = tracee.session.create_script(formatted)
script.on('message', lambda message,data: self._reactor.schedule(
lambda: self._on_message(tracee.pid, message)
))
script.load()
self._device.resume(tracee.pid)
def handle(self, reactor):
global _stop_event
while not _stop_event.is_set():
match self.action:
case PortalAction.INIT:
slog.log_info("Idling...")
if len(self.targets) > 0:
slog.log_info(f"Functions marked: {self.targets}")
case PortalAction.TRACE:
slog.log_info(f"Tracees: {list(map(lambda t: t.pid, self.tracees))}")
if len(self.targets) <= 0:
slog.log_warn("No functions to trace yet!")
else:
for t in list(filter(lambda x: not x.active, self.tracees)):
t.active = True
self._reactor.schedule(
lambda: self.trace_functions(t)
)
time.sleep(1)
slog.log_info("Finishing up")
self._stop()