Skip to content

Core client

CoreClient

rpcclient.core.client.CoreClient

Bases: Generic[SymbolT_co], ABC

Main client interface to access the remote rpcserver

Source code in src/rpcclient/rpcclient/core/client.py
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
class CoreClient(Generic[SymbolT_co], abc.ABC):
    """Main client interface to access the remote rpcserver"""

    DEFAULT_ARGV: ClassVar[list[str]] = ["/bin/sh"]
    DEFAULT_ENVP: ClassVar[list[str]] = []

    def __init__(self, bridge: RpcBridge, dlsym_global_handle: int = RTLD_NEXT) -> None:
        self._bridge: RpcBridge = bridge
        self._old_settings = None
        self._endianness: Literal["<", ">"] = "<"
        self._dlsym_global_handle: int = dlsym_global_handle
        self._logger: logging.Logger = logging.getLogger(self.__module__)
        self.notifier: EventNotifier = EventNotifier()
        self.pre_rpc_call_hooks: list[Callable[[], Coroutine[Any, Any, object]]] = []
        self._protocol_lock: asyncio.Lock = asyncio.Lock()

    @asynccontextmanager
    async def _acquire_protocol_lock(self) -> AsyncGenerator[None]:
        async with self._protocol_lock:
            yield

    @cached_property
    def symbols(self) -> SymbolsJar[SymbolT_co]:
        return SymbolsJar(self)

    @cached_property
    def null(self) -> SymbolT_co:
        return self.symbol(0)

    @classmethod
    async def create(cls, bridge: RpcBridge) -> Self:
        return cls(bridge)

    @subsystem
    @abc.abstractmethod
    def fs(self) -> Fs[Self]: ...

    @subsystem
    def processes(self) -> Processes[Self]:
        return Processes(self)

    @subsystem
    def network(self) -> Network[Self]:
        return Network(self)

    @subsystem
    def lief(self) -> Lief[Self]:
        return Lief(self)

    @subsystem
    def sysctl(self) -> Sysctl[Self]:
        return Sysctl(self)

    async def info(self) -> None:
        """print information about the current target"""
        uname = await self.get_uname()
        print("sysname:", uname.sysname)
        print("nodename:", uname.nodename)
        print("release:", uname.release)
        print("version:", uname.version)
        print("machine:", uname.machine)
        print(f"uid: {(await self.symbols.getuid()):d}")
        print(f"gid: {(await self.symbols.getgid()):d}")
        print(f"pid: {await self.get_pid():d}")
        print(f"ppid: {(await self.symbols.getppid()):d}")
        print(f"progname: {await self.get_progname()}")

    _cached_progname: str | None = None

    async def get_progname(self) -> str:
        """get the program name from remote"""
        if self._cached_progname is None:
            # bugfix https://github.com/doronz88/rpc-project/issues/405
            # Default Linux libraries don't expose `getprogname()`, so instead we use the `__progname` global symbol.
            # Tested on both macOS and Ubuntu.
            self._cached_progname = await (await self.symbols["__progname"].getindex(0)).peek_str()

        return self._cached_progname

    async def progname(self) -> str:
        """get the program name from remote"""
        return await self.get_progname()

    @abc.abstractmethod
    async def get_uname(self) -> Container:
        """get the utsname struct from remote"""

    async def uname(self) -> Container:
        """get the utsname struct from remote"""
        return await self.get_uname()

    @property
    def id(self) -> int:
        return self._bridge.client_id

    @property
    def platform(self) -> str:
        return self._bridge.platform

    @property
    def sysname(self) -> str:
        return self._bridge.sysname

    @property
    def arch(self) -> int:
        return self._bridge.arch

    async def rpc_call(self, msg_id: int, **kwargs: Any) -> Any:
        # Pop all hooks here, to prevent hooks from running out of order due to recursion.
        hooks, self.pre_rpc_call_hooks[:] = self.pre_rpc_call_hooks[::-1], []
        try:
            while hooks:
                await hooks.pop()()
        finally:
            # In case of failure, re-insert the remaining hooks.
            if hooks:
                self.pre_rpc_call_hooks[:0] = hooks

        try:
            return await self._bridge.rpc_call(msg_id, **kwargs)
        except ConnectionError:
            self.notifier.notify(ClientEvent.TERMINATED, self.id)
            raise
        except ServerResponseError:
            raise

    async def dlopen(self, filename: str, mode: int) -> SymbolT_co:
        """Load a shared library on the remote host and return its handle."""
        return self.symbol((await self.rpc_call(MsgId.REQ_DLOPEN, filename=filename, mode=mode)).handle)

    async def dlclose(self, lib: int) -> int:
        """Close a previously opened remote library handle."""
        return (await self.rpc_call(MsgId.REQ_DLCLOSE, handle=ctypes.c_uint64(lib).value)).res

    async def dlsym(self, lib: int, symbol_name: str) -> int:
        """Resolve a symbol name in a remote library handle and return its address.

        Runs and pops any hooks in `self.pre_dlsym_hooks` first.
        """
        return (await self.rpc_call(MsgId.REQ_DLSYM, handle=ctypes.c_uint64(lib).value, symbol_name=symbol_name)).ptr

    @null_pointer_guard
    async def call(
        self,
        address: int,
        argv: Iterable[RemoteCallArg] = (),
        return_float64: bool = False,
        return_float32: bool = False,
        return_raw: bool = False,
        va_list_index: int | None = None,
    ) -> float | SymbolT_co | Any:
        """call a remote function and retrieve its return value as a Symbol object"""
        if va_list_index is None:
            va_list_index = 0xFFFF

        args: list[Argument] = []
        for arg in argv:
            if isinstance(arg, LazySymbol):
                arg = int(await arg.resolve())

            if isinstance(arg, float):
                args.append(Argument(v_double=arg))
            elif isinstance(arg, str):
                args.append(Argument(v_str=arg))
            elif isinstance(arg, int):
                args.append(Argument(v_int=ctypes.c_uint64(arg).value))
            elif isinstance(arg, bytes):
                args.append(Argument(v_bytes=arg))
            elif isinstance(arg, Enum):
                args.append(Argument(v_int=ctypes.c_uint64(arg.value).value))
            elif isinstance(arg, PurePath):
                args.append(Argument(v_str=str(arg)))
            else:
                assert_never(arg)
                raise ArgumentError(f"Can't serialize object of type {type(arg).__name__}")

        ret = await self.rpc_call(MsgId.REQ_CALL, address=address, va_list_index=va_list_index, argv=args)
        if ret.HasField("arm_registers"):
            d0 = ret.arm_registers.d0
            if return_float32:
                return ctypes.c_float(d0).value
            if return_float64:
                return d0
            if return_raw:
                return ret.arm_registers
            return self.symbol(ret.arm_registers.x0)
        return self.symbol(ret.return_value)

    @null_pointer_guard
    async def peek(self, address: int, size: int) -> bytes:
        """peek data at the given address"""
        try:
            return (await self.rpc_call(MsgId.REQ_PEEK, address=address, size=size)).data
        except ServerResponseError as e:
            raise ArgumentError() from e

    @null_pointer_guard
    async def poke(self, address: int, data: bytes) -> Any:
        """poke data at a given address"""
        try:
            return await self.rpc_call(MsgId.REQ_POKE, address=address, data=data)
        except ServerResponseError as e:
            raise ArgumentError() from e

    async def get_dummy_block(self) -> SymbolT_co:
        """Get an address for a stub block containing nothing"""
        block_size = block_literal.sizeof()
        desc_size = block_descriptor.sizeof()

        descriptor = await self.symbols.malloc(desc_size)
        await descriptor.poke(block_descriptor.build({"reserved": 0, "size": block_size}))

        block = await self.symbols.malloc(block_size)
        await block.poke(
            block_literal.build({
                "isa": await self.symbols._NSConcreteGlobalBlock.resolve(),
                "flags": BLOCK_IS_GLOBAL,
                "reserved": 0,
                "invoke": await self.symbols.getpid.resolve(),
                "descriptor": descriptor,
            }),
        )

        return block

    async def listdir(self, path: str | PurePath) -> list[ProtocolDirent]:
        """get an address for a stub block containing nothing"""
        entries: list[ProtocolDirent] = []
        try:
            ret = await self.rpc_call(MsgId.REQ_LIST_DIR, path=str(path))
        except ServerResponseError:
            await self.raise_errno_exception(f"failed to listdir: {path}")

        for entry in ret.dir_entries:
            lstat = ProtocolDitentStat(
                errno=entry.lstat.errno1,
                st_blocks=entry.lstat.st_blocks,
                st_blksize=entry.lstat.st_blksize,
                st_atime=entry.lstat.st_atime1,
                st_ctime=entry.lstat.st_ctime1,
                st_mtime=entry.lstat.st_mtime1,
                st_nlink=entry.lstat.st_nlink,
                st_mode=entry.lstat.st_mode,
                st_rdev=entry.lstat.st_rdev,
                st_size=entry.lstat.st_size,
                st_dev=entry.lstat.st_dev,
                st_gid=entry.lstat.st_gid,
                st_ino=entry.lstat.st_ino,
                st_uid=entry.lstat.st_uid,
            )
            stat = ProtocolDitentStat(
                errno=entry.stat.errno1,
                st_blocks=entry.stat.st_blocks,
                st_blksize=entry.stat.st_blksize,
                st_atime=entry.stat.st_atime1,
                st_ctime=entry.stat.st_ctime1,
                st_mtime=entry.stat.st_mtime1,
                st_nlink=entry.stat.st_nlink,
                st_mode=entry.stat.st_mode,
                st_rdev=entry.stat.st_rdev,
                st_size=entry.stat.st_size,
                st_dev=entry.stat.st_dev,
                st_gid=entry.stat.st_gid,
                st_ino=entry.stat.st_ino,
                st_uid=entry.stat.st_uid,
            )
            entries.append(
                ProtocolDirent(
                    d_inode=entry.lstat.st_ino,
                    d_type=entry.d_type,
                    d_name=entry.d_name,
                    lstat=lstat,
                    stat=stat,
                )
            )
        return entries

    async def spawn(
        self,
        argv: list[str] | None = None,
        envp: list[str] | None = None,
        stdin: StrOrIO = sys.stdin,
        stdout=sys.stdout,
        raw_tty=False,
        background=False,
    ) -> SpawnResult:
        """
        spawn a new process and forward its stdin, stdout & stderr

        :param argv: argv of the process to be executed
        :param envp: envp of the process to be executed
        :param stdin: either a file object to read from OR a string
        :param stdout: a file object to write both stdout and stderr to. None if background is requested
        :param raw_tty: should enable raw tty mode
        :param background: should execute process in background
        :return: a SpawnResult. error is None if background is requested
        """
        if argv is None:
            argv = self.DEFAULT_ARGV

        if envp is None:
            envp = self.DEFAULT_ENVP

        async with self._acquire_protocol_lock():
            pid = await self._execute(argv, envp, background=background)
            self._logger.info(f"shell process started as pid: {pid}")

            if background:
                return SpawnResult(error=None, pid=pid, stdout=None)

            if raw_tty:
                self._prepare_terminal()
            try:
                error = await self.enter_pty_mode(stdin, stdout)
            except Exception:
                # this is important to really catch every exception here, even exceptions not inheriting from Exception
                # so the controlling terminal will remain working with its previous settings
                if raw_tty:
                    self._restore_terminal()
                raise

        return SpawnResult(error=error, pid=pid, stdout=stdout)

    @abc.abstractmethod
    def symbol(self, symbol: int) -> SymbolT_co:
        """Get a symbol object from a given address"""

    async def get_errno(self) -> int:
        return await self.symbols.errno.getindex(0)

    async def set_errno(self, value: int) -> None:
        await self.symbols.errno.setindex(0, value)

    async def get_last_error(self) -> str:
        """get info about the last occurred error"""
        if not (errno := await self.get_errno()):
            return ""
        err_str_ptr = await self.symbols.strerror(errno)
        err_str = await err_str_ptr.peek_str()
        return f"[{errno}] {err_str}"

    async def environ(self) -> list[str]:
        result = []
        environ = await self.symbols.environ.getindex(0)
        i = 0
        while var_ptr := await environ.getindex(i):
            result.append(await var_ptr.peek_str())
            i += 1
        return result

    async def setenv(self, name: str, value: str) -> None:
        """set process environment variable"""
        await (await self.symbols.setenv.resolve()).call(name, value)

    async def getenv(self, name: str) -> str | None:
        """get process environment variable"""
        value = await self.symbols.getenv(name)
        if not isinstance(value, Symbol) or not value:
            return None
        return await value.peek_str()

    _cached_pid: int | None = None

    async def get_pid(self) -> int:
        if self._cached_pid is None:
            self._cached_pid = int(await self.symbols.getpid())

        return self._cached_pid

    async def pid(self) -> int:
        return await self.get_pid()

    @asynccontextmanager
    async def safe_calloc(self, size: int) -> AsyncGenerator[SymbolT_co]:
        async with self.safe_malloc(size) as x:
            await x.poke(b"\x00" * size)
            yield x

    @asynccontextmanager
    async def safe_malloc(self, size: int) -> AsyncGenerator[SymbolT_co]:
        ptr = cast(SymbolT_co, await self.symbols.malloc(size))
        async with self.freeing(ptr) as x:
            yield x

    @asynccontextmanager
    async def freeing(self, symbol: SymbolT) -> AsyncGenerator[SymbolT]:
        try:
            yield symbol
        finally:
            if symbol:
                await self.symbols.free(symbol)

    async def close(self) -> None:
        try:
            await self.rpc_call(MsgId.REQ_CLOSE_CLIENT)
        finally:
            self.notifier.notify(ClientEvent.TERMINATED, self.id)
            self._bridge.close()

    async def _execute(self, argv: list[str], envp: list[str], background=False) -> int:
        try:
            return (await self.rpc_call(MsgId.REQ_EXEC, background=background, argv=argv, envp=envp)).pid
        except ServerResponseError as e:
            raise SpawnError(f"failed to spawn: {argv}") from e

    async def enter_pty_mode(self, stdin=sys.stdin, stdout=sys.stdout):
        # the socket must be non-blocking for using select()
        sock = self._bridge.sock.raw_socket
        blocking = sock.getblocking()
        sock.setblocking(False)
        exit_code = None
        try:
            fds = []
            if hasattr(stdin, "fileno"):
                fds.append(stdin)
            else:
                data = stdin
                if isinstance(data, str):
                    data = data.encode()
                sock.sendall(cast(Buffer, data))
            fds.append(sock)

            running = True
            while running:
                rlist, _, _ = select(fds, [], [])
                for fd in rlist:
                    if fd == stdin or (stdin is sys.stdin and fd == sys.stdin):
                        if stdin is sys.stdin:
                            buf = os.read(stdin.fileno(), ProtocolConstants.RPC_PTY_BUFFER_SIZE)
                        else:
                            buf = stdin.read(ProtocolConstants.RPC_PTY_BUFFER_SIZE)
                            if isinstance(buf, str):
                                buf = buf.encode()
                        if buf:
                            sock.sendall(buf)
                    elif fd == sock:
                        try:
                            response = await self._bridge.sock.rpc_msg_recv_pty()
                        except ConnectionResetError:
                            print("Bye. 👋")
                            running = False
                            break
                        msg_type = response.WhichOneof("type")
                        if msg_type == "buffer":
                            stdout.write(response.buffer.decode())
                            if hasattr(stdout, "flush"):
                                stdout.flush()
                        elif msg_type == "exit_code":
                            exit_code = response.exit_code
                            running = False
                            break
        finally:
            sock.setblocking(blocking)
        return exit_code

    def _restore_terminal(self) -> None:
        if not tty_support:
            return
        assert self._old_settings is not None
        termios.tcsetattr(sys.stdin.fileno(), termios.TCSADRAIN, self._old_settings)  # pyright: ignore[reportPossiblyUnboundVariable]

    def _prepare_terminal(self):
        if not tty_support:
            return
        fd = sys.stdin
        self._old_settings = termios.tcgetattr(fd)  # pyright: ignore[reportPossiblyUnboundVariable]
        tty.setraw(fd)  # pyright: ignore[reportPossiblyUnboundVariable]

    async def raise_errno_exception(self, message: str):
        message += f" ({await self.get_last_error()})"
        exceptions = {
            EPERM: RpcPermissionError,
            ENOENT: RpcFileNotFoundError,
            EEXIST: RpcFileExistsError,
            EISDIR: RpcIsADirectoryError,
            ENOTDIR: RpcNotADirectoryError,
            EPIPE: RpcBrokenPipeError,
            ENOTEMPTY: RpcNotEmptyError,
            EAGAIN: RpcResourceTemporarilyUnavailableError,
            ECONNREFUSED: RpcConnectionRefusedError,
        }
        exception = exceptions.get(await self.get_errno())
        if exception:
            raise exception(message)
        raise BadReturnValueError(message)

    def capture_fd(self, fd: int, sock_buf_size: int | None = None) -> CaptureFD[SymbolT_co]:
        """
        Get a context manager, capturing output to `fd`. Read from it using the `read()` method

        sock_buf_size is required for captures above 6KB, as any write above this value would block until a read is performed.

        :param fd: FD to capture
        :param sock_buf_size: Buffer size for the capture socket, if not specified, default value is used.
        :return: CaptureFD object
        """
        return CaptureFD(self, fd, sock_buf_size)

    async def last_error(self) -> str:
        return await self.get_last_error()

    def shell(self) -> None:
        from xonsh.built_ins import XSH
        from xonsh.main import main as xonsh_main

        self._logger.disabled = True

        args = ["--rc"]
        args.append(str((Path(__file__).parent / "xonshrc.py").absolute()))

        XSH.ctx["_client_to_reuse"] = self

        try:
            logging.getLogger("parso.python.diff").disabled = True
            logging.getLogger("parso.cache").disabled = True
            logging.getLogger("asyncio").disabled = True
            xonsh_main(args)
        except SystemExit:
            self._logger.disabled = False

    async def __aenter__(self) -> Self:
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
        await self.close()

    def __repr__(self) -> str:
        pid = self._cached_pid if self._cached_pid is not None else "?"
        progname = self._cached_progname if self._cached_progname is not None else "?"
        return f"<{self.__class__.__name__}: {pid} | {progname}>"

