Skip to content

Darwin subsystems

Subsystems available on Darwin (macOS/iOS) clients, reached as attributes of the client (e.g. p.fs, p.network, p.processes).

Filesystem

rpcclient.clients.darwin.subsystems.fs

DarwinFs

Bases: Fs['DarwinClient[DarwinSymbolT_co]'], Generic[DarwinSymbolT_co]

Source code in src/rpcclient/rpcclient/clients/darwin/subsystems/fs.py
class DarwinFs(Fs["DarwinClient[DarwinSymbolT_co]"], Generic[DarwinSymbolT_co]):
    async def stat(self, path: str | PurePath) -> Container:
        """Return stat64 info for a remote path."""
        return await do_stat(self._client, "stat64", path)

    async def lstat(self, path: str | PurePath) -> Container:
        """Return lstat64 info for a remote path (does not follow symlinks)."""
        return await do_stat(self._client, "lstat64", path)

    async def setxattr(self, path: str | PurePath, name: str, value: bytes) -> None:
        """set an extended attribute value"""
        count = (await self._client.symbols.setxattr(path, name, value, len(value), 0, 0)).c_int64
        if count == -1:
            await self._client.raise_errno_exception(f"failed to setxattr(): {path}")

    async def removexattr(self, path: str | PurePath, name: str) -> None:
        """remove an extended attribute value"""
        count = (await self._client.symbols.removexattr(path, name, 0)).c_int64
        if count == -1:
            await self._client.raise_errno_exception(f"failed to removexattr(): {path}")

    async def listxattr(self, path: str | PurePath) -> list[str]:
        """list extended attribute names"""
        max_buf_len = 1024
        async with self._client.safe_malloc(max_buf_len) as xattributes_names:
            count = (await self._client.symbols.listxattr(path, xattributes_names, max_buf_len, 0)).c_int64
            if count == -1:
                await self._client.raise_errno_exception(f"failed to listxattr(): {path}")
            return [s.decode() for s in (await xattributes_names.peek(count)).split(b"\x00")[:-1]]

    async def getxattr(self, path: str | PurePath, name: str) -> bytes:
        """get an extended attribute value"""
        max_buf_len = 1024
        async with self._client.safe_malloc(max_buf_len) as value:
            count = (await self._client.symbols.getxattr(path, name, value, max_buf_len, 0, 0)).c_int64
            if count == -1:
                await self._client.raise_errno_exception(f"failed to getxattr(): {path}")
            return await value.peek(count)

    async def dictxattr(self, path: str | PurePath) -> dict[str, bytes]:
        """get a dictionary of all extended attributes"""
        result = {}
        for k in await self.listxattr(path):
            result[k] = await self.getxattr(path, k)
        return result

    async def statfs(self, path: str | PurePath) -> Container:
        async with self._client.safe_malloc(statfs64.sizeof()) as buf:
            if await self._client.symbols.statfs64(path, buf) != 0:
                await self._client.raise_errno_exception(f"statfs failed for: {path}")
            return await buf.parse(statfs64)

    async def chflags(self, path: str | PurePath, flags: int = 0) -> None:
        """Set BSD file flags on a remote path."""
        if (await self._client.symbols.chflags(path, flags)) != 0:
            await self._client.raise_errno_exception(f"chflags failed for: {path}")

stat async

stat(path: str | PurePath) -> Container

Return stat64 info for a remote path.

Source code in src/rpcclient/rpcclient/clients/darwin/subsystems/fs.py
async def stat(self, path: str | PurePath) -> Container:
    """Return stat64 info for a remote path."""
    return await do_stat(self._client, "stat64", path)

lstat async

lstat(path: str | PurePath) -> Container

Return lstat64 info for a remote path (does not follow symlinks).

Source code in src/rpcclient/rpcclient/clients/darwin/subsystems/fs.py
async def lstat(self, path: str | PurePath) -> Container:
    """Return lstat64 info for a remote path (does not follow symlinks)."""
    return await do_stat(self._client, "lstat64", path)

setxattr async

setxattr(path: str | PurePath, name: str, value: bytes) -> None

set an extended attribute value

Source code in src/rpcclient/rpcclient/clients/darwin/subsystems/fs.py
async def setxattr(self, path: str | PurePath, name: str, value: bytes) -> None:
    """set an extended attribute value"""
    count = (await self._client.symbols.setxattr(path, name, value, len(value), 0, 0)).c_int64
    if count == -1:
        await self._client.raise_errno_exception(f"failed to setxattr(): {path}")

removexattr async

removexattr(path: str | PurePath, name: str) -> None

remove an extended attribute value

Source code in src/rpcclient/rpcclient/clients/darwin/subsystems/fs.py
async def removexattr(self, path: str | PurePath, name: str) -> None:
    """remove an extended attribute value"""
    count = (await self._client.symbols.removexattr(path, name, 0)).c_int64
    if count == -1:
        await self._client.raise_errno_exception(f"failed to removexattr(): {path}")

listxattr async

listxattr(path: str | PurePath) -> list[str]

list extended attribute names

Source code in src/rpcclient/rpcclient/clients/darwin/subsystems/fs.py
async def listxattr(self, path: str | PurePath) -> list[str]:
    """list extended attribute names"""
    max_buf_len = 1024
    async with self._client.safe_malloc(max_buf_len) as xattributes_names:
        count = (await self._client.symbols.listxattr(path, xattributes_names, max_buf_len, 0)).c_int64
        if count == -1:
            await self._client.raise_errno_exception(f"failed to listxattr(): {path}")
        return [s.decode() for s in (await xattributes_names.peek(count)).split(b"\x00")[:-1]]

getxattr async

getxattr(path: str | PurePath, name: str) -> bytes

get an extended attribute value

Source code in src/rpcclient/rpcclient/clients/darwin/subsystems/fs.py
async def getxattr(self, path: str | PurePath, name: str) -> bytes:
    """get an extended attribute value"""
    max_buf_len = 1024
    async with self._client.safe_malloc(max_buf_len) as value:
        count = (await self._client.symbols.getxattr(path, name, value, max_buf_len, 0, 0)).c_int64
        if count == -1:
            await self._client.raise_errno_exception(f"failed to getxattr(): {path}")
        return await value.peek(count)

dictxattr async

dictxattr(path: str | PurePath) -> dict[str, bytes]

get a dictionary of all extended attributes

Source code in src/rpcclient/rpcclient/clients/darwin/subsystems/fs.py
async def dictxattr(self, path: str | PurePath) -> dict[str, bytes]:
    """get a dictionary of all extended attributes"""
    result = {}
    for k in await self.listxattr(path):
        result[k] = await self.getxattr(path, k)
    return result

chflags async

chflags(path: str | PurePath, flags: int = 0) -> None

Set BSD file flags on a remote path.

Source code in src/rpcclient/rpcclient/clients/darwin/subsystems/fs.py
async def chflags(self, path: str | PurePath, flags: int = 0) -> None:
    """Set BSD file flags on a remote path."""
    if (await self._client.symbols.chflags(path, flags)) != 0:
        await self._client.raise_errno_exception(f"chflags failed for: {path}")

do_stat async

do_stat(client: DarwinClient[DarwinSymbol], stat_name: str, filename: str | PurePath) -> Container

Return a stat64 struct for a remote path.

Source code in src/rpcclient/rpcclient/clients/darwin/subsystems/fs.py
async def do_stat(client: "DarwinClient[DarwinSymbol]", stat_name: str, filename: str | PurePath) -> Container:
    """Return a stat64 struct for a remote path."""
    async with client.safe_malloc(stat64.sizeof()) as buf:
        err = await client.symbols[stat_name].call(filename, buf)
        if err != 0:
            await client.raise_errno_exception(f"failed to stat(): {filename}")
        return await buf.parse(stat64)

Network

rpcclient.clients.darwin.subsystems.network

DarwinNetwork

Bases: Network['DarwinClient[DarwinSymbolT_co]'], Generic[DarwinSymbolT_co]

Network utils

Source code in src/rpcclient/rpcclient/clients/darwin/subsystems/network.py
class DarwinNetwork(Network["DarwinClient[DarwinSymbolT_co]"], Generic[DarwinSymbolT_co]):
    """Network utils"""

    def __init__(self, client: "DarwinClient[DarwinSymbolT_co]") -> None:
        super().__init__(client)

    async def proxy_settings(self) -> dict:
        """Get proxy settings"""
        return await (await self._client.symbols.CFNetworkCopySystemProxySettings()).py(dict)

    async def set_http_proxy(self, ip: str, port: int) -> None:
        """Set http proxy"""
        async with await self._client.preferences.sc.open(SYSTEM_CONFIGURATION_PLIST) as config:
            await config.set(
                "Proxies",
                {
                    "HTTPProxyType": 1,
                    "HTTPEnable": 1,
                    "HTTPPort": port,
                    "HTTPSProxy": ip,
                    "HTTPSPort": port,
                    "HTTPProxy": ip,
                    "HTTPSEnable": 1,
                    "BypassAllowed": 0,
                },
            )
        await (await self._client.processes.get_by_basename("configd)")).kill(SIGKILL)

    async def remove_http_proxy(self) -> None:
        """Remove http proxy settings"""
        async with await self._client.preferences.sc.open(SYSTEM_CONFIGURATION_PLIST) as config:
            await config.remove("Proxies")
        await (await self._client.processes.get_by_basename("configd")).kill(SIGKILL)

    async def remove_certificate_pinning(self) -> None:
        """Remove pinning rules from trustd"""
        async with self._client.fs.remote_file(PINNING_RULED_DB) as local_db_file:
            # truncate pinning rules
            conn = sqlite3.connect(local_db_file)
            cursor = conn.cursor()
            cursor.execute("DELETE FROM rules")
            conn.commit()
            conn.close()

            # push new db
            await self._client.fs.push(local_db_file, PINNING_RULED_DB, force=True)

        # restart trustd for changes to take affect
        await self.kill_trustd()

    async def restore_certificate_pinning(self) -> None:
        """Restore pinning rules from trustd"""
        if await self._client.fs.accessible(PINNING_RULED_DB):
            await self._client.fs.remove(PINNING_RULED_DB)

        # Upon startup trustd to reloads pinning rules from the last MobileAsset
        await self.kill_trustd()

    async def kill_trustd(self) -> None:
        """Kill trustd processes"""
        for process in await self._client.processes.list():
            if await type(process).basename(process) == "trustd":
                await process.kill(SIGKILL)

    async def flush_dns(self) -> None:
        """Flush DNS cache"""
        await (await self._client.processes.get_by_basename("mDNSResponder")).kill(SIGKILL)

proxy_settings async

proxy_settings() -> dict

Get proxy settings

Source code in src/rpcclient/rpcclient/clients/darwin/subsystems/network.py
async def proxy_settings(self) -> dict:
    """Get proxy settings"""
    return await (await self._client.symbols.CFNetworkCopySystemProxySettings()).py(dict)

set_http_proxy async

set_http_proxy(ip: str, port: int) -> None

Set http proxy

Source code in src/rpcclient/rpcclient/clients/darwin/subsystems/network.py
async def set_http_proxy(self, ip: str, port: int) -> None:
    """Set http proxy"""
    async with await self._client.preferences.sc.open(SYSTEM_CONFIGURATION_PLIST) as config:
        await config.set(
            "Proxies",
            {
                "HTTPProxyType": 1,
                "HTTPEnable": 1,
                "HTTPPort": port,
                "HTTPSProxy": ip,
                "HTTPSPort": port,
                "HTTPProxy": ip,
                "HTTPSEnable": 1,
                "BypassAllowed": 0,
            },
        )
    await (await self._client.processes.get_by_basename("configd)")).kill(SIGKILL)

remove_http_proxy async

remove_http_proxy() -> None

Remove http proxy settings

Source code in src/rpcclient/rpcclient/clients/darwin/subsystems/network.py
async def remove_http_proxy(self) -> None:
    """Remove http proxy settings"""
    async with await self._client.preferences.sc.open(SYSTEM_CONFIGURATION_PLIST) as config:
        await config.remove("Proxies")
    await (await self._client.processes.get_by_basename("configd")).kill(SIGKILL)

remove_certificate_pinning async

remove_certificate_pinning() -> None

Remove pinning rules from trustd

Source code in src/rpcclient/rpcclient/clients/darwin/subsystems/network.py
async def remove_certificate_pinning(self) -> None:
    """Remove pinning rules from trustd"""
    async with self._client.fs.remote_file(PINNING_RULED_DB) as local_db_file:
        # truncate pinning rules
        conn = sqlite3.connect(local_db_file)
        cursor = conn.cursor()
        cursor.execute("DELETE FROM rules")
        conn.commit()
        conn.close()

        # push new db
        await self._client.fs.push(local_db_file, PINNING_RULED_DB, force=True)

    # restart trustd for changes to take affect
    await self.kill_trustd()

restore_certificate_pinning async

restore_certificate_pinning() -> None

Restore pinning rules from trustd

Source code in src/rpcclient/rpcclient/clients/darwin/subsystems/network.py
async def restore_certificate_pinning(self) -> None:
    """Restore pinning rules from trustd"""
    if await self._client.fs.accessible(PINNING_RULED_DB):
        await self._client.fs.remove(PINNING_RULED_DB)

    # Upon startup trustd to reloads pinning rules from the last MobileAsset
    await self.kill_trustd()

kill_trustd async

kill_trustd() -> None

Kill trustd processes

Source code in src/rpcclient/rpcclient/clients/darwin/subsystems/network.py
async def kill_trustd(self) -> None:
    """Kill trustd processes"""
    for process in await self._client.processes.list():
        if await type(process).basename(process) == "trustd":
            await process.kill(SIGKILL)

flush_dns async

flush_dns() -> None

Flush DNS cache

Source code in src/rpcclient/rpcclient/clients/darwin/subsystems/network.py
async def flush_dns(self) -> None:
    """Flush DNS cache"""
    await (await self._client.processes.get_by_basename("mDNSResponder")).kill(SIGKILL)

Processes

rpcclient.clients.darwin.subsystems.processes

Thread

Bases: ClientBound[DarwinClientT_co]

Source code in src/rpcclient/rpcclient/clients/darwin/subsystems/processes.py
class Thread(ClientBound[DarwinClientT_co]):
    def __init__(self, client: DarwinClientT_co, thread_id: int) -> None:
        """Initialize a thread wrapper for the given client and thread id."""
        self._client = client
        self._thread_id: int = thread_id

    @property
    def thread_id(self) -> int:
        """Return the Mach thread id."""
        return self._thread_id

    async def get_state(self) -> Container:
        """Return the architecture-specific thread state."""
        raise NotImplementedError()

    async def set_state(self, state: dict) -> None:
        """Set the architecture-specific thread state."""
        raise NotImplementedError()

    async def resume(self) -> None:
        """Resume execution of the thread."""
        raise NotImplementedError()

    async def suspend(self) -> None:
        """Suspend execution of the thread."""
        raise NotImplementedError()

    def __repr__(self) -> str:
        """Return a debug representation of the thread."""
        return f"<{self.__class__.__name__} TID:{self._thread_id}>"

thread_id property

thread_id: int

Return the Mach thread id.

get_state async

get_state() -> Container

Return the architecture-specific thread state.

Source code in src/rpcclient/rpcclient/clients/darwin/subsystems/processes.py
async def get_state(self) -> Container:
    """Return the architecture-specific thread state."""
    raise NotImplementedError()

set_state async

set_state(state: dict) -> None

Set the architecture-specific thread state.

Source code in src/rpcclient/rpcclient/clients/darwin/subsystems/processes.py
async def set_state(self, state: dict) -> None:
    """Set the architecture-specific thread state."""
    raise NotImplementedError()

resume async

resume() -> None

Resume execution of the thread.

Source code in src/rpcclient/rpcclient/clients/darwin/subsystems/processes.py
async def resume(self) -> None:
    """Resume execution of the thread."""
    raise NotImplementedError()

suspend async

suspend() -> None

Suspend execution of the thread.

Source code in src/rpcclient/rpcclient/clients/darwin/subsystems/processes.py
async def suspend(self) -> None:
    """Suspend execution of the thread."""
    raise NotImplementedError()

IntelThread64

Bases: Thread[DarwinClientT_co]

