How to use the Frida Interceptor from Python

I working on a plugin for Binary Ninja where one of the features is to trace functions using Frida. The plugin is written in python (using python 3.10) but the Frida commands are in JavaScript. I am trying to load some JS code and make Frida run it (It is part of my understanding that Frida provides its own VM for JS, but I may have misunderstood).

This code is an object representing the function being traced. The trace_template is the JS code I want to run.

import frida

class Tracee:
    session: frida.core.Session
    pid: int
    active: bool
    def __init__(self, session, pid):
        self.session = session
        self.pid = pid
        self.active = False

    def __eq__(self, other):
        if not isinstance(other, Tracee):
            return NotImplemented
        return self.pid == other.pid

    def __hash__(self):
        return hash(self.pid)

trace_template = """
const base_mod = Process.getModuleByName('{binary_name}').base;
const func_ptr = ptr(base_mod.add({func_addr}));

Interceptor.attach(func_ptr, {{
    onEnter: function(args) {{
        console.log('Entered function! had argument: ', args[0].toInt32().toString(16));
        var msg = {{
            'event': 'call',
            'args': args[0].toInt32().toString(16)
        }};
        send(msg);
    }},
    onLeave: function(retval) {{
        console.log('Returning from function: ', retval.toInt32().toString(16));
        var msg = {{
            'event': 'return',
            'args': retval.toInt32().toString(16)
        }};
        send(msg);
    }}
}});
send({{
    'maybeidk': DebugSymbol.fromName('{func_name}')
}})
"""

This is the function which uses the trace_template and is supposed to run it.

def trace_functions(self, tracee: Tracee):
    t = self.targets[0]
    fn = self.bv.file.original_filename.split('/')[-1]
    formatted = trace_template.format(
                    binary_name=fn,
                    func_addr=t.lowest_address,
                    func_name=t.name)
    slog.log_warn(formatted)
    script = tracee.session.create_script(formatted)
    script.on('message', lambda message,data: self._reactor.schedule(
            lambda: self._on_message(tracee.pid, message)
    ))
    script.load()
    self._device.resume(tracee.pid)

And that function is part of this file of code, which I’m adding in case more context is necessary:

import json
import binaryninja as bn
import frida
import time
import threading

from enum import Enum
from pathlib import Path
from frida_tools.application import Reactor
from typing import Optional, List, Callable
from json import loads

from .settings import SETTINGS
from .log import system_logger as slog, gadget_logger as glog
from .helper import TRACE_TAG, get_functions_by_tag, mark_func
from .trace import Tracee, trace_template

__portal_thread = None
_stop_event = threading.Event()

class PortalAction(Enum):
    INIT = 1
    TRACE = 2

def stop_server(bv: bn.BinaryView) -> None:
    """
    Stops server with stop_event
    """
    global __portal_thread, _stop_event
    if __portal_thread is None:
        slog.log_warn("Portal thread not up")
        bn.show_message_box(
                "Error: Portal not instantiated",
                "Frida-portal thread is not running.",
                bn.MessageBoxButtonSet.OKButtonSet,
                bn.MessageBoxIcon.ErrorIcon)
        return
    slog.log_info("Stopping server...")
    _stop_event.set()
    __portal_thread.join()
    __portal_thread = None
    _stop_event.clear()
    slog.log_info("Server closed.")

def mark_function(bv: bn.BinaryView, func: bn.Function):
    """
    Uses helper to mark function and schedule a reload of all marked functions
    """
    global __portal_thread
    mark_func(bv, func)
    if is_portal_running():
        __portal_thread.app.schedule_reload()

def is_portal_running() -> bool:
    return __portal_thread is not None and __portal_thread.is_alive()

def clear_traced_functions(bv: bn.BinaryView):
    if not is_portal_running():
        slog.log_warn("No functions to clear!")
        return
    for mark in __portal_thread.app.targets:
        mark_func(bv, mark)
    __portal_thread.app.schedule_reload()