info async

info() -> None

print information about the current target

Source code in src/rpcclient/rpcclient/core/client.py
async def info(self) -> None:
    """print information about the current target"""
    uname = await self.get_uname()
    print("sysname:", uname.sysname)
    print("nodename:", uname.nodename)
    print("release:", uname.release)
    print("version:", uname.version)
    print("machine:", uname.machine)
    print(f"uid: {(await self.symbols.getuid()):d}")
    print(f"gid: {(await self.symbols.getgid()):d}")
    print(f"pid: {await self.get_pid():d}")
    print(f"ppid: {(await self.symbols.getppid()):d}")
    print(f"progname: {await self.get_progname()}")

get_progname async

get_progname() -> str

get the program name from remote

Source code in src/rpcclient/rpcclient/core/client.py
async def get_progname(self) -> str:
    """get the program name from remote"""
    if self._cached_progname is None:
        # bugfix https://github.com/doronz88/rpc-project/issues/405
        # Default Linux libraries don't expose `getprogname()`, so instead we use the `__progname` global symbol.
        # Tested on both macOS and Ubuntu.
        self._cached_progname = await (await self.symbols["__progname"].getindex(0)).peek_str()

    return self._cached_progname

progname async

progname() -> str

get the program name from remote

Source code in src/rpcclient/rpcclient/core/client.py
async def progname(self) -> str:
    """get the program name from remote"""
    return await self.get_progname()