Source code in src/rpcclient/rpcclient/clients/darwin/subsystems/processes.py
class IntelThread64(Thread[DarwinClientT_co]):
    async def get_state(self) -> Container:
        """Fetch the x86_64 thread state."""
        async with (
            self._client.safe_malloc(x86_thread_state64_t.sizeof()) as p_state,
            self._client.safe_malloc(x86_thread_state64_t.sizeof()) as p_thread_state_count,
        ):
            await p_thread_state_count.setindex(0, x86_thread_state64_t.sizeof() // Int32ul.sizeof())
            if await self._client.symbols.thread_get_state(
                self._thread_id, x86_THREAD_STATE64, p_state, p_thread_state_count
            ):
                raise BadReturnValueError("thread_get_state() failed")
            return await p_state.parse(x86_thread_state64_t)

    async def set_state(self, state: dict) -> None:
        """Set the x86_64 thread state."""
        if await self._client.symbols.thread_set_state(
            self._thread_id,
            x86_THREAD_STATE64,
            x86_thread_state64_t.build(state),
            x86_thread_state64_t.sizeof() // Int32ul.sizeof(),
        ):
            raise BadReturnValueError("thread_set_state() failed")

get_state async

get_state() -> Container

Fetch the x86_64 thread state.

Source code in src/rpcclient/rpcclient/clients/darwin/subsystems/processes.py
async def get_state(self) -> Container:
    """Fetch the x86_64 thread state."""
    async with (
        self._client.safe_malloc(x86_thread_state64_t.sizeof()) as p_state,
        self._client.safe_malloc(x86_thread_state64_t.sizeof()) as p_thread_state_count,
    ):
        await p_thread_state_count.setindex(0, x86_thread_state64_t.sizeof() // Int32ul.sizeof())
        if await self._client.symbols.thread_get_state(
            self._thread_id, x86_THREAD_STATE64, p_state, p_thread_state_count
        ):
            raise BadReturnValueError("thread_get_state() failed")
        return await p_state.parse(x86_thread_state64_t)

set_state async

set_state(state: dict) -> None

Set the x86_64 thread state.

Source code in src/rpcclient/rpcclient/clients/darwin/subsystems/processes.py
async def set_state(self, state: dict) -> None:
    """Set the x86_64 thread state."""
    if await self._client.symbols.thread_set_state(
        self._thread_id,
        x86_THREAD_STATE64,
        x86_thread_state64_t.build(state),
        x86_thread_state64_t.sizeof() // Int32ul.sizeof(),
    ):
        raise BadReturnValueError("thread_set_state() failed")

ArmThread64

Bases: Thread[DarwinClientT_co]

Source code in src/rpcclient/rpcclient/clients/darwin/subsystems/processes.py
class ArmThread64(Thread[DarwinClientT_co]):
    async def get_state(self) -> Container:
        """Fetch the ARM64 thread state."""
        async with (
            self._client.safe_malloc(arm_thread_state64_t.sizeof()) as p_state,
            self._client.safe_malloc(arm_thread_state64_t.sizeof()) as p_thread_state_count,
        ):
            await p_thread_state_count.setindex(0, ARM_THREAD_STATE64_COUNT)
            if await self._client.symbols.thread_get_state(
                self._thread_id, ARMThreadFlavors.ARM_THREAD_STATE64, p_state, p_thread_state_count
            ):
                raise BadReturnValueError("thread_get_state() failed")
            return await p_state.parse(arm_thread_state64_t)

    async def set_state(self, state: dict) -> None:
        """Set the ARM64 thread state."""
        if await self._client.symbols.thread_set_state(
            self._thread_id,
            ARMThreadFlavors.ARM_THREAD_STATE64,
            arm_thread_state64_t.build(state),
            ARM_THREAD_STATE64_COUNT,
        ):
            raise BadReturnValueError("thread_set_state() failed")

    async def suspend(self) -> None:
        """Suspend execution of the ARM64 thread."""
        if await self._client.symbols.thread_suspend(self._thread_id):
            raise BadReturnValueError("thread_suspend() failed")

    async def resume(self) -> None:
        """Resume execution of the ARM64 thread."""
        if await self._client.symbols.thread_resume(self._thread_id):
            raise BadReturnValueError("thread_resume() failed")

get_state async

get_state() -> Container

Fetch the ARM64 thread state.

Source code in src/rpcclient/rpcclient/clients/darwin/subsystems/processes.py
async def get_state(self) -> Container:
    """Fetch the ARM64 thread state."""
    async with (
        self._client.safe_malloc(arm_thread_state64_t.sizeof()) as p_state,
        self._client.safe_malloc(arm_thread_state64_t.sizeof()) as p_thread_state_count,
    ):
        await p_thread_state_count.setindex(0, ARM_THREAD_STATE64_COUNT)
        if await self._client.symbols.thread_get_state(
            self._thread_id, ARMThreadFlavors.ARM_THREAD_STATE64, p_state, p_thread_state_count
        ):
            raise BadReturnValueError("thread_get_state() failed")
        return await p_state.parse(arm_thread_state64_t)

set_state async

set_state(state: dict) -> None

Set the ARM64 thread state.

Source code in src/rpcclient/rpcclient/clients/darwin/subsystems/processes.py
async def set_state(self, state: dict) -> None:
    """Set the ARM64 thread state."""
    if await self._client.symbols.thread_set_state(
        self._thread_id,
        ARMThreadFlavors.ARM_THREAD_STATE64,
        arm_thread_state64_t.build(state),
        ARM_THREAD_STATE64_COUNT,
    ):
        raise BadReturnValueError("thread_set_state() failed")

suspend async

suspend() -> None

Suspend execution of the ARM64 thread.

Source code in src/rpcclient/rpcclient/clients/darwin/subsystems/processes.py
async def suspend(self) -> None:
    """Suspend execution of the ARM64 thread."""
    if await self._client.symbols.thread_suspend(self._thread_id):
        raise BadReturnValueError("thread_suspend() failed")

resume async

resume() -> None

Resume execution of the ARM64 thread.

Source code in src/rpcclient/rpcclient/clients/darwin/subsystems/processes.py
async def resume(self) -> None:
    """Resume execution of the ARM64 thread."""
    if await self._client.symbols.thread_resume(self._thread_id):
        raise BadReturnValueError("thread_resume() failed")

Region dataclass

Bases: Generic[DarwinSymbolT_co]

Source code in src/rpcclient/rpcclient/clients/darwin/subsystems/processes.py
@dataclasses.dataclass
class Region(Generic[DarwinSymbolT_co]):
    region_type: str
    start: "ProcessSymbol[DarwinSymbolT_co]"
    end: int
    vsize: str | None
    protection: str
    protection_max: str
    region_detail: str | None

    @property
    def size(self) -> int:
        """Return the region size in bytes."""
        return self.end - self.start

size property

size: int

Return the region size in bytes.

Frame dataclass

Source code in src/rpcclient/rpcclient/clients/darwin/subsystems/processes.py
@dataclasses.dataclass
class Frame:
    depth: int
    address: int
    section: str
    offset: int
    symbol_name: str

    def __repr__(self):
        """Return a formatted frame representation."""
        return (
            f"<{self.__class__.__name__} [{self.depth:3}] 0x{self.address:x} ({self.section} + 0x{self.offset:x}) "
            f"{self.symbol_name}>"
        )

Backtrace dataclass

Source code in src/rpcclient/rpcclient/clients/darwin/subsystems/processes.py
@dataclasses.dataclass(kw_only=True)
class Backtrace:
    flavor: str
    time_start: float | None = None
    time_end: float | None = None
    pid: int
    thread_id: int
    dispatch_queue_serial_num: int
    frames: list[Frame]

    @staticmethod
    async def _from_backtrace(vmu_backtrace: DarwinSymbol) -> "Backtrace":
        """Parse a VMU backtrace description into structured fields."""
        backtrace = await (await vmu_backtrace.objc_call("description")).py(str)
        match = re.match(
            (
                r"VMUBacktrace \(Flavor: (?P<flavor>.+?) Simple Time: (?P<time>.+?) "
                r"Process: (?P<pid>\d+) Thread: (?P<thread_id>.+?)  Dispatch queue serial num: "
                r"(?P<dispatch_queue_serial_num>\d+)\)"
            ),
            backtrace,
        )
        assert match is not None

        return Backtrace(
            flavor=match.group("flavor"),
            pid=int(match.group("pid")),
            thread_id=int(match.group("thread_id"), 16),
            dispatch_queue_serial_num=int(match.group("dispatch_queue_serial_num")),
            frames=[
                Frame(
                    depth=int(frame[0]),
                    address=int(frame[1], 0),
                    section=frame[2],
                    offset=int(frame[3], 0),
                    symbol_name=frame[4],
                )
                for frame in re.findall(_BACKTRACE_FRAME_REGEX, backtrace)
            ],
        )

    def __repr__(self) -> str:
        """Return a formatted backtrace representation."""
        buf = f"<{self.__class__.__name__} PID: {self.pid} TID: {self.thread_id}\n"
        for frame in self.frames:
            buf += f"    {frame}\n"
        buf += ">"
        return buf

ProcessSymbol

Bases: AbstractSymbol, ClientBound['DarwinClient[DarwinSymbolT_co]'], Generic[DarwinSymbolT_co]

Source code in src/rpcclient/rpcclient/clients/darwin/subsystems/processes.py
class ProcessSymbol(AbstractSymbol, ClientBound["DarwinClient[DarwinSymbolT_co]"], Generic[DarwinSymbolT_co]):
    def __init__(self, value: int, process: "Process[DarwinSymbolT_co]") -> None:
        self._client: DarwinClient[DarwinSymbolT_co] = process._client
        self.process: Process[DarwinSymbolT_co] = process

    def _symbol_from_value(self, value: int) -> Self:
        """Clone this symbol for the given value."""
        return type(self)(value, self.process)

    async def peek(self, count: int, offset: int = 0) -> bytes:
        """Read bytes from process memory at this address."""
        return await self.process.peek(self + offset, count)

    async def poke(self, buf: bytes, offset: int = 0) -> None:
        """Write bytes to process memory at this address."""
        return await self.process.poke(self + offset, buf)

    async def peek_str(self, encoding="utf-8") -> str:
        """peek string at given address"""
        return await self.process.peek_str(self, encoding)

    async def dl_info(self) -> Container:
        """Raise because dl_info is not available for remote process symbols."""
        raise NotImplementedError("dl_info isn't implemented for remote process symbols")

    async def name(self) -> str:
        """Return the symbol name for this address."""
        return await self.process.get_symbol_name(self)

    async def filename(self) -> str:
        """Return the image path containing this symbol."""
        return (await self.process.get_symbol_image(await type(self).name(self))).path

    async def get_dl_info(self) -> "Container":
        dl_info = Dl_info(self._client)
        sizeof = dl_info.sizeof()
        async with self._client.safe_malloc(sizeof) as info:
            if await self._client.symbols.dladdr(self, info) == 0:
                await self._client.raise_errno_exception(f"failed to extract info for: {self}")
            return dl_info.parse(await info.read(sizeof))

    @property
    def arch(self) -> object:
        return self._client.arch

    @property
    def endianness(self) -> str:
        return self._client._endianness

    def call(self, *args, **kwargs) -> NoReturn:
        """Disallow calling a process symbol as a function."""
        raise RpcClientException("ProcessSymbol is not callable")

    __call__ = call

peek async

peek(count: int, offset: int = 0) -> bytes

Read bytes from process memory at this address.

Source code in src/rpcclient/rpcclient/clients/darwin/subsystems/processes.py
async def peek(self, count: int, offset: int = 0) -> bytes:
    """Read bytes from process memory at this address."""
    return await self.process.peek(self + offset, count)

poke async

poke(buf: bytes, offset: int = 0) -> None

Write bytes to process memory at this address.

Source code in src/rpcclient/rpcclient/clients/darwin/subsystems/processes.py
async def poke(self, buf: bytes, offset: int = 0) -> None:
    """Write bytes to process memory at this address."""
    return await self.process.poke(self + offset, buf)

peek_str async

peek_str(encoding='utf-8') -> str

peek string at given address

Source code in src/rpcclient/rpcclient/clients/darwin/subsystems/processes.py
async def peek_str(self, encoding="utf-8") -> str:
    """peek string at given address"""
    return await self.process.peek_str(self, encoding)

dl_info async

dl_info() -> Container

Raise because dl_info is not available for remote process symbols.

Source code in src/rpcclient/rpcclient/clients/darwin/subsystems/processes.py
async def dl_info(self) -> Container:
    """Raise because dl_info is not available for remote process symbols."""
    raise NotImplementedError("dl_info isn't implemented for remote process symbols")

name async

name() -> str

Return the symbol name for this address.

Source code in src/rpcclient/rpcclient/clients/darwin/subsystems/processes.py
async def name(self) -> str:
    """Return the symbol name for this address."""
    return await self.process.get_symbol_name(self)

filename async

filename() -> str

Return the image path containing this symbol.

Source code in src/rpcclient/rpcclient/clients/darwin/subsystems/processes.py
async def filename(self) -> str:
    """Return the image path containing this symbol."""
    return (await self.process.get_symbol_image(await type(self).name(self))).path

call

call(*args, **kwargs) -> NoReturn

Disallow calling a process symbol as a function.

Source code in src/rpcclient/rpcclient/clients/darwin/subsystems/processes.py
def call(self, *args, **kwargs) -> NoReturn:
    """Disallow calling a process symbol as a function."""
    raise RpcClientException("ProcessSymbol is not callable")

MachPortInfo dataclass

Source code in src/rpcclient/rpcclient/clients/darwin/subsystems/processes.py
@dataclasses.dataclass
class MachPortInfo:
    task: int
    pid: int
    name: int
    rights: list[str]
    ipc_object: int
    dead: bool
    proc_name: str | None = None
    thread_info: MachPortThreadInfo | None = None

    @property
    def has_recv_right(self) -> bool:
        """Return True if the port has a receive right."""
        return "recv" in self.rights

    @property
    def has_send_right(self) -> bool:
        """Return True if the port has a send right."""
        return "send" in self.rights

has_recv_right property

has_recv_right: bool

Return True if the port has a receive right.

has_send_right property

has_send_right: bool

Return True if the port has a send right.

SymbolOwner

Bases: ClientBound['DarwinClient[DarwinSymbolT_co]'], Generic[DarwinSymbolT_co]

Source code in src/rpcclient/rpcclient/clients/darwin/subsystems/processes.py
class SymbolOwner(ClientBound["DarwinClient[DarwinSymbolT_co]"], Generic[DarwinSymbolT_co]):
    def __init__(
        self,
        client: "DarwinClient[DarwinSymbolT_co]",
        process: "Process[DarwinSymbolT_co]",
        symbol_owner_opaque1: ProcessSymbol[DarwinSymbolT_co],
        symbol_owner_opaque2: ProcessSymbol[DarwinSymbolT_co],
    ) -> None:
        """Initialize a symbol owner wrapper."""
        self._client: DarwinClient[DarwinSymbolT_co] = client
        self._process: Process[DarwinSymbolT_co] = process
        self.symbol_owner_opaque1: ProcessSymbol[DarwinSymbolT_co] = symbol_owner_opaque1
        self.symbol_owner_opaque2: ProcessSymbol[DarwinSymbolT_co] = symbol_owner_opaque2

    async def get_symbol_address(self, name: str) -> ProcessSymbol[DarwinSymbolT_co]:
        """Resolve a symbol address by name."""
        symbol = await self._client.symbols.CSSymbolOwnerGetSymbolWithName(
            self.symbol_owner_opaque1,
            self.symbol_owner_opaque2,
            name,
            return_raw=True,
        )

        return self._process.get_process_symbol(await self._client.symbols.CSSymbolGetRange(symbol.x0, symbol.x1))

get_symbol_address async

get_symbol_address(name: str) -> ProcessSymbol[DarwinSymbolT_co]

Resolve a symbol address by name.

Source code in src/rpcclient/rpcclient/clients/darwin/subsystems/processes.py
async def get_symbol_address(self, name: str) -> ProcessSymbol[DarwinSymbolT_co]:
    """Resolve a symbol address by name."""
    symbol = await self._client.symbols.CSSymbolOwnerGetSymbolWithName(
        self.symbol_owner_opaque1,
        self.symbol_owner_opaque2,
        name,
        return_raw=True,
    )

    return self._process.get_process_symbol(await self._client.symbols.CSSymbolGetRange(symbol.x0, symbol.x1))

Symbolicator

Bases: ClientBound['DarwinClient[DarwinSymbolT_co]'], Generic[DarwinSymbolT_co]

Source code in src/rpcclient/rpcclient/clients/darwin/subsystems/processes.py
class Symbolicator(ClientBound["DarwinClient[DarwinSymbolT_co]"], Generic[DarwinSymbolT_co]):
    def __init__(
        self,
        client: "DarwinClient[DarwinSymbolT_co]",
        process: "Process[DarwinSymbolT_co]",
        symbolicator_opaque1: ProcessSymbol[DarwinSymbolT_co],
        symbolicator_opaque2: ProcessSymbol[DarwinSymbolT_co],
    ) -> None:
        """Initialize a symbolicator wrapper."""
        self._client: DarwinClient[DarwinSymbolT_co] = client
        self._process: Process[DarwinSymbolT_co] = process
        self.symbolicator_opaque1: ProcessSymbol[DarwinSymbolT_co] = symbolicator_opaque1
        self.symbolicator_opaque2: ProcessSymbol[DarwinSymbolT_co] = symbolicator_opaque2

    async def get_symbol_owner(self, library_basename: str) -> SymbolOwner[DarwinSymbolT_co]:
        """Resolve a symbol owner by library basename."""
        symbol_owner = await self._client.symbols.CSSymbolicatorGetSymbolOwnerWithNameAtTime(
            self.symbolicator_opaque1,
            self.symbolicator_opaque2,
            library_basename,
            kCSNow,
            return_raw=True,
        )
        return SymbolOwner(self._client, self._process, symbol_owner.x0, symbol_owner.x1)

get_symbol_owner async

get_symbol_owner(library_basename: str) -> SymbolOwner[DarwinSymbolT_co]

Resolve a symbol owner by library basename.

Source code in src/rpcclient/rpcclient/clients/darwin/subsystems/processes.py
async def get_symbol_owner(self, library_basename: str) -> SymbolOwner[DarwinSymbolT_co]:
    """Resolve a symbol owner by library basename."""
    symbol_owner = await self._client.symbols.CSSymbolicatorGetSymbolOwnerWithNameAtTime(
        self.symbolicator_opaque1,
        self.symbolicator_opaque2,
        library_basename,
        kCSNow,
        return_raw=True,
    )
    return SymbolOwner(self._client, self._process, symbol_owner.x0, symbol_owner.x1)

Process

Bases: ClientBound['DarwinClient[DarwinSymbolT_co]'], Generic[DarwinSymbolT_co]

Source code in src/rpcclient/rpcclient/clients/darwin/subsystems/processes.py
 524
 525
 526
 527
 528
 529
 530
 531
 532
 533
 534
 535
 536
 537
 538
 539
 540
 541
 542
 543
 544
 545
 546
 547
 548
 549
 550
 551
 552
 553
 554
 555
 556
 557
 558
 559
 560
 561
 562
 563
 564
 565
 566
 567
 568
 569
 570
 571
 572
 573
 574
 575
 576
 577
 578
 579
 580
 581
 582
 583
 584
 585
 586
 587
 588
 589
 590
 591
 592
 593
 594
 595
 596
 597
 598
 599
 600
 601
 602
 603
 604
 605
 606
 607
 608
 609
 610
 611
 612
 613
 614
 615
 616
 617
 618
 619
 620
 621
 622
 623
 624
 625
 626
 627
 628
 629
 630
 631
 632
 633
 634
 635
 636
 637
 638
 639
 640
 641
 642
 643
 644
 645
 646
 647
 648
 649
 650
 651
 652
 653
 654
 655
 656
 657
 658
 659
 660
 661
 662
 663
 664
 665
 666
 667
 668
 669
 670
 671
 672
 673
 674
 675
 676
 677
 678
 679
 680
 681
 682
 683
 684
 685
 686
 687
 688
 689
 690
 691
 692
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
class Process(ClientBound["DarwinClient[DarwinSymbolT_co]"], Generic[DarwinSymbolT_co]):
    PEEK_STR_CHUNK_SIZE = 0x100

    def __init__(self, client: "DarwinClient[DarwinSymbolT_co]", pid: int) -> None:
        """Initialize a process wrapper for a pid."""
        self._client = client
        self._pid: int = pid

        if self._client.arch == ARCH_ARM64:
            self._thread_class = ArmThread64
        else:
            self._thread_class = IntelThread64

    async def kill(self, sig: int = SIGTERM) -> None:
        """Send a signal to the remote process."""
        return await self._client.processes.kill(self._pid, sig)

    async def waitpid(self, flags: int = 0) -> int:
        """Wait for the remote process to change state and return status."""
        return await self._client.processes.waitpid(self._pid, flags)

    async def peek(self, address: int, size: int) -> bytes:
        """peek at memory address"""
        async with self._client.safe_malloc(size) as buf, self._client.safe_malloc(8) as p_size:
            await p_size.setindex(0, size)
            if await self._client.symbols.vm_read_overwrite(
                await type(self).task_read(self), address, size, buf, p_size
            ):
                raise BadReturnValueError("vm_read() failed")
            return await buf.peek(size)

    async def peek_str(self, address: int, encoding="utf-8") -> str:
        """peek string at memory address"""
        size = self.PEEK_STR_CHUNK_SIZE
        buf = b""

        while size:
            try:
                buf += await self.peek(address, size)
                if b"\x00" in buf:
                    return buf.split(b"\x00", 1)[0].decode(encoding)
                address += size
            except BadReturnValueError:
                size = size // 2
        else:
            raise RuntimeError("Failed to find string terminator")

    async def poke(self, address: int, buf: bytes) -> None:
        """poke at memory address"""
        if await self._client.symbols.vm_write(await type(self).task(self), address, buf, len(buf)):
            raise BadReturnValueError("vm_write() failed")

    async def get_symbol_name(self, address: int) -> str:
        """Resolve a symbol name for the given address."""
        if self._client.arch != ARCH_ARM64:
            raise NotImplementedError("implemented only on ARCH_ARM64")
        result = await (await type(self).vmu_object_identifier(self)).objc_call_raw("symbolForAddress:", address)
        if result.x0 == 0 and result.x1 == 0:
            raise SymbolAbsentError()
        return await (await self._client.symbols.CSSymbolGetName(result.x0, result.x1)).peek_str()

    async def get_symbol_image(self, name: str) -> Image:
        """Find the image containing the named symbol."""
        for image in await type(self).images(self):
            result = await self.get_symbol_address(name, posixpath.basename(image.path))
            if result:
                return image

        raise ProcessSymbolAbsentError()

    async def get_symbol_class_info(self, address: int) -> DarwinSymbolT_co:
        """Return Objective-C class info for the given address."""
        return await (await type(self).vmu_object_identifier(self)).objc_call("classInfoForMemory:length:", address, 8)

    async def get_symbol_address(
        self, name: str, lib: str | None = None
    ) -> DarwinSymbolT_co | ProcessSymbol[DarwinSymbolT_co]:
        """Resolve a symbol address, optionally within a library."""
        if lib is not None:
            address = (
                await (await type(self).vmu_object_identifier(self)).objc_call("addressOfSymbol:inLibrary:", name, lib)
            ).c_uint64
            if self.pid == await self._client.get_pid():
                return self._client.symbol(address)
            return self.get_process_symbol(address)

        image = await self.get_symbol_image(name)
        return await self.get_symbol_address(name, posixpath.basename(image.path))

    async def loaded_classes(self) -> AsyncGenerator[LoadedClass]:
        """Yield realized Objective-C classes for the process."""
        realized_classes = await (await type(self).vmu_object_identifier(self)).objc_call("realizedClasses")

        for i in range(1, await realized_classes.objc_call("count") + 1):
            class_info = await realized_classes.objc_call("classInfoForIndex:", i)
            name = await (await class_info.objc_call("className")).py(str)
            type_name = await (await class_info.objc_call("typeName")).py(str)
            binary_path = await (await class_info.objc_call("binaryPath")).py(str)
            yield LoadedClass(name=name, type_name=type_name, binary_path=binary_path)

    async def images(self) -> list[Image]:
        """get loaded image list"""
        result = []

        async with (
            self._client.safe_malloc(task_dyld_info_data_t.sizeof()) as dyld_info,
            self._client.safe_calloc(8) as count,
        ):
            await count.setindex(0, TASK_DYLD_INFO_COUNT)
            if await self._client.symbols.task_info(await type(self).task_read(self), TASK_DYLD_INFO, dyld_info, count):
                raise BadReturnValueError("task_info(TASK_DYLD_INFO) failed")
            dyld_info_data = await dyld_info.parse(task_dyld_info_data_t)

        all_image_infos = all_image_infos_t.parse(
            await self.peek(dyld_info_data.all_image_info_addr, dyld_info_data.all_image_info_size)
        )

        buf = await self.peek(all_image_infos.infoArray, all_image_infos.infoArrayCount * dyld_image_info_t.sizeof())
        for image in Array(all_image_infos.infoArrayCount, dyld_image_info_t).parse(buf):
            path = await self.peek_str(image.imageFilePath)
            result.append(Image(address=self.get_process_symbol(image.imageLoadAddress), path=path))
        return result

    async def app_images(self) -> list[Image]:
        """Return images that belong to the app bundle."""
        return [image for image in await type(self).images(self) if APP_SUFFIX in image.path]

    async def threads(self) -> list[Thread]:
        """Return the list of threads in the process."""
        result = []

        async with self._client.safe_malloc(8) as threads, self._client.safe_malloc(4) as count:
            count.item_size = 4
            if await self._client.symbols.task_threads(await type(self).task_read(self), threads, count):
                raise BadReturnValueError("task_threads() failed")

            for tid in Array((await count.getindex(0)).c_uint32, Int32ul).parse(
                await (await threads.getindex(0)).peek(await count.getindex(0) * 4)
            ):
                result.append(self._thread_class(self._client, tid))

        return result

    @property
    def pid(self) -> int:
        """get pid"""
        return self._pid

    async def fds(self) -> list[Fd]:
        """get a list of process opened file descriptors"""
        result = []
        for fdstruct in await self.fd_structs():
            fd = fdstruct.fd
            parsed = fdstruct.struct

            if fd.proc_fdtype == PROX_FDTYPE_VNODE:
                result.append(FileFd(fd=fd.proc_fd, path=parsed.pvip.vip_path))

            elif fd.proc_fdtype == PROX_FDTYPE_KQUEUE:
                result.append(KQueueFd(fd=fd.proc_fd))

            elif fd.proc_fdtype == PROX_FDTYPE_PIPE:
                result.append(PipeFd(fd=fd.proc_fd))

            elif fd.proc_fdtype == PROX_FDTYPE_PSHM:
                result.append(SharedMemoryFd(fd=fd.proc_fd, path=parsed.pshminfo.pshm_name))

            elif fd.proc_fdtype == PROX_FDTYPE_SOCKET:
                if parsed.psi.soi_kind in (so_kind_t.SOCKINFO_TCP, so_kind_t.SOCKINFO_IN):
                    correct_class = SOCKET_TYPE_DATACLASS[parsed.psi.soi_family][parsed.psi.soi_kind]

                    if parsed.psi.soi_kind == so_kind_t.SOCKINFO_TCP:
                        info = parsed.psi.soi_proto.pri_tcp.tcpsi_ini
                    else:
                        info = parsed.psi.soi_proto.pri_in
                    result.append(
                        correct_class(
                            fd=fd.proc_fd,
                            local_address=info.insi_laddr.ina_46.i46a_addr4,
                            local_port=info.insi_lport,
                            remote_address=info.insi_faddr.ina_46.i46a_addr4,
                            remote_port=info.insi_fport,
                        )
                    )

                elif parsed.psi.soi_kind == so_kind_t.SOCKINFO_UN:
                    result.append(UnixFd(fd=fd.proc_fd, path=parsed.psi.soi_proto.pri_un.unsi_addr.ua_sun.sun_path))

        return result

    async def fd_structs(self) -> list[FdStruct]:
        """get a list of process opened file descriptors as raw structs"""
        result = []
        size = await self._client.symbols.proc_pidinfo(self.pid, PROC_PIDLISTFDS, 0, 0, 0)

        vi_size = 8196  # should be enough for all structs
        async with self._client.safe_malloc(vi_size) as vi_buf:
            async with self._client.safe_malloc(size) as fdinfo_buf:
                size = int(await self._client.symbols.proc_pidinfo(self.pid, PROC_PIDLISTFDS, 0, fdinfo_buf, size))
                if not size:
                    raise BadReturnValueError("proc_pidinfo(PROC_PIDLISTFDS) failed")

                for fd in Array(size // proc_fdinfo.sizeof(), proc_fdinfo).parse(await fdinfo_buf.peek(size)):
                    if fd.proc_fdtype == PROX_FDTYPE_VNODE:
                        # file
                        vs = await self._client.symbols.proc_pidfdinfo(
                            self.pid, fd.proc_fd, PROC_PIDFDVNODEPATHINFO, vi_buf, vi_size
                        )
                        if not vs:
                            if await self._client.get_errno() == errno.EBADF:
                                # lsof treats this as fine
                                continue
                            raise BadReturnValueError(
                                f"proc_pidinfo(PROC_PIDFDVNODEPATHINFO) failed for fd: {fd.proc_fd} "
                                f"({await self._client.get_last_error()})"
                            )

                        result.append(
                            FdStruct(
                                fd=fd,
                                struct=vnode_fdinfowithpath.parse(await vi_buf.peek(vnode_fdinfowithpath.sizeof())),
                            )
                        )

                    elif fd.proc_fdtype == PROX_FDTYPE_KQUEUE:
                        result.append(FdStruct(fd=fd, struct=None))

                    elif fd.proc_fdtype == PROX_FDTYPE_SOCKET:
                        # socket
                        vs = await self._client.symbols.proc_pidfdinfo(
                            self.pid, fd.proc_fd, PROC_PIDFDSOCKETINFO, vi_buf, vi_size
                        )
                        if not vs:
                            if await self._client.get_errno() == errno.EBADF:
                                # lsof treats this as fine
                                continue
                            raise BadReturnValueError(
                                f"proc_pidinfo(PROC_PIDFDSOCKETINFO) failed ({await self._client.get_last_error()})"
                            )

                        result.append(FdStruct(fd=fd, struct=socket_fdinfo.parse(await vi_buf.peek(vi_size))))

                    elif fd.proc_fdtype == PROX_FDTYPE_PIPE:
                        # pipe
                        vs = await self._client.symbols.proc_pidfdinfo(
                            self.pid, fd.proc_fd, PROC_PIDFDPIPEINFO, vi_buf, vi_size
                        )
                        if not vs:
                            if await self._client.get_errno() == errno.EBADF:
                                # lsof treats this as fine
                                continue
                            raise BadReturnValueError(
                                f"proc_pidinfo(PROC_PIDFDPIPEINFO) failed ({await self._client.get_last_error()})"
                            )

                        result.append(FdStruct(fd=fd, struct=pipe_info.parse(await vi_buf.peek(pipe_info.sizeof()))))

                    elif fd.proc_fdtype == PROX_FDTYPE_PSHM:
                        vs = await self._client.symbols.proc_pidfdinfo(
                            self.pid, fd.proc_fd, PROC_PIDFDPSHMINFO, vi_buf, vi_size
                        )
                        if not vs:
                            if await self._client.get_errno() == errno.EBADF:
                                continue
                            raise BadReturnValueError(
                                f"proc_pidinfo(PROC_PIDFDPSHMINFO) failed ({await self._client.get_last_error()})"
                            )

                        result.append(
                            FdStruct(fd=fd, struct=pshm_fdinfo.parse(await vi_buf.peek(pshm_fdinfo.sizeof())))
                        )

            return result

    async def task_all_info(self) -> Container:
        """get a list of process opened file descriptors"""
        async with self._client.safe_malloc(proc_taskallinfo.sizeof()) as pti:
            if not await self._client.symbols.proc_pidinfo(
                self.pid, PROC_PIDTASKALLINFO, 0, pti, proc_taskallinfo.sizeof()
            ):
                raise BadReturnValueError("proc_pidinfo(PROC_PIDTASKALLINFO) failed")
            return await pti.parse(proc_taskallinfo)

    async def task_vm_info(self) -> Container:
        """get TASK_VM_INFO via task_info."""
        async with (
            self._client.safe_malloc(task_vm_info_data_t.sizeof()) as vm_info,
            self._client.safe_calloc(8) as count,
        ):
            await count.setindex(0, TASK_VM_INFO_COUNT)
            if await self._client.symbols.task_info(await type(self).task_read(self), TASK_VM_INFO, vm_info, count):
                raise BadReturnValueError("task_info(TASK_VM_INFO) failed")
            return await vm_info.parse(task_vm_info_data_t)

    async def backtraces(self) -> list[Backtrace]:
        """Collect backtraces for all threads in the process."""
        result = []
        backtraces = await (await self._client.symbols.objc_getClass("VMUSampler")).objc_call(
            "sampleAllThreadsOfTask:", await type(self).task_read(self)
        )
        for i in range(await backtraces.objc_call("count")):
            bt = await backtraces.objc_call("objectAtIndex:", i)
            result.append(await Backtrace._from_backtrace(bt))
        return result

    @cached_async_method
    async def vmu_proc_info(self) -> DarwinSymbolT_co:
        """Return the VMUProcInfo object for this task."""
        VMUProcInfo = await self._client.symbols.objc_getClass("VMUProcInfo")
        return await (await (VMUProcInfo).objc_call("alloc")).objc_call(
            "initWithTask:", await type(self).task_read(self)
        )

    @cached_async_method
    async def vmu_region_identifier(self) -> DarwinSymbolT_co:
        """Return the VMUVMRegionIdentifier object for this task."""

        VMUVMRegionIdentifier = await self._client.symbols.objc_getClass("VMUVMRegionIdentifier")
        return await (await (VMUVMRegionIdentifier).objc_call("alloc")).objc_call(
            "initWithTask:", await type(self).task_read(self)
        )

    @cached_async_method
    async def vmu_object_identifier(self) -> DarwinSymbolT_co:
        """Return the VMUObjectIdentifier object for this task."""
        VMUObjectIdentifier = await self._client.symbols.objc_getClass("VMUObjectIdentifier")
        return await (await (VMUObjectIdentifier).objc_call("alloc")).objc_call(
            "initWithTask:", await type(self).task_read(self)
        )

    @cached_async_method
    async def symbolicator(self) -> Symbolicator[DarwinSymbolT_co]:
        """Return a symbolicator for this task."""
        symbolicator = await self._client.symbols.CSSymbolicatorCreateWithTask(
            await type(self).task_read(self), return_raw=True
        )
        return Symbolicator(self._client, self, symbolicator.x0, symbolicator.x1)

    @cached_async_method
    async def task_read(self) -> int:
        """Return the task read port for this process."""
        self_task_port = await self._client.symbols.mach_task_self()

        if self.pid == await self._client.get_pid():
            return self_task_port

        async with self._client.safe_malloc(8) as p_task:
            ret = await self._client.symbols.task_read_for_pid(self_task_port, self.pid, p_task)
            if ret:
                raise BadReturnValueError("task_read_for_pid() failed")
            return (await p_task.getindex(0)).c_int64

    @cached_async_method
    async def task(self) -> int:
        """Return the full task port for this process."""
        self_task_port = await self._client.symbols.mach_task_self()

        if self.pid == await self._client.get_pid():
            return self_task_port

        async with self._client.safe_malloc(8) as p_task:
            if await self._client.symbols.task_for_pid(self_task_port, self.pid, p_task):
                raise BadReturnValueError("task_for_pid() failed")
            return (await p_task.getindex(0)).c_int64

    @cached_async_method
    async def path(self) -> str | None:
        """call proc_pidpath(filename, ...) at remote. review xnu header for more details."""
        async with self._client.safe_malloc(MAXPATHLEN) as path:
            path_len = await self._client.symbols.proc_pidpath(self.pid, path, MAXPATHLEN)
            return (await path.peek(path_len)).decode() if path_len else None

    @cached_async_method
    async def basename(self) -> str | None:
        """Return the basename of the process path."""
        path = await type(self).path(self)
        return PurePath(path).name if path else None

    @cached_async_method
    async def _get_pbsd(self) -> Container:
        return (await self.task_all_info()).pbsd

    async def name(self) -> str:
        """Return the process name from task info."""
        return (await self._get_pbsd()).pbi_name

    async def ppid(self) -> int:
        """Return the parent process id."""
        return (await self._get_pbsd()).pbi_ppid

    async def uid(self) -> int:
        """Return the process user id."""
        return (await self._get_pbsd()).pbi_uid

    async def gid(self) -> int:
        """Return the process group id."""
        return (await self._get_pbsd()).pbi_gid

    async def ruid(self) -> int:
        """Return the real user id."""
        return (await self._get_pbsd()).pbi_ruid

    async def rgid(self) -> int:
        """Return the real group id."""
        return (await self._get_pbsd()).pbi_rgid

    _start_time: datetime | None = None

    async def start_time(self) -> datetime:
        """Return the process start time."""
        if self._start_time is None:
            if self._client.arch != ARCH_ARM64:
                raise NotImplementedError("implemented only on ARCH_ARM64")
            val = await (await type(self).vmu_proc_info(self)).objc_call_raw("startTime")
            tv_sec = val.x0
            tv_nsec = val.x1
            self._start_time = datetime.fromtimestamp(tv_sec + (tv_nsec / (10**9)))

        return self._start_time

    async def parent(self) -> Self:
        """Return a Process wrapper for the parent pid."""
        return type(self)(self._client, await type(self).ppid(self))

    async def environ(self) -> list[str]:
        """Return the process environment variables."""
        return await (await (await type(self).vmu_proc_info(self)).objc_call("envVars")).py(list)

    async def arguments(self) -> list[str]:
        """Return the process argument list."""
        return await (await (await type(self).vmu_proc_info(self)).objc_call("arguments")).py(list)

    async def raw_procargs2(self) -> bytes:
        """Return the raw PROCARGS2 sysctl buffer."""
        return await self._client.sysctl.get(CTL.KERN, KERN.PROCARGS2, self.pid)

    async def procargs2(self) -> Container:
        """Parse and return the PROCARGS2 buffer."""
        return procargs2_t.parse(await type(self).raw_procargs2(self))

    async def get_regions(self) -> list[Region[DarwinSymbolT_co]]:
        """Return the list of VM regions for the process."""
        result: list[Region[DarwinSymbolT_co]] = []

        # remove the '()' wrapping the list:
        #   (
        #       "item1",
        #       "item2",
        #   )
        regions_sym = await (await type(self).vmu_region_identifier(self)).objc_call("regions")
        buf = cast(str, await type(regions_sym).cfdesc(regions_sym)).split("\n")[1:-1]

        for line in buf:
            # remove line prefix and suffix and split into words
            line = line[_CF_STRING_ARRAY_PREFIX_LEN:-_CF_STRING_ARRAY_SUFFIX_LEN].split()
            i = 0

            region_type = line[i]
            i += 1

            while "-" not in line[i]:
                # skip till address range
                i += 1

            address_range = line[i]
            i += 1
            if not address_range.startswith("0x"):
                continue
            start, end = address_range.split("-")

            start = int(start, 16)
            end = int(end, 16)
            vsize = None
            if "V=" in line[i]:
                vsize = line[i].split("V=", 1)[1].split("]", 1)[0]

            while "/" not in line[i]:
                i += 1

            protection = line[i].split("/")
            i += 1

            region_detail = None
            if len(line) >= i + 1:
                region_detail = line[i]

            result.append(
                Region(
                    region_type=region_type,
                    start=self.get_process_symbol(start),
                    end=end,
                    vsize=vsize,
                    protection=protection[0],
                    protection_max=protection[1],
                    region_detail=region_detail,
                )
            )

        return result

    async def regions(self) -> list[Region[DarwinSymbolT_co]]:
        """Return the list of VM regions for the process."""
        return await self.get_regions()

    async def cdhash(self) -> bytes:
        """Return the code directory hash for the process."""
        async with self._client.safe_malloc(CDHASH_SIZE) as cdhash:
            # by reversing online-auth-agent
            if await self._client.symbols.csops(self.pid, 5, cdhash, CDHASH_SIZE) != 0:
                raise BadReturnValueError(f"failed to get cdhash for {self.pid}")
            return await cdhash.peek(CDHASH_SIZE)

    def get_process_symbol(self, address: int) -> ProcessSymbol[DarwinSymbolT_co]:
        """Create a process symbol for the given address."""
        return ProcessSymbol(address, self)

    async def vm_allocate(self, size: int) -> ProcessSymbol[DarwinSymbolT_co]:
        """Allocate memory in the target task and return its address."""
        async with self._client.safe_malloc(8) as out_address:
            if await self._client.symbols.vm_allocate(
                await type(self).task(self), out_address, size, VM_FLAGS_ANYWHERE
            ):
                raise BadReturnValueError("vm_allocate() failed")
            return self.get_process_symbol(await out_address.getindex(0))

    async def dump_app(self, output_dir: str | PurePath, chunk_size: int = CHUNK_SIZE) -> None:
        """
        Based on:
        https://github.com/AloneMonkey/frida-ios-dump/blob/master/dump.js
        """
        for image in await type(self).app_images(self):
            relative_name = image.path.split(APP_SUFFIX, 1)[1]
            output_file = Path(output_dir).expanduser() / relative_name
            output_file.parent.mkdir(exist_ok=True, parents=True)
            logger.debug(f"dumping: {output_file}")

            with open(output_file, "wb") as output_file:
                macho_in_memory = mach_header_t.parse_stream(image.address)

                async with await self._client.fs.open(image.path, "r") as fat_file:
                    # locating the correct MachO offset within the FAT image (if FAT)
                    magic_in_fs = await fat_file.parse(Int32ul)
                    await fat_file.seek(0, SEEK_SET)
                    if magic_in_fs in (FAT_CIGAM, FAT_MAGIC):
                        parsed_fat = await fat_file.parse(fat_header)
                        for arch in parsed_fat.archs:
                            if (
                                arch.cputype == macho_in_memory.cputype
                                and arch.cpusubtype == macho_in_memory.cpusubtype
                            ):
                                # correct MachO offset found
                                file_offset = arch.offset
                                break
                        else:
                            raise RuntimeError("Failed to find file offset.")

                        await fat_file.seek(file_offset, SEEK_SET)

                    # perform actual MachO dump
                    output_file.seek(0, SEEK_SET)
                    while True:
                        chunk = await fat_file.read(chunk_size)
                        if len(chunk) == 0:
                            break
                        output_file.write(chunk)

                    offset_cryptid = None

                    # if image is encrypted, patch its encryption loader command
                    for load_command in macho_in_memory.load_commands:
                        if load_command.cmd in (
                            LOAD_COMMAND_TYPE.LC_ENCRYPTION_INFO_64,
                            LOAD_COMMAND_TYPE.LC_ENCRYPTION_INFO,
                        ):
                            offset_cryptid = load_command.data.cryptid_offset - image.address
                            crypt_offset = load_command.data.cryptoff
                            crypt_size = load_command.data.cryptsize
                            break
                    else:
                        raise RuntimeError("Failed to find LC_ENCRYPTION_INFO or LC_ENCRYPTION_INFO_64 command.")

                    if offset_cryptid is not None:
                        output_file.seek(offset_cryptid, SEEK_SET)
                        output_file.write(b"\x00" * 4)  # cryptid = 0
                        output_file.seek(crypt_offset, SEEK_SET)
                        output_file.flush()
                        output_file.write((image.address + crypt_offset).peek(crypt_size))

    async def get_mach_port_cross_ref_info(self) -> list[MachPortCrossRefInfo]:
        """Get all allocated mach ports and cross-refs to get the recv right owner"""
        own_ports: list[MachPortInfo] = []
        cross_refs_info: dict[int, list[MachPortInfo]] = {}
        async for port in self._client.processes.get_mach_ports():
            if port.pid == self.pid:
                own_ports.append(port)
            if port.ipc_object not in cross_refs_info:
                cross_refs_info[port.ipc_object] = []
            cross_refs_info[port.ipc_object].append(port)

        return [
            MachPortCrossRefInfo(
                name=port.name,
                ipc_object=port.ipc_object,
                recv_right_pid=cross_ref_port_info.pid,
                recv_right_proc_name=cross_ref_port_info.proc_name,
            )
            for port in own_ports
            for cross_ref_port_info in cross_refs_info[port.ipc_object]
            if cross_ref_port_info.has_recv_right
        ]

    async def get_memgraph_snapshot(self) -> bytes:
        """Return a memory graph snapshot as plist bytes."""
        scanner = None
        try:
            # first attempt via new API
            scanner = await (await self._client.symbols.objc_getClass("VMUProcessObjectGraph")).objc_call(
                "createWithTask:", await type(self).task_read(self)
            )
            snapshot_graph = await (await scanner.objc_call("plistRepresentationWithOptions:", 0)).py(bytes)
        except UnrecognizedSelectorError:
            # if failed, attempt with old API
            scanner = await (
                await (await self._client.symbols.objc_getClass("VMUTaskMemoryScanner")).objc_call("alloc")
            ).objc_call("initWithTask:", await type(self).task_read(self))
            await scanner.objc_call("addRootNodesFromTask")
            await scanner.objc_call("addMallocNodesFromTask")
            snapshot_graph = await (
                await (await scanner.objc_call("processSnapshotGraph")).objc_call("plistRepresentationWithOptions:", 0)
            ).py(bytes)
        finally:
            if scanner is not None:
                # free object scanner so process can resume
                await scanner.objc_call("release")
        return snapshot_graph

    def __repr__(self) -> str:
        """Return a debug representation of the process."""
        path = getattr(self, "_path", "<not cached yet>")

        return f"<{self.__class__.__name__} PID:{self.pid} PATH:{path}>"

pid property

pid: int

get pid

kill async

kill(sig: int = SIGTERM) -> None

Send a signal to the remote process.

Source code in src/rpcclient/rpcclient/clients/darwin/subsystems/processes.py
async def kill(self, sig: int = SIGTERM) -> None:
    """Send a signal to the remote process."""
    return await self._client.processes.kill(self._pid, sig)

waitpid async

waitpid(flags: int = 0) -> int

Wait for the remote process to change state and return status.

Source code in src/rpcclient/rpcclient/clients/darwin/subsystems/processes.py
async def waitpid(self, flags: int = 0) -> int:
    """Wait for the remote process to change state and return status."""
    return await self._client.processes.waitpid(self._pid, flags)

peek async

peek(address: int, size: int) -> bytes

peek at memory address

Source code in src/rpcclient/rpcclient/clients/darwin/subsystems/processes.py
async def peek(self, address: int, size: int) -> bytes:
    """peek at memory address"""
    async with self._client.safe_malloc(size) as buf, self._client.safe_malloc(8) as p_size:
        await p_size.setindex(0, size)
        if await self._client.symbols.vm_read_overwrite(
            await type(self).task_read(self), address, size, buf, p_size
        ):
            raise BadReturnValueError("vm_read() failed")
        return await buf.peek(size)

peek_str async

peek_str(address: int, encoding='utf-8') -> str

peek string at memory address

Source code in src/rpcclient/rpcclient/clients/darwin/subsystems/processes.py
async def peek_str(self, address: int, encoding="utf-8") -> str:
    """peek string at memory address"""
    size = self.PEEK_STR_CHUNK_SIZE
    buf = b""

    while size:
        try:
            buf += await self.peek(address, size)
            if b"\x00" in buf:
                return buf.split(b"\x00", 1)[0].decode(encoding)
            address += size
        except BadReturnValueError:
            size = size // 2
    else:
        raise RuntimeError("Failed to find string terminator")

poke async

poke(address: int, buf: bytes) -> None

poke at memory address

Source code in src/rpcclient/rpcclient/clients/darwin/subsystems/processes.py
async def poke(self, address: int, buf: bytes) -> None:
    """poke at memory address"""
    if await self._client.symbols.vm_write(await type(self).task(self), address, buf, len(buf)):
        raise BadReturnValueError("vm_write() failed")

get_symbol_name async

get_symbol_name(address: int) -> str

Resolve a symbol name for the given address.

Source code in src/rpcclient/rpcclient/clients/darwin/subsystems/processes.py
async def get_symbol_name(self, address: int) -> str:
    """Resolve a symbol name for the given address."""
    if self._client.arch != ARCH_ARM64:
        raise NotImplementedError("implemented only on ARCH_ARM64")
    result = await (await type(self).vmu_object_identifier(self)).objc_call_raw("symbolForAddress:", address)
    if result.x0 == 0 and result.x1 == 0:
        raise SymbolAbsentError()
    return await (await self._client.symbols.CSSymbolGetName(result.x0, result.x1)).peek_str()

get_symbol_image async

get_symbol_image(name: str) -> Image

Find the image containing the named symbol.

Source code in src/rpcclient/rpcclient/clients/darwin/subsystems/processes.py
async def get_symbol_image(self, name: str) -> Image:
    """Find the image containing the named symbol."""
    for image in await type(self).images(self):
        result = await self.get_symbol_address(name, posixpath.basename(image.path))
        if result:
            return image

    raise ProcessSymbolAbsentError()

get_symbol_class_info async

get_symbol_class_info(address: int) -> DarwinSymbolT_co

Return Objective-C class info for the given address.

Source code in src/rpcclient/rpcclient/clients/darwin/subsystems/processes.py
async def get_symbol_class_info(self, address: int) -> DarwinSymbolT_co:
    """Return Objective-C class info for the given address."""
    return await (await type(self).vmu_object_identifier(self)).objc_call("classInfoForMemory:length:", address, 8)

get_symbol_address async

get_symbol_address(name: str, lib: str | None = None) -> DarwinSymbolT_co | ProcessSymbol[DarwinSymbolT_co]

Resolve a symbol address, optionally within a library.

Source code in src/rpcclient/rpcclient/clients/darwin/subsystems/processes.py
async def get_symbol_address(
    self, name: str, lib: str | None = None
) -> DarwinSymbolT_co | ProcessSymbol[DarwinSymbolT_co]:
    """Resolve a symbol address, optionally within a library."""
    if lib is not None:
        address = (
            await (await type(self).vmu_object_identifier(self)).objc_call("addressOfSymbol:inLibrary:", name, lib)
        ).c_uint64
        if self.pid == await self._client.get_pid():
            return self._client.symbol(address)
        return self.get_process_symbol(address)

    image = await self.get_symbol_image(name)
    return await self.get_symbol_address(name, posixpath.basename(image.path))

loaded_classes async

loaded_classes() -> AsyncGenerator[LoadedClass]

Yield realized Objective-C classes for the process.

Source code in src/rpcclient/rpcclient/clients/darwin/subsystems/processes.py
async def loaded_classes(self) -> AsyncGenerator[LoadedClass]:
    """Yield realized Objective-C classes for the process."""
    realized_classes = await (await type(self).vmu_object_identifier(self)).objc_call("realizedClasses")

    for i in range(1, await realized_classes.objc_call("count") + 1):
        class_info = await realized_classes.objc_call("classInfoForIndex:", i)
        name = await (await class_info.objc_call("className")).py(str)
        type_name = await (await class_info.objc_call("typeName")).py(str)
        binary_path = await (await class_info.objc_call("binaryPath")).py(str)
        yield LoadedClass(name=name, type_name=type_name, binary_path=binary_path)

images async

images() -> list[Image]

get loaded image list

Source code in src/rpcclient/rpcclient/clients/darwin/subsystems/processes.py
async def images(self) -> list[Image]:
    """get loaded image list"""
    result = []

    async with (
        self._client.safe_malloc(task_dyld_info_data_t.sizeof()) as dyld_info,
        self._client.safe_calloc(8) as count,
    ):
        await count.setindex(0, TASK_DYLD_INFO_COUNT)
        if await self._client.symbols.task_info(await type(self).task_read(self), TASK_DYLD_INFO, dyld_info, count):
            raise BadReturnValueError("task_info(TASK_DYLD_INFO) failed")
        dyld_info_data = await dyld_info.parse(task_dyld_info_data_t)

    all_image_infos = all_image_infos_t.parse(
        await self.peek(dyld_info_data.all_image_info_addr, dyld_info_data.all_image_info_size)
    )

    buf = await self.peek(all_image_infos.infoArray, all_image_infos.infoArrayCount * dyld_image_info_t.sizeof())
    for image in Array(all_image_infos.infoArrayCount, dyld_image_info_t).parse(buf):
        path = await self.peek_str(image.imageFilePath)
        result.append(Image(address=self.get_process_symbol(image.imageLoadAddress), path=path))
    return result

app_images async

app_images() -> list[Image]

Return images that belong to the app bundle.

Source code in src/rpcclient/rpcclient/clients/darwin/subsystems/processes.py
async def app_images(self) -> list[Image]:
    """Return images that belong to the app bundle."""
    return [image for image in await type(self).images(self) if APP_SUFFIX in image.path]

threads async

threads() -> list[Thread]

Return the list of threads in the process.

Source code in src/rpcclient/rpcclient/clients/darwin/subsystems/processes.py
async def threads(self) -> list[Thread]:
    """Return the list of threads in the process."""
    result = []

    async with self._client.safe_malloc(8) as threads, self._client.safe_malloc(4) as count:
        count.item_size = 4
        if await self._client.symbols.task_threads(await type(self).task_read(self), threads, count):
            raise BadReturnValueError("task_threads() failed")

        for tid in Array((await count.getindex(0)).c_uint32, Int32ul).parse(
            await (await threads.getindex(0)).peek(await count.getindex(0) * 4)
        ):
            result.append(self._thread_class(self._client, tid))

    return result

fds async

fds() -> list[Fd]

get a list of process opened file descriptors

Source code in src/rpcclient/rpcclient/clients/darwin/subsystems/processes.py
async def fds(self) -> list[Fd]:
    """get a list of process opened file descriptors"""
    result = []
    for fdstruct in await self.fd_structs():
        fd = fdstruct.fd
        parsed = fdstruct.struct

        if fd.proc_fdtype == PROX_FDTYPE_VNODE:
            result.append(FileFd(fd=fd.proc_fd, path=parsed.pvip.vip_path))

        elif fd.proc_fdtype == PROX_FDTYPE_KQUEUE:
            result.append(KQueueFd(fd=fd.proc_fd))

        elif fd.proc_fdtype == PROX_FDTYPE_PIPE:
            result.append(PipeFd(fd=fd.proc_fd))

        elif fd.proc_fdtype == PROX_FDTYPE_PSHM:
            result.append(SharedMemoryFd(fd=fd.proc_fd, path=parsed.pshminfo.pshm_name))

        elif fd.proc_fdtype == PROX_FDTYPE_SOCKET:
            if parsed.psi.soi_kind in (so_kind_t.SOCKINFO_TCP, so_kind_t.SOCKINFO_IN):
                correct_class = SOCKET_TYPE_DATACLASS[parsed.psi.soi_family][parsed.psi.soi_kind]

                if parsed.psi.soi_kind == so_kind_t.SOCKINFO_TCP:
                    info = parsed.psi.soi_proto.pri_tcp.tcpsi_ini
                else:
                    info = parsed.psi.soi_proto.pri_in
                result.append(
                    correct_class(
                        fd=fd.proc_fd,
                        local_address=info.insi_laddr.ina_46.i46a_addr4,
                        local_port=info.insi_lport,
                        remote_address=info.insi_faddr.ina_46.i46a_addr4,
                        remote_port=info.insi_fport,
                    )
                )

            elif parsed.psi.soi_kind == so_kind_t.SOCKINFO_UN:
                result.append(UnixFd(fd=fd.proc_fd, path=parsed.psi.soi_proto.pri_un.unsi_addr.ua_sun.sun_path))

    return result

fd_structs async

fd_structs() -> list[FdStruct]

get a list of process opened file descriptors as raw structs

Source code in src/rpcclient/rpcclient/clients/darwin/subsystems/processes.py
async def fd_structs(self) -> list[FdStruct]:
    """get a list of process opened file descriptors as raw structs"""
    result = []
    size = await self._client.symbols.proc_pidinfo(self.pid, PROC_PIDLISTFDS, 0, 0, 0)

    vi_size = 8196  # should be enough for all structs
    async with self._client.safe_malloc(vi_size) as vi_buf:
        async with self._client.safe_malloc(size) as fdinfo_buf:
            size = int(await self._client.symbols.proc_pidinfo(self.pid, PROC_PIDLISTFDS, 0, fdinfo_buf, size))
            if not size:
                raise BadReturnValueError("proc_pidinfo(PROC_PIDLISTFDS) failed")

            for fd in Array(size // proc_fdinfo.sizeof(), proc_fdinfo).parse(await fdinfo_buf.peek(size)):
                if fd.proc_fdtype == PROX_FDTYPE_VNODE:
                    # file
                    vs = await self._client.symbols.proc_pidfdinfo(
                        self.pid, fd.proc_fd, PROC_PIDFDVNODEPATHINFO, vi_buf, vi_size
                    )
                    if not vs:
                        if await self._client.get_errno() == errno.EBADF:
                            # lsof treats this as fine
                            continue
                        raise BadReturnValueError(
                            f"proc_pidinfo(PROC_PIDFDVNODEPATHINFO) failed for fd: {fd.proc_fd} "
                            f"({await self._client.get_last_error()})"
                        )

                    result.append(
                        FdStruct(
                            fd=fd,
                            struct=vnode_fdinfowithpath.parse(await vi_buf.peek(vnode_fdinfowithpath.sizeof())),
                        )
                    )

                elif fd.proc_fdtype == PROX_FDTYPE_KQUEUE:
                    result.append(FdStruct(fd=fd, struct=None))

                elif fd.proc_fdtype == PROX_FDTYPE_SOCKET:
                    # socket
                    vs = await self._client.symbols.proc_pidfdinfo(
                        self.pid, fd.proc_fd, PROC_PIDFDSOCKETINFO, vi_buf, vi_size
                    )
                    if not vs:
                        if await self._client.get_errno() == errno.EBADF:
                            # lsof treats this as fine
                            continue
                        raise BadReturnValueError(
                            f"proc_pidinfo(PROC_PIDFDSOCKETINFO) failed ({await self._client.get_last_error()})"
                        )

                    result.append(FdStruct(fd=fd, struct=socket_fdinfo.parse(await vi_buf.peek(vi_size))))

                elif fd.proc_fdtype == PROX_FDTYPE_PIPE:
                    # pipe
                    vs = await self._client.symbols.proc_pidfdinfo(
                        self.pid, fd.proc_fd, PROC_PIDFDPIPEINFO, vi_buf, vi_size
                    )
                    if not vs:
                        if await self._client.get_errno() == errno.EBADF:
                            # lsof treats this as fine
                            continue
                        raise BadReturnValueError(
                            f"proc_pidinfo(PROC_PIDFDPIPEINFO) failed ({await self._client.get_last_error()})"
                        )

                    result.append(FdStruct(fd=fd, struct=pipe_info.parse(await vi_buf.peek(pipe_info.sizeof()))))

                elif fd.proc_fdtype == PROX_FDTYPE_PSHM:
                    vs = await self._client.symbols.proc_pidfdinfo(
                        self.pid, fd.proc_fd, PROC_PIDFDPSHMINFO, vi_buf, vi_size
                    )
                    if not vs:
                        if await self._client.get_errno() == errno.EBADF:
                            continue
                        raise BadReturnValueError(
                            f"proc_pidinfo(PROC_PIDFDPSHMINFO) failed ({await self._client.get_last_error()})"
                        )

                    result.append(
                        FdStruct(fd=fd, struct=pshm_fdinfo.parse(await vi_buf.peek(pshm_fdinfo.sizeof())))
                    )

        return result

task_all_info async

task_all_info() -> Container

get a list of process opened file descriptors

Source code in src/rpcclient/rpcclient/clients/darwin/subsystems/processes.py
async def task_all_info(self) -> Container:
    """get a list of process opened file descriptors"""
    async with self._client.safe_malloc(proc_taskallinfo.sizeof()) as pti:
        if not await self._client.symbols.proc_pidinfo(
            self.pid, PROC_PIDTASKALLINFO, 0, pti, proc_taskallinfo.sizeof()
        ):
            raise BadReturnValueError("proc_pidinfo(PROC_PIDTASKALLINFO) failed")
        return await pti.parse(proc_taskallinfo)

task_vm_info async

task_vm_info() -> Container

get TASK_VM_INFO via task_info.

Source code in src/rpcclient/rpcclient/clients/darwin/subsystems/processes.py
async def task_vm_info(self) -> Container:
    """get TASK_VM_INFO via task_info."""
    async with (
        self._client.safe_malloc(task_vm_info_data_t.sizeof()) as vm_info,
        self._client.safe_calloc(8) as count,
    ):
        await count.setindex(0, TASK_VM_INFO_COUNT)
        if await self._client.symbols.task_info(await type(self).task_read(self), TASK_VM_INFO, vm_info, count):
            raise BadReturnValueError("task_info(TASK_VM_INFO) failed")
        return await vm_info.parse(task_vm_info_data_t)

backtraces async

backtraces() -> list[Backtrace]

Collect backtraces for all threads in the process.

Source code in src/rpcclient/rpcclient/clients/darwin/subsystems/processes.py
async def backtraces(self) -> list[Backtrace]:
    """Collect backtraces for all threads in the process."""
    result = []
    backtraces = await (await self._client.symbols.objc_getClass("VMUSampler")).objc_call(
        "sampleAllThreadsOfTask:", await type(self).task_read(self)
    )
    for i in range(await backtraces.objc_call("count")):
        bt = await backtraces.objc_call("objectAtIndex:", i)
        result.append(await Backtrace._from_backtrace(bt))
    return result

vmu_proc_info async

vmu_proc_info() -> DarwinSymbolT_co

Return the VMUProcInfo object for this task.

Source code in src/rpcclient/rpcclient/clients/darwin/subsystems/processes.py
@cached_async_method
async def vmu_proc_info(self) -> DarwinSymbolT_co:
    """Return the VMUProcInfo object for this task."""
    VMUProcInfo = await self._client.symbols.objc_getClass("VMUProcInfo")
    return await (await (VMUProcInfo).objc_call("alloc")).objc_call(
        "initWithTask:", await type(self).task_read(self)
    )

vmu_region_identifier async

vmu_region_identifier() -> DarwinSymbolT_co

Return the VMUVMRegionIdentifier object for this task.

Source code in src/rpcclient/rpcclient/clients/darwin/subsystems/processes.py
@cached_async_method
async def vmu_region_identifier(self) -> DarwinSymbolT_co:
    """Return the VMUVMRegionIdentifier object for this task."""

    VMUVMRegionIdentifier = await self._client.symbols.objc_getClass("VMUVMRegionIdentifier")
    return await (await (VMUVMRegionIdentifier).objc_call("alloc")).objc_call(
        "initWithTask:", await type(self).task_read(self)
    )

vmu_object_identifier async

vmu_object_identifier() -> DarwinSymbolT_co

Return the VMUObjectIdentifier object for this task.

Source code in src/rpcclient/rpcclient/clients/darwin/subsystems/processes.py
@cached_async_method
async def vmu_object_identifier(self) -> DarwinSymbolT_co:
    """Return the VMUObjectIdentifier object for this task."""
    VMUObjectIdentifier = await self._client.symbols.objc_getClass("VMUObjectIdentifier")
    return await (await (VMUObjectIdentifier).objc_call("alloc")).objc_call(
        "initWithTask:", await type(self).task_read(self)
    )

symbolicator async

symbolicator() -> Symbolicator[DarwinSymbolT_co]

Return a symbolicator for this task.

Source code in src/rpcclient/rpcclient/clients/darwin/subsystems/processes.py
@cached_async_method
async def symbolicator(self) -> Symbolicator[DarwinSymbolT_co]:
    """Return a symbolicator for this task."""
    symbolicator = await self._client.symbols.CSSymbolicatorCreateWithTask(
        await type(self).task_read(self), return_raw=True
    )
    return Symbolicator(self._client, self, symbolicator.x0, symbolicator.x1)

task_read async

task_read() -> int

Return the task read port for this process.

Source code in src/rpcclient/rpcclient/clients/darwin/subsystems/processes.py
@cached_async_method
async def task_read(self) -> int:
    """Return the task read port for this process."""
    self_task_port = await self._client.symbols.mach_task_self()

    if self.pid == await self._client.get_pid():
        return self_task_port

    async with self._client.safe_malloc(8) as p_task:
        ret = await self._client.symbols.task_read_for_pid(self_task_port, self.pid, p_task)
        if ret:
            raise BadReturnValueError("task_read_for_pid() failed")
        return (await p_task.getindex(0)).c_int64

task async

task() -> int

Return the full task port for this process.

Source code in src/rpcclient/rpcclient/clients/darwin/subsystems/processes.py
@cached_async_method
async def task(self) -> int:
    """Return the full task port for this process."""
    self_task_port = await self._client.symbols.mach_task_self()

    if self.pid == await self._client.get_pid():
        return self_task_port

    async with self._client.safe_malloc(8) as p_task:
        if await self._client.symbols.task_for_pid(self_task_port, self.pid, p_task):
            raise BadReturnValueError("task_for_pid() failed")
        return (await p_task.getindex(0)).c_int64

path async

path() -> str | None

call proc_pidpath(filename, ...) at remote. review xnu header for more details.

Source code in src/rpcclient/rpcclient/clients/darwin/subsystems/processes.py
@cached_async_method
async def path(self) -> str | None:
    """call proc_pidpath(filename, ...) at remote. review xnu header for more details."""
    async with self._client.safe_malloc(MAXPATHLEN) as path:
        path_len = await self._client.symbols.proc_pidpath(self.pid, path, MAXPATHLEN)
        return (await path.peek(path_len)).decode() if path_len else None

basename async

basename() -> str | None

Return the basename of the process path.

Source code in src/rpcclient/rpcclient/clients/darwin/subsystems/processes.py
@cached_async_method
async def basename(self) -> str | None:
    """Return the basename of the process path."""
    path = await type(self).path(self)
    return PurePath(path).name if path else None

name async

name() -> str

Return the process name from task info.

Source code in src/rpcclient/rpcclient/clients/darwin/subsystems/processes.py
async def name(self) -> str:
    """Return the process name from task info."""
    return (await self._get_pbsd()).pbi_name

ppid async

ppid() -> int

Return the parent process id.

Source code in src/rpcclient/rpcclient/clients/darwin/subsystems/processes.py
async def ppid(self) -> int:
    """Return the parent process id."""
    return (await self._get_pbsd()).pbi_ppid

uid async

uid() -> int

Return the process user id.

Source code in src/rpcclient/rpcclient/clients/darwin/subsystems/processes.py
async def uid(self) -> int:
    """Return the process user id."""
    return (await self._get_pbsd()).pbi_uid

gid async

gid() -> int

Return the process group id.

Source code in src/rpcclient/rpcclient/clients/darwin/subsystems/processes.py
async def gid(self) -> int:
    """Return the process group id."""
    return (await self._get_pbsd()).pbi_gid

ruid async

ruid() -> int

Return the real user id.

Source code in src/rpcclient/rpcclient/clients/darwin/subsystems/processes.py
async def ruid(self) -> int:
    """Return the real user id."""
    return (await self._get_pbsd()).pbi_ruid

rgid async

rgid() -> int

Return the real group id.

Source code in src/rpcclient/rpcclient/clients/darwin/subsystems/processes.py
async def rgid(self) -> int:
    """Return the real group id."""
    return (await self._get_pbsd()).pbi_rgid

start_time async

start_time() -> datetime

Return the process start time.

Source code in src/rpcclient/rpcclient/clients/darwin/subsystems/processes.py
async def start_time(self) -> datetime:
    """Return the process start time."""
    if self._start_time is None:
        if self._client.arch != ARCH_ARM64:
            raise NotImplementedError("implemented only on ARCH_ARM64")
        val = await (await type(self).vmu_proc_info(self)).objc_call_raw("startTime")
        tv_sec = val.x0
        tv_nsec = val.x1
        self._start_time = datetime.fromtimestamp(tv_sec + (tv_nsec / (10**9)))

    return self._start_time

parent async

parent() -> Self

Return a Process wrapper for the parent pid.

Source code in src/rpcclient/rpcclient/clients/darwin/subsystems/processes.py
async def parent(self) -> Self:
    """Return a Process wrapper for the parent pid."""
    return type(self)(self._client, await type(self).ppid(self))

environ async

environ() -> list[str]

Return the process environment variables.

Source code in src/rpcclient/rpcclient/clients/darwin/subsystems/processes.py
async def environ(self) -> list[str]:
    """Return the process environment variables."""
    return await (await (await type(self).vmu_proc_info(self)).objc_call("envVars")).py(list)

arguments async

arguments() -> list[str]

Return the process argument list.

Source code in src/rpcclient/rpcclient/clients/darwin/subsystems/processes.py
async def arguments(self) -> list[str]:
    """Return the process argument list."""
    return await (await (await type(self).vmu_proc_info(self)).objc_call("arguments")).py(list)

raw_procargs2 async

raw_procargs2() -> bytes

Return the raw PROCARGS2 sysctl buffer.

Source code in src/rpcclient/rpcclient/clients/darwin/subsystems/processes.py
async def raw_procargs2(self) -> bytes:
    """Return the raw PROCARGS2 sysctl buffer."""
    return await self._client.sysctl.get(CTL.KERN, KERN.PROCARGS2, self.pid)

procargs2 async

procargs2() -> Container

Parse and return the PROCARGS2 buffer.

Source code in src/rpcclient/rpcclient/clients/darwin/subsystems/processes.py
async def procargs2(self) -> Container:
    """Parse and return the PROCARGS2 buffer."""
    return procargs2_t.parse(await type(self).raw_procargs2(self))

get_regions async

get_regions() -> list[Region[DarwinSymbolT_co]]

Return the list of VM regions for the process.

Source code in src/rpcclient/rpcclient/clients/darwin/subsystems/processes.py
async def get_regions(self) -> list[Region[DarwinSymbolT_co]]:
    """Return the list of VM regions for the process."""
    result: list[Region[DarwinSymbolT_co]] = []

    # remove the '()' wrapping the list:
    #   (
    #       "item1",
    #       "item2",
    #   )
    regions_sym = await (await type(self).vmu_region_identifier(self)).objc_call("regions")
    buf = cast(str, await type(regions_sym).cfdesc(regions_sym)).split("\n")[1:-1]

    for line in buf:
        # remove line prefix and suffix and split into words
        line = line[_CF_STRING_ARRAY_PREFIX_LEN:-_CF_STRING_ARRAY_SUFFIX_LEN].split()
        i = 0

        region_type = line[i]
        i += 1

        while "-" not in line[i]:
            # skip till address range
            i += 1

        address_range = line[i]
        i += 1
        if not address_range.startswith("0x"):
            continue
        start, end = address_range.split("-")

        start = int(start, 16)
        end = int(end, 16)
        vsize = None
        if "V=" in line[i]:
            vsize = line[i].split("V=", 1)[1].split("]", 1)[0]

        while "/" not in line[i]:
            i += 1

        protection = line[i].split("/")
        i += 1

        region_detail = None
        if len(line) >= i + 1:
            region_detail = line[i]

        result.append(
            Region(
                region_type=region_type,
                start=self.get_process_symbol(start),
                end=end,
                vsize=vsize,
                protection=protection[0],
                protection_max=protection[1],
                region_detail=region_detail,
            )
        )

    return result

regions async

regions() -> list[Region[DarwinSymbolT_co]]

Return the list of VM regions for the process.

Source code in src/rpcclient/rpcclient/clients/darwin/subsystems/processes.py
async def regions(self) -> list[Region[DarwinSymbolT_co]]:
    """Return the list of VM regions for the process."""
    return await self.get_regions()

cdhash async

cdhash() -> bytes

Return the code directory hash for the process.

Source code in src/rpcclient/rpcclient/clients/darwin/subsystems/processes.py
async def cdhash(self) -> bytes:
    """Return the code directory hash for the process."""
    async with self._client.safe_malloc(CDHASH_SIZE) as cdhash:
        # by reversing online-auth-agent
        if await self._client.symbols.csops(self.pid, 5, cdhash, CDHASH_SIZE) != 0:
            raise BadReturnValueError(f"failed to get cdhash for {self.pid}")
        return await cdhash.peek(CDHASH_SIZE)

get_process_symbol

get_process_symbol(address: int) -> ProcessSymbol[DarwinSymbolT_co]

Create a process symbol for the given address.

Source code in src/rpcclient/rpcclient/clients/darwin/subsystems/processes.py
def get_process_symbol(self, address: int) -> ProcessSymbol[DarwinSymbolT_co]:
    """Create a process symbol for the given address."""
    return ProcessSymbol(address, self)

vm_allocate async

vm_allocate(size: int) -> ProcessSymbol[DarwinSymbolT_co]

Allocate memory in the target task and return its address.

Source code in src/rpcclient/rpcclient/clients/darwin/subsystems/processes.py
async def vm_allocate(self, size: int) -> ProcessSymbol[DarwinSymbolT_co]:
    """Allocate memory in the target task and return its address."""
    async with self._client.safe_malloc(8) as out_address:
        if await self._client.symbols.vm_allocate(
            await type(self).task(self), out_address, size, VM_FLAGS_ANYWHERE
        ):
            raise BadReturnValueError("vm_allocate() failed")
        return self.get_process_symbol(await out_address.getindex(0))

dump_app async

dump_app(output_dir: str | PurePath, chunk_size: int = CHUNK_SIZE) -> None

Based on: https://github.com/AloneMonkey/frida-ios-dump/blob/master/dump.js

Source code in src/rpcclient/rpcclient/clients/darwin/subsystems/processes.py
async def dump_app(self, output_dir: str | PurePath, chunk_size: int = CHUNK_SIZE) -> None:
    """
    Based on:
    https://github.com/AloneMonkey/frida-ios-dump/blob/master/dump.js
    """
    for image in await type(self).app_images(self):
        relative_name = image.path.split(APP_SUFFIX, 1)[1]
        output_file = Path(output_dir).expanduser() / relative_name
        output_file.parent.mkdir(exist_ok=True, parents=True)
        logger.debug(f"dumping: {output_file}")

        with open(output_file, "wb") as output_file:
            macho_in_memory = mach_header_t.parse_stream(image.address)

            async with await self._client.fs.open(image.path, "r") as fat_file:
                # locating the correct MachO offset within the FAT image (if FAT)
                magic_in_fs = await fat_file.parse(Int32ul)
                await fat_file.seek(0, SEEK_SET)
                if magic_in_fs in (FAT_CIGAM, FAT_MAGIC):
                    parsed_fat = await fat_file.parse(fat_header)
                    for arch in parsed_fat.archs:
                        if (
                            arch.cputype == macho_in_memory.cputype
                            and arch.cpusubtype == macho_in_memory.cpusubtype
                        ):
                            # correct MachO offset found
                            file_offset = arch.offset
                            break
                    else:
                        raise RuntimeError("Failed to find file offset.")

                    await fat_file.seek(file_offset, SEEK_SET)

                # perform actual MachO dump
                output_file.seek(0, SEEK_SET)
                while True:
                    chunk = await fat_file.read(chunk_size)
                    if len(chunk) == 0:
                        break
                    output_file.write(chunk)

                offset_cryptid = None

                # if image is encrypted, patch its encryption loader command
                for load_command in macho_in_memory.load_commands:
                    if load_command.cmd in (
                        LOAD_COMMAND_TYPE.LC_ENCRYPTION_INFO_64,
                        LOAD_COMMAND_TYPE.LC_ENCRYPTION_INFO,
                    ):
                        offset_cryptid = load_command.data.cryptid_offset - image.address
                        crypt_offset = load_command.data.cryptoff
                        crypt_size = load_command.data.cryptsize
                        break
                else:
                    raise RuntimeError("Failed to find LC_ENCRYPTION_INFO or LC_ENCRYPTION_INFO_64 command.")

                if offset_cryptid is not None:
                    output_file.seek(offset_cryptid, SEEK_SET)
                    output_file.write(b"\x00" * 4)  # cryptid = 0
                    output_file.seek(crypt_offset, SEEK_SET)
                    output_file.flush()
                    output_file.write((image.address + crypt_offset).peek(crypt_size))

get_mach_port_cross_ref_info async

get_mach_port_cross_ref_info() -> list[MachPortCrossRefInfo]

Get all allocated mach ports and cross-refs to get the recv right owner

Source code in src/rpcclient/rpcclient/clients/darwin/subsystems/processes.py
async def get_mach_port_cross_ref_info(self) -> list[MachPortCrossRefInfo]:
    """Get all allocated mach ports and cross-refs to get the recv right owner"""
    own_ports: list[MachPortInfo] = []
    cross_refs_info: dict[int, list[MachPortInfo]] = {}
    async for port in self._client.processes.get_mach_ports():
        if port.pid == self.pid:
            own_ports.append(port)
        if port.ipc_object not in cross_refs_info:
            cross_refs_info[port.ipc_object] = []
        cross_refs_info[port.ipc_object].append(port)

    return [
        MachPortCrossRefInfo(
            name=port.name,
            ipc_object=port.ipc_object,
            recv_right_pid=cross_ref_port_info.pid,
            recv_right_proc_name=cross_ref_port_info.proc_name,
        )
        for port in own_ports
        for cross_ref_port_info in cross_refs_info[port.ipc_object]
        if cross_ref_port_info.has_recv_right
    ]

get_memgraph_snapshot async

get_memgraph_snapshot() -> bytes

Return a memory graph snapshot as plist bytes.

Source code in src/rpcclient/rpcclient/clients/darwin/subsystems/processes.py
async def get_memgraph_snapshot(self) -> bytes:
    """Return a memory graph snapshot as plist bytes."""
    scanner = None
    try:
        # first attempt via new API
        scanner = await (await self._client.symbols.objc_getClass("VMUProcessObjectGraph")).objc_call(
            "createWithTask:", await type(self).task_read(self)
        )
        snapshot_graph = await (await scanner.objc_call("plistRepresentationWithOptions:", 0)).py(bytes)
    except UnrecognizedSelectorError:
        # if failed, attempt with old API
        scanner = await (
            await (await self._client.symbols.objc_getClass("VMUTaskMemoryScanner")).objc_call("alloc")
        ).objc_call("initWithTask:", await type(self).task_read(self))
        await scanner.objc_call("addRootNodesFromTask")
        await scanner.objc_call("addMallocNodesFromTask")
        snapshot_graph = await (
            await (await scanner.objc_call("processSnapshotGraph")).objc_call("plistRepresentationWithOptions:", 0)
        ).py(bytes)
    finally:
        if scanner is not None:
            # free object scanner so process can resume
            await scanner.objc_call("release")
    return snapshot_graph

DarwinProcesses

Bases: Processes['DarwinClient[DarwinSymbolT_co]'], Generic[DarwinSymbolT_co]

manage processes

Source code in src/rpcclient/rpcclient/clients/darwin/subsystems/processes.py
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
class DarwinProcesses(Processes["DarwinClient[DarwinSymbolT_co]"], Generic[DarwinSymbolT_co]):
    """manage processes"""

    def __init__(self, client: "DarwinClient[DarwinSymbolT_co]") -> None:
        """Initialize the Darwin process subsystem."""
        super().__init__(client)
        client.load_framework_lazy("Symbolication")

    _self_process: Process[DarwinSymbolT_co] | None = None

    async def get_self(self) -> Process[DarwinSymbolT_co]:
        """get self process"""
        if self._self_process is None:
            self._self_process = Process(self._client, await self._client.get_pid())

        return self._self_process

    async def get_by_pid(self, pid: int) -> Process[DarwinSymbolT_co]:
        """get process object by pid"""
        proc_list = await self.list()
        for p in proc_list:
            if p.pid == pid:
                return p
        raise ArgumentError(f"failed to locate process with pid: {pid}")

    async def get_by_basename(self, name: str) -> Process[DarwinSymbolT_co]:
        """get process object by basename"""
        proc_list = await self.list()
        for p in proc_list:
            if await type(p).basename(p) == name:
                return p
        raise ArgumentError(f"failed to locate process with name: {name}")

    async def get_by_name(self, name: str) -> Process[DarwinSymbolT_co]:
        """get process object by name"""
        proc_list = await self.list()
        for p in proc_list:
            if await type(p).name(p) == name:
                return p
        raise ArgumentError(f"failed to locate process with name: {name}")

    async def grep(self, name: str) -> list[Process[DarwinSymbolT_co]]:
        """get process list by basename filter"""
        result = []
        proc_list = await self.list()
        for p in proc_list:
            basename = await type(p).basename(p)
            if basename and name in basename:
                result.append(p)
        return result

    async def get_processes_by_listening_port(self, port: int) -> list[Process[DarwinSymbolT_co]]:
        """get a process object listening on the specified port"""
        listening_processes = []
        for process in await self.list():
            try:
                fds = await type(process).fds(process)
            except BadReturnValueError:
                # it's possible to get error if new processes have since died or the rpcserver
                # doesn't have the required permissions to access all the processes
                continue

            for fd in fds:
                if (isinstance(fd, (Ipv4SocketFd, Ipv6SocketFd))) and fd.local_port == port and fd.remote_port == 0:
                    listening_processes.append(process)
        return listening_processes

    async def lsof(self) -> dict[int, list[Fd]]:
        """get dictionary of pid to its opened fds"""
        result = {}
        for process in await self.list():
            try:
                fds = await type(process).fds(process)
            except BadReturnValueError:
                # it's possible to get error if new processes have since died or the rpcserver
                # doesn't have the required permissions to access all the processes
                continue

            result[process.pid] = fds
        return result

    async def fuser(self, path: str | PurePath) -> list[Process[DarwinSymbolT_co]]:
        """get a list of all processes have an open hande to the specified path"""
        result = []
        proc_list = await self.list()
        for process in proc_list:
            try:
                fds = await type(process).fds(process)
            except BadReturnValueError:
                # it's possible to get error if new processes have since died or the rpcserver
                # doesn't have the required permissions to access all the processes
                continue

            for fd in fds:
                if isinstance(fd, FileFd) and (str(Path(fd.path).absolute()) == str(Path(path).absolute())):
                    result.append(process)

        return result

    async def list(self) -> list[Process[DarwinSymbolT_co]]:
        """list all currently running processes"""
        n = await self._client.symbols.proc_listallpids(0, 0)
        pid_buf_size = pid_t.sizeof() * n
        async with self._client.safe_malloc(pid_buf_size) as pid_buf:
            pid_buf.item_size = pid_t.sizeof()
            n = await self._client.symbols.proc_listallpids(pid_buf, pid_buf_size)

            result = []
            for i in range(n):
                pid = int(await pid_buf.getindex(i))
                result.append(Process(self._client, pid))
            return result

    async def disable_watchdog(self) -> None:
        """Continuously kill watchdogd to keep it disabled."""
        while True:
            with contextlib.suppress(ArgumentError):
                await (await self.get_by_basename("watchdogd")).kill(SIGKILL)

            await asyncio.sleep(1)

    async def get_mach_ports(self, include_thread_info: bool = False) -> AsyncGenerator[MachPortInfo]:
        """Enumerate all mach ports (heavily inspired by lsmp)"""
        thread_info = None
        mach_task_self = await self._client.symbols.mach_task_self()

        malloc = self._client.symbols.malloc.call
        p_psets = await self._client.symbols.malloc(8)
        p_pset_count = await malloc(8)
        p_pset_priv = await malloc(8)
        p_tasks = await malloc(8)
        p_task_count = await malloc(8)
        p_count = await malloc(4)
        ports_info = await malloc(4 * 2 * EXC_TYPES_COUNT)
        masks = await malloc(4 * EXC_TYPES_COUNT)
        behaviors = await malloc(4 * EXC_TYPES_COUNT)
        flavors = await malloc(4 * EXC_TYPES_COUNT)
        p_thread_count = await malloc(4)
        p_thread_ports = await malloc(8)
        info = await malloc(200)
        p_pid = await malloc(4)
        th_info = await malloc(thread_identifier_info.sizeof())
        p_th_kobject = await malloc(4)
        p_th_kotype = await malloc(4)
        p_th_info_count = await malloc(4)
        p_th_voucher = await self._client.symbols.calloc(4, 1)
        p_table = await malloc(8)
        unused = await malloc(200)
        proc_name = await malloc(100)
        p_kotype = await malloc(4)

        p_pid.item_size = 4
        p_count.item_size = 4
        p_thread_count.item_size = 4
        p_th_info_count.item_size = 4
        p_th_voucher.item_size = 4
        await p_th_info_count.setindex(0, THREAD_IDENTIFIER_INFO_COUNT)
        p_kotype.item_size = 4

        if await self._client.symbols.getuid() == 0:
            # if privileged, get the info for all tasks so we can match ports up
            if (
                await self._client.symbols.host_processor_sets(
                    await self._client.symbols.mach_host_self(),
                    p_psets,
                    p_pset_count,
                )
                != 0
            ):
                raise BadReturnValueError("host_processor_sets() failed")
            if await p_pset_count.getindex(0) != 1:
                raise BadReturnValueError("Assertion Failure: pset count greater than one")

            # convert the processor-set-name port to a privileged port
            if (
                await self._client.symbols.host_processor_set_priv(
                    await self._client.symbols.mach_host_self(),
                    await p_psets.getindex(0, 0),
                    p_pset_priv,
                )
                != 0
            ):
                raise BadReturnValueError("host_processor_set_priv() failed")

            await self._client.symbols.mach_port_deallocate(mach_task_self, await p_psets.getindex(0, 0))
            await self._client.symbols.vm_deallocate(
                mach_task_self,
                await p_psets.getindex(0),
                await p_pset_count.getindex(0) * mach_port_t.sizeof(),
            )

            # convert the processor-set-priv to a list of task read ports for the processor set
            if (
                await self._client.symbols.processor_set_tasks_with_flavor(
                    await p_pset_priv.getindex(0),
                    TASK_FLAVOR_READ,
                    p_tasks,
                    p_task_count,
                )
                != 0
            ):
                raise BadReturnValueError("processor_set_tasks_with_flavor() failed")

            await self._client.symbols.vm_deallocate(mach_task_self, await p_pset_priv.getindex(0))

            # swap my current instances port to be last to collect all threads and exception port info
            my_task_position = None
            task_count = await p_task_count.getindex(0)
            tasks = list(struct.unpack(f"<{int(task_count)}I", await (await p_tasks.getindex(0)).peek(task_count * 4)))

            for i in range(task_count):
                if await self._client.symbols.mach_task_is_self(tasks[i]):
                    my_task_position = i
                    break

            if my_task_position is not None:
                swap_holder = tasks[task_count - 1]
                tasks[task_count - 1] = tasks[my_task_position]
                tasks[my_task_position] = swap_holder
        else:
            logger.warning("should run as root for best output (cross-ref to other tasks' ports)")
            # just the one process
            task_count = 1
            async with self._client.safe_malloc(8) as p_task:
                ret = await self._client.symbols.task_read_for_pid(
                    mach_task_self,
                    (await self.get_self()).pid,
                    p_task,
                )
                if ret != 0:
                    raise BadReturnValueError("task_read_for_pid() failed")
                tasks = [await p_task.getindex(0)]

        for task in tasks:
            await self._client.symbols.pid_for_task(task, p_pid)
            pid = (await p_pid.getindex(0)).c_uint32

            if (
                await self._client.symbols.task_get_exception_ports_info(
                    task, EXC_MASK_ALL, masks, p_count, ports_info, behaviors, flavors
                )
                != 0
            ):
                raise BadReturnValueError("task_get_exception_ports_info() failed")

            if include_thread_info:
                # collect threads port as well
                if await self._client.symbols.task_threads(task, p_thread_ports, p_thread_count) != 0:
                    raise BadReturnValueError("task_threads() failed")

                # collect the thread information
                thread_count = (await p_thread_count.getindex(0)).c_uint32
                thread_ports = struct.unpack(
                    f"<{thread_count}I",
                    await (await p_thread_ports.getindex(0)).peek(4 * thread_count),
                )
                thread_ids = []
                for thread_port in thread_ports:
                    ret = await self._client.symbols.thread_get_exception_ports_info(
                        thread_port,
                        EXC_MASK_ALL,
                        masks,
                        p_count,
                        ports_info,
                        behaviors,
                        flavors,
                    )
                    if ret != 0:
                        raise BadReturnValueError(
                            f"thread_get_exception_ports_info() failed: "
                            f"{await (await self._client.symbols.mach_error_string(ret)).peek_str()}"
                        )

                    if (
                        await self._client.symbols.mach_port_kernel_object(
                            mach_task_self, thread_port, p_th_kotype, p_th_kobject
                        )
                        == 0
                    ) and (
                        await self._client.symbols.thread_info(
                            mach_task_self,
                            thread_port,
                            THREAD_IDENTIFIER_INFO,
                            th_info,
                            await p_th_info_count.getindex(0),
                        )
                        == 0
                    ):
                        thread_id = (await th_info.parse(thread_identifier_info)).thread_id
                        thread_ids.append(thread_id)

                    await self._client.symbols.mach_port_deallocate(mach_task_self, thread_port)
                thread_info = MachPortThreadInfo(thread_ids=thread_ids)

            if await self._client.symbols.mach_port_space_info(task, info, p_table, p_count, unused, unused) != 0:
                raise BadReturnValueError("mach_port_space_info() failed")

            proc_name_str = (
                await proc_name.peek_str() if await self._client.symbols.proc_name(pid, proc_name, 100) != 0 else None
            )

            count = (await p_count.getindex(0)).c_uint32
            table_struct = Array(count, ipc_info_name_t)

            parsed_table = await (await p_table.getindex(0)).parse(table_struct)

            for entry in parsed_table:
                dnreq = False
                rights = []

                if entry.iin_type & MACH_PORT_TYPE_ALL_RIGHTS == 0:
                    # skip empty slots in the table
                    continue

                if entry.iin_type == MACH_PORT_TYPE_PORT_SET:
                    continue

                if entry.iin_type & MACH_PORT_TYPE_SEND:
                    rights.append("send")

                if entry.iin_type & MACH_PORT_TYPE_DNREQUEST:
                    dnreq = True

                if entry.iin_type & MACH_PORT_TYPE_RECEIVE:
                    rights.append("recv")

                elif entry.iin_type == MACH_PORT_TYPE_DEAD_NAME:
                    continue

                if entry.iin_type == MACH_PORT_TYPE_SEND_ONCE:
                    pass

                yield MachPortInfo(
                    task=task,
                    pid=pid,
                    name=entry.iin_name,
                    rights=rights,
                    ipc_object=entry.iin_object,
                    dead=dnreq,
                    proc_name=proc_name_str,
                    thread_info=thread_info,
                )

get_self async

get_self() -> Process[DarwinSymbolT_co]

get self process

Source code in src/rpcclient/rpcclient/clients/darwin/subsystems/processes.py
async def get_self(self) -> Process[DarwinSymbolT_co]:
    """get self process"""
    if self._self_process is None:
        self._self_process = Process(self._client, await self._client.get_pid())

    return self._self_process

get_by_pid async

get_by_pid(pid: int) -> Process[DarwinSymbolT_co]

get process object by pid

Source code in src/rpcclient/rpcclient/clients/darwin/subsystems/processes.py
async def get_by_pid(self, pid: int) -> Process[DarwinSymbolT_co]:
    """get process object by pid"""
    proc_list = await self.list()
    for p in proc_list:
        if p.pid == pid:
            return p
    raise ArgumentError(f"failed to locate process with pid: {pid}")

get_by_basename async

get_by_basename(name: str) -> Process[DarwinSymbolT_co]

get process object by basename

Source code in src/rpcclient/rpcclient/clients/darwin/subsystems/processes.py
async def get_by_basename(self, name: str) -> Process[DarwinSymbolT_co]:
    """get process object by basename"""
    proc_list = await self.list()
    for p in proc_list:
        if await type(p).basename(p) == name:
            return p
    raise ArgumentError(f"failed to locate process with name: {name}")

get_by_name async

get_by_name(name: str) -> Process[DarwinSymbolT_co]

get process object by name

Source code in src/rpcclient/rpcclient/clients/darwin/subsystems/processes.py
async def get_by_name(self, name: str) -> Process[DarwinSymbolT_co]:
    """get process object by name"""
    proc_list = await self.list()
    for p in proc_list:
        if await type(p).name(p) == name:
            return p
    raise ArgumentError(f"failed to locate process with name: {name}")

grep async

grep(name: str) -> list[Process[DarwinSymbolT_co]]

get process list by basename filter

Source code in src/rpcclient/rpcclient/clients/darwin/subsystems/processes.py
async def grep(self, name: str) -> list[Process[DarwinSymbolT_co]]:
    """get process list by basename filter"""
    result = []
    proc_list = await self.list()
    for p in proc_list:
        basename = await type(p).basename(p)
        if basename and name in basename:
            result.append(p)
    return result

get_processes_by_listening_port async

get_processes_by_listening_port(port: int) -> list[Process[DarwinSymbolT_co]]

get a process object listening on the specified port

Source code in src/rpcclient/rpcclient/clients/darwin/subsystems/processes.py
async def get_processes_by_listening_port(self, port: int) -> list[Process[DarwinSymbolT_co]]:
    """get a process object listening on the specified port"""
    listening_processes = []
    for process in await self.list():
        try:
            fds = await type(process).fds(process)
        except BadReturnValueError:
            # it's possible to get error if new processes have since died or the rpcserver
            # doesn't have the required permissions to access all the processes
            continue

        for fd in fds:
            if (isinstance(fd, (Ipv4SocketFd, Ipv6SocketFd))) and fd.local_port == port and fd.remote_port == 0:
                listening_processes.append(process)
    return listening_processes

lsof async

lsof() -> dict[int, list[Fd]]

get dictionary of pid to its opened fds

Source code in src/rpcclient/rpcclient/clients/darwin/subsystems/processes.py
async def lsof(self) -> dict[int, list[Fd]]:
    """get dictionary of pid to its opened fds"""
    result = {}
    for process in await self.list():
        try:
            fds = await type(process).fds(process)
        except BadReturnValueError:
            # it's possible to get error if new processes have since died or the rpcserver
            # doesn't have the required permissions to access all the processes
            continue

        result[process.pid] = fds
    return result

fuser async

fuser(path: str | PurePath) -> list[Process[DarwinSymbolT_co]]

get a list of all processes have an open hande to the specified path

Source code in src/rpcclient/rpcclient/clients/darwin/subsystems/processes.py
async def fuser(self, path: str | PurePath) -> list[Process[DarwinSymbolT_co]]:
    """get a list of all processes have an open hande to the specified path"""
    result = []
    proc_list = await self.list()
    for process in proc_list:
        try:
            fds = await type(process).fds(process)
        except BadReturnValueError:
            # it's possible to get error if new processes have since died or the rpcserver
            # doesn't have the required permissions to access all the processes
            continue

        for fd in fds:
            if isinstance(fd, FileFd) and (str(Path(fd.path).absolute()) == str(Path(path).absolute())):
                result.append(process)

    return result

list async

list() -> list[Process[DarwinSymbolT_co]]

list all currently running processes

Source code in src/rpcclient/rpcclient/clients/darwin/subsystems/processes.py
async def list(self) -> list[Process[DarwinSymbolT_co]]:
    """list all currently running processes"""
    n = await self._client.symbols.proc_listallpids(0, 0)
    pid_buf_size = pid_t.sizeof() * n
    async with self._client.safe_malloc(pid_buf_size) as pid_buf:
        pid_buf.item_size = pid_t.sizeof()
        n = await self._client.symbols.proc_listallpids(pid_buf, pid_buf_size)

        result = []
        for i in range(n):
            pid = int(await pid_buf.getindex(i))
            result.append(Process(self._client, pid))
        return result

disable_watchdog async

disable_watchdog() -> None

Continuously kill watchdogd to keep it disabled.

Source code in src/rpcclient/rpcclient/clients/darwin/subsystems/processes.py
async def disable_watchdog(self) -> None:
    """Continuously kill watchdogd to keep it disabled."""
    while True:
        with contextlib.suppress(ArgumentError):
            await (await self.get_by_basename("watchdogd")).kill(SIGKILL)

        await asyncio.sleep(1)

get_mach_ports async

get_mach_ports(include_thread_info: bool = False) -> AsyncGenerator[MachPortInfo]

Enumerate all mach ports (heavily inspired by lsmp)

Source code in src/rpcclient/rpcclient/clients/darwin/subsystems/processes.py
async def get_mach_ports(self, include_thread_info: bool = False) -> AsyncGenerator[MachPortInfo]:
    """Enumerate all mach ports (heavily inspired by lsmp)"""
    thread_info = None
    mach_task_self = await self._client.symbols.mach_task_self()

    malloc = self._client.symbols.malloc.call
    p_psets = await self._client.symbols.malloc(8)
    p_pset_count = await malloc(8)
    p_pset_priv = await malloc(8)
    p_tasks = await malloc(8)
    p_task_count = await malloc(8)
    p_count = await malloc(4)
    ports_info = await malloc(4 * 2 * EXC_TYPES_COUNT)
    masks = await malloc(4 * EXC_TYPES_COUNT)
    behaviors = await malloc(4 * EXC_TYPES_COUNT)
    flavors = await malloc(4 * EXC_TYPES_COUNT)
    p_thread_count = await malloc(4)
    p_thread_ports = await malloc(8)
    info = await malloc(200)
    p_pid = await malloc(4)
    th_info = await malloc(thread_identifier_info.sizeof())
    p_th_kobject = await malloc(4)
    p_th_kotype = await malloc(4)
    p_th_info_count = await malloc(4)
    p_th_voucher = await self._client.symbols.calloc(4, 1)
    p_table = await malloc(8)
    unused = await malloc(200)
    proc_name = await malloc(100)
    p_kotype = await malloc(4)

    p_pid.item_size = 4
    p_count.item_size = 4
    p_thread_count.item_size = 4
    p_th_info_count.item_size = 4
    p_th_voucher.item_size = 4
    await p_th_info_count.setindex(0, THREAD_IDENTIFIER_INFO_COUNT)
    p_kotype.item_size = 4

    if await self._client.symbols.getuid() == 0:
        # if privileged, get the info for all tasks so we can match ports up
        if (
            await self._client.symbols.host_processor_sets(
                await self._client.symbols.mach_host_self(),
                p_psets,
                p_pset_count,
            )
            != 0
        ):
            raise BadReturnValueError("host_processor_sets() failed")
        if await p_pset_count.getindex(0) != 1:
            raise BadReturnValueError("Assertion Failure: pset count greater than one")

        # convert the processor-set-name port to a privileged port
        if (
            await self._client.symbols.host_processor_set_priv(
                await self._client.symbols.mach_host_self(),
                await p_psets.getindex(0, 0),
                p_pset_priv,
            )
            != 0
        ):
            raise BadReturnValueError("host_processor_set_priv() failed")

        await self._client.symbols.mach_port_deallocate(mach_task_self, await p_psets.getindex(0, 0))
        await self._client.symbols.vm_deallocate(
            mach_task_self,
            await p_psets.getindex(0),
            await p_pset_count.getindex(0) * mach_port_t.sizeof(),
        )

        # convert the processor-set-priv to a list of task read ports for the processor set
        if (
            await self._client.symbols.processor_set_tasks_with_flavor(
                await p_pset_priv.getindex(0),
                TASK_FLAVOR_READ,
                p_tasks,
                p_task_count,
            )
            != 0
        ):
            raise BadReturnValueError("processor_set_tasks_with_flavor() failed")

        await self._client.symbols.vm_deallocate(mach_task_self, await p_pset_priv.getindex(0))

        # swap my current instances port to be last to collect all threads and exception port info
        my_task_position = None
        task_count = await p_task_count.getindex(0)
        tasks = list(struct.unpack(f"<{int(task_count)}I", await (await p_tasks.getindex(0)).peek(task_count * 4)))

        for i in range(task_count):
            if await self._client.symbols.mach_task_is_self(tasks[i]):
                my_task_position = i
                break

        if my_task_position is not None:
            swap_holder = tasks[task_count - 1]
            tasks[task_count - 1] = tasks[my_task_position]
            tasks[my_task_position] = swap_holder
    else:
        logger.warning("should run as root for best output (cross-ref to other tasks' ports)")
        # just the one process
        task_count = 1
        async with self._client.safe_malloc(8) as p_task:
            ret = await self._client.symbols.task_read_for_pid(
                mach_task_self,
                (await self.get_self()).pid,
                p_task,
            )
            if ret != 0:
                raise BadReturnValueError("task_read_for_pid() failed")
            tasks = [await p_task.getindex(0)]

    for task in tasks:
        await self._client.symbols.pid_for_task(task, p_pid)
        pid = (await p_pid.getindex(0)).c_uint32

        if (
            await self._client.symbols.task_get_exception_ports_info(
                task, EXC_MASK_ALL, masks, p_count, ports_info, behaviors, flavors
            )
            != 0
        ):
            raise BadReturnValueError("task_get_exception_ports_info() failed")

        if include_thread_info:
            # collect threads port as well
            if await self._client.symbols.task_threads(task, p_thread_ports, p_thread_count) != 0:
                raise BadReturnValueError("task_threads() failed")

            # collect the thread information
            thread_count = (await p_thread_count.getindex(0)).c_uint32
            thread_ports = struct.unpack(
                f"<{thread_count}I",
                await (await p_thread_ports.getindex(0)).peek(4 * thread_count),
            )
            thread_ids = []
            for thread_port in thread_ports:
                ret = await self._client.symbols.thread_get_exception_ports_info(
                    thread_port,
                    EXC_MASK_ALL,
                    masks,
                    p_count,
                    ports_info,
                    behaviors,
                    flavors,
                )
                if ret != 0:
                    raise BadReturnValueError(
                        f"thread_get_exception_ports_info() failed: "
                        f"{await (await self._client.symbols.mach_error_string(ret)).peek_str()}"
                    )

                if (
                    await self._client.symbols.mach_port_kernel_object(
                        mach_task_self, thread_port, p_th_kotype, p_th_kobject
                    )
                    == 0
                ) and (
                    await self._client.symbols.thread_info(
                        mach_task_self,
                        thread_port,
                        THREAD_IDENTIFIER_INFO,
                        th_info,
                        await p_th_info_count.getindex(0),
                    )
                    == 0
                ):
                    thread_id = (await th_info.parse(thread_identifier_info)).thread_id
                    thread_ids.append(thread_id)

                await self._client.symbols.mach_port_deallocate(mach_task_self, thread_port)
            thread_info = MachPortThreadInfo(thread_ids=thread_ids)

        if await self._client.symbols.mach_port_space_info(task, info, p_table, p_count, unused, unused) != 0:
            raise BadReturnValueError("mach_port_space_info() failed")

        proc_name_str = (
            await proc_name.peek_str() if await self._client.symbols.proc_name(pid, proc_name, 100) != 0 else None
        )

        count = (await p_count.getindex(0)).c_uint32
        table_struct = Array(count, ipc_info_name_t)

        parsed_table = await (await p_table.getindex(0)).parse(table_struct)

        for entry in parsed_table:
            dnreq = False
            rights = []

            if entry.iin_type & MACH_PORT_TYPE_ALL_RIGHTS == 0:
                # skip empty slots in the table
                continue

            if entry.iin_type == MACH_PORT_TYPE_PORT_SET:
                continue

            if entry.iin_type & MACH_PORT_TYPE_SEND:
                rights.append("send")

            if entry.iin_type & MACH_PORT_TYPE_DNREQUEST:
                dnreq = True

            if entry.iin_type & MACH_PORT_TYPE_RECEIVE:
                rights.append("recv")

            elif entry.iin_type == MACH_PORT_TYPE_DEAD_NAME:
                continue

            if entry.iin_type == MACH_PORT_TYPE_SEND_ONCE:
                pass

            yield MachPortInfo(
                task=task,
                pid=pid,
                name=entry.iin_name,
                rights=rights,
                ipc_object=entry.iin_object,
                dead=dnreq,
                proc_name=proc_name_str,
                thread_info=thread_info,
            )

HID

rpcclient.clients.darwin.subsystems.hid

Hid

Bases: ClientBound['DarwinClient[DarwinSymbolT_co]'], Generic[DarwinSymbolT_co]

Control HID devices and simulate events

Source code in src/rpcclient/rpcclient/clients/darwin/subsystems/hid.py
class Hid(ClientBound["DarwinClient[DarwinSymbolT_co]"], Generic[DarwinSymbolT_co]):
    """Control HID devices and simulate events"""

    def __init__(self, client: "DarwinClient[DarwinSymbolT_co]") -> None:
        self._client = client

    async def send_double_home_button_press(self) -> None:
        await self.send_home_button_press()
        await self.send_home_button_press()

    async def send_rewind_button_press(self) -> None:
        await self.send_key_press(kHIDPage_Consumer, kHIDUsage_Csmr_Rewind)

    async def send_random_play_button_press(self) -> None:
        await self.send_key_press(kHIDPage_Consumer, kHIDUsage_Csmr_RandomPlay)

    async def send_repeat_button_press(self) -> None:
        await self.send_key_press(kHIDPage_Consumer, kHIDUsage_Csmr_Repeat)

    async def send_fast_forward_button_press(self) -> None:
        await self.send_key_press(kHIDPage_Consumer, kHIDUsage_Csmr_FastForward)

    async def send_play_button_press(self) -> None:
        await self.send_key_press(kHIDPage_Consumer, kHIDUsage_Csmr_Play)

    async def send_pause_button_press(self) -> None:
        await self.send_key_press(kHIDPage_Consumer, kHIDUsage_Csmr_Pause)

    async def send_play_pause_button_press(self) -> None:
        await self.send_key_press(kHIDPage_Consumer, kHIDUsage_Csmr_PlayOrPause)

    async def send_search_button_press(self) -> None:
        await self.send_key_press(kHIDPage_Consumer, kHIDUsage_Csmr_ACSearch)

    async def send_mute_button_press(self) -> None:
        await self.send_key_press(kHIDPage_Consumer, kHIDUsage_Csmr_Mute)

    async def send_home_button_press(self) -> None:
        await self.send_key_press(kHIDPage_Consumer, kHIDUsage_Csmr_Menu)

    async def send_power_button_press(self, interval: int = 0) -> None:
        await self.send_key_press(kHIDPage_Consumer, kHIDUsage_Csmr_Power, interval=interval)

    async def send_volume_down_button_press(self, interval: int = 0) -> None:
        await self.send_key_press(kHIDPage_Consumer, kHIDUsage_Csmr_VolumeDecrement, interval=interval)

    async def send_volume_up_button_press(self, interval: int = 0) -> None:
        await self.send_key_press(kHIDPage_Consumer, kHIDUsage_Csmr_VolumeIncrement, interval=interval)

    async def send_key_press(self, page: int, key_code: int, interval: float | int = 0) -> None:
        await self.send_keyboard_event(page, key_code, True)
        if interval:
            await asyncio.sleep(interval)
        await self.send_keyboard_event(page, key_code, False)

    async def send_keyboard_event(self, page: int, key_code: int, down: bool) -> None:
        event = await self._client.symbols.IOHIDEventCreateKeyboardEvent(
            kCFAllocatorDefault, await self._client.symbols.mach_absolute_time(), page, key_code, down, 0
        )
        if not event:
            raise BadReturnValueError("IOHIDEventCreateKeyboardEvent() failed")

        await self.dispatch(event)

    async def send_swipe_right(self) -> None:
        await self.send_swipe(0.5, 0.5, 1.0, 0.5)

    async def send_swipe_left(self) -> None:
        await self.send_swipe(0.5, 0.5, 0.0, 0.5)

    async def send_swipe_up(self) -> None:
        await self.send_swipe(0.5, 1.5, 0.5, 0.5)

    async def send_swipe_down(self) -> None:
        await self.send_swipe(0.5, 0.5, 0.5, 1.5)

    async def send_swipe(self, from_x: float, from_y: float, to_x: float, to_y: float) -> None:
        await self.send_touch_event(TouchEventType.TOUCH_DOWN, from_x, from_y)
        await asyncio.sleep(TOUCH_EVENT_SLEEP)
        await self.send_touch_event(TouchEventType.TOUCH_MOVE, to_x, to_y)
        await self.send_touch_event(TouchEventType.TOUCH_UP, to_x, to_y)

    async def send_touch_event(self, event_type: TouchEventType, x: float, y: float, pressure: float = 0.4) -> None:
        params = EVENT_TYPE_PARAMS[event_type]

        timestamp = await self._client.symbols.mach_absolute_time()

        event_flags = params["event_flags"]
        touch = params["touch"]
        button_mask = params["button_mask"]

        parent = await self.create_digitizer_event(
            IOHIDDigitizerTransducerType.kIOHIDDigitizerTransducerTypeHand,
            0,
            0,
            event_flags,
            0,
            x,
            y,
            0.0,
            0.0,
            0.0,
            touch,
            touch,
            0,
            timestamp=timestamp,
        )

        child = await self.create_digitizer_finger_event(
            2, 2, event_flags, button_mask, x, y, 0.0, pressure, 0.0, touch, touch, 0, timestamp=timestamp
        )
        await self._client.symbols.IOHIDEventAppendEvent(parent, child)
        await self.dispatch(parent)

    async def create_digitizer_event(
        self,
        type_: IOHIDDigitizerTransducerType | int,
        index: int,
        identity: int,
        event_mask: int,
        button_mask: int,
        x: float,
        y: float,
        z: float,
        tip_pressure: float,
        barrel_pressure: float,
        range_: bool,
        touch: bool,
        options: int,
        timestamp=None,
    ):
        if timestamp is None:
            timestamp = await self._client.symbols.mach_absolute_time()

        event = await self._client.symbols.IOHIDEventCreateDigitizerEvent(
            kCFAllocatorDefault,
            timestamp,
            type_,
            index,
            identity,
            event_mask,
            button_mask,
            x,
            y,
            z,
            tip_pressure,
            barrel_pressure,
            range_,
            touch,
            options,
        )

        if not event:
            raise BadReturnValueError("IOHIDEventCreateDigitizerEvent() failed")

        await self._client.symbols.IOHIDEventSetIntegerValue(event, IOHIDEventField.kIOHIDEventFieldIsBuiltIn, 0)
        await self._client.symbols.IOHIDEventSetIntegerValue(
            event, IOHIDEventFieldDigitizer.kIOHIDEventFieldDigitizerIsDisplayIntegrated, 1
        )
        await self._client.symbols.IOHIDEventSetSenderID(event, 0x8000000817319375)

        return event

    async def create_digitizer_finger_event(
        self,
        index: int,
        identity: int,
        event_mask: int,
        button_mask: int,
        x: float,
        y: float,
        z: float,
        tip_pressure: float,
        twist: float,
        range_: bool,
        touch: bool,
        options: int,
        timestamp=None,
    ) -> DarwinSymbolT_co:
        if timestamp is None:
            timestamp = await self._client.symbols.mach_absolute_time()

        event = await self._client.symbols.IOHIDEventCreateDigitizerFingerEvent(
            kCFAllocatorDefault,
            timestamp,
            index,
            identity,
            event_mask,
            button_mask,
            x,
            y,
            z,
            tip_pressure,
            twist,
            range_,
            touch,
            options,
        )

        if not event:
            raise BadReturnValueError("IOHIDEventCreateDigitizerFingerEvent() failed")

        await self._client.symbols.IOHIDEventSetFloatValue(
            event, IOHIDEventFieldDigitizer.kIOHIDEventFieldDigitizerMajorRadius, 0.5
        )
        return event

    @asynccontextmanager
    async def create_hid_client(self) -> AsyncGenerator[DarwinSymbolT_co]:
        client = await self._client.symbols.IOHIDEventSystemClientCreate(0)

        if not client:
            raise BadReturnValueError("IOHIDEventSystemClientCreate() failed")

        try:
            yield client
        finally:
            await self._client.symbols.CFRelease(client)

    async def dispatch(self, event):
        async with self.create_hid_client() as hid_client:
            await self._client.symbols.IOHIDEventSystemClientDispatchEvent(hid_client, event)

Location

rpcclient.clients.darwin.subsystems.location

Location

Bases: ClientBound['DarwinClient[DarwinSymbolT_co]'], Generic[DarwinSymbolT_co]

Wrapper to CLLocationManager https://developer.apple.com/documentation/corelocation/cllocationmanager?language=objc

Source code in src/rpcclient/rpcclient/clients/darwin/subsystems/location.py
class Location(ClientBound["DarwinClient[DarwinSymbolT_co]"], Generic[DarwinSymbolT_co]):
    """
    Wrapper to CLLocationManager
    https://developer.apple.com/documentation/corelocation/cllocationmanager?language=objc
    """

    def __init__(self, client: "DarwinClient[DarwinSymbolT_co]") -> None:
        self._client = client
        client.load_framework_lazy("CoreLocation")

    @cached_async_method
    async def _get_CLLocationManager(self) -> DarwinSymbolT_co:
        return await self._client.symbols.objc_getClass("CLLocationManager")

    @cached_async_method
    async def _get_location_manager(self) -> DarwinSymbolT_co:
        return await (await self._get_CLLocationManager()).objc_call("sharedManager")

    async def get_location_services_enabled(self) -> bool:
        """opt-in status for location services"""
        return bool(await (await self._get_location_manager()).objc_call("locationServicesEnabled"))

    async def set_location_services_enabled(self, value: bool) -> None:
        """opt-in status for location services"""
        await (await self._get_CLLocationManager()).objc_call("setLocationServicesEnabled:", value)

    async def authorization_status(self) -> CLAuthorizationStatus:
        """authorization status for current server process of accessing location services"""
        return CLAuthorizationStatus((await self._get_location_manager()).objc_call("authorizationStatus"))

    async def last_sample(self) -> dict | None:
        """last taken location sample (or None if there isn't any)"""
        location = await (await self._get_location_manager()).objc_call("location")
        if not location:
            return None
        return await (await location.objc_call("jsonObject")).py(dict)

    async def request_always_authorization(self) -> None:
        """Request authorization to always query location data"""
        await (await self._get_location_manager()).objc_call("requestAlwaysAuthorization")

    async def start_updating_location(self) -> None:
        """request location updates from CLLocationManager"""
        if (
            await type(self).authorization_status(self)
        ).value < CLAuthorizationStatus.kCLAuthorizationStatusAuthorizedAlways.value:
            raise RpcPermissionError()
        await (await self._get_location_manager()).objc_call("startUpdatingLocation")

    async def stop_updating_location(self) -> None:
        """stop requesting location updates from CLLocationManager"""
        await (await self._get_location_manager()).objc_call("stopUpdatingLocation")

    async def request_oneshot_location(self) -> None:
        """requests the one-time delivery of the user's current location"""
        if (
            await type(self).authorization_status(self)
        ).value < CLAuthorizationStatus.kCLAuthorizationStatusAuthorizedAlways.value:
            raise RpcPermissionError()
        await (await self._get_location_manager()).objc_call("requestLocation")

get_location_services_enabled async

get_location_services_enabled() -> bool

opt-in status for location services

Source code in src/rpcclient/rpcclient/clients/darwin/subsystems/location.py
async def get_location_services_enabled(self) -> bool:
    """opt-in status for location services"""
    return bool(await (await self._get_location_manager()).objc_call("locationServicesEnabled"))

set_location_services_enabled async

set_location_services_enabled(value: bool) -> None

opt-in status for location services

Source code in src/rpcclient/rpcclient/clients/darwin/subsystems/location.py
async def set_location_services_enabled(self, value: bool) -> None:
    """opt-in status for location services"""
    await (await self._get_CLLocationManager()).objc_call("setLocationServicesEnabled:", value)

authorization_status async

authorization_status() -> CLAuthorizationStatus

authorization status for current server process of accessing location services

Source code in src/rpcclient/rpcclient/clients/darwin/subsystems/location.py
async def authorization_status(self) -> CLAuthorizationStatus:
    """authorization status for current server process of accessing location services"""
    return CLAuthorizationStatus((await self._get_location_manager()).objc_call("authorizationStatus"))

last_sample async

last_sample() -> dict | None

last taken location sample (or None if there isn't any)

Source code in src/rpcclient/rpcclient/clients/darwin/subsystems/location.py
async def last_sample(self) -> dict | None:
    """last taken location sample (or None if there isn't any)"""
    location = await (await self._get_location_manager()).objc_call("location")
    if not location:
        return None
    return await (await location.objc_call("jsonObject")).py(dict)

request_always_authorization async

request_always_authorization() -> None

Request authorization to always query location data

Source code in src/rpcclient/rpcclient/clients/darwin/subsystems/location.py
async def request_always_authorization(self) -> None:
    """Request authorization to always query location data"""
    await (await self._get_location_manager()).objc_call("requestAlwaysAuthorization")

start_updating_location async

start_updating_location() -> None

request location updates from CLLocationManager

Source code in src/rpcclient/rpcclient/clients/darwin/subsystems/location.py
async def start_updating_location(self) -> None:
    """request location updates from CLLocationManager"""
    if (
        await type(self).authorization_status(self)
    ).value < CLAuthorizationStatus.kCLAuthorizationStatusAuthorizedAlways.value:
        raise RpcPermissionError()
    await (await self._get_location_manager()).objc_call("startUpdatingLocation")

stop_updating_location async

stop_updating_location() -> None

stop requesting location updates from CLLocationManager

Source code in src/rpcclient/rpcclient/clients/darwin/subsystems/location.py
async def stop_updating_location(self) -> None:
    """stop requesting location updates from CLLocationManager"""
    await (await self._get_location_manager()).objc_call("stopUpdatingLocation")

request_oneshot_location async

request_oneshot_location() -> None

requests the one-time delivery of the user's current location

Source code in src/rpcclient/rpcclient/clients/darwin/subsystems/location.py
async def request_oneshot_location(self) -> None:
    """requests the one-time delivery of the user's current location"""
    if (
        await type(self).authorization_status(self)
    ).value < CLAuthorizationStatus.kCLAuthorizationStatusAuthorizedAlways.value:
        raise RpcPermissionError()
    await (await self._get_location_manager()).objc_call("requestLocation")

Media

rpcclient.clients.darwin.subsystems.media

AVAudioSessionCategory

Bases: Enum

https://developer.apple.com/documentation/avfaudio/AVAudioSessionCategory?language=objc

Source code in src/rpcclient/rpcclient/clients/darwin/subsystems/media.py
class AVAudioSessionCategory(Enum):
    """https://developer.apple.com/documentation/avfaudio/AVAudioSessionCategory?language=objc"""

    PlayAndRecord = "AVAudioSessionCategoryPlayAndRecord"
    Ambient = "AVAudioSessionCategoryAmbient"
    MultiRoute = "AVAudioSessionCategoryMultiRoute"
    Playback = "AVAudioSessionCategoryPlayback"
    Record = "AVAudioSessionCategoryRecord"
    SoloAmbient = "AVAudioSessionCategorySoloAmbient"
    AudioProcessing = "AVAudioSessionCategoryAudioProcessing"

AVAudioSessionMode

Bases: Enum

https://developer.apple.com/documentation/avfaudio/avaudiosessionmode?language=objc

Source code in src/rpcclient/rpcclient/clients/darwin/subsystems/media.py
class AVAudioSessionMode(Enum):
    """https://developer.apple.com/documentation/avfaudio/avaudiosessionmode?language=objc"""

    Default = "AVAudioSessionModeDefault"
    GameChat = "AVAudioSessionModeGameChat"
    Measurement = "AVAudioSessionModeMeasurement"
    MoviePlayback = "AVAudioSessionModeMoviePlayback"
    SpokenAudio = "AVAudioSessionModeSpokenAudio"
    VideoChat = "AVAudioSessionModeVideoChat"
    VideoRecording = "AVAudioSessionModeVideoRecording"
    VoiceChat = "AVAudioSessionModeVoiceChat"
    VoicePrompt = "AVAudioSessionModeVoicePrompt"

Recorder

Bases: Allocated['DarwinClient[DarwinSymbolT_co]'], Generic[DarwinSymbolT_co]

Wrapper for AVAudioRecorder https://developer.apple.com/documentation/avfaudio/avaudiorecorder?language=objc

Source code in src/rpcclient/rpcclient/clients/darwin/subsystems/media.py
class Recorder(Allocated["DarwinClient[DarwinSymbolT_co]"], Generic[DarwinSymbolT_co]):
    """
    Wrapper for AVAudioRecorder
    https://developer.apple.com/documentation/avfaudio/avaudiorecorder?language=objc
    """

    def __init__(
        self, client: "DarwinClient[DarwinSymbolT_co]", session: "AudioSession", recorder: DarwinSymbolT_co
    ) -> None:
        super().__init__()
        self._client = client
        self._session = session
        self._recorder = recorder

    async def _deallocate(self) -> None:
        await self._recorder.objc_call("release")

    async def record(self) -> None:
        await self._session.set_category(AVAudioSessionCategory.PlayAndRecord)
        await self._session.set_active(True)
        await self._recorder.objc_call("record")
        if not await type(self).recording(self):
            raise RpcFailedToRecordError()

    async def pause(self) -> None:
        await self._recorder.objc_call("pause")
        await self._session.set_active(False)

    async def stop(self) -> None:
        await self._recorder.objc_call("stop")
        await self._session.set_active(False)

    async def delete_recording(self) -> None:
        if not await self._recorder.objc_call("deleteRecording"):
            raise BadReturnValueError("deleteRecording failed")

    async def recording(self) -> bool:
        return bool(await self._recorder.objc_call("isRecording"))

Player

Bases: Allocated['DarwinClient[DarwinSymbolT_co]'], Generic[DarwinSymbolT_co]

Wrapper for AVAudioPlayer https://developer.apple.com/documentation/avfaudio/avaudioplayer?language=objc

Source code in src/rpcclient/rpcclient/clients/darwin/subsystems/media.py
class Player(Allocated["DarwinClient[DarwinSymbolT_co]"], Generic[DarwinSymbolT_co]):
    """
    Wrapper for AVAudioPlayer
    https://developer.apple.com/documentation/avfaudio/avaudioplayer?language=objc
    """

    def __init__(
        self,
        client: "DarwinClient[DarwinSymbolT_co]",
        session: "AudioSession[DarwinSymbolT_co]",
        player: DarwinSymbolT_co,
    ) -> None:
        self._client = client
        self._session: AudioSession[DarwinSymbolT_co] = session
        self._player: DarwinSymbolT_co = player

    async def _deallocate(self) -> None:
        await self._player.objc_call("release")

    async def play(self) -> None:
        await self._session.set_category(AVAudioSessionCategory.PlayAndRecord)
        await self._session.set_active(True)
        await self._player.objc_call("play")
        if not await type(self).playing(self):
            raise RpcFailedToPlayError()

    async def pause(self) -> None:
        await self._player.objc_call("pause")

    async def stop(self) -> None:
        await self._player.objc_call("stop")
        await self._session.set_active(False)

    async def set_volume(self, value: float) -> None:
        await self._player.objc_call("setVolume:", struct.pack("<f", value))

    async def playing(self) -> bool:
        return bool(await self._player.objc_call("isPlaying"))

    async def get_loops(self) -> int:
        return await self._player.objc_call("numberOfLoops")

    async def set_loops(self, value: int) -> None:
        await self._player.objc_call("setNumberOfLoops:", value)

AudioSession

Bases: ClientBound['DarwinClient[DarwinSymbolT_co]'], Generic[DarwinSymbolT_co]

wrapper for AVAudioSession https://developer.apple.com/documentation/avfaudio/avaudiosession?language=objc

Source code in src/rpcclient/rpcclient/clients/darwin/subsystems/media.py
class AudioSession(ClientBound["DarwinClient[DarwinSymbolT_co]"], Generic[DarwinSymbolT_co]):
    """
    wrapper for AVAudioSession
    https://developer.apple.com/documentation/avfaudio/avaudiosession?language=objc
    """

    def __init__(self, client: "DarwinClient[DarwinSymbolT_co]", session: DarwinSymbolT_co) -> None:
        self._client = client
        self._session: DarwinSymbolT_co = session

    @classmethod
    async def create(cls, client: "DarwinClient[DarwinSymbolT_co]") -> Self:
        session = await (await client.symbols.objc_getClass("AVAudioSession")).objc_call("sharedInstance")
        return cls(client, session)

    async def set_active(self, is_active: bool) -> None:
        await self._session.objc_call("setActive:error:", is_active, 0)

    async def set_mode(self, mode: AVAudioSessionMode) -> None:
        await self._session.objc_call(
            "setMode:error:",
            await self._client.symbols[mode.value].getindex(0),
            0,
        )

    async def set_category(
        self,
        category: AVAudioSessionCategory,
        mode: AVAudioSessionMode = AVAudioSessionMode.Default,
        route_sharing_policy: AVAudioSessionRouteSharingPolicy = AVAudioSessionRouteSharingPolicy.Default,
        options: AVAudioSessionCategoryOptions = AVAudioSessionCategoryOptions.DefaultToSpeaker,
    ) -> None:
        await self._session.objc_call(
            "setCategory:mode:routeSharingPolicy:options:error:",
            await self._client.symbols[category.value].getindex(0),
            await self._client.symbols[mode.value].getindex(0),
            route_sharing_policy,
            options,
            0,
        )

    async def set_interruption_priority(self, priority: InterruptionPriority) -> None:
        await self._session.objc_call("setInterruptionPriority:error:", priority, 0)

    async def override_output_audio_port(self, port: int) -> None:
        await self._session.objc_call("overrideOutputAudioPort:error:", port, 0)

    async def other_audio_playing(self) -> bool:
        return bool(await self._session.objc_call("isOtherAudioPlaying"))

    async def record_permission(self) -> str:
        return struct.pack("<I", await self._session.objc_call("recordPermission"))[::-1].decode()

    async def available_categories(self) -> list[str]:
        return await (await self._session.objc_call("availableCategories")).py(list)

    async def available_modes(self) -> list[str]:
        return await (await self._session.objc_call("availableModes")).py(list)

    async def is_active(self) -> bool:
        return bool(await self._session.objc_call("isActive"))

DarwinMedia

Bases: ClientBound['DarwinClient[DarwinSymbolT_co]'], Generic[DarwinSymbolT_co]

Media utils

Source code in src/rpcclient/rpcclient/clients/darwin/subsystems/media.py
class DarwinMedia(ClientBound["DarwinClient[DarwinSymbolT_co]"], Generic[DarwinSymbolT_co]):
    """Media utils"""

    def __init__(self, client: "DarwinClient[DarwinSymbolT_co]") -> None:
        """
        :param rpcclient.darwin.client.DarwinClient client:
        """
        self._client = client
        client.load_framework_lazy("AVFoundation")

    @cached_async_method
    async def session(self) -> AudioSession[DarwinSymbolT_co]:
        return await AudioSession.create(self._client)

    async def get_recorder(self, filename: str | PurePath) -> Recorder[DarwinSymbolT_co]:
        url = await (await self._client.symbols.objc_getClass("NSURL")).objc_call(
            "fileURLWithPath:", await self._client.cf(str(filename))
        )
        settings = await self._client.cf({
            "AVEncoderQualityKey": 100,
            "AVEncoderBitRateKey": 16,
            "AVNumberOfChannelsKey": 1,
            "AVSampleRateKey": 8000.0,
        })
        AVAudioRecorder = await self._client.symbols.objc_getClass("AVAudioRecorder")
        recorder = await (await AVAudioRecorder.objc_call("alloc")).objc_call(
            "initWithURL:settings:error:", url, settings, 0
        )

        return Recorder(self._client, await type(self).session(self), recorder)

    async def get_player(self, filename: str | PurePath) -> Player[DarwinSymbolT_co]:
        NSURL = await self._client.symbols.objc_getClass("NSURL")
        url = await NSURL.objc_call("fileURLWithPath:", await self._client.cf(str(filename)))

        AVAudioPlayer = await self._client.symbols.objc_getClass("AVAudioPlayer")
        player = await (await AVAudioPlayer.objc_call("alloc")).objc_call("initWithContentsOfURL:error:", url, 0)

        return Player(self._client, await type(self).session(self), player)

Preferences

rpcclient.clients.darwin.subsystems.preferences

Preferences

Bases: ClientBound['DarwinClient[DarwinSymbolT_co]'], Generic[DarwinSymbolT_co]

Preferences utils

Source code in src/rpcclient/rpcclient/clients/darwin/subsystems/preferences.py
class Preferences(ClientBound["DarwinClient[DarwinSymbolT_co]"], Generic[DarwinSymbolT_co]):
    """Preferences utils"""

    def __init__(self, client: "DarwinClient[DarwinSymbolT_co]") -> None:
        """
        :param rpcclient.darwin.client.DarwinClient client:
        """
        self.cf: CFPreferences[DarwinSymbolT_co] = CFPreferences(client)
        self.sc: SCPreferences[DarwinSymbolT_co] = SCPreferences(client)

IORegistry

rpcclient.clients.darwin.subsystems.ioregistry

IOService

Bases: Allocated['DarwinClient[DarwinSymbolT_co]'], Generic[DarwinSymbolT_co]

representation of a remote IOService

Source code in src/rpcclient/rpcclient/clients/darwin/subsystems/ioregistry.py
class IOService(Allocated["DarwinClient[DarwinSymbolT_co]"], Generic[DarwinSymbolT_co]):
    """representation of a remote IOService"""

    def __init__(self, client: "DarwinClient[DarwinSymbolT_co]", service: DarwinSymbolT_co, name: str) -> None:
        super().__init__()
        self._client = client
        self._service: DarwinSymbolT_co = service
        self.name: str = name

    @classmethod
    async def create(cls, client: "DarwinClient[DarwinSymbolT_co]", service: int) -> Self:
        async with client.safe_malloc(io_name_t.sizeof()) as name:
            if await client.symbols.IORegistryEntryGetName(service, name):
                raise BadReturnValueError("IORegistryEntryGetName failed")
            name = await name.peek_str()

        return cls(client, client.symbol(service), name)

    async def properties(self) -> dict:
        async with self._client.safe_malloc(8) as p_properties:
            if await self._client.symbols.IORegistryEntryCreateCFProperties(
                self._service, p_properties, kCFAllocatorDefault, 0
            ):
                raise BadReturnValueError("IORegistryEntryCreateCFProperties failed")
            return await (await p_properties.getindex(0)).py(dict)

    async def _iter(self) -> "AsyncGenerator[IOService[DarwinSymbolT_co]]":
        async with self._client.safe_malloc(io_object_t.sizeof()) as p_child_iter:
            if await self._client.symbols.IORegistryEntryGetChildIterator(self._service, kIOServicePlane, p_child_iter):
                raise BadReturnValueError("IORegistryEntryGetChildIterator failed")
            child_iter = await p_child_iter.getindex(0)

        while child := await self._client.symbols.IOIteratorNext(child_iter):
            yield await IOService.create(self._client, child)

    def __aiter__(self) -> "AsyncGenerator[IOService[DarwinSymbolT_co]]":
        return self._iter()

    async def set(self, properties: dict) -> None:
        await self._client.symbols.IORegistryEntrySetCFProperties(self._service, await self._client.cf(properties))

    async def get(
        self, key: str, typ: type[CfSerializableT] | tuple[type[CfSerializableT], ...] = _CfSerializableAny
    ) -> CfSerializableT:
        return await (
            await self._client.symbols.IORegistryEntryCreateCFProperty(
                self._service, await self._client.cf(key), kCFAllocatorDefault, 0
            )
        ).py(typ)

    async def _deallocate(self) -> None:
        await self._client.symbols.IOObjectRelease(self._service)

    def __repr__(self):
        return f"<{self.__class__.__name__} NAME:{self.name}>"

IORegistry

Bases: ClientBound['DarwinClient[DarwinSymbolT_co]'], Generic[DarwinSymbolT_co]

IORegistry utils https://developer.apple.com/library/archive/documentation/DeviceDrivers/Conceptual/IOKitFundamentals/TheRegistry/TheRegistry.html

Source code in src/rpcclient/rpcclient/clients/darwin/subsystems/ioregistry.py
class IORegistry(ClientBound["DarwinClient[DarwinSymbolT_co]"], Generic[DarwinSymbolT_co]):
    """
    IORegistry utils
    https://developer.apple.com/library/archive/documentation/DeviceDrivers/Conceptual/IOKitFundamentals/TheRegistry/TheRegistry.html
    """

    def __init__(self, client: "DarwinClient[DarwinSymbolT_co]") -> None:
        """
        :param rpcclient.darwin.client.DarwinClient client:
        """
        self._client = client

    async def backlight_control(self) -> BacklightControlService[DarwinSymbolT_co]:
        service = await self._client.symbols.IOServiceGetMatchingService(
            0, await self._client.cf({"IOPropertyMatch": {"backlight-control": True}})
        )
        if not service:
            raise RpcClientException("IOServiceGetMatchingService failed")
        return await BacklightControlService.create(self._client, service)

    async def power_source(self) -> PowerSourceService[DarwinSymbolT_co]:
        service = await self._client.symbols.IOServiceGetMatchingService(
            0, await self._client.symbols.IOServiceMatching("IOPMPowerSource")
        )
        if not service:
            raise RpcClientException("IOServiceGetMatchingService failed")
        return await PowerSourceService.create(self._client, service)

    async def root(self) -> IOService[DarwinSymbolT_co]:
        async with self._client.safe_malloc(mach_port_t.sizeof()) as p_master_port:
            await self._client.symbols.IOMasterPort(MACH_PORT_NULL, p_master_port)
            return await IOService.create(
                self._client, await self._client.symbols.IORegistryGetRootEntry(await p_master_port.getindex(0))
            )

Reports

rpcclient.clients.darwin.subsystems.reports

Reports

Bases: ClientBound['DarwinClient[DarwinSymbolT_co]'], Generic[DarwinSymbolT_co]

equivalent to the data that can be found using the Console app inside the Reports section

Source code in src/rpcclient/rpcclient/clients/darwin/subsystems/reports.py
class Reports(ClientBound["DarwinClient[DarwinSymbolT_co]"], Generic[DarwinSymbolT_co]):
    """equivalent to the data that can be found using the Console app inside the Reports section"""

    def __init__(self, client: "DarwinClient[DarwinSymbolT_co]", crash_reports_dir: str) -> None:
        self._client = client
        self.crash_reports: CrashReports[DarwinSymbolT_co] = CrashReports(client, crash_reports_dir)

    async def get_logs(self, prefix: str = "") -> list[Path]:
        result = []
        sub_paths = ["var/log", "Library/Logs"]
        for sub_path in sub_paths:
            for path in await self._client.roots():
                path = Path(path) / sub_path
                if not await self._client.fs.accessible(path):
                    continue

                async for root, _dirs, files in self._client.fs.walk(path, onerror=lambda x: None):
                    for name in files:
                        if not await self._client.fs.accessible(path):
                            continue

                        if name.endswith(".log") and name.startswith(prefix):
                            result.append(Path(root) / name)
        return result

    async def system_log(self) -> str:
        async with await self._client.fs.open("/var/log/system.log", "r") as f:
            return (await f.read()).decode()

Bluetooth

rpcclient.clients.darwin.subsystems.bluetooth

Bluetooth

Bases: ClientBound['DarwinClient[DarwinSymbolT_co]'], Generic[DarwinSymbolT_co]

bluetooth utils

Source code in src/rpcclient/rpcclient/clients/darwin/subsystems/bluetooth.py
class Bluetooth(ClientBound["DarwinClient[DarwinSymbolT_co]"], Generic[DarwinSymbolT_co]):
    """bluetooth utils"""

    # bugfix: +[BluetoothManager.setSharedInstanceQueue:] cannot be called twice in the same process,
    # so we use this global to tell if it was already called
    _ENV_QUEUE_SET = "_rpc_server_bluetooth_manager_dispatch_queue_set"

    def __init__(self, client: "DarwinClient[DarwinSymbolT_co]") -> None:
        self._client = client
        client.load_framework_lazy("BluetoothManager")

    _bluetooth_manager: DarwinSymbolT_co | None = None

    async def _get_bluetooth_manager(self) -> DarwinSymbolT_co:
        if self._bluetooth_manager is None:
            bluetooth_manager_class = await self._client.symbols.objc_getClass("BluetoothManager")
            if not await self._client.getenv(self._ENV_QUEUE_SET):
                await bluetooth_manager_class.objc_call(
                    "setSharedInstanceQueue:", await self._client.symbols.dispatch_queue_create(0, 0)
                )
                await self._client.setenv(self._ENV_QUEUE_SET, "1")
            self._bluetooth_manager = await bluetooth_manager_class.objc_call("sharedInstance")

        return self._bluetooth_manager

    async def is_on(self) -> bool:
        return await (await self._get_bluetooth_manager()).objc_call("enabled") == 1

    async def turn_on(self) -> None:
        await self._set(is_on=1)

    async def turn_off(self) -> None:
        await self._set(is_on=0)

    async def address(self) -> str | None:
        addr = await (await (await self._get_bluetooth_manager()).objc_call("localAddress")).py()
        assert addr is None or isinstance(addr, str)

    async def connected(self) -> bool:
        return bool(await (await self._get_bluetooth_manager()).objc_call("connected"))

    async def get_discoverable(self) -> bool:
        return bool(await (await self._get_bluetooth_manager()).objc_call("isDiscoverable"))

    async def set_discoverable(self, value: bool) -> None:
        await (await self._get_bluetooth_manager()).objc_call("setDiscoverable:", value)

    async def _set(self, is_on) -> None:
        await (await self._get_bluetooth_manager()).objc_call("setPowered:", is_on)
        await (await self._get_bluetooth_manager()).objc_call("setEnabled:", is_on)

    def __repr__(self):
        return f"<{type(self).__name__} (async)>"

XPC

rpcclient.clients.darwin.subsystems.xpc

XPCObject

Bases: AbstractSymbol, ClientBound['DarwinClient[DarwinSymbolT_co]'], Generic[DarwinSymbolT_co]

Source code in src/rpcclient/rpcclient/clients/darwin/subsystems/xpc.py
class XPCObject(AbstractSymbol, ClientBound["DarwinClient[DarwinSymbolT_co]"], Generic[DarwinSymbolT_co]):
    def __init__(self, value: int, client: "DarwinClient[DarwinSymbolT_co]") -> None:
        self._client = client
        self._sym: DarwinSymbolT_co = client.symbol(value)

    def _symbol_from_value(self, value: int) -> DarwinSymbolT_co:
        return self._client.symbol(value)

    async def peek(self, count: int, offset: int = 0) -> bytes:
        return await self._sym.peek(count, offset)

    async def poke(self, buf: bytes, offset: int = 0) -> Any:
        return await self._sym.poke(buf, offset)

    async def peek_str(self, encoding="utf-8") -> str:
        """peek string at given address"""
        return await self._sym.peek_str(encoding=encoding)

    @property
    def arch(self) -> object:
        return self._client.arch

    @property
    def endianness(self) -> str:
        return self._client._endianness

    async def get_dl_info(self) -> Container:
        return await self._sym.get_dl_info()

    async def type(self) -> int:
        return await self._client.symbols.xpc_get_type(self)

peek_str async

peek_str(encoding='utf-8') -> str

peek string at given address

Source code in src/rpcclient/rpcclient/clients/darwin/subsystems/xpc.py
async def peek_str(self, encoding="utf-8") -> str:
    """peek string at given address"""
    return await self._sym.peek_str(encoding=encoding)

XPCArray

Bases: XPCObject[DarwinSymbolT_co]

Source code in src/rpcclient/rpcclient/clients/darwin/subsystems/xpc.py
class XPCArray(XPCObject[DarwinSymbolT_co]):
    async def set_data(self, buf: bytes, index: int = XPC_ARRAY_APPEND) -> None:
        """
        See https://developer.apple.com/documentation/xpc/1505937-xpc_array_set_data?language=objc
        """
        await self._client.symbols.xpc_array_set_data(self, index, buf, len(buf))

set_data async

set_data(buf: bytes, index: int = XPC_ARRAY_APPEND) -> None

See https://developer.apple.com/documentation/xpc/1505937-xpc_array_set_data?language=objc

Source code in src/rpcclient/rpcclient/clients/darwin/subsystems/xpc.py
async def set_data(self, buf: bytes, index: int = XPC_ARRAY_APPEND) -> None:
    """
    See https://developer.apple.com/documentation/xpc/1505937-xpc_array_set_data?language=objc
    """
    await self._client.symbols.xpc_array_set_data(self, index, buf, len(buf))

Xpc

Bases: ClientBound['DarwinClient[DarwinSymbolT_co]'], Generic[DarwinSymbolT_co]

Source code in src/rpcclient/rpcclient/clients/darwin/subsystems/xpc.py
class Xpc(ClientBound["DarwinClient[DarwinSymbolT_co]"], Generic[DarwinSymbolT_co]):
    def __init__(self, client: "DarwinClient[DarwinSymbolT_co]") -> None:
        """
        :param rpcclient.darwin.client.DarwinClient client:
        """
        self._client = client
        self._client.load_framework_lazy("DuetActivityScheduler")
        self._service_cache: dict[str, DarwinSymbolT_co] = {}

    async def sharedScheduler(self) -> DarwinSymbolT_co:
        return await (await self._client.symbols.objc_getClass("_DASScheduler")).objc_call("sharedScheduler")

    async def create_xpc_dictionary(self) -> XPCDictionary:
        return XPCDictionary(await self._client.symbols.xpc_dictionary_create(0, 0, 0), self._client)

    async def create_xpc_array(self) -> XPCArray:
        return XPCArray(await self._client.symbols.xpc_array_create_empty(), self._client)

    async def send_xpc_dictionary(self, service_name: str, message: XPCDictionary) -> XPCDictionary:
        """
        Send a native XPC dictionary to a XPC service synchronously and return result.

        :param service_name: mach service name
        :param message: xpc message to send
        :return: received response
        """
        return XPCDictionary(await self.send_message_raw(service_name, message), self._client)

    async def send_message_using_cf_serialization(
        self, service_name: str, message: CfSerializable, decode_cf: bool = True
    ) -> CfSerializable:
        """
        Send a CFObject serialized over an XPC object to a XPC service synchronously and return reply.

        :param service_name: mach service name
        :param message: xpc message to send
        :param decode_cf: should response be decoded as CFObject-over-XPCObject or a native XPC object
        :return: received response
        """
        message_raw = await self.encode_xpc_message_using_cf_serialization(message)
        if message_raw == 0:
            raise RpcXpcSerializationError
        response = await self.send_message_raw(service_name, message_raw)
        if response == 0:
            raise RpcXpcSerializationError
        return (
            await self.decode_xpc_message_using_cf_serialization(response)
            if decode_cf
            else await self.decode_xpc_object_using_cf_serialization(response)
        )

    async def send_object_using_cf_serialization(self, service_name: str, message: CfSerializable) -> CfSerializable:
        """
        Send a native XPC object serialized over a CFObject to a XPC service synchronously and return reply.

        :param service_name: mach service name
        :param message: xpc message to send
        :return: received response
        """
        message_raw = await self.encode_xpc_object_using_cf_serialization(message)
        if message_raw == 0:
            raise RpcXpcSerializationError()
        response = await self.send_message_raw(service_name, message_raw)
        if response == 0:
            raise RpcXpcSerializationError()
        return await self.decode_xpc_object_using_cf_serialization(response)

    async def decode_xpc_object_using_cf_serialization(self, address: int) -> CfSerializable:
        """
        Convert XPC object to python object using CF serialization.
        """
        return await (await self._client.symbols._CFXPCCreateCFObjectFromXPCObject(address)).py()

    async def encode_xpc_object_using_cf_serialization(self, obj: CfSerializable) -> DarwinSymbolT_co:
        """
        Convert python object to XPC object using CF conversion.
        """
        return await self._client.symbols._CFXPCCreateXPCObjectFromCFObject(await self._client.cf(obj))

    async def decode_xpc_message_using_cf_serialization(self, address: int) -> CfSerializable:
        """
        Convert a CFObject serialized over an XPCObject to python object
        """
        return await (await self._client.symbols._CFXPCCreateCFObjectFromXPCMessage(address)).py()

    async def encode_xpc_message_using_cf_serialization(self, obj: CfSerializable) -> DarwinSymbolT_co:
        """
        Convert python object to a CFObject serialized over an XPCObject
        """
        return await self._client.symbols._CFXPCCreateXPCMessageWithCFObject(await self._client.cf(obj))

    async def send_message_raw(self, service_name: str, message_raw: int) -> DarwinSymbolT_co:
        """Send a RAW xpc object to given service_name and wait reply."""
        conn = await self._connect_to_mach_service(service_name)
        return await self._client.symbols.xpc_connection_send_message_with_reply_sync(conn, message_raw)

    async def force_run_activities(self, activities: list[str]) -> None:
        await (await type(self).sharedScheduler(self)).objc_call(
            "forceRunActivities:", await self._client.cf(activities)
        )

    async def loaded_activities(self) -> dict:
        return await self._client.preferences.cf.get_dict("com.apple.xpc.activity2", "root")

    async def set_activity_base_date(self, name: str, date: datetime) -> None:
        activity_base_dates = (await type(self).loaded_activities(self))["ActivityBaseDates"]
        activity_base_dates[name] = date
        await self._client.preferences.cf.set(
            "ActivityBaseDates", activity_base_dates, "com.apple.xpc.activity2", "root"
        )

    async def _connect_to_mach_service(self, service_name: str) -> DarwinSymbolT_co:
        if service_name not in self._service_cache:
            conn = await self._client.symbols.xpc_connection_create_mach_service(service_name, 0, 0)
            if conn == 0:
                raise BadReturnValueError("failed to create xpc connection")
            await self._client.symbols.xpc_connection_set_event_handler(conn, await self._client.get_dummy_block())
            await self._client.symbols.xpc_connection_resume(conn)
            self._service_cache[service_name] = conn

        return self._service_cache[service_name]

send_xpc_dictionary async

send_xpc_dictionary(service_name: str, message: XPCDictionary) -> XPCDictionary

Send a native XPC dictionary to a XPC service synchronously and return result.

Parameters:

Name Type Description Default
service_name str

mach service name

required
message XPCDictionary

xpc message to send

required

Returns:

Type Description
XPCDictionary

received response

Source code in src/rpcclient/rpcclient/clients/darwin/subsystems/xpc.py
async def send_xpc_dictionary(self, service_name: str, message: XPCDictionary) -> XPCDictionary:
    """
    Send a native XPC dictionary to a XPC service synchronously and return result.

    :param service_name: mach service name
    :param message: xpc message to send
    :return: received response
    """
    return XPCDictionary(await self.send_message_raw(service_name, message), self._client)

send_message_using_cf_serialization async

send_message_using_cf_serialization(service_name: str, message: CfSerializable, decode_cf: bool = True) -> CfSerializable

Send a CFObject serialized over an XPC object to a XPC service synchronously and return reply.

Parameters:

Name Type Description Default
service_name str

mach service name

required
message CfSerializable

xpc message to send

required
decode_cf bool

should response be decoded as CFObject-over-XPCObject or a native XPC object

True

Returns:

Type Description
CfSerializable

received response

Source code in src/rpcclient/rpcclient/clients/darwin/subsystems/xpc.py
async def send_message_using_cf_serialization(
    self, service_name: str, message: CfSerializable, decode_cf: bool = True
) -> CfSerializable:
    """
    Send a CFObject serialized over an XPC object to a XPC service synchronously and return reply.

    :param service_name: mach service name
    :param message: xpc message to send
    :param decode_cf: should response be decoded as CFObject-over-XPCObject or a native XPC object
    :return: received response
    """
    message_raw = await self.encode_xpc_message_using_cf_serialization(message)
    if message_raw == 0:
        raise RpcXpcSerializationError
    response = await self.send_message_raw(service_name, message_raw)
    if response == 0:
        raise RpcXpcSerializationError
    return (
        await self.decode_xpc_message_using_cf_serialization(response)
        if decode_cf
        else await self.decode_xpc_object_using_cf_serialization(response)
    )

send_object_using_cf_serialization async

send_object_using_cf_serialization(service_name: str, message: CfSerializable) -> CfSerializable

Send a native XPC object serialized over a CFObject to a XPC service synchronously and return reply.

Parameters:

Name Type Description Default
service_name str

mach service name

required
message CfSerializable

xpc message to send

required

Returns:

Type Description
CfSerializable

received response

Source code in src/rpcclient/rpcclient/clients/darwin/subsystems/xpc.py
async def send_object_using_cf_serialization(self, service_name: str, message: CfSerializable) -> CfSerializable:
    """
    Send a native XPC object serialized over a CFObject to a XPC service synchronously and return reply.

    :param service_name: mach service name
    :param message: xpc message to send
    :return: received response
    """
    message_raw = await self.encode_xpc_object_using_cf_serialization(message)
    if message_raw == 0:
        raise RpcXpcSerializationError()
    response = await self.send_message_raw(service_name, message_raw)
    if response == 0:
        raise RpcXpcSerializationError()
    return await self.decode_xpc_object_using_cf_serialization(response)

decode_xpc_object_using_cf_serialization async

decode_xpc_object_using_cf_serialization(address: int) -> CfSerializable

Convert XPC object to python object using CF serialization.

Source code in src/rpcclient/rpcclient/clients/darwin/subsystems/xpc.py
async def decode_xpc_object_using_cf_serialization(self, address: int) -> CfSerializable:
    """
    Convert XPC object to python object using CF serialization.
    """
    return await (await self._client.symbols._CFXPCCreateCFObjectFromXPCObject(address)).py()

encode_xpc_object_using_cf_serialization async

encode_xpc_object_using_cf_serialization(obj: CfSerializable) -> DarwinSymbolT_co

Convert python object to XPC object using CF conversion.

Source code in src/rpcclient/rpcclient/clients/darwin/subsystems/xpc.py
async def encode_xpc_object_using_cf_serialization(self, obj: CfSerializable) -> DarwinSymbolT_co:
    """
    Convert python object to XPC object using CF conversion.
    """
    return await self._client.symbols._CFXPCCreateXPCObjectFromCFObject(await self._client.cf(obj))

decode_xpc_message_using_cf_serialization async

decode_xpc_message_using_cf_serialization(address: int) -> CfSerializable

Convert a CFObject serialized over an XPCObject to python object

Source code in src/rpcclient/rpcclient/clients/darwin/subsystems/xpc.py
async def decode_xpc_message_using_cf_serialization(self, address: int) -> CfSerializable:
    """
    Convert a CFObject serialized over an XPCObject to python object
    """
    return await (await self._client.symbols._CFXPCCreateCFObjectFromXPCMessage(address)).py()

encode_xpc_message_using_cf_serialization async

encode_xpc_message_using_cf_serialization(obj: CfSerializable) -> DarwinSymbolT_co

Convert python object to a CFObject serialized over an XPCObject

Source code in src/rpcclient/rpcclient/clients/darwin/subsystems/xpc.py
async def encode_xpc_message_using_cf_serialization(self, obj: CfSerializable) -> DarwinSymbolT_co:
    """
    Convert python object to a CFObject serialized over an XPCObject
    """
    return await self._client.symbols._CFXPCCreateXPCMessageWithCFObject(await self._client.cf(obj))

send_message_raw async

send_message_raw(service_name: str, message_raw: int) -> DarwinSymbolT_co

Send a RAW xpc object to given service_name and wait reply.

Source code in src/rpcclient/rpcclient/clients/darwin/subsystems/xpc.py
async def send_message_raw(self, service_name: str, message_raw: int) -> DarwinSymbolT_co:
    """Send a RAW xpc object to given service_name and wait reply."""
    conn = await self._connect_to_mach_service(service_name)
    return await self._client.symbols.xpc_connection_send_message_with_reply_sync(conn, message_raw)

Objective-C

rpcclient.clients.darwin.objective_c.objc

Method dataclass

Bases: Generic[DarwinSymbolT_co]

Source code in src/rpcclient/rpcclient/clients/darwin/objective_c/objc.py
@dataclass
class Method(Generic[DarwinSymbolT_co]):
    name: str
    client: "DarwinClient[DarwinSymbolT_co]" = field(compare=False)
    address: int = field(compare=False)
    imp: int = field(compare=False)
    type_: str = field(compare=False)
    return_type: str = field(compare=False)
    is_class: bool = field(compare=False)
    args_types: list = field(compare=False)

    @staticmethod
    def from_data(data: dict, client: "DarwinClient[DarwinSymbolT]") -> "Method[DarwinSymbolT]":
        """
        Create Method object from raw data.
        :param data: Data as loaded from get_objectivec_symbol_data.m.
        :param rpcclient.darwin.client.DarwinClient client: Darwin client.
        """
        return Method(
            name=data["name"],
            client=client,
            address=client.symbol(data["address"]),
            imp=client.symbol(data["imp"]),
            type_=data["type"],
            return_type=decode_type(data["return_type"]),
            is_class=data["is_class"],
            args_types=list(map(decode_type, data["args_types"])),
        )

    async def set_implementation(self, new_imp: int) -> None:
        await self.client.symbols.method_setImplementation(self.address, new_imp)
        self.imp = self.client.symbol(new_imp)

    async def always_return(self, value: bool) -> None:
        """
        Patch the method to always return the given value.
        """
        # Exported from rpcserver binary
        ret_val = await self.client.symbols["get_true" if value else "get_false"].resolve()
        await self.set_implementation(ret_val)

    def __str__(self) -> str:
        if ":" in self.name:
            args_names = self.name.split(":")
            name = " ".join(["{}:({})".format(*arg) for arg in zip(args_names, self.args_types[2:], strict=False)])
        else:
            name = self.name
        prefix = "+" if self.is_class else "-"
        return f"{prefix} {name}; // 0x{self.address:x} (returns: {self.return_type})\n"

from_data staticmethod

from_data(data: dict, client: DarwinClient[DarwinSymbolT]) -> Method[DarwinSymbolT]

Create Method object from raw data.

Parameters:

Name Type Description Default
data dict

Data as loaded from get_objectivec_symbol_data.m.

required
client DarwinClient

Darwin client.

required
Source code in src/rpcclient/rpcclient/clients/darwin/objective_c/objc.py
@staticmethod
def from_data(data: dict, client: "DarwinClient[DarwinSymbolT]") -> "Method[DarwinSymbolT]":
    """
    Create Method object from raw data.
    :param data: Data as loaded from get_objectivec_symbol_data.m.
    :param rpcclient.darwin.client.DarwinClient client: Darwin client.
    """
    return Method(
        name=data["name"],
        client=client,
        address=client.symbol(data["address"]),
        imp=client.symbol(data["imp"]),
        type_=data["type"],
        return_type=decode_type(data["return_type"]),
        is_class=data["is_class"],
        args_types=list(map(decode_type, data["args_types"])),
    )

always_return async

always_return(value: bool) -> None

Patch the method to always return the given value.

Source code in src/rpcclient/rpcclient/clients/darwin/objective_c/objc.py
async def always_return(self, value: bool) -> None:
    """
    Patch the method to always return the given value.
    """
    # Exported from rpcserver binary
    ret_val = await self.client.symbols["get_true" if value else "get_false"].resolve()
    await self.set_implementation(ret_val)