def start_server(bv: bn.BinaryView) -> None:
    """
    Start server thread with PortalApp running.
    Run thread as daemon to not block and be closed when exiting.
    """
    global __portal_thread
    if __portal_thread is not None:
        bn.show_message_box(
                "Error",
                "Error: Portal already running",
                bn.MessageBoxButtonSet.OKButtonSet,
                bn.MessageBoxIcon.ErrorIcon)
        slog.log_warn("Portal server already created!")
    else:
        slog.log_info('Starting frida portal...')
        __portal_thread = threading.Thread(target=instance_app, args=(bv, ))
        __portal_thread.daemon = True
        __portal_thread.start()

def instance_app(bv: bn.BinaryView) -> None:
    global __portal_thread
    app = PortalApp(bv)
    __portal_thread.app = app
    app.run()

class PortalApp:
    """
    Portal app based on example found under frida_python
    Uses Reactor from frida_tools to handle async tasks
    """
    bv: bn.BinaryView
    _reactor: Reactor
    _session: Optional[frida.core.Session]
    cluster_params: frida.EndpointParameters
    control_params: frida.EndpointParameters
    action: PortalAction
    targets: List[bn.Function]
    tracees: List[Tracee]

    def __init__(self, bv: bn.BinaryView):
        self._reactor = Reactor(run_until_return=self.handle)
        self.bv = bv
        try:
            cluster_ip      = bv.query_metadata('fridalens_cluster_ip')
            cluster_port    = int(bv.query_metadata('fridalens_cluster_port'))
            control_ip      = bv.query_metadata('fridalens_control_ip')
            control_port    = int(bv.query_metadata('fridalens_control_port'))
        except:
            print('No saved settings, using defailtnAddress=127.0.0.1ncluster port=27052ncontrol port=27042')
            cluster_ip      = "127.0.0.1"
            cluster_port    = 27052
            control_ip      = "127.0.0.1"
            control_port    = 27042

        cluster_params = frida.EndpointParameters(
            address=cluster_ip,
            port=cluster_port
        )
        control_params = frida.EndpointParameters(
            address=control_ip, port=control_port, authentication=None)

        service = frida.PortalService(cluster_params, control_params)
        self._service = service
        self._device = service.device
        self._session = None
        self.action = PortalAction.INIT
        self.targets = get_functions_by_tag(bv, TRACE_TAG)
        self.tracees = []

        service.on("node-connected", lambda *args: self._reactor.schedule(
            lambda: self._on_node_connected(*args)
            ))
        service.on("node-joined", lambda *args: self._reactor.schedule(
            lambda: self._on_node_joined(*args)
            ))
        service.on("node-left", lambda *args: self._reactor.schedule(
            lambda: self._on_node_left(*args)
            ))
        service.on("node-disconnected", lambda *args: self._reactor.schedule(
            lambda: self._on_node_disconnected(*args)
            ))
        service.on("message", lambda *args: self._reactor.schedule(
            lambda: self._on_message(*args)
            ))
        service.on("controller-connected", lambda *args: self._reactor.schedule(
            lambda: self._on_controller_connected(*args)
            ))
        service.on("controller-disconnected", lambda *args: self._reactor.schedule(
            lambda: self._on_controller_disconnected(*args)
            ))
        service.on("subscribe", lambda *args: self._reactor.schedule(
            lambda: self._on_subscribe(*args)
            ))

    def _on_node_connected(self, connection_id, remote_addr):
        slog.log_info(f"Node connected: {connection_id}, Address: {remote_addr}")

    def _on_node_disconnected(self, connection_id, remote_addr):
        slog.log_info(f"Node disconnected: {connection_id}, Address: {remote_addr}")

    def _on_detached(self, pid: int, reason: str):
        slog.log_info(f"[DETACH] Tracee: {pid} reason: {reason}")

    def _on_node_joined(self, connection_id, app):
        """
        Happens along with connecting, but passes application info
        Do most of logic here
        """
        slog.log_info(f"Node joined: {connection_id}, Application: {app}")
        sesh = self._device.attach(app.pid)
        #TODO
        sesh.on("detached",
                lambda reason: self._reactor.schedule(
                    lambda: self._on_detached(app.pid, reason)
            ))
        tracee = Tracee(sesh, app.pid)
        if tracee not in self.tracees:
                self.tracees.append(tracee)
                self.action = PortalAction.TRACE
        else:
            slog.log_warn(f"Repeated tracee triggered node_joined; PID: {tracee.pid}")

    def _on_node_left(self, connection_id, app):
        """
        Happens along with disconnecting, but passes application info
        Do most of logic here
        """
        slog.log_info(f"Node left: {connection_id}, Address: {app}")
        self.tracees = [tracee for tracee in self.tracees if tracee.pid != app.pid]
        #TODO rename to idle or something similar
        if len(self.tracees) == 0:
            self.action = PortalAction.INIT

    def _on_controller_connected(self, connection_id, remote_addr):
        slog.log_info(f"Controller Connected: {connection_id}, Address: {remote_addr}")

    def _on_controller_disconnected(self, connection_id, remote_addr):
        slog.log_info(f"Controller disconnected: {connection_id}, Address: {remote_addr}")


    def _on_message(self, pid: int, msg: str):
        """
        Takes messages sent from gadgets/controllers and handles them
        Msg is sent as str, convert to dict using json.loads()
        """
        msg = loads(msg)
        match msg['type']:
            case "log":
                payload = msg['payload']
                level = msg['level']
                glog.log_info(f"[PID {pid}-{level}] {payload}")
            case _:
                glog.log_warn(f"[PID {pid}] {msg}")

    def _on_subscribe(self, connection_id):
        slog.log_info(f'New subscription: {connection_id}')

    def run(self):
        slog.log_info("Starting portal run")
        self._reactor.schedule(self._start)
        self._reactor.run()

    def _start(self):
        self._service.start()
        self._device.enable_spawn_gating()

    def _stop(self):
        for tracee in self.tracees:
            self._reactor.schedule(lambda: tracee.session.detach())
        self._service.stop()

    def _reload(self):
        targets = get_functions_by_tag(self.bv, TRACE_TAG)
        self.targets = targets
        slog.log_info(f"Targets found: {targets}")
        if len(self.tracees) > 0:
            self.action = PortalAction.TRACE

    def schedule_reload(self):
        """
        KISS for now ig
        """
        self._reactor.schedule(self._reload)

    def trace_functions(self, tracee: Tracee):
        t = self.targets[0]
        fn = self.bv.file.original_filename.split('/')[-1]
        formatted = trace_template.format(
                        binary_name=fn,
                        func_addr=t.lowest_address,
                        func_name=t.name)
        slog.log_warn(formatted)
        script = tracee.session.create_script(formatted)
        script.on('message', lambda message,data: self._reactor.schedule(
                lambda: self._on_message(tracee.pid, message)
        ))
        script.load()
        self._device.resume(tracee.pid)

    def handle(self, reactor):
        global _stop_event
        while not _stop_event.is_set():
            match self.action:
                case PortalAction.INIT:
                    slog.log_info("Idling...")
                    if len(self.targets) > 0:
                        slog.log_info(f"Functions marked: {self.targets}")
                case PortalAction.TRACE:
                    slog.log_info(f"Tracees: {list(map(lambda t: t.pid, self.tracees))}")
                    if len(self.targets) <= 0:
                        slog.log_warn("No functions to trace yet!")
                    else:
                        for t in list(filter(lambda x: not x.active, self.tracees)):
                            t.active = True
                            self._reactor.schedule(
                                    lambda: self.trace_functions(t)
                            )
            time.sleep(1)
        slog.log_info("Finishing up")
        self._stop()