get_uname abstractmethod async

get_uname() -> Container

get the utsname struct from remote

Source code in src/rpcclient/rpcclient/core/client.py
@abc.abstractmethod
async def get_uname(self) -> Container:
    """get the utsname struct from remote"""

uname async

uname() -> Container

get the utsname struct from remote

Source code in src/rpcclient/rpcclient/core/client.py
async def uname(self) -> Container:
    """get the utsname struct from remote"""
    return await self.get_uname()

dlopen async

dlopen(filename: str, mode: int) -> SymbolT_co

Load a shared library on the remote host and return its handle.

Source code in src/rpcclient/rpcclient/core/client.py
async def dlopen(self, filename: str, mode: int) -> SymbolT_co:
    """Load a shared library on the remote host and return its handle."""
    return self.symbol((await self.rpc_call(MsgId.REQ_DLOPEN, filename=filename, mode=mode)).handle)

dlclose async

dlclose(lib: int) -> int

Close a previously opened remote library handle.

Source code in src/rpcclient/rpcclient/core/client.py
async def dlclose(self, lib: int) -> int:
    """Close a previously opened remote library handle."""
    return (await self.rpc_call(MsgId.REQ_DLCLOSE, handle=ctypes.c_uint64(lib).value)).res

dlsym async

dlsym(lib: int, symbol_name: str) -> int

Resolve a symbol name in a remote library handle and return its address.

Runs and pops any hooks in self.pre_dlsym_hooks first.

Source code in src/rpcclient/rpcclient/core/client.py
async def dlsym(self, lib: int, symbol_name: str) -> int:
    """Resolve a symbol name in a remote library handle and return its address.

    Runs and pops any hooks in `self.pre_dlsym_hooks` first.
    """
    return (await self.rpc_call(MsgId.REQ_DLSYM, handle=ctypes.c_uint64(lib).value, symbol_name=symbol_name)).ptr

call async

call(address: int, argv: Iterable[RemoteCallArg] = (), return_float64: bool = False, return_float32: bool = False, return_raw: bool = False, va_list_index: int | None = None) -> float | SymbolT_co | Any

call a remote function and retrieve its return value as a Symbol object

Source code in src/rpcclient/rpcclient/core/client.py
@null_pointer_guard
async def call(
    self,
    address: int,
    argv: Iterable[RemoteCallArg] = (),
    return_float64: bool = False,
    return_float32: bool = False,
    return_raw: bool = False,
    va_list_index: int | None = None,
) -> float | SymbolT_co | Any:
    """call a remote function and retrieve its return value as a Symbol object"""
    if va_list_index is None:
        va_list_index = 0xFFFF

    args: list[Argument] = []
    for arg in argv:
        if isinstance(arg, LazySymbol):
            arg = int(await arg.resolve())

        if isinstance(arg, float):
            args.append(Argument(v_double=arg))
        elif isinstance(arg, str):
            args.append(Argument(v_str=arg))
        elif isinstance(arg, int):
            args.append(Argument(v_int=ctypes.c_uint64(arg).value))
        elif isinstance(arg, bytes):
            args.append(Argument(v_bytes=arg))
        elif isinstance(arg, Enum):
            args.append(Argument(v_int=ctypes.c_uint64(arg.value).value))
        elif isinstance(arg, PurePath):
            args.append(Argument(v_str=str(arg)))
        else:
            assert_never(arg)
            raise ArgumentError(f"Can't serialize object of type {type(arg).__name__}")

    ret = await self.rpc_call(MsgId.REQ_CALL, address=address, va_list_index=va_list_index, argv=args)
    if ret.HasField("arm_registers"):
        d0 = ret.arm_registers.d0
        if return_float32:
            return ctypes.c_float(d0).value
        if return_float64:
            return d0
        if return_raw:
            return ret.arm_registers
        return self.symbol(ret.arm_registers.x0)
    return self.symbol(ret.return_value)

peek async

peek(address: int, size: int) -> bytes

peek data at the given address

Source code in src/rpcclient/rpcclient/core/client.py
@null_pointer_guard
async def peek(self, address: int, size: int) -> bytes:
    """peek data at the given address"""
    try:
        return (await self.rpc_call(MsgId.REQ_PEEK, address=address, size=size)).data
    except ServerResponseError as e:
        raise ArgumentError() from e

poke async

poke(address: int, data: bytes) -> Any

poke data at a given address

Source code in src/rpcclient/rpcclient/core/client.py
@null_pointer_guard
async def poke(self, address: int, data: bytes) -> Any:
    """poke data at a given address"""
    try:
        return await self.rpc_call(MsgId.REQ_POKE, address=address, data=data)
    except ServerResponseError as e:
        raise ArgumentError() from e

get_dummy_block async

get_dummy_block() -> SymbolT_co

Get an address for a stub block containing nothing

Source code in src/rpcclient/rpcclient/core/client.py
async def get_dummy_block(self) -> SymbolT_co:
    """Get an address for a stub block containing nothing"""
    block_size = block_literal.sizeof()
    desc_size = block_descriptor.sizeof()

    descriptor = await self.symbols.malloc(desc_size)
    await descriptor.poke(block_descriptor.build({"reserved": 0, "size": block_size}))

    block = await self.symbols.malloc(block_size)
    await block.poke(
        block_literal.build({
            "isa": await self.symbols._NSConcreteGlobalBlock.resolve(),
            "flags": BLOCK_IS_GLOBAL,
            "reserved": 0,
            "invoke": await self.symbols.getpid.resolve(),
            "descriptor": descriptor,
        }),
    )

    return block

listdir async

listdir(path: str | PurePath) -> list[ProtocolDirent]

get an address for a stub block containing nothing

Source code in src/rpcclient/rpcclient/core/client.py
async def listdir(self, path: str | PurePath) -> list[ProtocolDirent]:
    """get an address for a stub block containing nothing"""
    entries: list[ProtocolDirent] = []
    try:
        ret = await self.rpc_call(MsgId.REQ_LIST_DIR, path=str(path))
    except ServerResponseError:
        await self.raise_errno_exception(f"failed to listdir: {path}")

    for entry in ret.dir_entries:
        lstat = ProtocolDitentStat(
            errno=entry.lstat.errno1,
            st_blocks=entry.lstat.st_blocks,
            st_blksize=entry.lstat.st_blksize,
            st_atime=entry.lstat.st_atime1,
            st_ctime=entry.lstat.st_ctime1,
            st_mtime=entry.lstat.st_mtime1,
            st_nlink=entry.lstat.st_nlink,
            st_mode=entry.lstat.st_mode,
            st_rdev=entry.lstat.st_rdev,
            st_size=entry.lstat.st_size,
            st_dev=entry.lstat.st_dev,
            st_gid=entry.lstat.st_gid,
            st_ino=entry.lstat.st_ino,
            st_uid=entry.lstat.st_uid,
        )
        stat = ProtocolDitentStat(
            errno=entry.stat.errno1,
            st_blocks=entry.stat.st_blocks,
            st_blksize=entry.stat.st_blksize,
            st_atime=entry.stat.st_atime1,
            st_ctime=entry.stat.st_ctime1,
            st_mtime=entry.stat.st_mtime1,
            st_nlink=entry.stat.st_nlink,
            st_mode=entry.stat.st_mode,
            st_rdev=entry.stat.st_rdev,
            st_size=entry.stat.st_size,
            st_dev=entry.stat.st_dev,
            st_gid=entry.stat.st_gid,
            st_ino=entry.stat.st_ino,
            st_uid=entry.stat.st_uid,
        )
        entries.append(
            ProtocolDirent(
                d_inode=entry.lstat.st_ino,
                d_type=entry.d_type,
                d_name=entry.d_name,
                lstat=lstat,
                stat=stat,
            )
        )
    return entries

spawn async

spawn(argv: list[str] | None = None, envp: list[str] | None = None, stdin: StrOrIO = sys.stdin, stdout=sys.stdout, raw_tty=False, background=False) -> SpawnResult

spawn a new process and forward its stdin, stdout & stderr

Parameters:

Name Type Description Default
argv list[str] | None

argv of the process to be executed

None
envp list[str] | None

envp of the process to be executed

None
stdin StrOrIO

either a file object to read from OR a string

stdin
stdout

a file object to write both stdout and stderr to. None if background is requested

stdout
raw_tty

should enable raw tty mode

False
background

should execute process in background

False

Returns:

Type Description
SpawnResult

a SpawnResult. error is None if background is requested

Source code in src/rpcclient/rpcclient/core/client.py
async def spawn(
    self,
    argv: list[str] | None = None,
    envp: list[str] | None = None,
    stdin: StrOrIO = sys.stdin,
    stdout=sys.stdout,
    raw_tty=False,
    background=False,
) -> SpawnResult:
    """
    spawn a new process and forward its stdin, stdout & stderr

    :param argv: argv of the process to be executed
    :param envp: envp of the process to be executed
    :param stdin: either a file object to read from OR a string
    :param stdout: a file object to write both stdout and stderr to. None if background is requested
    :param raw_tty: should enable raw tty mode
    :param background: should execute process in background
    :return: a SpawnResult. error is None if background is requested
    """
    if argv is None:
        argv = self.DEFAULT_ARGV

    if envp is None:
        envp = self.DEFAULT_ENVP

    async with self._acquire_protocol_lock():
        pid = await self._execute(argv, envp, background=background)
        self._logger.info(f"shell process started as pid: {pid}")

        if background:
            return SpawnResult(error=None, pid=pid, stdout=None)

        if raw_tty:
            self._prepare_terminal()
        try:
            error = await self.enter_pty_mode(stdin, stdout)
        except Exception:
            # this is important to really catch every exception here, even exceptions not inheriting from Exception
            # so the controlling terminal will remain working with its previous settings
            if raw_tty:
                self._restore_terminal()
            raise

    return SpawnResult(error=error, pid=pid, stdout=stdout)

symbol abstractmethod

symbol(symbol: int) -> SymbolT_co

Get a symbol object from a given address

Source code in src/rpcclient/rpcclient/core/client.py
@abc.abstractmethod
def symbol(self, symbol: int) -> SymbolT_co:
    """Get a symbol object from a given address"""

get_last_error async

get_last_error() -> str

get info about the last occurred error

Source code in src/rpcclient/rpcclient/core/client.py
async def get_last_error(self) -> str:
    """get info about the last occurred error"""
    if not (errno := await self.get_errno()):
        return ""
    err_str_ptr = await self.symbols.strerror(errno)
    err_str = await err_str_ptr.peek_str()
    return f"[{errno}] {err_str}"

setenv async

setenv(name: str, value: str) -> None

set process environment variable

Source code in src/rpcclient/rpcclient/core/client.py
async def setenv(self, name: str, value: str) -> None:
    """set process environment variable"""
    await (await self.symbols.setenv.resolve()).call(name, value)

getenv async

getenv(name: str) -> str | None

get process environment variable

Source code in src/rpcclient/rpcclient/core/client.py
async def getenv(self, name: str) -> str | None:
    """get process environment variable"""
    value = await self.symbols.getenv(name)
    if not isinstance(value, Symbol) or not value:
        return None
    return await value.peek_str()

capture_fd

capture_fd(fd: int, sock_buf_size: int | None = None) -> CaptureFD[SymbolT_co]

Get a context manager, capturing output to fd. Read from it using the read() method

sock_buf_size is required for captures above 6KB, as any write above this value would block until a read is performed.

Parameters:

Name Type Description Default
fd int

FD to capture

required
sock_buf_size int | None

Buffer size for the capture socket, if not specified, default value is used.

None

Returns:

Type Description
CaptureFD[SymbolT_co]

CaptureFD object

Source code in src/rpcclient/rpcclient/core/client.py
def capture_fd(self, fd: int, sock_buf_size: int | None = None) -> CaptureFD[SymbolT_co]:
    """
    Get a context manager, capturing output to `fd`. Read from it using the `read()` method

    sock_buf_size is required for captures above 6KB, as any write above this value would block until a read is performed.

    :param fd: FD to capture
    :param sock_buf_size: Buffer size for the capture socket, if not specified, default value is used.
    :return: CaptureFD object
    """
    return CaptureFD(self, fd, sock_buf_size)

Symbols

rpcclient.core.symbol

AbstractSymbol

Bases: int, ABC

Abstract wrapper for a remote symbol object

Source code in src/rpcclient/rpcclient/core/symbol.py
class AbstractSymbol(int, abc.ABC):
    """Abstract wrapper for a remote symbol object"""

    @final
    def __new__(cls, value: int, *args, **kwargs) -> Self:
        if not isinstance(value, int):
            raise TypeError(f"expected int, got {type(value).__qualname__}")

        value &= 0xFFFFFFFFFFFFFFFF

        return super().__new__(cls, value)

    @abc.abstractmethod
    def _symbol_from_value(self, value: int) -> Self:
        """
        Returns the given value as a symbol.
        The symbol's type can change depending on the value and/or the symbol's state.

        :param value: Symbol address
        :return: Symbol object.
        :rtype: Symbol
        """

    @contextmanager
    def change_item_size(self, new_item_size: int) -> Generator[None]:
        """
        Temporarily change item size
        :param new_item_size: Temporary item size
        """
        save_item_size = self.item_size
        self.item_size = new_item_size
        try:
            yield
        finally:
            self.item_size = save_item_size

    @abc.abstractmethod
    async def peek(self, count: int, offset: int = 0) -> bytes: ...

    @abc.abstractmethod
    async def poke(self, buf: bytes, offset: int = 0) -> Any: ...

    @abc.abstractmethod
    async def peek_str(self, encoding="utf-8") -> str:
        """peek string at given address"""

    def close(self) -> None:
        """Construct compliance."""
        pass

    def seek(self, offset: int, whence: int) -> None:
        """Construct compliance."""
        if whence == os.SEEK_CUR:
            self._offset += offset
        elif whence == os.SEEK_SET:
            self._offset = offset - self
        else:
            raise OSError("Unsupported whence")

    async def read(self, count: int) -> bytes:
        """Construct compliance."""
        val = await (self + self._offset).peek(count)
        self._offset += count
        return val

    async def write(self, buf: bytes) -> None:
        """Construct compliance."""
        val = await (self + self._offset).poke(buf)
        self._offset += len(buf)
        return val

    def tell(self) -> Self:
        """Construct compliance."""
        return self + self._offset

    @property
    @abc.abstractmethod
    def arch(self) -> object: ...

    async def disass(self, size=40) -> list[CsInsn]:
        """peek disassembled lines of 'size' bytes"""
        cs = (
            Cs(CS_ARCH_ARM64, CS_MODE_LITTLE_ENDIAN)
            if self.arch == ARCH_ARM64
            # assume x86_64 by default
            else Cs(CS_ARCH_X86, CS_MODE_LITTLE_ENDIAN | CS_MODE_64)
        )

        return list(cs.disasm(await self.peek(size), self))

    @property
    def c_int64(self) -> int:
        """cast to c_int64"""
        return ctypes.c_int64(self).value

    @property
    def c_uint64(self) -> int:
        """cast to c_uint64"""
        return ctypes.c_uint64(self).value

    @property
    def c_int32(self) -> int:
        """cast to c_int32"""
        return ctypes.c_int32(self).value

    @property
    def c_uint32(self) -> int:
        """cast to c_uint32"""
        return ctypes.c_uint32(self).value

    @property
    def c_int16(self) -> int:
        """cast to c_int16"""
        return ctypes.c_int16(self).value

    @property
    def c_uint16(self) -> int:
        """cast to c_uint16"""
        return ctypes.c_uint16(self).value

    @property
    def c_bool(self) -> bool:
        """cast to c_bool"""
        return ctypes.c_bool(self).value

    @abc.abstractmethod
    async def get_dl_info(self) -> "Container": ...

    async def dl_info(self) -> "Container":
        return await self.get_dl_info()

    async def name(self) -> str:
        return (await self.get_dl_info()).dli_sname

    async def filename(self) -> str:
        return (await self.get_dl_info()).dli_fname

    @property
    @abc.abstractmethod
    def endianness(self) -> str: ...

    async def getindex(self, index: int, *indices: int) -> Self:
        fmt = ADDRESS_SIZE_TO_STRUCT_FORMAT[self.item_size]
        new_symbol = self._symbol_from_value(
            struct.unpack(self.endianness + fmt, await self.peek(self.item_size, offset=index * self.item_size))[0]
        )
        if indices:
            return await new_symbol.getindex(*indices)
        return new_symbol

    async def setindex(self, index: int, value) -> None:
        fmt = ADDRESS_SIZE_TO_STRUCT_FORMAT[self.item_size]
        value = struct.pack(self.endianness + fmt, int(value))
        await self.poke(value, offset=index * self.item_size)

    def __getitem__(self, item: int) -> Coroutine[Any, Any, Self]:
        """Read a remote item: ``await sym[i]`` (awaitable alias for ``getindex``)."""
        return self.getindex(item)

    def __setitem__(self, item: int, value) -> None:
        raise TypeError(
            "item assignment can't be awaited; use `await sym.setindex(index, value)` instead of `sym[index] = value`"
        )

    async def parse(self, struct: "Construct[ParsedType, Any]") -> "ParsedType":
        return struct.parse(await self.peek(struct.sizeof()))

    def __add__(self, other) -> Self:
        try:
            return self._symbol_from_value(int(self) + other)
        except TypeError:
            return int(self) + other

    def __radd__(self, other) -> Self:
        return self.__add__(other)

    def __sub__(self, other) -> Self:
        try:
            return self._symbol_from_value(int(self) - other)
        except TypeError:
            return int(self) - other

    def __rsub__(self, other) -> Self:
        try:
            return self._symbol_from_value(other - int(self))
        except TypeError:
            return other - int(self)

    def __mul__(self, other) -> Self:
        try:
            return self._symbol_from_value(int(self) * other)
        except TypeError:
            return int(self) * other

    def __rmul__(self, other) -> Self:
        return self.__mul__(other)

    def __truediv__(self, other) -> Self:
        return self._symbol_from_value(int(self) / other)

    def __floordiv__(self, other) -> Self:
        return self._symbol_from_value(int(self) // other)

    def __mod__(self, other) -> Self:
        return self._symbol_from_value(int(self) % other)

    def __and__(self, other) -> Self:
        return self._symbol_from_value(int(self) & other)

    def __or__(self, other) -> Self:
        return self._symbol_from_value(int(self) | other)

    def __xor__(self, other) -> Self:
        return self._symbol_from_value(int(self) ^ other)

    def __repr__(self) -> str:
        return f"<{type(self).__name__}: {hex(self)}>"

    def __str__(self) -> str:
        return hex(self)

c_int64 property

c_int64: int

cast to c_int64

c_uint64 property

c_uint64: int

cast to c_uint64

c_int32 property

c_int32: int

cast to c_int32

c_uint32 property

c_uint32: int

cast to c_uint32

c_int16 property

c_int16: int

cast to c_int16

c_uint16 property

c_uint16: int

cast to c_uint16

c_bool property

c_bool: bool

cast to c_bool

change_item_size

change_item_size(new_item_size: int) -> Generator[None]

Temporarily change item size

Parameters:

Name Type Description Default
new_item_size int

Temporary item size

required
Source code in src/rpcclient/rpcclient/core/symbol.py
@contextmanager
def change_item_size(self, new_item_size: int) -> Generator[None]:
    """
    Temporarily change item size
    :param new_item_size: Temporary item size
    """
    save_item_size = self.item_size
    self.item_size = new_item_size
    try:
        yield
    finally:
        self.item_size = save_item_size

peek_str abstractmethod async

peek_str(encoding='utf-8') -> str

peek string at given address

Source code in src/rpcclient/rpcclient/core/symbol.py
@abc.abstractmethod
async def peek_str(self, encoding="utf-8") -> str:
    """peek string at given address"""

close

close() -> None

Construct compliance.

Source code in src/rpcclient/rpcclient/core/symbol.py
def close(self) -> None:
    """Construct compliance."""
    pass

seek

seek(offset: int, whence: int) -> None

Construct compliance.

Source code in src/rpcclient/rpcclient/core/symbol.py
def seek(self, offset: int, whence: int) -> None:
    """Construct compliance."""
    if whence == os.SEEK_CUR:
        self._offset += offset
    elif whence == os.SEEK_SET:
        self._offset = offset - self
    else:
        raise OSError("Unsupported whence")

read async

read(count: int) -> bytes

Construct compliance.

Source code in src/rpcclient/rpcclient/core/symbol.py
async def read(self, count: int) -> bytes:
    """Construct compliance."""
    val = await (self + self._offset).peek(count)
    self._offset += count
    return val

write async

write(buf: bytes) -> None

Construct compliance.

Source code in src/rpcclient/rpcclient/core/symbol.py
async def write(self, buf: bytes) -> None:
    """Construct compliance."""
    val = await (self + self._offset).poke(buf)
    self._offset += len(buf)
    return val

tell

tell() -> Self

Construct compliance.

Source code in src/rpcclient/rpcclient/core/symbol.py
def tell(self) -> Self:
    """Construct compliance."""
    return self + self._offset

disass async

disass(size=40) -> list[CsInsn]

peek disassembled lines of 'size' bytes

Source code in src/rpcclient/rpcclient/core/symbol.py
async def disass(self, size=40) -> list[CsInsn]:
    """peek disassembled lines of 'size' bytes"""
    cs = (
        Cs(CS_ARCH_ARM64, CS_MODE_LITTLE_ENDIAN)
        if self.arch == ARCH_ARM64
        # assume x86_64 by default
        else Cs(CS_ARCH_X86, CS_MODE_LITTLE_ENDIAN | CS_MODE_64)
    )

    return list(cs.disasm(await self.peek(size), self))

Symbol

Bases: AbstractSymbol

wrapper for a remote symbol object

Source code in src/rpcclient/rpcclient/core/symbol.py
class Symbol(AbstractSymbol):
    """wrapper for a remote symbol object"""

    @readonly
    def _client(self) -> "CoreClient[Self]": ...

    def __init__(self, value: int, client: "CoreClient[Self]") -> None:
        """
        Create a Symbol object.
        :param value: Symbol address.
        :param client: client.
        :return: Symbol object.
        :rtype: Symbol
        """
        self.retval_bit_count: int = RETVAL_BIT_COUNT
        self.is_retval_signed: bool = True
        self.item_size: int = 8

        # private members
        __class__._client.set(self, client)
        self._offset: int = 0

    def _symbol_from_value(self, value: int) -> Self:
        """
        Returns the given value as a symbol.
        The symbol's type can change depending on the value and/or the symbol's state.

        :param value: Symbol address
        :return: Symbol object.
        :rtype: Symbol
        """
        return type(self)(value, cast("CoreClient", self._client))

    async def peek(self, count: int, offset: int = 0) -> bytes:
        return await self._client.peek(self + offset, count)

    async def poke(self, buf: bytes, offset: int = 0) -> Any:
        return await self._client.poke(self + offset, buf)

    async def peek_str(self, encoding="utf-8") -> str:
        """peek string at given address"""
        str_len = await self._client.symbols.strlen(self)
        return (await self.peek(str_len)).decode(encoding)

    @property
    def arch(self) -> object:
        return self._client.arch

    async def get_dl_info(self) -> "Container":
        dl_info = Dl_info(self._client)
        sizeof = dl_info.sizeof()
        async with self._client.safe_malloc(sizeof) as info:
            if await self._client.symbols.dladdr(self, info) == 0:
                await self._client.raise_errno_exception(f"failed to extract info for: {self}")
            return dl_info.parse(await info.read(sizeof))

    @property
    def endianness(self) -> str:
        return self._client._endianness

    async def resolve(self) -> Self:
        """Return this symbol unchanged.

        This method exists to allow calling `resolve()` on an object that may be either a symbol or a `LazySymbol`.
        """
        return self

    @overload
    async def call(
        self,
        *args: "RemoteCallArg",
        return_float64: Literal[False] = False,
        return_float32: Literal[False] = False,
        return_raw: Literal[False] = False,
        va_list_index: int | None = None,
    ) -> Self: ...
    @overload
    async def call(
        self, *args: "RemoteCallArg", return_float64: Literal[True], va_list_index: int | None = None
    ) -> float: ...
    @overload
    async def call(
        self, *args: "RemoteCallArg", return_float32: Literal[True], va_list_index: int | None = None
    ) -> float: ...
    @overload
    async def call(
        self, *args: "RemoteCallArg", return_raw: Literal[True], va_list_index: int | None = None
    ) -> Any: ...
    @overload
    async def call(self, *args: "RemoteCallArg", **kwargs) -> "float | Self | Any": ...
    async def call(self, *args: "RemoteCallArg", **kwargs) -> "float | Self | Any":
        """Call this symbol as a function pointer."""
        return await self._client.call(self, args, **kwargs)

    __call__ = call

peek_str async

peek_str(encoding='utf-8') -> str

peek string at given address

Source code in src/rpcclient/rpcclient/core/symbol.py
async def peek_str(self, encoding="utf-8") -> str:
    """peek string at given address"""
    str_len = await self._client.symbols.strlen(self)
    return (await self.peek(str_len)).decode(encoding)

resolve async

resolve() -> Self

Return this symbol unchanged.

This method exists to allow calling resolve() on an object that may be either a symbol or a LazySymbol.

Source code in src/rpcclient/rpcclient/core/symbol.py
async def resolve(self) -> Self:
    """Return this symbol unchanged.

    This method exists to allow calling `resolve()` on an object that may be either a symbol or a `LazySymbol`.
    """
    return self

call async

call(*args: RemoteCallArg, return_float64: Literal[False] = False, return_float32: Literal[False] = False, return_raw: Literal[False] = False, va_list_index: int | None = None) -> Self
call(*args: RemoteCallArg, return_float64: Literal[True], va_list_index: int | None = None) -> float
call(*args: RemoteCallArg, return_float32: Literal[True], va_list_index: int | None = None) -> float
call(*args: RemoteCallArg, return_raw: Literal[True], va_list_index: int | None = None) -> Any
call(*args: RemoteCallArg, **kwargs) -> float | Self | Any
call(*args: RemoteCallArg, **kwargs) -> float | Self | Any

Call this symbol as a function pointer.

Source code in src/rpcclient/rpcclient/core/symbol.py
async def call(self, *args: "RemoteCallArg", **kwargs) -> "float | Self | Any":
    """Call this symbol as a function pointer."""
    return await self._client.call(self, args, **kwargs)

Capturing file descriptors

rpcclient.core.capture_fd

CaptureFD

Bases: Allocated['CoreClient[SymbolT_co]'], Generic[SymbolT_co]

Context manager, capturing output to a given fd. Read from it using the read() method.

Source code in src/rpcclient/rpcclient/core/capture_fd.py
class CaptureFD(Allocated["CoreClient[SymbolT_co]"], Generic[SymbolT_co]):
    """
    Context manager, capturing output to a given `fd`. Read from it using the `read()` method.
    """

    def __init__(self, client: "CoreClient[SymbolT_co]", fd: int, sock_buf_size: int | None = None) -> None:
        """
        sock_buf_size is required for captures above 6KB, as any write above this value would block until a read is performed.

        :param rpcclient.client.client.Client client: Current client
        :param fd: FD to capture
        :param sock_buf_size: Buffer size for the capture socket, if not specified, default value is used.
        """
        self._client = client
        self.fd: int = fd
        self._backupfd: int | None = None
        self._socket_pair: tuple[int, int] | None = None
        self._sock_buf_size: int | None = sock_buf_size

    async def _allocate(self) -> None:
        async with self._client.safe_malloc(FD_SIZE * 2) as socket_pair:
            socket_pair.item_size = FD_SIZE
            if await self._client.symbols.socketpair(AF_UNIX, SOCK_STREAM, 0, socket_pair) != 0:
                await self._client.raise_errno_exception("socketpair failed")
            self._socket_pair = ((await socket_pair.getindex(0)).c_int32, (await socket_pair.getindex(1)).c_int32)
        if self._sock_buf_size is not None:
            await Socket(self._client, self._socket_pair[0]).setbufsize(self._sock_buf_size)
        self._backupfd = (await self._client.symbols.dup(self.fd)).c_int32
        if self._backupfd == -1:
            self._backupfd = None
            await self._client.raise_errno_exception("dup fd failed")
        if await self._client.symbols.dup2(self._socket_pair[0], self.fd) < 0:
            await self._client.raise_errno_exception("dup2 sock-fd failed")

    async def _deallocate(self) -> None:
        if self._backupfd is not None:
            if await self._client.symbols.dup2(self._backupfd, self.fd) < 0:
                await self._client.raise_errno_exception("dup2 backup-fd failed")
            if await self._client.symbols.close(self._backupfd) != 0:
                await self._client.raise_errno_exception("close backupfd failed")
            self._backupfd = None
        if self._socket_pair is not None:
            if await self._client.symbols.close(self._socket_pair[0]) != 0:
                await self._client.raise_errno_exception(f"close _socket_pair[0] {self._socket_pair[0]} failed")
            if await self._client.symbols.close(self._socket_pair[1]) != 0:
                await self._client.raise_errno_exception(f"close _socket_pair[1] {self._socket_pair[1]} failed")
            self._socket_pair = None

    async def read(self) -> bytes:
        """Read the bytes captured from `fd` so far."""
        data = b""
        if self._socket_pair is not None:
            async with self._client.safe_malloc(READ_SIZE) as buff:
                read = READ_SIZE
                while read == READ_SIZE:
                    async with self._client.safe_malloc(pollfd.sizeof()) as pfds:
                        await pfds.poke(pollfd.build({"fd": self._socket_pair[1], "events": POLLIN, "revents": 0}))
                        if await self._client.symbols.poll(pfds, 1, 0) != 1:
                            return data
                    read = (await self._client.symbols.read(self._socket_pair[1], buff, READ_SIZE)).c_int32
                    if read == -1:
                        await self._client.raise_errno_exception("read fd failed")
                    data += await buff.peek(read)
        return data

read async

read() -> bytes

Read the bytes captured from fd so far.

Source code in src/rpcclient/rpcclient/core/capture_fd.py
async def read(self) -> bytes:
    """Read the bytes captured from `fd` so far."""
    data = b""
    if self._socket_pair is not None:
        async with self._client.safe_malloc(READ_SIZE) as buff:
            read = READ_SIZE
            while read == READ_SIZE:
                async with self._client.safe_malloc(pollfd.sizeof()) as pfds:
                    await pfds.poke(pollfd.build({"fd": self._socket_pair[1], "events": POLLIN, "revents": 0}))
                    if await self._client.symbols.poll(pfds, 1, 0) != 1:
                        return data
                read = (await self._client.symbols.read(self._socket_pair[1], buff, READ_SIZE)).c_int32
                if read == -1:
                    await self._client.raise_errno_exception("read fd failed")
                data += await buff.peek(read)
    return data

Exceptions

rpcclient.exceptions

RpcClientException

Bases: Exception

general exception

Source code in src/rpcclient/rpcclient/exceptions.py
1
2
3
4
class RpcClientException(Exception):
    """general exception"""

    pass

InvalidServerVersionMagicError

Bases: RpcClientException

server handshake failed due to an invalid magic

Source code in src/rpcclient/rpcclient/exceptions.py
class InvalidServerVersionMagicError(RpcClientException):
    """server handshake failed due to an invalid magic"""

    pass

ResponseNotFoundError

Bases: RpcClientException

Response not found

Source code in src/rpcclient/rpcclient/exceptions.py
class ResponseNotFoundError(RpcClientException):
    """Response not found"""

    pass

SubsystemInitError

Bases: RpcClientException

Raised when unavailable subsystems are used.

Source code in src/rpcclient/rpcclient/exceptions.py
class SubsystemInitError(RpcClientException):
    """Raised when unavailable subsystems are used."""

    pass

ServerResponseError

Bases: RpcClientException

Server returned error

Source code in src/rpcclient/rpcclient/exceptions.py
class ServerResponseError(RpcClientException):
    """Server returned error"""

    pass

ServerDiedError

Bases: RpcClientException

server became disconnected during an operation

Source code in src/rpcclient/rpcclient/exceptions.py
class ServerDiedError(RpcClientException):
    """server became disconnected during an operation"""

    pass

SymbolAbsentError

Bases: RpcClientException

trying to access a symbol which is not exported from any library currently loaded into the server's memory

Source code in src/rpcclient/rpcclient/exceptions.py
class SymbolAbsentError(RpcClientException):
    """trying to access a symbol which is not exported from any library currently loaded into the server's memory"""

    pass

ProcessSymbolAbsentError

Bases: RpcClientException

trying to access a symbol which is not exported from any library currently loaded into the process's memory

Source code in src/rpcclient/rpcclient/exceptions.py
class ProcessSymbolAbsentError(RpcClientException):
    """trying to access a symbol which is not exported from any library currently loaded into the process's memory"""

    pass

ArgumentError

Bases: RpcClientException

at least one of the supplied arguments for a given function was invalid

Source code in src/rpcclient/rpcclient/exceptions.py
class ArgumentError(RpcClientException):
    """at least one of the supplied arguments for a given function was invalid"""

    pass

BadReturnValueError

Bases: RpcClientException

remote c function returned an error

Source code in src/rpcclient/rpcclient/exceptions.py
class BadReturnValueError(RpcClientException):
    """remote c function returned an error"""

    pass

NoSuchPreferenceError

Bases: RpcClientException

attempt to read a preference data which doesn't exist

Source code in src/rpcclient/rpcclient/exceptions.py
class NoSuchPreferenceError(RpcClientException):
    """attempt to read a preference data which doesn't exist"""

    pass

CfSerializationError

Bases: RpcClientException

failed to encode/decode a cfobject into/from a python object

Source code in src/rpcclient/rpcclient/exceptions.py
class CfSerializationError(RpcClientException):
    """failed to encode/decode a cfobject into/from a python object"""

    pass

SpawnError

Bases: RpcClientException

failed to spawn a child process

Source code in src/rpcclient/rpcclient/exceptions.py
class SpawnError(RpcClientException):
    """failed to spawn a child process"""

    pass

FailedToConnectError

Bases: RpcClientException

failed to connect to rpcserver

Source code in src/rpcclient/rpcclient/exceptions.py
class FailedToConnectError(RpcClientException):
    """failed to connect to rpcserver"""

    pass

UnrecognizedSelectorError

Bases: RpcClientException

tried to access a non-existing objc object selector

Source code in src/rpcclient/rpcclient/exceptions.py
class UnrecognizedSelectorError(RpcClientException):
    """tried to access a non-existing objc object selector"""

    pass

GettingObjectiveCClassError

Bases: RpcClientException

failed to create an objc class wrapper for a given object

Source code in src/rpcclient/rpcclient/exceptions.py
class GettingObjectiveCClassError(RpcClientException):
    """failed to create an objc class wrapper for a given object"""

    pass

MissingLibraryError

Bases: RpcClientException

a required library could not be found

Source code in src/rpcclient/rpcclient/exceptions.py
class MissingLibraryError(RpcClientException):
    """a required library could not be found"""

    pass

NoEntitlementsError

Bases: RpcClientException

binary contains no entitlements

Source code in src/rpcclient/rpcclient/exceptions.py
class NoEntitlementsError(RpcClientException):
    """binary contains no entitlements"""

    pass

HarGlobalNotFoundError

Bases: RpcClientException

Failed to find Harlogger global

Source code in src/rpcclient/rpcclient/exceptions.py
class HarGlobalNotFoundError(RpcClientException):
    """Failed to find Harlogger global"""

    pass

ElementNotFoundError

Bases: RpcClientException

Failed to find element

Source code in src/rpcclient/rpcclient/exceptions.py
class ElementNotFoundError(RpcClientException):
    """Failed to find element"""

    pass

FirstElementNotFoundError

Bases: ElementNotFoundError

Failed to find the first element

Source code in src/rpcclient/rpcclient/exceptions.py
class FirstElementNotFoundError(ElementNotFoundError):
    """Failed to find the first element"""

    pass

LastElementNotFoundError

Bases: ElementNotFoundError

Failed to find the last element

Source code in src/rpcclient/rpcclient/exceptions.py
class LastElementNotFoundError(ElementNotFoundError):
    """Failed to find the last element"""

    pass

LaunchError

Bases: BadReturnValueError

failed to launch a child process

Source code in src/rpcclient/rpcclient/exceptions.py
class LaunchError(BadReturnValueError):
    """failed to launch a child process"""

    pass

RpcFileExistsError

Bases: BadReturnValueError

RPC version for FileExistsError (errno = EEXIST)

Source code in src/rpcclient/rpcclient/exceptions.py
class RpcFileExistsError(BadReturnValueError):
    """RPC version for FileExistsError (errno = EEXIST)"""

    pass

RpcFileNotFoundError

Bases: BadReturnValueError

RPC version for FileNotFoundError (errno = ENOENTRY)

Source code in src/rpcclient/rpcclient/exceptions.py
class RpcFileNotFoundError(BadReturnValueError):
    """RPC version for FileNotFoundError (errno = ENOENTRY)"""

    pass

RpcBrokenPipeError

Bases: BadReturnValueError

RPC version for BrokenPipeError (errno = EPIPE)

Source code in src/rpcclient/rpcclient/exceptions.py
class RpcBrokenPipeError(BadReturnValueError):
    """RPC version for BrokenPipeError (errno = EPIPE)"""

    pass

RpcNotEmptyError

Bases: BadReturnValueError

raised when errno = ENOTEMPTY

Source code in src/rpcclient/rpcclient/exceptions.py
class RpcNotEmptyError(BadReturnValueError):
    """raised when errno = ENOTEMPTY"""

    pass

RpcIsADirectoryError

Bases: BadReturnValueError

RPC version for IsADirectoryError (errno = ENOTEMPTY)

Source code in src/rpcclient/rpcclient/exceptions.py
class RpcIsADirectoryError(BadReturnValueError):
    """RPC version for IsADirectoryError (errno = ENOTEMPTY)"""

    pass

RpcNotADirectoryError

Bases: BadReturnValueError

RPC version for NotADirectoryError (errno = ENOTDIR)

Source code in src/rpcclient/rpcclient/exceptions.py
class RpcNotADirectoryError(BadReturnValueError):
    """RPC version for NotADirectoryError (errno = ENOTDIR)"""

    pass

RpcPermissionError

Bases: BadReturnValueError

RPC version for PermissionError (errno = EPERM)

Source code in src/rpcclient/rpcclient/exceptions.py
class RpcPermissionError(BadReturnValueError):
    """RPC version for PermissionError (errno = EPERM)"""

    pass

RpcAccessibilityTurnedOffError

Bases: BadReturnValueError

Application AX and Automation is turned off

Source code in src/rpcclient/rpcclient/exceptions.py
class RpcAccessibilityTurnedOffError(BadReturnValueError):
    """Application AX and Automation is turned off"""

    pass

RpcFailedToRecordError

Bases: BadReturnValueError

An attempt to record has failed

Source code in src/rpcclient/rpcclient/exceptions.py
class RpcFailedToRecordError(BadReturnValueError):
    """An attempt to record has failed"""

    pass

RpcFailedToPlayError

Bases: BadReturnValueError

An attempt to play has failed

Source code in src/rpcclient/rpcclient/exceptions.py
class RpcFailedToPlayError(BadReturnValueError):
    """An attempt to play has failed"""

    pass

RpcResourceTemporarilyUnavailableError

Bases: BadReturnValueError

raised when errno = EAGAIN

Source code in src/rpcclient/rpcclient/exceptions.py
class RpcResourceTemporarilyUnavailableError(BadReturnValueError):
    """raised when errno = EAGAIN"""

    pass

RpcConnectionRefusedError

Bases: BadReturnValueError

RPC version for ConnectionRefusedError (errno = ECONNREFUSED)

Source code in src/rpcclient/rpcclient/exceptions.py
class RpcConnectionRefusedError(BadReturnValueError):
    """RPC version for ConnectionRefusedError (errno = ECONNREFUSED)"""

    pass

RpcDeadClientError

Bases: BadReturnValueError

RPC client in a dead state

Source code in src/rpcclient/rpcclient/exceptions.py
class RpcDeadClientError(BadReturnValueError):
    """RPC client in a dead state"""

    pass

RpcFailedLaunchingAppError

Bases: BadReturnValueError

Failed to launch application

Source code in src/rpcclient/rpcclient/exceptions.py
class RpcFailedLaunchingAppError(BadReturnValueError):
    """Failed to launch application"""

    pass

RpcAppleScriptError

Bases: BadReturnValueError

Failed to execute given AppleScript

Source code in src/rpcclient/rpcclient/exceptions.py
class RpcAppleScriptError(BadReturnValueError):
    """Failed to execute given AppleScript"""

    pass

RpcXpcError

Bases: BadReturnValueError

XPC-related error

Source code in src/rpcclient/rpcclient/exceptions.py
class RpcXpcError(BadReturnValueError):
    """XPC-related error"""

    pass

RpcXpcSerializationError

Bases: RpcXpcError

Failed to serialize/deserialize XPC message

Source code in src/rpcclient/rpcclient/exceptions.py
class RpcXpcSerializationError(RpcXpcError):
    """Failed to serialize/deserialize XPC message"""

    pass

RpcSetDeveloperModeError

Bases: BadReturnValueError

Failed to set Developer Mode

Source code in src/rpcclient/rpcclient/exceptions.py
class RpcSetDeveloperModeError(BadReturnValueError):
    """Failed to set Developer Mode"""

    pass

RpcFailedToGetPrimaryAppError

Bases: BadReturnValueError

Failed to get [AXElement primaryApp]

Source code in src/rpcclient/rpcclient/exceptions.py
class RpcFailedToGetPrimaryAppError(BadReturnValueError):
    """Failed to get [AXElement primaryApp]"""

    pass